scala – Akka Http – 主机级客户端API Source.queue模式
我们开始实现docs:
http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples中提到的Source.queue [HttpRequest]模式
这是文档中的(简化)示例 val poolClientFlow = Http() .cachedHostConnectionPool[Promise[HttpResponse]]("akka.io") val queue = Source.queue[(HttpRequest,Promise[HttpResponse])]( QueueSize,OverflowStrategy.dropNew ) .via(poolClientFlow) .toMat(Sink.foreach({ case ((Success(resp),p)) => p.success(resp) case ((Failure(e),p)) => p.failure(e) }))(Keep.left) .run() def queueRequest(request: HttpRequest): Future[HttpResponse] = { val responsePromise = Promise[HttpResponse]() queue.offer(request -> responsePromise).flatMap { case QueueOfferResult.Enqueued => responsePromise.future case QueueOfferResult.Dropped => Future.failed(new RuntimeException("Queue overflowed. Try again later.")) case QueueOfferResult.Failure(ex) => Future.failed(ex) case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later.")) } } val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/")) 文档声明使用Source.single(request)是一个反模式,应该避免.但是,它没有说明使用Source.queue的原因和含义.
Source.queue的优点 >流程只实现一次(可能是性能提升?).但是,如果我正确理解了akka-http实现,那么每个连接都会实现一个新流程,所以这似乎不是一个很大的问题 Source.queue的问题 当我们在应用程序中开始实现此模式时,会出现这些问题. Source.queue不是线程安全的 The queue implementation is not thread safe.当我们在不同的路线/演员中使用队列时,我们有这样的场景: 排队的请求可以覆盖最新的排队请求,从而导致未解决的Future. UPDATE 此问题已在akka/akka/issues/23081中解决.队列实际上是线程安全的. 过滤? 过滤请求时会发生什么?例如.当有人改变实施时 Source.queue[(HttpRequest,Promise[HttpResponse])]( QueueSize,OverflowStrategy.dropNew) .via(poolClientFlow) // only successful responses .filter(_._1.isSuccess) // failed won't arrive here .to(Sink.foreach({ case ((Success(resp),p)) => p.success(resp) case ((Failure(e),p)) => p.failure(e) })) 未来会不会解决?使用单个请求流程,这很简单: Source.single(request).via(poolClientFlow).runWith(Sink.headOption) QueueSize vs max-open-request? QueueSize和max-open-requests之间的区别尚不清楚.最后,两者都是缓冲区.我们的实现最终使用QueueSize == max-open-requests Source.single()的缺点是什么? 到目前为止,我发现在Source.single上使用Source.queue有两个原因 >性能 – 仅实现流程一次.但是根据this answer,它应该不是问题 提前致谢, 解决方法
我将直接回答您的每个问题,然后对整体问题给出一般的间接答案.
可能性能提升? 您是正确的,因为每个IncomingConnection都有一个Flow,但如果Connection有多个请求,则仍然有性能提升. 过滤请求时会发生什么? 通常,流元素和接收元素之间没有1:1映射.可以有1:0,如在您的示例中,或者可以有1:多个,如果单个请求以某种方式产生多个响应. QueueSize vs max-open-request? 该比率将取决于向队列提供元素的速度以及将http请求处理为响应的速度.没有预定义的理想解决方案. GENERAL REDESIGN 在大多数情况下,使用Source.queue是因为某些上游函数动态地创建输入元素,然后将它们提供给队列,例如, val queue = ??? //as in the example in your question queue.offer(httpRequest1) queue.offer(httpRequest2) queue.offer(httpRequest3) 这是糟糕的设计,因为用于创建每个输入元素的任何实体或功能本身可以是流源的一部分, val allRequests = Iterable(httpRequest1,httpRequest2,httpRequest3) //no queue necessary val allResponses : Future[Seq[HttpResponse]] = Source(allRequests) .via(poolClientFlow) .to(Sink.seq[HttpResponse]) .run() 现在没有必要担心队列,最大队列大小等.一切都被捆绑成一个很好的紧凑流. 即使请求源是动态的,您仍然可以使用Source.假设我们从控制台stdin获取请求路径,这仍然是一个完整的流: import scala.io.{Source => ioSource} val consoleLines : () => Iterator[String] = () => ioSource.stdin.getLines() Source .fromIterator(consoleLines) .map(consoleLine => HttpRequest(GET,uri = Uri(consoleLine))) .via(poolClientFlow) .to(Sink.foreach[HttpResponse](println)) .run() 现在,即使每行都以随机间隔键入控制台,流仍然可以在没有队列的情况下进行反应. 我必须创建一个传递给第三方API的回调函数,我唯一看到队列的实例或Source.ActorRef是绝对必要的.此回调函数必须将传入元素提供给队列. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |