c# – 使用NetMQ接收的IObservable
发布时间:2020-12-15 23:23:17 所属栏目:百科 来源:网络整理
导读:我正在尝试编写一个典型的股票交易程序,它从netmq接收股票代码/订单/交易,将流转换为IObservable,并在 WPF前端显示它们.我尝试使用async / await与NetMQ阻塞ReceiveString(假设我期待一些字符串输入),以便ReceiveString循环不会阻止主(UI)线程.由于我还是C#
我正在尝试编写一个典型的股票交易程序,它从netmq接收股票代码/订单/交易,将流转换为IObservable,并在
WPF前端显示它们.我尝试使用async / await与NetMQ阻塞ReceiveString(假设我期待一些字符串输入),以便ReceiveString循环不会阻止主(UI)线程.由于我还是C#的新手,我在这篇文章中接受了Dave Sexton的回答:(
https://social.msdn.microsoft.com/Forums/en-US/b0cf96b0-d23e-4461-9d2b-ca989be678dc/where-is-iasyncenumerable-in-the-lastest-release?forum=rx)并尝试写一些这样的例子:
using System; using System.Threading; using System.Threading.Tasks; using System.Collections.Generic; using NetMQ; using NetMQ.Sockets; using System.Reactive; using System.Reactive.Linq; namespace App1 { class MainClass { // publisher for testing,should be an external data publisher in real environment public static Thread StartPublisher(PublisherSocket s) { s.Bind("inproc://test"); var thr = new Thread(() => { Console.WriteLine("Start publishing..."); while (true) { Thread.Sleep(500); s.Send("hello"); } }); thr.Start(); return thr; } public static IObservable<string> Receive(SubscriberSocket s) { s.Connect("inproc://test"); s.Subscribe(""); return Observable.Create<string>( async observer => { while (true) { var result = await s.ReceiveString(); observer.OnNext(result); } }); } public static void Main(string[] args) { var ctx = NetMQContext.Create(); var sub = ctx.CreateSubscriberSocket(); var pub = ctx.CreatePublisherSocket(); StartPublisher(pub); Receive(sub).Subscribe(Console.WriteLine); Console.ReadLine(); } } } 它无法使用“无法等待字符串”进行编译.虽然我知道它可能会期待一个任务,但我并不清楚如何完成整个任务. 再次换行:我想要实现的只是使用简单的阻塞apis从netmq获取IObservable的股票代码/订单/交易流,但没有真正阻止主线程. 我能用它做什么吗?非常感谢. 解决方法
我不熟悉NetMQ,但你真的应该构建你这样可观察的:
public static IObservable<string> Receive(NetMQContext ctx) { return Observable .Create<string>(o => Observable.Using<string,SubscriberSocket>(() => { var sub = ctx.CreateSubscriberSocket(); sub.Connect("inproc://test"); sub.Subscribe(""); return sub; },sub => Observable .FromEventPattern<EventHandler<NetMQSocketEventArgs>,NetMQSocketEventArgs>( h => sub.ReceiveReady += h,h => sub.ReceiveReady -= h) .Select(x => sub.ReceiveString())) .Subscribe(o)); } 这将自动为您创建SubscriberSocket,当observable结束时,将自动在您的套接字上调用.Dispose(). 就像我说的,我不熟悉NetMQ,所以上面的代码没有收到任何已发布的消息,所以你需要摆弄它才能让它工作,但这是一个很好的起点. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |