加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala Actors而不是Java Futures

发布时间:2020-12-16 10:01:41 所属栏目:安全 来源:网络整理
导读:问题:我需要编写一个应用程序来处理几百个文件,每个文件需要几百兆字节才能完成.我使用Executors.newFixedThreadPool()创建的Future [Report]对象编写了它,但由于ExecutorService.invokeAll()返回的List [Future [Report]]对象保留在中间体上,因此出现了内
问题:我需要编写一个应用程序来处理几百个文件,每个文件需要几百兆字节才能完成.我使用Executors.newFixedThreadPool()创建的Future [Report]对象编写了它,但由于ExecutorService.invokeAll()返回的List [Future [Report]]对象保留在中间体上,因此出现了内存不足错误每个进程使用的内存.我通过在计算报告值(每个报告只有几百行)之后从处理器中的本地方法返回Report对象而不是在call方法中进行计算(来自Callable接口)来解决问题.

我想尝试使用Scala Actors来解决这个问题.我创建了一个类,它接受一系列作业(作业,结果和处理函数的参数化类型),并在一个可配置数量的Worker实例(Actor的子类)中处理每个作业.代码如下.

问题:

>我不确定我的处理是什么
正确.
>我不喜欢使用CountDownLatch来延迟从调度程序返回结果.
>我更愿意编写一个更加“功能”的调度程序版本,它不会修改jobsQueue列表或者worker hashmap,也许是从Clojure借用尾递归循环结构(我在其他Scala中使用了@tailrec def循环方法)码).

我焦急地等待Philipp Haller和Frank Sommers出版的“Actors in Scala”.

这是代码:

package multi_worker

import scala.actors.Actor
import java.util.concurrent.CountDownLatch

object MultiWorker {
  private val megabyte = 1024 * 1024
  private val runtime = Runtime.getRuntime
}

class MultiWorker[A,B](jobs: List[A],actorCount: Int)(process: (A) => B) {
  import MultiWorker._

  sealed abstract class Message

  // Dispatcher -> Worker: Run this job and report results
  case class Process(job: A) extends Message

  // Worker -> Dispatcher: Result of processing
  case class ReportResult(id: Int,result: B) extends Message

  // Worker -> Dispatcher: I need work -- send me a job
  case class SendJob(id: Int) extends Message

  // Worker -> Dispatcher: I have stopped as requested
  case class Stopped(id: Int) extends Message

  // Dispatcher -> Worker: Stop working -- all jobs done
  case class StopWorking extends Message

  /**
   * A simple logger that can be sent text messages that will be written to the
   * console. Used so that messages from the actors do not step on each other.
   */
  object Logger
  extends Actor {
    def act() {
      loop {
        react {
          case text: String => println(text)
          case StopWorking => exit()
        }
      }
    }
  }
  Logger.start()

  /**
   * A worker actor that will process jobs and return results to the
   * dispatcher.
   */
  class Worker(id: Int)
  extends Actor{
    def act() {
      // Ask the dispatcher for an initial job
      dispatcher ! SendJob(id)

      loop {
        react {
          case Process(job) =>
            val startTime = System.nanoTime
            dispatcher ! ReportResult(id,process(job))

            val endTime = System.nanoTime
            val totalMemory = (runtime.totalMemory / megabyte)
            val usedMemory = totalMemory - (runtime.freeMemory / megabyte)
            val message = "Finished job " + job + " in " +
              ((endTime - startTime) / 1000000000.0) +
              " seconds using " + usedMemory +
              "MB out of total " + totalMemory + "MB"
            Logger ! message

            dispatcher ! SendJob(id)

          case StopWorking =>
            dispatcher ! Stopped(id)
            exit()
        }
      }
    }
  }

  val latch = new CountDownLatch(1)
  var res = List.empty[B]

  /**
   * The job dispatcher that sends jobs to the worker until the job queue
   * (jobs: TraversableOnce[A]) is empty. It then tells the workers to
   * stop working and returns the List[B] results to the caller.
   */
  val dispatcher = new Actor {
    def act() {
      var jobQueue = jobs
      var workers = (0 until actorCount).map(id => (id,new Worker(id))).toMap
      workers.values.foreach(_.start())

      loop {
        react {
          case ReportResult(id,result) =>
            res = result :: res
            if (jobQueue.isEmpty && workers.isEmpty) {
              latch.countDown()
              exit()
            }

          case SendJob(id) =>
            if (!jobQueue.isEmpty) {
              workers(id) ! Process(jobQueue.head)
              jobQueue = jobQueue.tail
            }

          case Stopped(id) =>
            workers = workers - id
        }
      }
    }
  }
  dispatcher.start()

  /**
   * Get the results of the processing -- wait for the dispatcher to finish
   * before returning.
   */
  def results: List[B] = {
    latch.await()
    res
  }
}

解决方法

快速浏览一下后,我会提出以下更新:

val resultsChannel = new Channel[List[B]] // used instead of countdown latch to get the results

val dispatcher = new Actor {

  def act = loop(Nil,(0 to actorCount).map(id =>
      (id,new Worker(id).start.asInstanceOf[Worker])).toMap,Nil)

  @tailrec
  def loop(jobQueue: List[A],// queue,workers and results are immutable lists,passed recursively through the loop
           workers: Map[Int,Worker],res: List[B]):Unit = react {
    case ReportResult(id,result) =>
      val results = result :: res
      if (results.size == jobs.size) { // when the processing is finished,sends results to the output channel        
        resultsChannel ! results
      }
      loop(jobQueue,workers,results)

    case SendJob(id) =>
      if (!jobQueue.isEmpty) {
        workers(id) ! Process(jobQueue.head)
        loop(jobQueue.tail,res)
      }

    case Stopped(id) =>
      loop(jobQueue,workers - id,res)
  }

}
dispatcher.start()

def results: List[B] = {
  resultsChannel.receive {
    case results => results // synchronously wait for the data in the channel
  }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读