斯卡拉 – 为什么我的演员调度在Akka缩小?
发布时间:2020-12-16 09:00:03 所属栏目:安全 来源:网络整理
导读:我有一个100个运行Actors的actor池,它共享一个工作窃取调度程序,其CorePoolSize设置为100.但是现在当向一个Actors发送19条消息时,19条消息没有并行化到19个Actors,只有5条消息正在运行在平行下.当这5条消息完成后,接下来的5条消息将由这些相同的5个Actors再
我有一个100个运行Actors的actor池,它共享一个工作窃取调度程序,其CorePoolSize设置为100.但是现在当向一个Actors发送19条消息时,19条消息没有并行化到19个Actors,只有5条消息正在运行在平行下.当这5条消息完成后,接下来的5条消息将由这些相同的5个Actors再次处理,依此类推.为什么我的19条消息并非并行运行,我在这里缺少什么?
我的代码基本上看起来像这样: object TestActor { val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool") .setCorePoolSize(100) .setMaxPoolSize(100) .build } class TestActor(val name: Integer) extends Actor { self.lifeCycle = Permanent self.dispatcher = TestActor.dispatcher def receive = { case num: Integer => { println("Actor: " + name + " Received: " + num) Thread.sleep(10000) } } } trait CyclicLoadBalancing extends LoadBalancer { this: Actor => val testActors: List[ActorRef] val seq = new CyclicIterator[ActorRef](testActors) } trait TestActorManager extends Actor { self.lifeCycle = Permanent self.faultHandler = OneForOneStrategy(List(classOf[Exception]),5,5000) val testActors: List[ActorRef] override def preStart = testActors foreach { self.startLink(_) } override def postStop = self.shutdownLinkedActors() } val supervisor = actorOf(new TestActorManager with CyclicLoadBalancing { val testActors = (1 until 100 toList) map (i => actorOf(new TestActor(i))) }).start println("Number of Actors: " + Actor.registry.actorsFor(classOf[TestActor]).length) val testActor = Actor.registry.actorsFor(classOf[TestActor]).head (1 until 20 toList) foreach { testActor ! _ } 输出: Actor: 4 Received: 16 Actor: 3 Received: 17 Actor: 1 Received: 19 Actor: 59 Received: 1 Actor: 2 Received: 18 // 10 secs. are passing.. Actor: 4 Received: 15 Actor: 3 Received: 14 Actor: 1 Received: 13 Actor: 59 Received: 2 Actor: 2 Received: 12 // 10 secs. are passing.. Actor: 4 Received: 11 Actor: 3 Received: 10 Actor: 59 Received: 3 Actor: 2 Received: 8 Actor: 1 Received: 9 // 10 secs. are passing.. Actor: 4 Received: 7 Actor: 3 Received: 6 Actor: 59 Received: 4 Actor: 2 Received: 5 编辑:我正在使用Akka 1.0 解决方法
感谢您的询问,我对瓶颈进行了本地化并将其修改为Akka master:
https://github.com/akka/akka/commit/e4e99ef56399e892206ce4a46b9a9107da6c7770 它将在Akka 1.1-RC1中发布 干杯,√ (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |