scala – akka:用于组合来自多个孩子的消息的模式
这是我遇到的模式:
演员A有多个孩子C1,…,Cn.在收到消息时,A将其发送给每个子节点,每个子节点对消息进行一些计算,并在完成时将其发送回A.然后,A会将所有子节点的结果组合传递给另一个actor. 这个问题的解决方案是什么样的?或者这是反模式?在哪种情况下应该如何处理这个问题? 这是一个简单的例子,希望能够说明我目前的解决方案.我担心的是重复的代码(直到对称);对很多孩子来说并不是很好;并且很难看出发生了什么. import akka.actor.{Props,Actor} case class Tagged[T](value: T,id: Int) class A extends Actor { import C1._ import C2._ val c1 = context.actorOf(Props[C1],"C1") val c2 = context.actorOf(Props[C2],"C2") var uid = 0 var c1Results = Map[Int,Int]() var c2Results = Map[Int,Int]() def receive = { case n: Int => { c1 ! Tagged(n,uid) c2 ! Tagged(n,uid) uid += 1 } case Tagged(C1Result(n),id) => c2Results get id match { case None => c1Results += (id -> n) case Some(m) => { c2Results -= id context.parent ! (n,m) } } case Tagged(C2Result(n),id) => c1Results get id match { case None => c2Results += (id -> n) case Some(m) => { c1Results -= id context.parent ! (m,n) } } } } class C1 extends Actor { import C1._ def receive = { case Tagged(n: Int,id) => Tagged(C1Result(n),id) } } object C1 { case class C1Result(n: Int) } class C2 extends Actor { import C2._ def receive = { case Tagged(n: Int,id) => Tagged(C2Result(n),id) } } object C2 { case class C2Result(n: Int) } 如果您认为代码看起来很糟糕,请放轻松我,我刚刚开始学习akka;) 解决方法
在许多 – 或不同数量 – 的儿童演员的情况下,Zim-Zam建议的
ask pattern将很快失控.
aggregator pattern旨在帮助解决这种情况.它提供了一个聚合器特征,您可以在actor中使用它来执行聚合逻辑. 想要执行聚合的客户端actor可以启动基于聚合器的actor实例,并向其发送一条消息,该消息将启动聚合过程. 应为每个聚合操作创建一个新的聚合器,并在发送回结果时终止(当它收到所有响应或超时时). 下面列出了这种模式的示例,该模式用于对由Child类表示的actor所持有的整数值求和. (请注意,所有孩子都不需要由同一个父actor监督:SummationAggregator只需要一个ActorRef集合.) import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import akka.actor._ import akka.contrib.pattern.Aggregator object Child { def props(value: Int): Props = Props(new Child(value)) case object GetValue case class GetValueResult(value: Int) } class Child(value: Int) extends Actor { import Child._ def receive = { case GetValue => sender ! GetValueResult(value) } } object SummationAggregator { def props = Props(new SummationAggregator) case object TimedOut case class StartAggregation(targets: Seq[ActorRef]) case object BadCommand case class AggregationResult(sum: Int) } class SummationAggregator extends Actor with Aggregator { import Child._ import SummationAggregator._ expectOnce { case StartAggregation(targets) => // Could do what this handler does in line but handing off to a // separate class encapsulates the state a little more cleanly new Handler(targets,sender()) case _ => sender ! BadCommand context stop self } class Handler(targets: Seq[ActorRef],originalSender: ActorRef) { // Could just store a running total and keep track of the number of responses // that we are awaiting... var valueResults = Set.empty[GetValueResult] context.system.scheduler.scheduleOnce(1.second,self,TimedOut) expect { case TimedOut => // It might make sense to respond with what we have so far if some responses are still awaited... respondIfDone(respondAnyway = true) } if (targets.isEmpty) respondIfDone() else targets.foreach { t => t ! GetValue expectOnce { case vr: GetValueResult => valueResults += vr respondIfDone() } } def respondIfDone(respondAnyway: Boolean = false) = { if (respondAnyway || valueResults.size == targets.size) { originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc,GetValueResult(v)) => acc + v }) context stop self } } } } 要从您的父actor使用此SummationAggregator,您可以执行以下操作: context.actorOf(SummationAggregator.props) ! StartAggregation(children) 然后在父接收的某个地方处理AggregationResult. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |