加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – groupBy的子流是否可以依赖于它们生成的密钥?

发布时间:2020-12-16 18:49:38 所属栏目:安全 来源:网络整理
导读:我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取. 我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素. 如果我不想合并下游的子流,我可以使用groupBy和Sink.lazyInit做一
我有一个与用户相关的数据流.我也为每个用户都有一个状态,我可以从DB异步获取.

我想用每个用户的一个子流分离我的流,并在实现子流时为每个用户加载状态,以便可以相对于该状态处理子流的元素.

如果我不想合并下游的子流,我可以使用groupBy和Sink.lazyInit做一些事情:

def getState(userId: UserId): Future[UserState] = ...
def getUserId(element: Element): UserId = ...
def treatUser(state: UserState): Sink[Element,_] = ...

val treatByUser: Sink[Element] = Flow[Element].groupBy(
  Int.MaxValue,getUserId
).to(
  Sink.lazyInit(
    elt => getState(getUserId(elt)).map(treatUser),??? // this is never called,since the subflow is created when an element comes
  )
)

但是,如果treatUser成为Flow,则这不起作用,因为Sink.lazyInit没有等价物.

由于groupBy的子流仅在推送新元素时才具体化,因此应该可以使用此元素来实现子流,但是我无法调整groupBy的源代码以使此工作始终如一.同样,Sink.lazyInitdo似乎不容易转换为Flow案例.

有关如何解决这个问题的任何想法?

解决方法

您需要关注的相关Akka问题是 #20129: add Sink.dynamic and Flow.dynamic.

在相关的PR #20579中,他们实际上实现了LazySink的东西.

他们计划下一步做LazyFlow:

Will do next lazyFlow with similar signature.

不幸的是,你必须等待在Akka中实现该功能或自己编写(然后考虑公关到Akka).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读