加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 你如何在最新的Akka(2.4.6)中限制流量?

发布时间:2020-12-16 08:45:11 所属栏目:安全 来源:网络整理
导读:你如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以限制每秒3个请求的请求数.我在网上找到了以下示例,但是对于旧的Akka和akka-streams API变化太大,我无法弄清楚如何重写它. def throttled[T](rate: FiniteDuration): Flow[T,T] = { val tickS
你如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以限制每秒3个请求的请求数.我在网上找到了以下示例,但是对于旧的Akka和akka-streams API变化太大,我无法弄清楚如何重写它.

def throttled[T](rate: FiniteDuration): Flow[T,T] = {
  val tickSource: Source[Unit] = TickSource(rate,rate,() => ())
  val zip = Zip[T,Unit]
  val in = UndefinedSource[T]
  val out = UndefinedSink[T]
  PartialFlowGraph { implicit builder =>
    import FlowGraphImplicits._
    in ~> zip.left ~> Flow[(T,Unit)].map { case (t,_) => t } ~> out
    tickSource ~> zip.right
  }.toFlow(in,out)
}

这是迄今为止我最好的尝试

def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val ticker = Source.tick(rate,Unit)

  val zip = builder.add(Zip[T,Unit.type])
  val map = Flow[(T,Unit.type)].map { case (value,_) => value }
  val messageExtractor = builder.add(map)

  val in = Inlet[T]("Req.in")
  val out = Outlet[T]("Req.out")

  out ~> zip.in0
  ticker ~> zip.in1
  zip.out ~> messageExtractor.in

  FlowShape.of(in,messageExtractor.out)
})

它在我的主要流程中抛出异常:)

private val queueHttp = Source.queue[(HttpRequest,(Any,Promise[(Try[HttpResponse],Any)]))](1000,OverflowStrategy.backpressure)
  .via(throttleFlow(rate))
  .via(poolClientFlow)
  .mapAsync(4) {
    case (util.Success(resp),any) =>
      val strictFut = resp.entity.toStrict(5 seconds)
      strictFut.map(ent => (util.Success(resp.copy(entity = ent)),any))
    case other =>
      Future.successful(other)
  }
  .toMat(Sink.foreach({
    case (triedResp,(value: Any,p: Promise[(Try[HttpResponse],Any)])) =>
      p.success(triedResp -> value)
    case _ =>
      throw new RuntimeException()
  }))(Keep.left)
  .run

其中poolClientFlow是Http()(系统).cachedHostConnectionPool [Any](baseDomain)

例外情况是:

Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph.
    at scala.Predef$.require(Predef.scala:219)
    at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204)

解决方法

这是一个使用@Qingwei提到的节流方法的尝试.关键是不要使用bindAndHandle(),而是在处理它们之前使用bind()并限制传入连接的流.代码取自 implementation of bindAndHandle(),但为了简单起见,省略了一些错误处理.请不要在生产中这样做.

implicit val system = ActorSystem("test")
implicit val mat = ActorMaterializer()
import system.dispatcher
val maxConcurrentConnections = 4

val handler: Flow[HttpRequest,HttpResponse,NotUsed] = complete(LocalDateTime.now().toString)

def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] =
  incomingConnection.flow
        .watchTermination()(Keep.right)
        .joinMat(handler)(Keep.left)
        .run()

Http().bind("127.0.0.1",8080)
  .throttle(3,1.second,1,ThrottleMode.Shaping)
  .mapAsyncUnordered(maxConcurrentConnections)(handleOneConnection)
  .to(Sink.ignore)
  .run()

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读