scala – Akka Stream连接到多个接收器
发布时间:2020-12-16 18:09:27 所属栏目:安全 来源:网络整理
导读:我在akka流中实现了一个自定义组件,它将元素作为输入,组合并根据键合并它们并通过十几个出口中的一个发送出去.您可以将此组件视为一种GroupBy组件,它不会将流分为子流,而是实际流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生一些
我在akka流中实现了一个自定义组件,它将元素作为输入,组合并根据键合并它们并通过十几个出口中的一个发送出去.您可以将此组件视为一种GroupBy组件,它不会将流分为子流,而是实际流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生一些缓冲,使得1个元素不一定意味着通过插座输出1个元素.
以下是所述组件的简化实现. class CustomGroupBy[A,B](k: Int,f: A => Int) extends GraphStage[FlowShape[B,B]] { val in = Inlet[A]("CustomGroupBy.in") val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out")) override val shape = new AmorphousShape(scala.collection.immutable.Seq(in),outs) /* ... */ } 我现在将该组件的每个插座连接到不同的接收器并将所有这些接收器的物化值组合在一起. 我已经用图形DSL尝试了一些东西,但还没有完全设法让它工作.有人会非常友好地为我提供一个片段来指导我或指向正确的方向吗? 提前致谢! 解决方法
您最有可能想要内置的
broadcast舞台.示例用法可以在
here找到:
val bcast = builder.add(Broadcast[Int](2)) in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out bcast ~> f4 ~> merge (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |