加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 从期货中观察 – 从多个线程开始

发布时间:2020-12-16 18:31:29 所属栏目:安全 来源:网络整理
导读:我想从期货清单的结果中实时生成一个Observable. 在最简单的情况下,假设我有一个我正在使用Future.sequence运行的未来列表,我只是用一个Observable监视它们的进度,它告诉我每次完成一个.我基本上是这样做的: def observeFuturesProgress(futs: List[Future[
我想从期货清单的结果中实时生成一个Observable.

在最简单的情况下,假设我有一个我正在使用Future.sequence运行的未来列表,我只是用一个Observable监视它们的进度,它告诉我每次完成一个.我基本上是这样做的:

def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
    Observable[String](observer => {
      val loudFutures: List[Future[Int]] = futs.map(f => {
            f onComplete {
              case Success(a) => observer.onNext(s"just did $a more")
              case Failure(e) => observer.onError(e)
            }
            f
        })
      Future.sequence(loudFutures) onComplete {
          case Success(_) => observer.onCompleted()
          case Failure(e) => observer.onError(e)
      }
    })
  }

这在我的测试环境中工作正常.但我刚刚读到不应该从不同的线程调用onNext,至少不要小心没有重叠的调用.建议的解决方法是什么?似乎Observables的许多实际使用都需要从这样的异步代码中调用onNext,但我在文档中找不到类似的例子.

解决方法

The Observable Contract

Observables must issue notifications to observers serially (not in
parallel). They may issue these notifications from different threads,
but there must be a formal happens-before relationship between the
notifications.

看看Serialize

It is possible for an Observable to invoke its observers’ methods
asynchronously,perhaps from different threads. This could make such
an Observable violate the Observable contract,in that it might try to
send an OnCompleted or OnError notification before one of its OnNext
notifications,or it might make an OnNext notification from two
different threads concurrently. You can force such an Observable to be well-behaved and synchronous by applying the Serialize operator to it.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读