c# – Reactive框架消除了Observable.Create中的while循环
发布时间:2020-12-15 17:16:42 所属栏目:百科 来源:网络整理
导读:我正在使用一个可观察的字节流,从网络出来,我想把它带到一层抽象.格式有两个字节,包含下一条消息的长度.我想很好地适应反应框架.到目前为止,我感觉不太正确,所以我想知道我可能错过了什么技巧来消除这里的while循环. 这是我想到的概念: public static IObse
我正在使用一个可观察的字节流,从网络出来,我想把它带到一层抽象.格式有两个字节,包含下一条消息的长度.我想很好地适应反应框架.到目前为止,我感觉不太正确,所以我想知道我可能错过了什么技巧来消除这里的while循环.
这是我想到的概念: public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) { return Observable.Create((IObserver<Stream>o)=>{ bool done = false; var byteStream = byteStreamArg.Do( b => { },(ex) => { done = true; },() => { done = true; }); while (!done) { var size = byteStream.Take(2). Aggregate(0,(n,b) => (n << 8) + b).Single(); var buf = byteStream.Skip(2).Take(size); var stream = new MemoryStream(buf.ToEnumerable().ToArray()); if (!done) { o.OnNext(stream); } } return (() => {}); }); } 解决方法
IObservable在这里有点奇怪 – 记住你正在返回一个“未来流列表” – 我实际上只是返回一个Stream,或者可能是IObservable< byte []>,其中每个数组代表一条消息.或者做得更好,并返回IObservable< ParsedMessage>
此外,你的While循环使这非同步并且行为奇怪.怎么样更像这样的东西: public static IObservable<System.IO.Stream> GetToplevelStreams(IObservable<byte> byteStream) { return Observable.Create((IObserver<System.IO.Stream> o) => { int? size1=null; int? size=null; var buf = new MemoryStream(); var subscription = byteStream.Subscribe(v => { if (!size1.HasValue) { size1 = ((int)v) << 8; } else if (!size.HasValue) { size = size1.Value + v; } else { buf.WriteByte(v); } if (size.HasValue && buf.Length == size) { buf.Position = 0; o.OnNext(buf); buf.SetLength(0); size1 = null; size = null; } },(ex)=>o.OnError(ex),()=>o.OnCompleted()); return () => subscription.Dispose(); }); } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐
热点阅读