scala – 如何设置akka Actor容错?
我试图在akka Actors中获得容错行为.我正在研究一些代码,这些代码依赖于系统中的Actors可用于长时间的处理.我发现我的处理在几个小时后停止(它应该需要大约10个小时)并且没有太多事情发生.我相信我的演员没有从异常中恢复过来.
我需要做什么才能永久地一对一地重新启动Actors?我希望可以通过此文档http://akka.io/docs/akka/1.1.3/scala/fault-tolerance完成此操作 我正在使用akka 1.1.3和scala 2.9 import akka.actor.Actor import akka.actor.Actor._ import akka.actor.ActorRef import akka.actor.MaximumNumberOfRestartsWithinTimeRangeReached import akka.dispatch.Dispatchers import akka.routing.CyclicIterator import akka.routing.LoadBalancer import akka.config.Supervision._ object TestActor { val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") .setCorePoolSize(100) .setMaxPoolSize(100) .build } class TestActor(val name: Integer) extends Actor { self.lifeCycle = Permanent self.dispatcher = TestActor.dispatcher def receive = { case num: Integer => { if( num % 2 == 0 ) throw new Exception("This is a simulated failure") println("Actor: " + name + " Received: " + num) //Thread.sleep(100) } } override def postStop(){ println("TestActor post Stop ") } //callback method for restart handling override def preRestart(reason: Throwable){ println("TestActor "+ name + " restaring after shutdown because of " + reason) } //callback method for restart handling override def postRestart(reason: Throwable){ println("Restaring TestActor "+name+"after shutdown because of " + reason) } } trait CyclicLoadBalancing extends LoadBalancer { this: Actor => val testActors: List[ActorRef] val seq = new CyclicIterator[ActorRef](testActors) } trait TestActorManager extends Actor { self.lifeCycle = Permanent self.faultHandler = OneForOneStrategy(List(classOf[Exception]),1000,5000) val testActors: List[ActorRef] override def preStart = testActors foreach { self.startLink(_) } override def postStop = { System.out.println("postStop") } } object FaultTest { def main(args : Array[String]) : Unit = { println("starting FaultTest.main()") val numOfActors = 5 val supervisor = actorOf( new TestActorManager with CyclicLoadBalancing { val testActors = (0 until numOfActors toList) map (i => actorOf(new TestActor(i))); } ) supervisor.start(); println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) val testActor = Actor.registry.actorsFor(classOf[TestActor]).head (1 until 200 toList) foreach { testActor ! _ } } } 这段代码在LoadBalancer后面设置了5个Actors,它只打印出发送给它们的整数,除了它们在偶数上抛出异常以模拟故障.整数0到200被发送到这些Actors.我希望奇数会得到输出,但是在偶数偶数故障之后,一切似乎都会关闭.使用sbt运行此代码会导致此输出: [info] Running FaultTest starting FaultTest.main() Loading config [akka.conf] from the application classpath. Number of Actors: 5 Actor: 2 Received: 1 Actor: 2 Received: 9 Actor: 1 Received: 3 Actor: 3 Received: 7 [info] == run == [success] Successful. [info] [info] Total time: 13 s,completed Aug 16,2011 11:00:23 AM 我认为这里发生的是5个演员开始,前5个偶数让他们破产,他们没有重新开始. 如何更改此代码以便Actors从异常中恢复? 我希望这实际上会打印出从1到200的所有奇数.我认为每个参与者都会在偶数上失败,但是会在例外情况下使用一个经过处理的邮箱重新启动.我希望看到preRestart和postRestart的println.需要在此代码示例中配置什么才能实现这些目标? 以下是关于akka和Actors的一些额外假设,可能会导致我的误解.我假设可以使用Supervisor或faultHandler配置Actor,以便在接收期间抛出异常时重新启动并继续可用.我假设发送给actor的消息如果在接收期间抛出异常将丢失.我假设将调用抛出异常的actor上的preRestart()和postRestart(). 代码示例代表我正在尝试做的事情并且基于Why is my Dispatching on Actors scaled down in Akka? **另一个代码示例** 这是另一个更简单的代码示例.我正在创建一个在偶数上抛出异常的actor.路上没有负载均衡器或其他东西.我正在尝试打印出关于演员的信息.在将消息发送给Actor并监视正在发生的事情之后,我等待退出程序一分钟. 我希望这会打印出奇怪的数字,但看起来Actor会在其邮箱中找到消息. 我是否将OneForOneStrategy设置错误?我需要将Actor链接到某个东西吗?这种配置是否从根本上误导了我? Dispatcher是否需要设置容错,如何?我可以搞乱Dispatcher中的线程吗? import akka.actor.Actor import akka.actor.Actor._ import akka.actor.ActorRef import akka.actor.ActorRegistry import akka.config.Supervision._ class SingleActor(val name: Integer) extends Actor { self.lifeCycle = Permanent self.faultHandler = OneForOneStrategy(List(classOf[Exception]),30,1000) def receive = { case num: Integer => { if( num % 2 == 0 ) throw new Exception("This is a simulated failure,where does this get logged?") println("Actor: " + name + " Received: " + num) } } override def postStop(){ println("TestActor post Stop ") } override def preRestart(reason: Throwable){ println("TestActor "+ name + " restaring after shutdown because of " + reason) } override def postRestart(reason: Throwable){ println("Restaring TestActor "+name+"after shutdown because of " + reason) } } object TestSingleActor{ def main(args : Array[String]) : Unit = { println("starting TestSingleActor.main()") val testActor = Actor.actorOf( new SingleActor(1) ).start() println("number of actors: " + registry.actors.size) printAllActorsInfo (1 until 20 toList) foreach { testActor ! _ } for( i <- 1 until 120 ){ Thread.sleep(500) printAllActorsInfo } } def printAllActorsInfo() ={ registry.actors.foreach( (a) => println("Actor hash: %d has mailbox %d isRunning: %b isShutdown: %b isBeingRestarted: %b " .format(a.hashCode(),a.mailboxSize,a.isRunning,a.isShutdown,a.isBeingRestarted))) } } 我得到的输出如下: [info] Running TestSingleActor starting TestSingleActor.main() Loading config [akka.conf] from the application classpath. number of actors: 1 Actor hash: -1537745664 has mailbox 0 isRunning: true isShutdown: false isBeingRestarted: false Actor: 1 Received: 1 Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false ... 117 more of these lines repeted ... Actor hash: -1537745664 has mailbox 17 isRunning: true isShutdown: false isBeingRestarted: false [info] == run == [success] Successful. [info] [info] Total time: 70 s,completed Aug 17,2011 2:24:49 PM 解决方法
问题是我使用的是akka.conf文件.我使用的是参考1.1.3 akka.conf文件,但配置事件处理程序的行除外.
我的(破碎的): event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] 参考1.1.3(有效的): event-handlers = ["akka.event.EventHandler$DefaultListener"] 使用我的事件处理程序配置行,不会发生Actor重新启动.参考1.1.3线路重启很奇妙. 我根据这些说明http://akka.io/docs/akka/1.1.3/general/slf4j.html进行了此更改 因此,通过删除该页面中的建议并返回到1.1.3参考akka.conf,我能够获得容错的Actors. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |