加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 从线程模型转换为actor

发布时间:2020-12-16 19:13:14 所属栏目:安全 来源:网络整理
导读:试图了解如何用演员而不是线程来思考.我对以下用例感到有点困惑: Consider a system that has a producer process that creates work (e.g. by reading data from a file),and a number of worker processes that consume the work (e.g. by parsing the da
试图了解如何用演员而不是线程来思考.我对以下用例感到有点困惑:

Consider a system that has a producer process that creates work (e.g. by reading data from a file),and a number of worker processes that consume the work (e.g. by parsing the data and writing it to a database). The rates at which work is produced and consumed can vary,and the system should remain robust to this. For example,if the workers can’t keep up,the producer should detect this and eventually slow down or wait.

使用线程很容易实现:

val producer:Iterator[Work] = createProducer()
val queue = new LinkedBlockingQueue[Work](QUEUE_SIZE)
val workers = (0 until NUM_WORKERS) map { i =>
  new Thread() { 
    override def run() = {
      while (true) {
        try {
          // take next unit of work,waiting if necessary
          val work = queue.take()
          process(work)
        }
        catch {
          case e:InterruptedException => return
        }
      }
    }
  }
}

// start the workers
workers.foreach(_.start())

while (producer.hasNext) {
  val work = producer.next()
  // add new unit of work,waiting if necessary
  queue.put(work)
}

while (!queue.isEmpty) {
  // wait until queue is drained
  queue.wait()
}

// stop the workers
workers.foreach(_.interrupt())

这个模型没有什么问题,我以前成功地使用过它.这个例子可能过于冗长,因为使用Executor或CompletionService可以很好地适应这个任务.但我喜欢演员抽象,并认为在很多情况下更容易推理.有没有办法使用actor重写这个例子,特别是确保没有缓冲区溢出(例如完整邮箱,丢弃的消息等)?

解决方法

因为演员处理消息“离线”(即消息的消费与他们被接收的消息无关),所以很难看出你如何能够准确地模拟“生产者等待消费者赶上”.

我唯一能想到的是消费者从生产者演员(使用回复)请求作品:

case object MoreWorkPlease
class Consumer(prod : Producer) extends Actor {
  def act = {
    prod ! MoreWorkPlease
    loop {
      react {
        case Work(payload) => doStuff(payload); reply(MoreWorkPlease)
      }
    }
  }
}

class Producer extends Actor {
  def act = loop {
    react {
      case MoreWorkPlease => reply(Work(getNextItem))
    }
  }
}

当然,这并不完美,因为制作人不会“向前阅读”,只有在消费者做好准备时才能开始工作.用法如下:

val prod = new Producer
(1 to NUM_ACTORS).map(new Consumer(prod)).foreach(_.start())
prod.start()

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读