斯卡拉 – 阿卡流.分组,聚合一段时间并发出结果
发布时间:2020-12-14 05:02:18 所属栏目:百科 来源:网络整理
导读:我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西: Source .tick(0 second,50 millis,() = if (Random.nextBoolean) (1,s"A") else (2,s"B")) .map { f = f() } .gro
我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西:
Source .tick(0 second,50 millis,() => if (Random.nextBoolean) (1,s"A") else (2,s"B")) .map { f => f() } .groupBy(10,_._1) // how to aggregate grouped elements here for two seconds? .scan(Seq[String]()) { (x,y) => x ++ Seq(y._2) } .to(Sink.foreach(println)) 期望的输出应该如下所示: Seq(A,A,A) Seq(B,B,B) Seq(A,B) // and so on 如何使用流实现此类功能? 解决方法
你需要在你的流程中分组:
http://doc.akka.io/docs/akka/2.4.17/scala/stream/stages-overview.html#groupedwithin (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |