加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何使用akka流中的mapAsync使用分组子流

发布时间:2020-12-16 18:36:26 所属栏目:安全 来源:网络整理
导读:我需要做一些与此 https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala非常相似的事情 我的问题是我有一个未知数量的组,如果mapAsync的并行数少于我得到的组数和最后一个数组中的错误
我需要做一些与此 https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala非常相似的事情

我的问题是我有一个未知数量的组,如果mapAsync的并行数少于我得到的组数和最后一个数组中的错误

Tearing down
SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)
due to upstream error
(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)

我试图在akka streams http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html的模式指南中建议使用缓冲区

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000,OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....

但结果相同

解决方法

扩展jrudolph的评论是完全正确的……

在这个实例中,您不需要mapAsync.作为一个基本示例,假设您有元组的来源

import akka.stream.scaladsl.{Source,Sink}

def data() = List(("foo",1),("foo",2),("bar",3),2))

val originalSource = Source(data)

然后,您可以执行groupBy以创建Source of Sources

def getID(tuple : (String,Int)) = tuple._1

//a Source of (String,Source[(String,Int),_])
val groupedSource = originalSource groupBy getID

每个分组的源可以仅与地图并行处理,不需要任何花哨的东西.以下是每个分组在独立流中求和的示例:

import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

def getValues(tuple : (String,Int)) = tuple._2

//does not have to be a def,we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)

//a Source of (String,Future[Int])
val sumSource  = 
  groupedSource map { case (id,src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream
  }

现在所有的“foo”数字都与所有“bar”数字并行求和.

当你有一个返回Future [T]的封装函数并且你试图发出一个T时,会使用mapAsync;在你的问题中并非如此.此外,mapAsync涉及waiting for results,而不是reactive ……

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读