scala – akka stream asyncBoundary vs mapAsync
我试图理解asyncBoundary和mapAsync之间的区别.从一目了然,我猜他们应该是一样的.但是,当我运行代码时,看起来asyncBoundary的性能比mapAsync快
这是代码 implicit val system = ActorSystem("sourceDemo") implicit val materializer = ActorMaterializer() Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run() Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary",t)).to(Sink.ignore).run() 输出: 从描述的有关asyncBoundary(https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html)的文档中,我可以看到它在不同的CPU上运行,但mapAsync是使用Future的多线程.未来也是异步的. 请问有关这两个API的更多说明? 解决方法
异步
正确指出这会强制在两个阶段之间插入异步边界.在你的例子中 Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary",t)).to(Sink.ignore).run() 这实际上意味着1操作和* 2操作将由分离的参与者运行.这使得流水线操作成为可能,同时元素移动到* 2阶段,同时另一个元素可以被引入1阶段.如果您不强制执行异步边界,则在从上游请求新的操作之前,同一个actor将对操作进行顺序操作并对一个元素执行操作. 顺便说一句,您的示例可以使用异步组合器以较短的格式重写: Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary",t)).to(Sink.ignore).run() mapAsync 这是并行执行异步操作的阶段.并行因子允许您指定要旋转以提供传入元素的最大并行actor数.并行计算的结果由mapAsync阶段按顺序跟踪和发出. 在你的例子中 Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run() 可以并行运行最多100个操作(即所有操作),并按顺序收集结果.随后,可以并行运行多达100 * 2个操作,并且结果再次按顺序收集并向下游发射. 在您的示例中,您正在运行CPU限制的快速操作,这些操作无法使用mapAsync,因为此阶段所需的基础架构很可能比并行运行其中100个操作的优势要昂贵得多. mapAsync在处理IO绑定的慢速操作时特别有用,其中并行化非常方便. 另请注意,mapAsync也按定义引入了异步边界,因此您可能会将其视为async的“扩展”,您可以在其中指定大于1的并行度. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |