斯卡拉 – Akka Streams的工人池
发布时间:2020-12-16 18:14:19 所属栏目:安全 来源:网络整理
导读:如 akka streams documentation所述,我尝试创建一个工作池(流): def balancer[In,Out](worker: Flow[In,Out,NotUsed],workerCount: Int): Flow[In,NotUsed] = { import GraphDSL.Implicits._ Flow.fromGraph(GraphDSL.create() { implicit b = val balancer
如
akka streams documentation所述,我尝试创建一个工作池(流):
def balancer[In,Out](worker: Flow[In,Out,NotUsed],workerCount: Int): Flow[In,NotUsed] = { import GraphDSL.Implicits._ Flow.fromGraph(GraphDSL.create() { implicit b => val balancer = b.add(Balance[In](workerCount)) val merge = b.add(Merge[Out](workerCount)) for (_ <- 1 to workerCount) { balancer ~> worker ~> merge } FlowShape(balancer.in,merge.out) }) } 然后我用这个函数并行运行一个流程: def main(args: Array[String]) { val system = ActorSystem() implicit val mat = ActorMaterializer.create(system) val flow = Flow[Int].map(e => { println(e) Thread.sleep(1000) // 1 second e }) Source(Range.apply(1,10).toList) .via(balancer(flow,3)) .runForeach(e => {}) } 我得到预期的输出1,2,3,4,5,6,7,8,9,但数字以每秒1的速率出现(没有并行性).我做错了什么? 解决方法
该部分的文档已过时,将在下一版本中修复.基本上你只需要在流本身上调用.async.通过这样做,你可以在流程周围绘制一个“盒子”(你可以想象它是一个带有一个输入和输出端口的盒子),它可以防止在那个盒子上融合.通过这样做,基本上所有工人都将成为专职演员.图的其余部分(广播和合并阶段)将共享另一个actor(它们不会在不同的actor上运行,异步盒只保护流,外部的东西仍然会被融合).
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |