加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – akka actor中的增量处理

发布时间:2020-12-16 19:07:54 所属栏目:安全 来源:网络整理
导读:我有演员需要做很长时间运行和计算上昂贵的工作,但计算本身可以逐步完成.因此,虽然完整的计算本身需要数小时才能完成,但中间结果实际上非常有用,我希望能够响应它们的任何请求.这是我想要做的伪代码: var intermediateResult = ...loop { while (mailbox.is
我有演员需要做很长时间运行和计算上昂贵的工作,但计算本身可以逐步完成.因此,虽然完整的计算本身需要数小时才能完成,但中间结果实际上非常有用,我希望能够响应它们的任何请求.这是我想要做的伪代码:

var intermediateResult = ...
loop {
     while (mailbox.isEmpty && computationNotFinished)
       intermediateResult = computationStep(intermediateResult)


     receive {
         case GetCurrentResult => sender ! intermediateResult
         ...other messages...
     }
 }

解决方法

我假设从你对Roland Kuhn的评论回答你有一些可以被认为是递归的工作,至少在块中.如果不是这种情况,我认为没有任何干净的解决方案来处理您的问题,您将不得不处理复杂的模式匹配块.

如果我的假设是正确的,我会异步安排计算,让演员可以自由地回答其他消息.关键是使用Future monadic功能并具有简单的接收块.你必须处理三个消息(startComputation,changeState,getState)

您将获得以下收到:

def receive {
  case StartComputation(myData) =>expensiveStuff(myData)
  case ChangeState(newstate) = this.state = newstate
  case GetState => sender ! this.state
}

然后,您可以通过定义自己的递归映射来利用Future上的map方法:

def mapRecursive[A](f:Future[A],handler: A => A,exitConditions: A => Boolean):Future[A] = {
    f.flatMap {  a=>
                 if (exitConditions(a))
                   f
                 else {
                     val newFuture = f.flatMap{ a=> Future(handler(a))}
                     mapRecursive(newFuture,handler,exitConditions)
                 }

              }
  }

一旦你有了这个工具,一切都会变得更容易.如果您查看以下示例:

def main(args:Array[String]){
    val baseFuture:Future[Int] = Promise.successful(64)
    val newFuture:Future[Int] = mapRecursive(baseFuture,(a:Int) => {
                                   val result = a/2
                                   println("Additional step done: the current a is " + result)
                                   result
                                 },(a:Int) => (a<=1))

    val one = Await.result(newFuture,Duration.Inf)
    println("Computation finished,result = " + one)



  }

它的输出是:

Additional step done: the current a is 32

Additional step done: the current a is 16

Additional step done: the current a is 8

Additional step done: the current a is 4

Additional step done: the current a is 2

Additional step done: the current a is 1

Computation finished,result = 1

你知道你可以在昂贵的Stuffmethod中做同样的事情

def expensiveStuff(myData:MyData):Future[MyData]= {
    val firstResult = Promise.successful(myData)
    val handler : MyData => MyData = (myData) => {
      val result = myData.copy(myData.value/2)
      self ! ChangeState(result)
      result
    }
    val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1
    mapRecursive(firstResult,exitCondition)
  }

编辑 – 更详细

如果您不想阻止以线程安全且同步的方式处理来自其邮箱的消息的Actor,那么您唯一能做的就是在不同的线程上执行计算.这正是一种高性能的非阻塞接收.

但是,你说得对,我提出的方法会给你带来很高的性能损失.每一步都是在不同的未来完成的,这可能根本就没有必要.因此,您可以递归处理程序以获取单线程或多线程执行.毕竟没有神奇的公式:

>如果要异步安排并最小化成本,则所有工作都应由单个线程完成
>然而,这可能会阻止其他工作开始,因为如果线程池中的所有线程都被占用,则期货将排队.因此,您可能希望将操作分解为多个期货,以便即使在完全使用时也可以在旧工作完成之前安排一些新工作.

def recurseFuture[A](entryFuture: Future[A],exitCondition: A => Boolean,maxNestedRecursion: Long = Long.MaxValue): Future[A] = {
        def recurse(a:A,maxNestedRecursion: Long,currentStep: Long): Future[A] = {
          if (exitCondition(a))
            Promise.successful(a)
          else
            if (currentStep==maxNestedRecursion)
              Promise.successful(handler(a)).flatMap(a => recurse(a,exitCondition,maxNestedRecursion,0))
            else{
              recurse(handler(a),currentStep+1)
            }
        }
        entryFuture.flatMap { a => recurse(a,0) }
      }

我为测试目的增强了我的处理程序方法:

val handler: Int => Int = (a: Int) => {
      val result = a / 2
      println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName)
      result
    }

方法1:将处理程序递归到自身,以便在单个线程上执行所有操作.

println("Starting strategy with all the steps on the same thread")
    val deepestRecursion: Future[Int] = recurseFuture(baseFuture,exitCondition)
    Await.result(deepestRecursion,Duration.Inf)
    println("Completed strategy with all the steps on the same thread")
    println("")

方法2:递归处理器本身的有限深度

println("Starting strategy with the steps grouped by three")
val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,3)
val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,4)
Await.result(threeStepsInSameFuture,Duration.Inf)
Await.result(threeStepsInSameFuture2,Duration.Inf)
println("Completed strategy with all the steps grouped by three")
executorService.shutdown()

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读