scala – 使用喷雾客户端进行多次请求时的akka??超时
发布时间:2020-12-16 18:56:14 所属栏目:安全 来源:网络整理
导读:使用喷雾1.3.2和akka 2.3.6. (akka仅用于喷雾). 我需要读取大文件,并为每一行发出一个http请求. 我用迭代器逐行读取文件,并为每个项目发出请求. 它成功运行了一些线路,但有时它开始失败: akka.pattern.AskTimeoutException:在[60000 ms]之后询问[Actor [ak
使用喷雾1.3.2和akka 2.3.6. (akka仅用于喷雾).
我需要读取大文件,并为每一行发出一个http请求. 我用迭代器逐行读取文件,并为每个项目发出请求. 它成功运行了一些线路,但有时它开始失败: akka.pattern.AskTimeoutException:在[60000 ms]之后询问[Actor [akka:// default / user / IO-HTTP#-35162984]]的超时时间. 我首先想到我重载了服务,所以我将“spray.can.host-connector.max-connections”设置为1.它运行得慢得多,但我得到了同样的错误. 这里的代码: import spray.http.MediaTypes._ val EdnType = register( MediaType.custom( mainType = "application",subType = "edn",compressible = true,binary = false,fileExtensions = Seq("edn"))) val pipeline = ( addHeader("Accept","application/json") ~> sendReceive ~> unmarshal[PipelineResponse]) def postData(data: String) = { val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType,data)) val responseFuture: Future[PipelineResponse] = pipeline(request) responseFuture } dataLines.map { d => val f = postData(d) f.onFailure { case e => println("Error - "+e)} // This is where the errors are display f.map { p => someMoreLogic(d,p) } } aggrigateResults(dataLines) 我是这样做的,因为我不需要整个数据,只需要一些聚合. 我怎样才能解决这个问题并保持完全异步? 解决方法
Akka ask timeout是通过firstCompletedOf实现的,所以定时器在初始化ask时启动.
你似乎正在做的是为每一行产生一个Future(在地图中) – 所以你的所有调用几乎同时执行.在初始化期货时,超时开始计算,但没有遗留给所有衍生的演员执行其工作的执行程序线程.因此要求超时. 我建议采用更灵活的方法,而不是处理“一次性全部”,有点类似于使用iteratees或akka-streams:Work Pulling Pattern.(Github) 你提供了你已经拥有的Epic迭代器.介绍一个工作人员演员,该演员将执行呼叫&一些逻辑.如果你产生N个工人,那么最多将同时处理N行(并且处理管道可能涉及多个步骤).这样,您可以确保不会使执行程序超载,并且不应发生超时. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |