scala – akka actor中的增量处理
我有演员需要做很长时间运行和计算上昂贵的工作,但计算本身可以逐步完成.因此,虽然完整的计算本身需要数小时才能完成,但中间结果实际上非常有用,我希望能够响应它们的任何请求.这是我想要做的伪代码:
var intermediateResult = ... loop { while (mailbox.isEmpty && computationNotFinished) intermediateResult = computationStep(intermediateResult) receive { case GetCurrentResult => sender ! intermediateResult ...other messages... } } 解决方法
我假设从你对Roland Kuhn的评论回答你有一些可以被认为是递归的工作,至少在块中.如果不是这种情况,我认为没有任何干净的解决方案来处理您的问题,您将不得不处理复杂的模式匹配块.
如果我的假设是正确的,我会异步安排计算,让演员可以自由地回答其他消息.关键是使用Future monadic功能并具有简单的接收块.你必须处理三个消息(startComputation,changeState,getState) 您将获得以下收到: def receive { case StartComputation(myData) =>expensiveStuff(myData) case ChangeState(newstate) = this.state = newstate case GetState => sender ! this.state } 然后,您可以通过定义自己的递归映射来利用Future上的map方法: def mapRecursive[A](f:Future[A],handler: A => A,exitConditions: A => Boolean):Future[A] = { f.flatMap { a=> if (exitConditions(a)) f else { val newFuture = f.flatMap{ a=> Future(handler(a))} mapRecursive(newFuture,handler,exitConditions) } } } 一旦你有了这个工具,一切都会变得更容易.如果您查看以下示例: def main(args:Array[String]){ val baseFuture:Future[Int] = Promise.successful(64) val newFuture:Future[Int] = mapRecursive(baseFuture,(a:Int) => { val result = a/2 println("Additional step done: the current a is " + result) result },(a:Int) => (a<=1)) val one = Await.result(newFuture,Duration.Inf) println("Computation finished,result = " + one) } 它的输出是:
你知道你可以在昂贵的Stuffmethod中做同样的事情 def expensiveStuff(myData:MyData):Future[MyData]= { val firstResult = Promise.successful(myData) val handler : MyData => MyData = (myData) => { val result = myData.copy(myData.value/2) self ! ChangeState(result) result } val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1 mapRecursive(firstResult,exitCondition) } 编辑 – 更详细 如果您不想阻止以线程安全且同步的方式处理来自其邮箱的消息的Actor,那么您唯一能做的就是在不同的线程上执行计算.这正是一种高性能的非阻塞接收. 但是,你说得对,我提出的方法会给你带来很高的性能损失.每一步都是在不同的未来完成的,这可能根本就没有必要.因此,您可以递归处理程序以获取单线程或多线程执行.毕竟没有神奇的公式: >如果要异步安排并最小化成本,则所有工作都应由单个线程完成 def recurseFuture[A](entryFuture: Future[A],exitCondition: A => Boolean,maxNestedRecursion: Long = Long.MaxValue): Future[A] = { def recurse(a:A,maxNestedRecursion: Long,currentStep: Long): Future[A] = { if (exitCondition(a)) Promise.successful(a) else if (currentStep==maxNestedRecursion) Promise.successful(handler(a)).flatMap(a => recurse(a,exitCondition,maxNestedRecursion,0)) else{ recurse(handler(a),currentStep+1) } } entryFuture.flatMap { a => recurse(a,0) } } 我为测试目的增强了我的处理程序方法: val handler: Int => Int = (a: Int) => { val result = a / 2 println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName) result } 方法1:将处理程序递归到自身,以便在单个线程上执行所有操作. println("Starting strategy with all the steps on the same thread") val deepestRecursion: Future[Int] = recurseFuture(baseFuture,exitCondition) Await.result(deepestRecursion,Duration.Inf) println("Completed strategy with all the steps on the same thread") println("") 方法2:递归处理器本身的有限深度 println("Starting strategy with the steps grouped by three") val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,3) val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,4) Await.result(threeStepsInSameFuture,Duration.Inf) Await.result(threeStepsInSameFuture2,Duration.Inf) println("Completed strategy with all the steps grouped by three") executorService.shutdown() (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |