scala – 从期货中观察 – 从多个线程开始
我想从期货清单的结果中实时生成一个Observable.
在最简单的情况下,假设我有一个我正在使用Future.sequence运行的未来列表,我只是用一个Observable监视它们的进度,它告诉我每次完成一个.我基本上是这样做的: def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = { Observable[String](observer => { val loudFutures: List[Future[Int]] = futs.map(f => { f onComplete { case Success(a) => observer.onNext(s"just did $a more") case Failure(e) => observer.onError(e) } f }) Future.sequence(loudFutures) onComplete { case Success(_) => observer.onCompleted() case Failure(e) => observer.onError(e) } }) } 这在我的测试环境中工作正常.但我刚刚读到不应该从不同的线程调用onNext,至少不要小心没有重叠的调用.建议的解决方法是什么?似乎Observables的许多实际使用都需要从这样的异步代码中调用onNext,但我在文档中找不到类似的例子. 解决方法
The Observable Contract
看看Serialize
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |