.net – Rx框架中的内容允许我在创建过程中等待其他方法时返回IO
我一直在努力使用
Reactive Extensions for
Twitter’s streaming APIs创建
IObservable<T> 实现.
从高级别发送HTTP请求并保持连接打开.以长度为前缀的项目被发送到消费. 基本上,它是使用 但是,在此循环开始处理之前必须完成许多事情,这需要调用 public async Task<IObservable<string>> Create(string parameter,CancellationToken cancellationToken) { // Make some call that requires await. var response = await _httpClient.SendAsync(parameter,cancellationToken). ConfigureAwait(false); // Create a BufferBlock which will be the observable. var block = new BufferBlock<T>(); // Start some background task which will use the block and publish to it // on a separate task. This is not something that is awaited for. ConsumeResponse(response,block,cancellationToken); // Create the observable. return block.AsObservable(); } 也就是说,我正在返回一个任务< IObservable< T>>从我的方法,但我觉得我在Reactive Extensions中缺少一些东西,这将允许我使用await来促进我需要进行的调用,但也返回一个IObservable< T>而不是任务< IObservable< T>>. Reactive Extensions中的哪个方法允许我创建一个observable,它需要在从创建方法返回之前等待方法? 我发现最接近的是Observable.DeferAsync.假设对我的方法的调用和observable的使用是这样的: public async Task Observe() { // NOT the real name of the interface,but explains it's role here. IObservableFactory factory; // Create is really named something else. IObservable<string> observable = factory.Create("parameter"); // Subscribe. observable.Subscribe(o => Console.WriteLine("Observed: {0}",o)); // Wait. await observable; } 使用DeferAsync在这里不起作用,因为对Subscribe的调用将发送第一个请求,然后读取它,然后对observable的await调用将创建第二个订阅,但是创建一个不同的observable. 或者,最终返回任务< IObservable< T>>在Reactive Framework中执行此操作的适当方法? 随后,由于该方法返回任务< T>,因此最好通过 我的直觉说不,因为这里存在违反问题分离以及取消的DRY原则: >取消创建和取消观察是两个不同的事情. 解决方法
我不会返回任务< IObservable< T>>.在公共API中混合任务和Observable最终会让人感到困惑.请记住,任务可以被视为产生单个值的可观察对象.这也意味着不要将CancellationTokens与公共API中的observable混合使用.您可以通过订阅和取消订阅来控制可观察对象.
这并不意味着你不能混合幕后的概念.以下是使用Observable.Using,Task.ToObservable和CancellationDisposable执行所需操作的方法 首先,修改您的方法以返回任务< ISourceBlock< string>>: public async Task<ISourceBlock<string>> CreateBlock(string parameter,cancellationToken).ConfigureAwait(false); // Create a BufferBlock which will be the observable. var block = new BufferBlock<T>(); // Start some background task which will use the block and publish to it // on a separate task. This is not something that is awaited for. ConsumeResponse(response,cancellationToken); return block; } 现在,这是使用上述方法的新Create方法: public IObservable<string> Create(string parameter) { // Create a cancellation token that will be canceled when the observable is unsubscribed,use this token in your call to CreateBlock. // Use ToObservable() to convert the Task to an observable so we can then // use SelectMany to subscribe to the block itself once it is available return Observable.Using(() => new CancellationDisposable(),cd => CreateBlock(parameter,cd.Token) .ToObservable() .SelectMany(block => block.AsObservable())); } 编辑:我发现Rx已经使用FromAsync实现了这种模式: public IObservable<string> Create(string parameter) { return Observable.FromAsync(token => CreateBlock(parameter,token)) .SelectMany(block => block.AsObservable()); } 还有,DeferAsync,这更合适,因为你的Task实际上是在创建你真正想要观察的Observable(例如你的块): public IObservable<string> Create(string parameter) { return Observable.DeferAsync(async token => (await CreateBlock(parameter,token)).AsObservable()); } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |