c# – 使用BlockingCollection的活动框架作为消息队列
我最近一直在与“反应框架”一起做了一些工作,到目前为止绝对喜欢.我正在考虑用一些过滤的IObservables替换传统的轮询消息队列,以清理我的服务器操作.以旧的方式,我处理了像这样进入服务器的消息:
// Start spinning the process message loop Task.Factory.StartNew(() => { while (true) { Command command = m_CommandQueue.Take(); ProcessMessage(command); } },TaskCreationOptions.LongRunning); 这导致连续轮询线程将命令从客户机委派到ProcessMessage方法,其中我有一系列if / else-if语句来确定命令的类型,并根据其类型委派工作 我正在用一个事件驱动的系统替换这个,使用了Reactive,我写了以下代码: private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>(); private IObservable<BesiegedMessage> m_MessagePublisher; m_MessagePublisher = m_MessageQueue .GetConsumingEnumerable() .ToObservable(TaskPoolScheduler.Default); // All generic Server messages (containing no properties) will be processed here IDisposable genericServerMessageSubscriber = m_MessagePublisher .Where(message => message is GenericServerMessage) .Subscribe(message => { // do something with the generic server message here } 我的问题是,尽管如此,使用阻塞集合作为IObservable的支持是很好的做法吗?我没有看到Take()曾经被称为这种方式,这使我认为消息将在处理完毕后不被删除而在队列上堆积? 查看Subjects作为后备集合来驱动已经过滤的IObservables将会收到这些消息会更有效率吗?有没有我在这里错过的可能会有益于这个系统的架构? 解决方法
这里有一些直接从我的后方获得的东西 – 任何真正的解决方案都将取决于你的实际使用情况,但是这里是“最便宜的伪消息队列系统”:
思想/动机: >有意识地暴露IObservable< T>使得订阅者可以执行他们想要的任何过滤/交叉订阅 代码: public class TheCheapestPubSubEver { private Subject<object> _inner = new Subject<object>(); public IObservable<T> Register<T>() { return _inner.OfType<T>().Publish().RefCount(); } public void Publish<T>(T message) { _inner.OnNext(message); } } 用法: void Main() { var queue = new TheCheapestPubSubEver(); var ofString = queue.Register<string>(); var ofInt = queue.Register<int>(); using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}",i))) using(ofString.Subscribe(s => Console.WriteLine("A string! {0}",s))) { queue.Publish("Foo"); queue.Publish(1); Console.ReadLine(); } } 输出: A string! Foo An int! 1 然而,这并不是严格执行“消费消费者” – 特定类型的多个注册表将导致多个观察者呼叫 – 即: var queue = new TheCheapestPubSubEver(); var ofString = queue.Register<string>(); var anotherOfString = queue.Register<string>(); var ofInt = queue.Register<int>(); using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}",i))) using(ofString.Subscribe(s => Console.WriteLine("A string! {0}",s))) using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}",s))) { queue.Publish("Foo"); queue.Publish(1); Console.ReadLine(); } 结果是: A string! Foo Another string! Foo An int! 1 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |