加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何清理连续的Akka流中的子流

发布时间:2020-12-16 19:17:16 所属栏目:安全 来源:网络整理
导读:鉴于我有很长的事件流经过如下所示的事物.经过很长时间后,将会有很多不再需要的子流. Is there a way to clean up a specific substream at a given time,for example the substream created by id 3 should be cleaned and the state in the scan method lo
鉴于我有很长的事件流经过如下所示的事物.经过很长时间后,将会有很多不再需要的子流.

Is there a way to clean up a specific substream at a given time,for
example the substream created by id 3 should be cleaned and the state
in the scan method lost at 13Pm (expires property of Wid)?

case class Wid(id: Int,v: String,expires: LocalDateTime)
test("Substream with scan") {
  val (pub,sub) = TestSource.probe[Wid]
    .groupBy(Int.MaxValue,_.id)
    .scan("")((a: String,b: Wid) => a + b.v)
    .mergeSubstreams
    .toMat(TestSink.probe[String])(Keep.both)
    .run()
}

解决方法

TL; DR您可以在一段时间后关闭子流.但是,使用输入来动态设置内置阶段的时间是另一回事.

关闭子流

要关闭流程,您通常会完成它(从上游),但您也可以取消它(从下游).例如,一旦n个元素通过,take(n:Int)流将取消.

现在,在groupBy情况下,您无法完成子流,因为上游由所有子流共享,但您可以取消它.如何取决于你想要放在什么条件.

但是,请注意groupBy删除已经关闭的子流的输入:如果一个id为3的新元素在3子流关闭后从上游到groupBy,它将被忽略,下一个元素将被拉出这可能是因为在关闭和重新打开子流之间的过程中可能会丢失一些元素.此外,如果您的流应该运行很长时间,这将影响性能,因为在转发到相关(实时)子流之前,将针对关闭的子流列表检查每个元素.如果您对此的性能不满意,您可能希望实现自己的有状态过滤器(例如,使用布隆过滤器).

要关闭一个子流,我通常使用take(如果你只想要一定数量的元素,但在无限流中可能不是这种情况),或者某种超时:如果你想要从物化到固定的时间,则需要completionTimeout如果要在没有元素通过一段时间后关闭,则关闭或idleTimeout.请注意,这些流不会取消流但会使其失败,因此您必须使用recover或recoverWith阶段捕获异常以将失败更改为取消(recoverWith允许您通过使用Source恢复来取消而不发送任何最后一个元素.空).

动态设置超时

现在你想要的是根据第一个传递元素动态设置关闭时间.这更复杂,因为流的实现与通过它们的元素无关.实际上,在通常的(没有groupBy)情况下,流在任何元素通过之前都已实现,因此使用元素实现它们是没有意义的.

我在that question中遇到了类似的问题,最后使用了带签名的groupBy的修改版本

paramGroupBy[K,OO,MM](maxSubstreams: Int,f: Out => K,paramSubflow: K => Flow[Out,MM])

允许使用定义它的键定义每个子流.这可以修改为具有第一个元素(而不是键)作为参数.

另一个(可能更简单,在你的情况下)方式是编写你自己的阶段,完全符合你的要求:从第一个元素获取结束时间并在那时取消流.这是一个示例实现(我使用调度程序而不是设置状态):

object CancelAfterTimer

class CancelAfter[T](getTimeout: T => FiniteDuration) extends GraphStage[FlowShape[T,T]] {
  val in = Inlet[T]("CancelAfter.in")
  val out = Outlet[T]("CancelAfter.in")
  override val shape: FlowShape[T,T] = FlowShape(in,out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler  {
    override def onPush(): Unit = {
      val elem = grab(in)
      if (!isTimerActive(CancelAfterTimer))
        scheduleOnce(CancelAfterTimer,getTimeout(elem))
      push(out,elem)
    }

    override def onTimer(timerKey: Any): Unit = 
      completeStage() //this will cancel the upstream and close the downstrean

    override def onPull(): Unit = pull(in)

    setHandlers(in,out,this)
  }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读