加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Akka Http – 主机级客户端API Source.queue模式

发布时间:2020-12-16 18:32:02 所属栏目:安全 来源:网络整理
导读:我们开始实现docs: http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples中提到的Source.queue [HttpRequest]模式 这是文档中的(简化)示例 val poolClientFlow = Http() .cachedHostConnectionPool[Promise[HttpRes
我们开始实现docs: http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#examples中提到的Source.queue [HttpRequest]模式

这是文档中的(简化)示例

val poolClientFlow = Http()
  .cachedHostConnectionPool[Promise[HttpResponse]]("akka.io")

val queue =
  Source.queue[(HttpRequest,Promise[HttpResponse])](
     QueueSize,OverflowStrategy.dropNew
  )
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case ((Success(resp),p)) => p.success(resp)
      case ((Failure(e),p))    => p.failure(e)
    }))(Keep.left)
    .run()

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
  val responsePromise = Promise[HttpResponse]()
  queue.offer(request -> responsePromise).flatMap {
    case QueueOfferResult.Enqueued    => responsePromise.future
    case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
    case QueueOfferResult.Failure(ex) => Future.failed(ex)
    case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
  }
}

val responseFuture: Future[HttpResponse] = queueRequest(HttpRequest(uri = "/"))

文档声明使用Source.single(request)是一个反模式,应该避免.但是,它没有说明使用Source.queue的原因和含义.

At this place we previously showed an example that used the Source.single(request).via(pool).runWith(Sink.head).
In fact,this is an anti-pattern that doesn’t perform well. Please either supply requests using a queue or in a streamed fashion as shown below.

Source.queue的优点

>流程只实现一次(可能是性能提升?).但是,如果我正确理解了akka-http实现,那么每个连接都会实现一个新流程,所以这似乎不是一个很大的问题
>使用OverflowStrategy进行显式背压处理并匹配QueueOfferResult

Source.queue的问题

当我们在应用程序中开始实现此模式时,会出现这些问题.

Source.queue不是线程安全的

The queue implementation is not thread safe.当我们在不同的路线/演员中使用队列时,我们有这样的场景:

排队的请求可以覆盖最新的排队请求,从而导致未解决的Future.

UPDATE

此问题已在akka/akka/issues/23081中解决.队列实际上是线程安全的.

过滤?

过滤请求时会发生什么?例如.当有人改变实施时

Source.queue[(HttpRequest,Promise[HttpResponse])](
    QueueSize,OverflowStrategy.dropNew)
  .via(poolClientFlow)
  // only successful responses
  .filter(_._1.isSuccess)
  // failed won't arrive here
  .to(Sink.foreach({
    case ((Success(resp),p)) => p.success(resp)
    case ((Failure(e),p)) => p.failure(e)
  }))

未来会不会解决?使用单个请求流程,这很简单:

Source.single(request).via(poolClientFlow).runWith(Sink.headOption)

QueueSize vs max-open-request?

QueueSize和max-open-requests之间的区别尚不清楚.最后,两者都是缓冲区.我们的实现最终使用QueueSize == max-open-requests

Source.single()的缺点是什么?

到目前为止,我发现在Source.single上使用Source.queue有两个原因

>性能 – 仅实现流程一次.但是根据this answer,它应该不是问题
>明确配置背压并处理故障情况.在我看来,ConnectionPool具有足够的负载处理能力.可以映射结果的未来并处理异常.

提前致谢,
缪奇

解决方法

我将直接回答您的每个问题,然后对整体问题给出一般的间接答案.

可能性能提升?

您是正确的,因为每个IncomingConnection都有一个Flow,但如果Connection有多个请求,则仍然有性能提升.

过滤请求时会发生什么?

通常,流元素和接收元素之间没有1:1映射.可以有1:0,如在您的示例中,或者可以有1:多个,如果单个请求以某种方式产生多个响应.

QueueSize vs max-open-request?

该比率将取决于向队列提供元素的速度以及将http请求处理为响应的速度.没有预定义的理想解决方案.

GENERAL REDESIGN

在大多数情况下,使用Source.queue是因为某些上游函数动态地创建输入元素,然后将它们提供给队列,例如,

val queue = ??? //as in the example in your question

queue.offer(httpRequest1)
queue.offer(httpRequest2)
queue.offer(httpRequest3)

这是糟糕的设计,因为用于创建每个输入元素的任何实体或功能本身可以是流源的一部分,

val allRequests = Iterable(httpRequest1,httpRequest2,httpRequest3)

//no queue necessary
val allResponses : Future[Seq[HttpResponse]] = 
  Source(allRequests)
    .via(poolClientFlow)
    .to(Sink.seq[HttpResponse])
    .run()

现在没有必要担心队列,最大队列大小等.一切都被捆绑成一个很好的紧凑流.

即使请求源是动态的,您仍然可以使用Source.假设我们从控制台stdin获取请求路径,这仍然是一个完整的流:

import scala.io.{Source => ioSource}

val consoleLines : () => Iterator[String] = 
  () => ioSource.stdin.getLines()

Source
  .fromIterator(consoleLines)
  .map(consoleLine => HttpRequest(GET,uri = Uri(consoleLine)))
  .via(poolClientFlow)
  .to(Sink.foreach[HttpResponse](println))
  .run()

现在,即使每行都以随机间隔键入控制台,流仍然可以在没有队列的情况下进行反应.

我必须创建一个传递给第三方API的回调函数,我唯一看到队列的实例或Source.ActorRef是绝对必要的.此回调函数必须将传入元素提供给队列.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读