Scala Actors而不是Java Futures
发布时间:2020-12-16 10:01:41 所属栏目:安全 来源:网络整理
导读:问题:我需要编写一个应用程序来处理几百个文件,每个文件需要几百兆字节才能完成.我使用Executors.newFixedThreadPool()创建的Future [Report]对象编写了它,但由于ExecutorService.invokeAll()返回的List [Future [Report]]对象保留在中间体上,因此出现了内
问题:我需要编写一个应用程序来处理几百个文件,每个文件需要几百兆字节才能完成.我使用Executors.newFixedThreadPool()创建的Future [Report]对象编写了它,但由于ExecutorService.invokeAll()返回的List [Future [Report]]对象保留在中间体上,因此出现了内存不足错误每个进程使用的内存.我通过在计算报告值(每个报告只有几百行)之后从处理器中的本地方法返回Report对象而不是在call方法中进行计算(来自Callable接口)来解决问题.
我想尝试使用Scala Actors来解决这个问题.我创建了一个类,它接受一系列作业(作业,结果和处理函数的参数化类型),并在一个可配置数量的Worker实例(Actor的子类)中处理每个作业.代码如下. 问题: >我不确定我的处理是什么 我焦急地等待Philipp Haller和Frank Sommers出版的“Actors in Scala”. 这是代码: package multi_worker import scala.actors.Actor import java.util.concurrent.CountDownLatch object MultiWorker { private val megabyte = 1024 * 1024 private val runtime = Runtime.getRuntime } class MultiWorker[A,B](jobs: List[A],actorCount: Int)(process: (A) => B) { import MultiWorker._ sealed abstract class Message // Dispatcher -> Worker: Run this job and report results case class Process(job: A) extends Message // Worker -> Dispatcher: Result of processing case class ReportResult(id: Int,result: B) extends Message // Worker -> Dispatcher: I need work -- send me a job case class SendJob(id: Int) extends Message // Worker -> Dispatcher: I have stopped as requested case class Stopped(id: Int) extends Message // Dispatcher -> Worker: Stop working -- all jobs done case class StopWorking extends Message /** * A simple logger that can be sent text messages that will be written to the * console. Used so that messages from the actors do not step on each other. */ object Logger extends Actor { def act() { loop { react { case text: String => println(text) case StopWorking => exit() } } } } Logger.start() /** * A worker actor that will process jobs and return results to the * dispatcher. */ class Worker(id: Int) extends Actor{ def act() { // Ask the dispatcher for an initial job dispatcher ! SendJob(id) loop { react { case Process(job) => val startTime = System.nanoTime dispatcher ! ReportResult(id,process(job)) val endTime = System.nanoTime val totalMemory = (runtime.totalMemory / megabyte) val usedMemory = totalMemory - (runtime.freeMemory / megabyte) val message = "Finished job " + job + " in " + ((endTime - startTime) / 1000000000.0) + " seconds using " + usedMemory + "MB out of total " + totalMemory + "MB" Logger ! message dispatcher ! SendJob(id) case StopWorking => dispatcher ! Stopped(id) exit() } } } } val latch = new CountDownLatch(1) var res = List.empty[B] /** * The job dispatcher that sends jobs to the worker until the job queue * (jobs: TraversableOnce[A]) is empty. It then tells the workers to * stop working and returns the List[B] results to the caller. */ val dispatcher = new Actor { def act() { var jobQueue = jobs var workers = (0 until actorCount).map(id => (id,new Worker(id))).toMap workers.values.foreach(_.start()) loop { react { case ReportResult(id,result) => res = result :: res if (jobQueue.isEmpty && workers.isEmpty) { latch.countDown() exit() } case SendJob(id) => if (!jobQueue.isEmpty) { workers(id) ! Process(jobQueue.head) jobQueue = jobQueue.tail } case Stopped(id) => workers = workers - id } } } } dispatcher.start() /** * Get the results of the processing -- wait for the dispatcher to finish * before returning. */ def results: List[B] = { latch.await() res } } 解决方法
快速浏览一下后,我会提出以下更新:
val resultsChannel = new Channel[List[B]] // used instead of countdown latch to get the results val dispatcher = new Actor { def act = loop(Nil,(0 to actorCount).map(id => (id,new Worker(id).start.asInstanceOf[Worker])).toMap,Nil) @tailrec def loop(jobQueue: List[A],// queue,workers and results are immutable lists,passed recursively through the loop workers: Map[Int,Worker],res: List[B]):Unit = react { case ReportResult(id,result) => val results = result :: res if (results.size == jobs.size) { // when the processing is finished,sends results to the output channel resultsChannel ! results } loop(jobQueue,workers,results) case SendJob(id) => if (!jobQueue.isEmpty) { workers(id) ! Process(jobQueue.head) loop(jobQueue.tail,res) } case Stopped(id) => loop(jobQueue,workers - id,res) } } dispatcher.start() def results: List[B] = { resultsChannel.receive { case results => results // synchronously wait for the data in the channel } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- 如何让StructureMap使用AngularJs / MVC5和WebApi2 Web项目
- CuteStrap——Bootstrap 的轻量级替代方案
- Azure WebApp容器重启问题
- 利用Axis2开发WebService(3)---用Java实现调用WebService的
- playframework – 可以使用play framework 2来美化scala模板
- 在Angular2 * ngFor迭代中,如何仅输出数组中的唯一值?
- angularjs – 复杂角度js类
- 单元测试 – angular2测试,我如何模拟子组件
- ______和_ _在Scala(两个单独的操作)中是什么意思?
- 分享一个适用于bootstrap且优化后的codeigniter分页类