c# – 在Rx.Net中,如何在反馈耗尽之前实现可观察的反馈循环?
发布时间:2020-12-15 22:22:32 所属栏目:百科 来源:网络整理
导读:我有以下API: IObservableIListSqlDataRecord WriteToDBAndGetFailedSource(SqlConnection conn,IListSqlDataRecord batch) 它试图将批处理写入数据库.如果失败,则返回整个批处理,否则返回的observable为空. 我也有一个产生批次的来源: IObservableIListSq
我有以下API:
IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn,IList<SqlDataRecord> batch) 它试图将批处理写入数据库.如果失败,则返回整个批处理,否则返回的observable为空. 我也有一个产生批次的来源: IObservable<IList<SqlDataRecord>> GetDataSource(string filePath,int bufferThreshold) 现在,我可以像这样组合它们: var failedBatchesSource = GetDataSource(filePath,1048576) .Select(batch => WriteToDBAndGetFailedSource(conn,batch)) .Merge(100); 这将编写所有批次(最多100个并发)并返回可观察到的失败批次. 我真正想要的是在一段暂停之后将失败的批次送回批次的来源,可能是在原始来源仍在生产批次时.当然,我可以这样写: var failedBatchesSource = GetDataSource(filePath,batch)) .Merge(100) .Select(batch => WriteToDBAndGetFailedSource(conn,batch)) .Merge(100); 但当然,这是错误的,因为: >这打破了在再次处理失败的批次之前暂停的要求. 一旦我收集了所有的失败并在循环中重新开始,我也可以打破可观察的monad: var src = GetDataSource(filePath,1048576); for (;;) { var failed = await src .Select(batch => WriteToDBAndGetFailedSource(conn,batch)) .Merge(100) .ToList(); if (failed.Count == 0) { break; } src = failed.ToObservable(); } 但是我想知道我是否可以在可观察的monad中保持更好的状态. 解决方法
我认为这可能会成功
public static IObservable<T> ProcessAll<T>(this IObservable<T> source,Func<T,IObservable<T>> processor,int mergeCount,TimeSpan failureDelay) { return Observable.Create<T>( observer => { var failed = new Subject<T>(); return source.Merge(failed) .Select(processor) .Merge(mergeCount) .Delay(failureDelay) .Subscribe(failed.OnNext,observer.OnError,observer.OnCompleted); }); } 并像这样使用它: GetDataSource(filePath,1048576) .ProcessAll(batch => WriteToDBAndGetFailedSource(conn,batch),100,TimeSpan.FromMilliseconds(500)) .Subscribe(); ProcessAll是一个可怕的名字,但它是星期五晚上,我想不出更好的名字. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |