加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

斯卡拉 – Akka Streams的工人池

发布时间:2020-12-16 18:14:19 所属栏目:安全 来源:网络整理
导读:如 akka streams documentation所述,我尝试创建一个工作池(流): def balancer[In,Out](worker: Flow[In,Out,NotUsed],workerCount: Int): Flow[In,NotUsed] = { import GraphDSL.Implicits._ Flow.fromGraph(GraphDSL.create() { implicit b = val balancer
如 akka streams documentation所述,我尝试创建一个工作池(流):

def balancer[In,Out](worker: Flow[In,Out,NotUsed],workerCount: Int): Flow[In,NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b =>
      val balancer = b.add(Balance[In](workerCount))
      val merge = b.add(Merge[Out](workerCount))

      for (_ <- 1 to workerCount) {
        balancer ~> worker ~> merge
      }
      FlowShape(balancer.in,merge.out)
    })
  }

然后我用这个函数并行运行一个流程:

def main(args: Array[String]) {
    val system = ActorSystem()
    implicit val mat = ActorMaterializer.create(system)

    val flow = Flow[Int].map(e => {
      println(e)
      Thread.sleep(1000) // 1 second
      e
    })

    Source(Range.apply(1,10).toList)
      .via(balancer(flow,3))
      .runForeach(e => {})
  }

我得到预期的输出1,2,3,4,5,6,7,8,9,但数字以每秒1的速率出现(没有并行性).我做错了什么?

解决方法

该部分的文档已过时,将在下一版本中修复.基本上你只需要在流本身上调用.async.通过这样做,你可以在流程周围绘制一个“盒子”(你可以想象它是一个带有一个输入和输出端口的盒子),它可以防止在那个盒子上融合.通过这样做,基本上所有工人都将成为专职演员.图的其余部分(广播和合并阶段)将共享另一个actor(它们不会在不同的actor上运行,异步盒只保护流,外部的东西仍然会被融合).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读