加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 在Akka / Spray上的油门HTTP请求

发布时间:2020-12-16 21:32:51 所属栏目:安全 来源:网络整理
导读:我在 Scala中使用Akka演员从外部服务(HTTP获取请求)下载资源.外部服务的响应是 JSON,我必须使用分页(提供程序很慢).我想在10个线程中同时下载所有分页结果.我使用这样的URL下载块: http://service.com/itmes?limit=50offset=1000 我创建了以下管道: Scatte
我在 Scala中使用Akka演员从外部服务(HTTP获取请求)下载资源.外部服务的响应是 JSON,我必须使用分页(提供程序很慢).我想在10个线程中同时下载所有分页结果.我使用这样的URL下载块: http://service.com/itmes?limit=50&offset=1000

我创建了以下管道:

ScatterActor => RoundRobinPool[10](LoadChunkActor) => Aggreator

ScatterActor需要总共下载的项目数量并将其划分成块.我创建了10个LoadChunkActor来同时处理任务.

override def receive: Receive = {
    case LoadMessage(limit) =>
    val offsets: IndexedSeq[Int] = 0 until limit by chunkSize
    offsets.foreach(offset => context.system.actorSelection(pipe) !
    LoadMessage(chunkSize,offset))
 }

LoadChunkActor使用Spray发送请求.演员看起来像这样:

val pipeline = sendReceive ~> unmarshal[List[Items]]
override def receive: Receive = {
  case LoadMessage(limit,offset) =>
    val uri: String = s"http://service.com/items?limit=50&offset=$offset"
    val responseFuture = pipeline {Get(uri)}
    responseFuture onComplete {
      case Success(items) => aggregator ! Loaded(items)
    }
 }

正如你可以看到的,LoadChunkActor正在从外部服务请求块,并添加回调来运行onComplete.演员现在准备再发一个消息,他正在请求另一个大块. Spray正在使用非阻塞API来下载块.结果外部服务充斥着我的请求,我得到超时.

如何安排任务列表,但是我想同时处理最多10个?

解决方法

我创建了以下解决方案(类似于拉 http://www.michaelpollmeier.com/akka-work-pulling-pattern/:

ScatterActor (10000x messages) => 
  ThrottleActor => LoadChunkActor => ThrottleMonitorActor => Aggregator
         ^                                    |
         |<--------WorkDoneMessage------------|

> ThrottleActor pub消息到ListBuffer并发送给LoadChunkActor最大N个消息计数.>当LoadChunkActor通过ThrottleMonitorActor向Aggregator发送消息时.> ThrottleMonitorActor向ThrottleActor发送确认.> ThrottleActor向LoadChunkActor发送下一条消息.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读