scala – Akka Stream Option输出
发布时间:2020-12-16 08:49:00 所属栏目:安全 来源:网络整理
导读:我创建了一个Akka Stream,它有一个简单的Source,Flow和Sink.有了这个,我可以轻松地通过它发送元素.现在我想更改此流,以便Flow返回一个Option.根据选项的结果,我想更改Flow的输出. 是否有可能创建这样的结构? 解决方法 目前给出的答案都涉及广播.请注意,它可
我创建了一个Akka Stream,它有一个简单的Source,Flow和Sink.有了这个,我可以轻松地通过它发送元素.现在我想更改此流,以便Flow返回一个Option.根据选项的结果,我想更改Flow的输出.
是否有可能创建这样的结构? 解决方法
目前给出的答案都涉及广播.请注意,它可能适用于此特定示例,但在更复杂的图形中,广播可能不是一个明智的选择.
原因是如果至少有一个下游背压,广播总是背压. 最好的背压感知解决方案是Partition,它能够从分区器功能选择的分支中选择性地传播背压. 以下示例(详细说明T-Fowl的答案之一) def splittingSink[T,M1,M2,Mat](f: T ? Option[T],someSink: Sink[T,M1],noneSink: Sink[None.type,M2],combineMat: (M1,M2) ? Mat): Sink[T,Mat] = { val graph = GraphDSL.create(someSink,noneSink)(combineMat) { implicit builder ? (sink1,sink2) ? { import GraphDSL.Implicits._ def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1) val partition = builder.add(Partition[Option[T]](2,partitioner)) partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ? t } ~> sink1.in partition.out(1) ~> Flow[Option[T]].collect { case None ? None } ~> sink2.in val mapper = builder.add(Flow.fromFunction(f)) mapper.out ~> partition.in SinkShape(mapper.in) } } Sink.fromGraph(graph) } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐
- angularjs – 角度ng点击与调用控制器功能不工作
- 使用AngularJS从Node.JS服务器下载文件
- angularjs – 在指令测试中模拟所需的控制器
- Redis系列五:redis键管理和redis数据库管理
- angular2 material paginator MatPaginator MatP
- 彻底理解webservice SOAP WSDL
- 【数据结构】·【图】
- 【OSChina-MoPaaS应用开发大赛】JeeSite(JES)
- WCF4.0 –- RESTful WCF Services (4) (Basic Se
- angular – 错误TS2300:node_modules/@types/co
热点阅读