加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Akka Stream Option输出

发布时间:2020-12-16 08:49:00 所属栏目:安全 来源:网络整理
导读:我创建了一个Akka Stream,它有一个简单的Source,Flow和Sink.有了这个,我可以轻松地通过它发送元素.现在我想更改此流,以便Flow返回一个Option.根据选项的结果,我想更改Flow的输出. 是否有可能创建这样的结构? 解决方法 目前给出的答案都涉及广播.请注意,它可
我创建了一个Akka Stream,它有一个简单的Source,Flow和Sink.有了这个,我可以轻松地通过它发送元素.现在我想更改此流,以便Flow返回一个Option.根据选项的结果,我想更改Flow的输出.

enter image description here

是否有可能创建这样的结构?

解决方法

目前给出的答案都涉及广播.请注意,它可能适用于此特定示例,但在更复杂的图形中,广播可能不是一个明智的选择.
原因是如果至少有一个下游背压,广播总是背压.
最好的背压感知解决方案是Partition,它能够从分区器功能选择的分支中选择性地传播背压.

以下示例(详细说明T-Fowl的答案之一)

def splittingSink[T,M1,M2,Mat](f: T ? Option[T],someSink: Sink[T,M1],noneSink: Sink[None.type,M2],combineMat: (M1,M2) ? Mat): Sink[T,Mat] = {
    val graph = GraphDSL.create(someSink,noneSink)(combineMat) { implicit builder ?
      (sink1,sink2) ? {
        import GraphDSL.Implicits._

        def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
        val partition = builder.add(Partition[Option[T]](2,partitioner))
        partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ? t } ~> sink1.in
        partition.out(1) ~> Flow[Option[T]].collect { case None ? None } ~> sink2.in

        val mapper = builder.add(Flow.fromFunction(f))
        mapper.out ~> partition.in

        SinkShape(mapper.in)
      }
    }
    Sink.fromGraph(graph)
  }

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读