使用Scala actor执行CPU绑定任务?
发布时间:2020-12-16 19:15:21 所属栏目:安全 来源:网络整理
导读:假设我必须执行几个CPU绑定的任务.例如,如果我有4个CPU,我可能会创建一个4-5个工作线程的固定大小的线程池,等待队列并将任务放入队列中.在 Java中,我可以使用java.util.concurrent(可能是ThreadPoolExecutor)来实现这种机制. 你会如何用Scala演员实现它? 解
假设我必须执行几个CPU绑定的任务.例如,如果我有4个CPU,我可能会创建一个4-5个工作线程的固定大小的线程池,等待队列并将任务放入队列中.在
Java中,我可以使用java.util.concurrent(可能是ThreadPoolExecutor)来实现这种机制.
你会如何用Scala演员实现它? 解决方法
所有参与者基本上都是由调度程序执行的线程.调度程序创建一个线程池来执行大致绑定到您的内核数量的actor.这意味着您可以根据需要执行的每个任务创建一个actor,并将其余部分留给Scala:
for(i <- 1 to 20) { actor { print(i); Thread.sleep(1000); } } 这里的缺点取决于任务的数量,为每个任务创建线程的成本可能非常昂贵,因为线程在Java中并不便宜. 创建有界工作者角色池然后通过消息传递将任务分发给他们的简单方法是: import scala.actors.Actor._ val numWorkers = 4 val pool = (1 to numWorkers).map { i => actor { loop { react { case x: String => println(x) } } } } for(i <- 1 to 20) { val r = (new util.Random).nextInt(numWorkers) pool(r) ! "task "+i } 我们想要创建多个actor的原因是因为一个actor一次只处理一个消息(即任务),所以为你需要创建多个任务获得并行性. 旁注:当涉及到I / O绑定任务时,默认调度程序变得尤为重要,因为在这种情况下,您肯定希望更改线程池的大小.关于此的详细信息有两个很好的博客文章:Explore the Scheduling of Scala Actors和Scala actors thread pool pitfall. 话虽如此,Akka是一个Actor框架,它为使用Actors的更高级工作流提供工具,我将在任何实际应用程序中使用它.这是一个负载平衡(而不是随机)任务执??行器: import akka.actor.Actor import Actor._ import akka.routing.{LoadBalancer,CyclicIterator} class TaskHandler extends Actor { def receive = { case t: Task => // some computationally expensive thing t.execute case _ => println("default case is required in Akka...") } } class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer { val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start()) val seq = new CyclicIterator(workerPool) } val router = actorOf(new TaskRouter(4)).start() for(i <- 1 to 20) { router ! Task(..) } 您可以使用不同类型的负载平衡(CyclicIterator是循环分配),因此您可以查看文档here以获取更多信息. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |