加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 处理Akka演员的错误

发布时间:2020-12-16 18:54:06 所属栏目:安全 来源:网络整理
导读:我有一个非常简单的例子,我有一个Actor(SimpleActor),通过向自己发送消息来执行周期性任务.消息在actor的构造函数中调度.在正常情况下(即没有故障)一切正常. 但是如果演员必须处理错误呢?我有另一个Actor(SimpleActorWithFault).这个演员可能有错.在这种情
我有一个非常简单的例子,我有一个Actor(SimpleActor),通过向自己发送消息来执行周期性任务.消息在actor的构造函数中调度.在正常情况下(即没有故障)一切正常.

但是如果演员必须处理错误呢?我有另一个Actor(SimpleActorWithFault).这个演员可能有错.在这种情况下,我通过抛出异常来生成一个.当发生故障时(即SimpleActorWithFault抛出异常),它会自动重启.但是,这次重启会扰乱Actor内部的调度程序,该调度程序不再作为例外.如果故障发生得足够快,就会产生更多的意外行为.

我的问题是在这种情况下处理故障的首选方法是什么?我知道我可以使用Try块来处理异常.但是,如果我扩展另一个演员,我不能在超类中放置一个演员,或者某些情况下,当我是一个例外的故障发生在演员身上.

import akka.actor.{Props,ActorLogging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor

case object MessageA

case object MessageToSelf


class SimpleActor extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds,1 seconds,self,MessageToSelf)

  //keeps track of some internal state
  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActor] Got MessageA at %d".format(count))
    }
    case MessageToSelf => {
      //update state and tell the world about its current state 
      count = count + 1
      log.info("[SimpleActor] Got scheduled message at %d".format(count))

    }
  }

}


class SimpleActorWithFault extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds,MessageToSelf)

  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActorWithFault] Got MessageA at %d".format(count))
    }
    case MessageToSelf => {
      count = count + 1
      log.info("[SimpleActorWithFault] Got scheduled message at %d".format(count))

      //at some point generate a fault
      if (count > 5) {
        log.info("[SimpleActorWithFault] Going to throw an exception now %d".format(count))
        throw new Exception("Excepttttttiooooooon")
      }
    }
  }

}


object MainApp extends App {
  implicit val akkaSystem = akka.actor.ActorSystem()
  //Run the Actor without any faults or exceptions 
  akkaSystem.actorOf(Props(classOf[SimpleActor]))

  //comment the above line and uncomment the following to run the actor with faults  
  //akkaSystem.actorOf(Props(classOf[SimpleActorWithFault]))

}

解决方法

为了避免弄乱调度程序:

class SimpleActor extends Actor with ActorLogging {

  private var cancellable: Option[Cancellable] = None

  override def preStart() = {
    //schedule a message to self every second
    cancellable = Option(context.system.scheduler.schedule(0 seconds,MessageToSelf))
  }

  override def postStop() = {
    cancellable.foreach(_.cancel())
    cancellable = None
  }
...

要正确处理异常(akka.actor.Status.Failure用于在发件人使用Ask模式时正确回答问题):

...
def receive: Receive = {
    case MessageA => {
      try {
        log.info("[SimpleActor] Got MessageA at %d".format(count))
      } catch {
        case e: Exception =>
          sender ! akka.actor.Status.Failure(e)
          log.error(e.getMessage,e)
      }
    }
...

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读