加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 在Akka流中创建来自Future的背压

发布时间:2020-12-16 19:14:34 所属栏目:安全 来源:网络整理
导读:我是Akka流和流的新手,因此我可能在概念层面上完全误解了某些东西,但是有什么方法可以创建背压直到未来结算?基本上我想做的是这样的: object Parser { def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???}val futures = FileIO.fromPath(
我是Akka流和流的新手,因此我可能在概念层面上完全误解了某些东西,但是有什么方法可以创建背压直到未来结算?基本上我想做的是这样的:

object Parser {
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000,x => x)(_ ++ _)
  .map(values => doAsyncOp(values))
  .runWith(Sink.seq)

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???

从文件中读取字节并将其流式传输到解析器,解析器将发出ExampleObjects的Seqs,并将这些字节流式传输到返回Future的异步操作.我想这样做,直到Future解析,流的其余部分得到反压,然后在Future解析后恢复,将另一个Seq [ExampleObject]传递给doAsyncOp,它恢复背压等等.

现在我有这个工作:

Await.result(doAsyncOp(values),10 seconds)

但我的理解是,这会锁定整个线程并且很糟糕.有没有更好的方法呢?

如果它有帮助,那么大局是我正在尝试用Jawn解析一个非常大的JSON文件(太大而不适合内存),然后将对象传递给ElasticSearch,以便在它们被解析时编入索引 – ElasticSearch有一个包含50个待处理操作的队列,如果溢出则开始拒绝新对象.

解决方法

这很容易.你需要使用mapAync

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读