加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用喷雾客户端进行多次请求时的akka??超时

发布时间:2020-12-16 18:56:14 所属栏目:安全 来源:网络整理
导读:使用喷雾1.3.2和akka 2.3.6. (akka仅用于喷雾). 我需要读取大文件,并为每一行发出一个http请求. 我用迭代器逐行读取文件,并为每个项目发出请求. 它成功运行了一些线路,但有时它开始失败: akka.pattern.AskTimeoutException:在[60000 ms]之后询问[Actor [ak
使用喷雾1.3.2和akka 2.3.6. (akka仅用于喷雾).
我需要读取大文件,并为每一行发出一个http请求.
我用迭代器逐行读取文件,并为每个项目发出请求.
它成功运行了一些线路,但有时它开始失败:
akka.pattern.AskTimeoutException:在[60000 ms]之后询问[Actor [akka:// default / user / IO-HTTP#-35162984]]的超时时间.
我首先想到我重载了服务,所以我将“spray.can.host-connector.max-connections”设置为1.它运行得慢得多,但我得到了同样的错误.
这里的代码:

import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
  mainType = "application",subType = "edn",compressible = true,binary = false,fileExtensions = Seq("edn")))

val pipeline = (
  addHeader("Accept","application/json")
  ~> sendReceive
  ~> unmarshal[PipelineResponse])

def postData(data: String) = {
  val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType,data))
  val responseFuture: Future[PipelineResponse] = pipeline(request)
  responseFuture
}

dataLines.map { d =>
  val f = postData(d)
  f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
  f.map { p => someMoreLogic(d,p) }
}

aggrigateResults(dataLines)

我是这样做的,因为我不需要整个数据,只需要一些聚合.

我怎样才能解决这个问题并保持完全异步?

解决方法

Akka ask timeout是通过firstCompletedOf实现的,所以定时器在初始化ask时启动.

你似乎正在做的是为每一行产生一个Future(在地图中) – 所以你的所有调用几乎同时执行.在初始化期货时,超时开始计算,但没有遗留给所有衍生的演员执行其工作的执行程序线程.因此要求超时.

我建议采用更灵活的方法,而不是处理“一次性全部”,有点类似于使用iteratees或akka-streams:Work Pulling Pattern.(Github)

你提供了你已经拥有的Epic迭代器.介绍一个工作人员演员,该演员将执行呼叫&一些逻辑.如果你产生N个工人,那么最多将同时处理N行(并且处理管道可能涉及多个步骤).这样,您可以确保不会使执行程序超载,并且不应发生超时.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读