scala – groupBy的子流是否可以依赖于它们生成的密钥?
发布时间:2020-12-16 18:49:38 所属栏目:安全 来源:网络整理
导读:我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取. 我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素. 如果我不想合并下游的子流,我可以使用groupBy和Sink.lazyInit做一
我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取.
我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素. 如果我不想合并下游的子流,我可以使用groupBy和Sink.lazyInit做一些事情: def getState(userId: UserId): Future[UserState] = ... def getUserId(element: Element): UserId = ... def treatUser(state: UserState): Sink[Element,_] = ... val treatByUser: Sink[Element] = Flow[Element].groupBy( Int.MaxValue,getUserId ).to( Sink.lazyInit( elt => getState(getUserId(elt)).map(treatUser),??? // this is never called,since the subflow is created when an element comes ) ) 但是,如果treatUser成为Flow,则这不起作用,因为Sink.lazyInit没有等价物. 由于groupBy的子流仅在推送新元素时才具体化,因此应该可以使用此元素来实现子流,但是我无法调整groupBy的源代码以使此工作始终如一.同样,Sink.lazyInitdo似乎不容易转换为Flow案例. 有关如何解决这个问题的任何想法? 解决方法
您需要关注的相关Akka问题是
#20129: add Sink.dynamic and Flow.dynamic.
在相关的PR #20579中,他们实际上实现了LazySink的东西. 他们计划下一步做LazyFlow:
不幸的是,你必须等待在Akka中实现该功能或自己编写(然后考虑公关到Akka). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐
热点阅读