c# – 如何通过在原始序列的值上运行任务来创建Rx序列?
我有一个IObservable类型的序列< T>以及将T,CancellationToken映射到任务< U>的函数.获得IObservable< U>的最简洁方法是什么?出来的?
我需要以下语义: >每个任务在上一个项目的任务完成后开始 这是我看到的签名: public static IObservable<U> Select<T,U> ( this IObservable<T> source,Func<T,CancellationToken,Task<U>> selector ); 我还没有写任何代码,但我会除非有人打败我. 我需要C#4中的解决方案,但为了比较,也欢迎C#5答案. 如果你很好奇,下面是我的真实场景,或多或少: Dropbox.GetImagesRecursively () .ObserveOn (SynchronizationContext.Current) .Select (DownloadImage) .Subscribe (AddImageToFilePicker); 解决方法
到目前为止,这似乎对我有用:
public static IObservable<U> Select<T,Task<U>> selector) { return source .Select (item => Observable.Defer (() => Observable.StartAsync (ct => selector (item,ct)) .Catch (Observable.Empty<U> ()) )) .Concat (); } 我们将一个延迟的基于任务的异常吞咽可观察对象映射到每个项目,然后将它们连接起来. 我的思维过程就像这样. 我注意到其中一个SelectMany重载几乎完全符合我的要求,甚至具有完全相同的签名.但它并不能满足我的需求: >它会在原始项目出现时创建任务,而我需要等待每项任务完成 我查看了这个重载的实现,发现它使用FromAsync来处理任务创建和取消: public virtual IObservable<TResult> SelectMany<TSource,TTaskResult,TResult> (IObservable<TSource> source,Func<TSource,Task<TTaskResult>> taskSelector,TResult> resultSelector) { return SelectMany_<TSource,TResult> ( source,x => FromAsync (ct => taskSelector (x,ct)),resultSelector ); } 我把目光转向了FromAsync以了解它是如何实现的,并且惊喜地发现它也是可组合的: public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken,Task<TResult>> functionAsync) { return Defer (() => StartAsync (functionAsync)); } 我重用了Defer和StartAsync,同时还添加了Catch来吞下错误. Defer和Concat的组合可确保任务彼此等待并以原始顺序开始. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |