加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何动态地将Source添加到现有Graph?

发布时间:2020-12-16 09:55:43 所属栏目:安全 来源:网络整理
导读:什么可以替代动态更改运行图?这是我的情况.我有图表将文章摄入DB.文章来自3种不同格式的插件.因此,我有几个流程 val converterFlow1: Flow[ImpArticle,Article,NotUsed]val converterFlow2: Flow[NewsArticle,NotUsed]val sinkDB: Sink[Article,Future[Done
什么可以替代动态更改运行图?这是我的情况.我有图表将文章摄入DB.文章来自3种不同格式的插件.因此,我有几个流程

val converterFlow1: Flow[ImpArticle,Article,NotUsed]
val converterFlow2: Flow[NewsArticle,NotUsed]
val sinkDB: Sink[Article,Future[Done]]

// These are being created every time I poll plugins    
val sourceContentProvider : Source[ImpArticle,NotUsed]
val sourceNews : Source[NewsArticle,NotUsed]
val sourceCit : Source[Article,NotUsed]

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1),sourceNews.via(converterFlow2),sourceCit)(Merge(_))

val res = merged
  .buffer(10,OverflowStrategy.backpressure)
  .toMat(sinkDB)(Keep.both)
  .run()

问题是我每24小时从内容提供商处获取一次数据,每2小时一次从新闻获取数据,最后一个来源可能随时来自,因为它来自人类.

我意识到图形是不可变的,但我如何定期将Source的新实例附加到我的图形中,以便我对摄取过程进行单点限制?

更新:您可以说我的数据是Source-s的流,在我的情况下有三个来源.但是我不能改变它,因为我从外部类(所谓的插件)获取Source的实例.这些插件独立于我的摄取类.我无法将它们组合成一个巨大的类来拥有单一的Source.

解决方法

好的,一般来说,正确的方法是将一个源流加入一个源,即从Source [Source [T,_],Whatever]到Source [T,Whatever].这可以使用 flatMapConcatflatMapMerge来完成.因此,如果您可以获取Source [Source [Article,NotUsed],您可以使用flatMap *变体之一并获取最终的Source [Article,NotUsed].为你的每个来源(没有双关语)做,然后你的原始方法应该工作.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读