c# – 如何使用Reactive Extensions对状态进行轮询?
使用Reactive(
Database polling with Reactive Extensions)进行数据库轮询已经有了一个很好的问题
我有一个类似的问题,但有一个转折:我需要将前一个结果中的值提供给下一个请求.基本上,我想对此进行调查: interface ResultSet<T> { int? CurrentAsOfHandle {get;} IList<T> Results {get;} } Task<ResultSet<T>> GetNewResultsAsync<T>(int? previousRequestHandle); 我们的想法是返回自上一个请求以来的所有新项目 >每分钟我都想调用GetNewResultsAsync 基本上,有一个更好的方法: return Observable.Create<IMessage>(async (observer,cancellationToken) => { int? currentVersion = null; while (!cancellationToken.IsCancellationRequested) { MessageResultSet messageResultSet = await ReadLatestMessagesAsync(currentVersion); currentVersion = messageResultSet.CurrentVersionHandle; foreach (IMessage resultMessage in messageResultSet.Messages) observer.OnNext(resultMessage); await Task.Delay(TimeSpan.FromMinutes(1),cancellationToken); } }); 另请注意,此版本允许在等待下一次迭代时收集messageResultSet(例如,我想也许我可以使用Scan将先前的结果集对象传递到下一次迭代) 解决方法
您的问题基本上减少到:有一个带签名的扫描功能:
IObservable<TAccumulate> Scan<TSource,TAccumulate>(this IObservable<TSource> source,TAccumulate initialValue,Func<TAccumulate,TSource,TAccumulate> accumulator) 但你需要类似的东西 IObservable<TAccumulate> Scan<TSource,IObservable<TAccumulate>> accumulator) …累加器函数返回一个observable,Scan函数自动减少它以传入下一个调用. 这是一个穷人的扫描功能实现: public static IObservable<TAccumulate> MyScan<TSource,TAccumulate> accumulator) { return source .Publish(_source => _source .Take(1) .Select(s => accumulator(initialValue,s)) .SelectMany(m => _source.MyScan(m,accumulator).StartWith(m)) ); } 鉴于此,我们可以稍微改变一下以结合减少功能: public static IObservable<TAccumulate> MyObservableScan<TSource,IObservable<TAccumulate>> accumulator) { return source .Publish(_source => _source .Take(1) .Select(s => accumulator(initialValue,s)) .SelectMany(async o => (await o.LastOrDefaultAsync()) .Let(m => _source .MyObservableScan(m,accumulator) .StartWith(m) ) ) .Merge() ); } //Wrapper to accommodate easy Task -> Observable transformations public static IObservable<TAccumulate> MyObservableScan<TSource,Task<TAccumulate>> accumulator) { return source.MyObservableScan(initialValue,(a,s) => Observable.FromAsync(() => accumulator(a,s))); } //Function to prevent re-evaluation in functional scenarios public static U Let<T,U>(this T t,Func<T,U> selector) { return selector(t); } 现在我们有了这个花哨的MyObservableScan操作符,我们可以相对轻松地解决您的问题: var o = Observable.Interval(TimeSpan.FromMinutes(1)) .MyObservableScan<long,ResultSet<string>>(null,(r,_) => Methods.GetNewResultsAsync<string>(r?.CurrentAsOfHandle)) 请注意,在测试中我注意到如果累加器Task / Observable函数的时间超过源中的间隔,则observable将终止.我不知道为什么.如果有人可以纠正,那就很有必要了. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |