c# – 观察者可以安全地用Rx监听多个可观察量吗?
我试图看看如何使用Rx将多个可观察事件流式传输到一组事件中.但是当我运行下面的代码时,我得到一个例外.那么这是否意味着由于违反Rx语法,多个观察者总是容易出现异常?我的意思是如果这些多个观察者中的两个偶然同时生成一个事件(任何两个可观察者将同时产生一些概率),它应该给出一个例外.
DateTimeOffset start; object sync = new object(); var subject = new Subject<long>(); var observer = Observer.Create<long>(c => { lock (sync) { Console.WriteLine(c); } }) ; var observable1 = Observable.Interval(TimeSpan.FromSeconds(2)); var observable2 = Observable.Interval(TimeSpan.FromSeconds(5)); var observable3 = Observable.Never<long>().Timeout (start = DateTimeOffset.Now.AddSeconds(15),(new long[] { 1 }).ToObservable()); var observable4 = Observable.Never<long>().Timeout(start); observable1.Subscribe(observer); observable2.Subscribe(observer); observable3.Subscribe(observer); observable4.Subscribe(observer); Thread.Sleep(20000); 感谢Gideon的解释.这是我得到的例外.你是对的,这是一个时间的例子.这是一个编码错误.谢谢. System.TimeoutException: The operation has timed out. at System.Reactive.Observer.<Create>b__8[T](Exception e) at System.Reactive.AnonymousObserver`1.Error(Exception exception) at System.Reactive.AbstractObserver`1.OnError(Exception error) at System.Reactive.Subjects.Subject`1.OnError(Exception error) at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e xception) at System.Reactive.AbstractObserver`1.OnError(Exception error) at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e xception) at System.Reactive.AbstractObserver`1.OnError(Exception error) at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28 e.<Throw>b__28b() at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler,Action action) at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta te,Func`3 action) at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler,Actio n action) at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse rver`1 observer) at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0() at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler,Action action) at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore() at System.Reactive.Concurrency.ScheduledItem`1.Invoke() at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run() at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState state,TimeSpan dueTime,Func`3 action) at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState state,Actio n action) at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer) at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54 5.<Timeout>b__53f() at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler,Action action) at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche dule>b__6(Object _) at System.Threading._TimerCallback.TimerCallback_Context(Object state) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext,C ontextCallback callback,Object state,Boolean ignoreSyncCtx) at System.Threading._TimerCallback.PerformTimerCallback(Object state) 解决方法
是的,观察者可以听多个可观察者.最好的例子是Merge运算符.内置的运算符都将遵循RX语法,并且通常会在没有的语法上执行它.
从Observer.Create获得的IObserver就是这样一种情况.一旦调用OnError或OnCompleted,它将忽略将来对OnNext的任何调用.这意味着使用相同的观察者来订阅一个observable,然后在第一个observable之后另一个observable将无法工作,因为来自第一个observable的终止消息将导致观察者忽略来自第二个observable的消息.为了解决这个问题,像Merge,Concat和OnErrorResumeNext(以及其他)这样的运算符在内部使用多个观察者,并且不会从最后一个observable到外部的任何一个传递完成消息(OnError和/或OnCompleted,具体取决于运算符的语义)观察者. 你没有提到你得到的异常,但我猜这是你从observable4获得的超时错误.如果没有提供另一个observable用于超时,则会调用观察者的OnError,并且不带错误处理程序的Subscribe和Observer.Create重载的默认OnError就是抛出异常. 虽然这显然是示例/测试代码,但我想指出即使您不再将消息传递给OnNext,所有其他可观察对象也会在此异常后继续运行.可以使用Merge为您跟踪此信息,也可以跟踪描述中的所有一次性用品,并在完成消息时自行处理. CompositeDisposable(在System.Reactive.Disposables中)对此有好处. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |