scala – 在Akka流中创建来自Future的背压
发布时间:2020-12-16 19:14:34 所属栏目:安全 来源:网络整理
导读:我是Akka流和流的新手,因此我可能在概念层面上完全误解了某些东西,但是有什么方法可以创建背压直到未来结算?基本上我想做的是这样的: object Parser { def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???}val futures = FileIO.fromPath(
我是Akka流和流的新手,因此我可能在概念层面上完全误解了某些东西,但是有什么方法可以创建背压直到未来结算?基本上我想做的是这样的:
object Parser { def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ??? } val futures = FileIO.fromPath(path) .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) .batch(1000,x => x)(_ ++ _) .map(values => doAsyncOp(values)) .runWith(Sink.seq) def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ??? 从文件中读取字节并将其流式传输到解析器,解析器将发出ExampleObjects的Seqs,并将这些字节流式传输到返回Future的异步操作.我想这样做,直到Future解析,流的其余部分得到反压,然后在Future解析后恢复,将另一个Seq [ExampleObject]传递给doAsyncOp,它恢复背压等等. 现在我有这个工作: Await.result(doAsyncOp(values),10 seconds) 但我的理解是,这会锁定整个线程并且很糟糕.有没有更好的方法呢? 如果它有帮助,那么大局是我正在尝试用Jawn解析一个非常大的JSON文件(太大而不适合内存),然后将对象传递给ElasticSearch,以便在它们被解析时编入索引 – ElasticSearch有一个包含50个待处理操作的队列,如果溢出则开始拒绝新对象. 解决方法
这很容易.你需要使用mapAync (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |