scala – 你如何在最新的Akka(2.4.6)中限制流量?
发布时间:2020-12-16 08:45:11 所属栏目:安全 来源:网络整理
导读:你如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以限制每秒3个请求的请求数.我在网上找到了以下示例,但是对于旧的Akka和akka-streams API变化太大,我无法弄清楚如何重写它. def throttled[T](rate: FiniteDuration): Flow[T,T] = { val tickS
你如何在最新的Akka(2.4.6)中限制Flow?我想限制Http客户端流量,以限制每秒3个请求的请求数.我在网上找到了以下示例,但是对于旧的Akka和akka-streams API变化太大,我无法弄清楚如何重写它.
def throttled[T](rate: FiniteDuration): Flow[T,T] = { val tickSource: Source[Unit] = TickSource(rate,rate,() => ()) val zip = Zip[T,Unit] val in = UndefinedSource[T] val out = UndefinedSink[T] PartialFlowGraph { implicit builder => import FlowGraphImplicits._ in ~> zip.left ~> Flow[(T,Unit)].map { case (t,_) => t } ~> out tickSource ~> zip.right }.toFlow(in,out) } 这是迄今为止我最好的尝试 def throttleFlow[T](rate: FiniteDuration) = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val ticker = Source.tick(rate,Unit) val zip = builder.add(Zip[T,Unit.type]) val map = Flow[(T,Unit.type)].map { case (value,_) => value } val messageExtractor = builder.add(map) val in = Inlet[T]("Req.in") val out = Outlet[T]("Req.out") out ~> zip.in0 ticker ~> zip.in1 zip.out ~> messageExtractor.in FlowShape.of(in,messageExtractor.out) }) 它在我的主要流程中抛出异常:) private val queueHttp = Source.queue[(HttpRequest,(Any,Promise[(Try[HttpResponse],Any)]))](1000,OverflowStrategy.backpressure) .via(throttleFlow(rate)) .via(poolClientFlow) .mapAsync(4) { case (util.Success(resp),any) => val strictFut = resp.entity.toStrict(5 seconds) strictFut.map(ent => (util.Success(resp.copy(entity = ent)),any)) case other => Future.successful(other) } .toMat(Sink.foreach({ case (triedResp,(value: Any,p: Promise[(Try[HttpResponse],Any)])) => p.success(triedResp -> value) case _ => throw new RuntimeException() }))(Keep.left) .run 其中poolClientFlow是Http()(系统).cachedHostConnectionPool [Any](baseDomain) 例外情况是: Caused by: java.lang.IllegalArgumentException: requirement failed: The output port [Req.out] is not part of the underlying graph. at scala.Predef$.require(Predef.scala:219) at akka.stream.impl.StreamLayout$Module$class.wire(StreamLayout.scala:204) 解决方法
这是一个使用@Qingwei提到的节流方法的尝试.关键是不要使用bindAndHandle(),而是在处理它们之前使用bind()并限制传入连接的流.代码取自
implementation of
bindAndHandle() ,但为了简单起见,省略了一些错误处理.请不要在生产中这样做.
implicit val system = ActorSystem("test") implicit val mat = ActorMaterializer() import system.dispatcher val maxConcurrentConnections = 4 val handler: Flow[HttpRequest,HttpResponse,NotUsed] = complete(LocalDateTime.now().toString) def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] = incomingConnection.flow .watchTermination()(Keep.right) .joinMat(handler)(Keep.left) .run() Http().bind("127.0.0.1",8080) .throttle(3,1.second,1,ThrottleMode.Shaping) .mapAsyncUnordered(maxConcurrentConnections)(handleOneConnection) .to(Sink.ignore) .run() (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |