Akka实战:分散、聚合模式
分散与聚合:简单说就是一个任务需要拆分成多个小任务,每个小任务执行完后再把结果聚合在一起返回。 代码 http://git.oschina.net/yangbajing/akka-action 实例背景本实例来自一个真实的线上产品,现将其需求简化如下:
设计根据需求,一个简化的分散、聚合模式可以使用两个
实现NewsTask
override def metricPreStart(): Unit = { context.system.scheduler.scheduleOnce(doneDuration,self,TaskDelay) } override def metricReceive: Receive = { case StartFetchNews => _receipt = sender() (0 until NewsTask.TASK_SIZE).foreach { i => context.actorOf(SearchPageTask.props(self),"scatter-" + i) ! SearchPage(key) } case GetNewsItem(newsItem) => _newses ::= newsItem if (_newses.size == NewsTask.TASK_SIZE) { logger.debug(s"分散任务,${NewsTask.TASK_SIZE}个已全部完成") if (_receipt != null) { _receipt ! NewsResult(key,_newses) _receipt = null } self ! PoisonPill } case TaskDelay => if (_receipt != null) { _receipt ! NewsResult(key,_newses) _receipt = null } }
在 在收到
SearchPageTask
override def metricReceive: Receive = { case SearchPage(key) => // XXX 模拟抓取新闻时间 TimeUtils.sleep(Random.nextInt(20).seconds) val item = NewsItem( "http://newssite/news/" + self.path.name,"测试新闻" + self.path.name,self.path.name,TimeUtils.now().toString,"内容简介","新闻正文") taskRef ! GetNewsItem(item) context.stop(self) }
执行测试./sbt akka-action > testOnly me.yangbajing.akkaaction.scattergather.ScatterGatherTest 总结这是一个简单的
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |