斯卡拉 – 如何从外面正确阻止阿卡流
发布时间:2020-12-16 19:21:01 所属栏目:安全 来源:网络整理
导读:我正在设计一个可以生成CSV测试数据的小工具.我想使用Akka Streams(1.0-RC4)来实现数据流.将有一个Source生成随机数,转换为CSV字符串,一些速率限制器和一个写入文件的接收器. 还应该有一个使用小型REST接口停止工具的干净方法. 这是我在努力的地方.在流启动
我正在设计一个可以生成CSV测试数据的小工具.我想使用Akka Streams(1.0-RC4)来实现数据流.将有一个Source生成随机数,转换为CSV字符串,一些速率限制器和一个写入文件的接收器.
还应该有一个使用小型REST接口停止工具的干净方法. 这是我在努力的地方.在流启动后(RunnableFlow.run())似乎无法阻止它. Source和Sink是无限的(至少在磁盘运行完全:) :)所以他们不会停止流. 向Source或Sink添加控制逻辑会感觉不对.也使用ActorSystem.shutdown().什么是阻止流的好方法? 解决方法
好的,我找到了一个不错的解决方案.它已经坐在我的鼻子底下,我只是没有看到它. Source.lazyEmpty实现了一个承诺,即完成后将终止Source及其后面的流.
剩下的问题是,如何将其包含在无限的随机数流中.我试过Zip.结果是没有随机数通过流,因为lazyEmpty从不发出值(doh).我尝试了Merge,但是流程从未终止,因为Merge一直持续到所有来源都已完成. 所以我写了自己的合并.它从一个输入端口转发所有值,并在任何源完成时终止. object StopperFlow { private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) { val in = newInlet[A]("in") val stopper = newInlet[Unit]("stopper") override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init) } private class StopperMerge[In] extends FlexiMerge[In,StopperMergeShape[In]]( new StopperMergeShape(),Attributes.name("StopperMerge")) { import FlexiMerge._ override def createMergeLogic(p: PortT) = new MergeLogic[In] { override def initialState = State[In](Read(p.in)) { (ctx,input,element) => ctx.emit(element) SameState } override def initialCompletionHandling = eagerClose } } def apply[In](): Flow[In,In,Promise[Unit]] = { val stopperSource = Source.lazyEmpty[Unit] Flow(stopperSource) { implicit builder => stopper => val stopperMerge = builder.add(new StopperMerge[In]()) stopper ~> stopperMerge.stopper (stopperMerge.in,stopperMerge.out) } } } 流可以插入任何流.实现后,它将返回一个Promise,在完成时终止流.这是我对它的测试. implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val startTime = System.currentTimeMillis() def dumpToConsole(f: Float) = { val timeSinceStart = System.currentTimeMillis() - startTime System.out.println(s"[$timeSinceStart] - Random number: $f") } val randomSource = Source(() => Iterator.continually(Random.nextFloat())) val consoleSink = Sink.foreach(dumpToConsole) val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink) val (_,promise) = flow.run() Thread.sleep(1000) val _ = promise.success(()) Thread.sleep(1000) 我希望这对其他人也有用.仍然让我感到困惑的是,为什么没有内置的方法来终止来自流外部的流. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |