加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

斯卡拉 – 阿卡流.分组,聚合一段时间并发出结果

发布时间:2020-12-14 05:02:18 所属栏目:百科 来源:网络整理
导读:我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西: Source .tick(0 second,50 millis,() = if (Random.nextBoolean) (1,s"A") else (2,s"B")) .map { f = f() } .gro
我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西:

Source
    .tick(0 second,50 millis,() => if (Random.nextBoolean) (1,s"A") else (2,s"B"))
    .map { f => f() }
    .groupBy(10,_._1)
    // how to aggregate grouped elements here for two seconds?
    .scan(Seq[String]()) { (x,y) => x ++ Seq(y._2) }
    .to(Sink.foreach(println))

期望的输出应该如下所示:

Seq(A,A,A)
Seq(B,B,B)
Seq(A,B)
// and so on

如何使用流实现此类功能?

解决方法

你需要在你的流程中分组:

http://doc.akka.io/docs/akka/2.4.17/scala/stream/stages-overview.html#groupedwithin

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读