scala – 如何透明地将输入元素与输出元素相关联
发布时间:2020-12-16 08:46:32 所属栏目:安全 来源:网络整理
导读:我得到一个流 A,B (这是一个奇特的流/图形东西,见 https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html)来自我控制之外的一些外部代码.我需要包装该流并对每个输入元素和每个输出元素进行一些处理.我可以通过在它上面放置一个 BidiFlow 来
我得到一个流< A,B> (这是一个奇特的流/图形东西,见
https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html)来自我控制之外的一些外部代码.我需要包装该流并对每个输入元素和每个输出元素进行一些处理.我可以通过在它上面放置一个
BidiFlow 来轻松实现这一点:
Flow<I,O,Unused> flow = ...; // external source BidiFlow<I,I,Unused> bidi = BidiFlow.fromFunctions(i -> preprocess(i),o -> postprocess(o)); // do something on every input and every output Flow<I,Unused> newFlow = bidi.join(flow); 所以这是扭曲:要正确地后处理输出元素o,我需要生成该输出元素的输入.由于我无法控制底层流,因此我无法重构它以返回,例如,输入和输出的元组.由于Akka的异步和并行特性,我不能做任何技巧,比如将输入存储在本地线程或静态字段或类似的东西上. 所以我的问题是:是否有一些Akka Streams魔术可以应用于某种方式获取生成输出的输入元素? 解决方法
这是使用GraphDsl,Broadcast和Zip阶段的解决方案.
val externalFlow: Flow[Int,String,NotUsed] = Flow[Int].map(i => i.toString + "-external") def zipInAndOut[I,O](flow: Flow[I,NotUsed]): Flow[I,(I,O),NotUsed] = { Flow.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val broadcast = b.add(Broadcast[I](2)) val zip = b.add(Zip[I,O]) val theFlow = b.add(flow) broadcast.out(0) ~> zip.in0 broadcast.out(1) ~> theFlow ~> zip.in1 new FlowShape(broadcast.in,zip.out) }) } Source .fromIterator(() => (1 until 10).iterator) .via(zipInAndOut(externalFlow)) .runWith(Sink.foreach(println)) 版画 (1,1-external) (2,2-external) (3,3-external) (4,4-external) (5,5-external) (6,6-external) (7,7-external) (8,8-external) (9,9-external) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |