加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

c# – 使用NetMQ接收的IObservable

发布时间:2020-12-15 23:23:17 所属栏目:百科 来源:网络整理
导读:我正在尝试编写一个典型的股票交易程序,它从netmq接收股票代码/订单/交易,将流转换为IObservable,并在 WPF前端显示它们.我尝试使用async / await与NetMQ阻塞ReceiveString(假设我期待一些字符串输入),以便ReceiveString循环不会阻止主(UI)线程.由于我还是C#
我正在尝试编写一个典型的股票交易程序,它从netmq接收股票代码/订单/交易,将流转换为IObservable,并在 WPF前端显示它们.我尝试使用async / await与NetMQ阻塞ReceiveString(假设我期待一些字符串输入),以便ReceiveString循环不会阻止主(UI)线程.由于我还是C#的新手,我在这篇文章中接受了Dave Sexton的回答:( https://social.msdn.microsoft.com/Forums/en-US/b0cf96b0-d23e-4461-9d2b-ca989be678dc/where-is-iasyncenumerable-in-the-lastest-release?forum=rx)并尝试写一些这样的例子:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using NetMQ;
using NetMQ.Sockets;
using System.Reactive;
using System.Reactive.Linq;

namespace App1
{
    class MainClass
    {
        // publisher for testing,should be an external data publisher in real environment
        public static Thread StartPublisher(PublisherSocket s)
        {
            s.Bind("inproc://test");
            var thr = new Thread(() => {
                Console.WriteLine("Start publishing...");
                while (true) {
                    Thread.Sleep(500);
                    s.Send("hello");
                }
            });
            thr.Start();
            return thr;
        }

        public static IObservable<string> Receive(SubscriberSocket s)
        {
            s.Connect("inproc://test");
            s.Subscribe("");
            return Observable.Create<string>(
                async observer =>
                {
                    while (true)
                    {
                        var result = await s.ReceiveString();
                        observer.OnNext(result);
                    }
                });
        }

        public static void Main(string[] args)
        {
            var ctx = NetMQContext.Create();
            var sub = ctx.CreateSubscriberSocket();
            var pub = ctx.CreatePublisherSocket();
            StartPublisher(pub);

            Receive(sub).Subscribe(Console.WriteLine);
            Console.ReadLine();
        }
    }
}

它无法使用“无法等待字符串”进行编译.虽然我知道它可能会期待一个任务,但我并不清楚如何完成整个任务.

再次换行:我想要实现的只是使用简单的阻塞apis从netmq获取IObservable的股票代码/订单/交易流,但没有真正阻止主线程.

我能用它做什么吗?非常感谢.

解决方法

我不熟悉NetMQ,但你真的应该构建你这样可观察的:

public static IObservable<string> Receive(NetMQContext ctx)
    {
        return Observable
            .Create<string>(o =>
                Observable.Using<string,SubscriberSocket>(() =>
                {
                    var sub = ctx.CreateSubscriberSocket();
                    sub.Connect("inproc://test");
                    sub.Subscribe("");
                    return sub;
                },sub =>
                Observable
                    .FromEventPattern<EventHandler<NetMQSocketEventArgs>,NetMQSocketEventArgs>(
                        h => sub.ReceiveReady += h,h => sub.ReceiveReady -= h)
                    .Select(x => sub.ReceiveString()))
            .Subscribe(o));
    }

这将自动为您创建SubscriberSocket,当observable结束时,将自动在您的套接字上调用.Dispose().

就像我说的,我不熟悉NetMQ,所以上面的代码没有收到任何已发布的消息,所以你需要摆弄它才能让它工作,但这是一个很好的起点.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读