scala – akka:用于组合来自多个孩子的消息的模式

发布时间:2020-12-16 08:59:50
导读:这是我遇到的模式: 演员A有多个孩子C1,…,Cn.在收到消息时,A将其发送给每个子节点,每个子节点对消息进行一些计算,并在完成时将其发送回A.然后,A会将所有子节点的结果组合传递给另一个actor. 这个问题的解决方案是什么样的?或者这是反模式?在哪种情况下应




import akka.actor.{Props,Actor}

case class Tagged[T](value: T,id: Int)

class A extends Actor {
  import C1._
  import C2._

  val c1 = context.actorOf(Props[C1],"C1")
  val c2 = context.actorOf(Props[C2],"C2")
  var uid = 0
  var c1Results = Map[Int,Int]()
  var c2Results = Map[Int,Int]()

  def receive = {
    case n: Int => {
      c1 ! Tagged(n,uid)
      c2 ! Tagged(n,uid)
      uid += 1
    case Tagged(C1Result(n),id) => c2Results get id match {
      case None => c1Results += (id -> n)
      case Some(m) => {
        c2Results -= id
        context.parent ! (n,m)
    case Tagged(C2Result(n),id) => c1Results get id match {
      case None => c2Results += (id -> n)
      case Some(m) => {
        c1Results -= id
        context.parent ! (m,n)

class C1 extends Actor {
  import C1._

  def receive = {
    case Tagged(n: Int,id) => Tagged(C1Result(n),id)

object C1 {
  case class C1Result(n: Int)

class C2 extends Actor {
  import C2._

  def receive = {
    case Tagged(n: Int,id) => Tagged(C2Result(n),id)

object C2 {
  case class C2Result(n: Int)



在许多 – 或不同数量 – 的儿童演员的情况下,Zim-Zam建议的 ask pattern将很快失控.

aggregator pattern旨在帮助解决这种情况.它提供了一个聚合器特征,您可以在actor中使用它来执行聚合逻辑.



下面列出了这种模式的示例,该模式用于对由Child类表示的actor所持有的整数值求和. (请注意,所有孩子都不需要由同一个父actor监督:SummationAggregator只需要一个ActorRef集合.)

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import akka.actor._
import akka.contrib.pattern.Aggregator

object Child {
  def props(value: Int): Props = Props(new Child(value))

  case object GetValue
  case class GetValueResult(value: Int)

class Child(value: Int) extends Actor {
  import Child._

  def receive = { case GetValue => sender ! GetValueResult(value) }

object SummationAggregator {
  def props = Props(new SummationAggregator)

  case object TimedOut
  case class StartAggregation(targets: Seq[ActorRef])
  case object BadCommand
  case class AggregationResult(sum: Int)

class SummationAggregator extends Actor with Aggregator {
  import Child._
  import SummationAggregator._

  expectOnce {
    case StartAggregation(targets) =>
      // Could do what this handler does in line but handing off to a 
      // separate class encapsulates the state a little more cleanly
      new Handler(targets,sender())
    case _ =>
      sender ! BadCommand
      context stop self

  class Handler(targets: Seq[ActorRef],originalSender: ActorRef) {
    // Could just store a running total and keep track of the number of responses 
    // that we are awaiting...
    var valueResults = Set.empty[GetValueResult]


    expect {
      case TimedOut =>
        // It might make sense to respond with what we have so far if some responses are still awaited...
        respondIfDone(respondAnyway = true)

    if (targets.isEmpty)
      targets.foreach { t =>
        t ! GetValue
        expectOnce {
          case vr: GetValueResult =>
            valueResults += vr

    def respondIfDone(respondAnyway: Boolean = false) = {
      if (respondAnyway || valueResults.size == targets.size) {
        originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc,GetValueResult(v)) => acc + v })
        context stop self


context.actorOf(SummationAggregator.props) ! StartAggregation(children)



