scala – 如何动态地将Source添加到现有Graph?
发布时间:2020-12-16 09:55:43 所属栏目:安全 来源:网络整理
导读:什么可以替代动态更改运行图?这是我的情况.我有图表将文章摄入DB.文章来自3种不同格式的插件.因此,我有几个流程 val converterFlow1: Flow[ImpArticle,Article,NotUsed]val converterFlow2: Flow[NewsArticle,NotUsed]val sinkDB: Sink[Article,Future[Done
什么可以替代动态更改运行图?这是我的情况.我有图表将文章摄入DB.文章来自3种不同格式的插件.因此,我有几个流程
val converterFlow1: Flow[ImpArticle,Article,NotUsed] val converterFlow2: Flow[NewsArticle,NotUsed] val sinkDB: Sink[Article,Future[Done]] // These are being created every time I poll plugins val sourceContentProvider : Source[ImpArticle,NotUsed] val sourceNews : Source[NewsArticle,NotUsed] val sourceCit : Source[Article,NotUsed] val merged = Source.combine( sourceContentProvider.via(converterFlow1),sourceNews.via(converterFlow2),sourceCit)(Merge(_)) val res = merged .buffer(10,OverflowStrategy.backpressure) .toMat(sinkDB)(Keep.both) .run() 问题是我每24小时从内容提供商处获取一次数据,每2小时一次从新闻获取数据,最后一个来源可能随时来自,因为它来自人类. 我意识到图形是不可变的,但我如何定期将Source的新实例附加到我的图形中,以便我对摄取过程进行单点限制? 更新:您可以说我的数据是Source-s的流,在我的情况下有三个来源.但是我不能改变它,因为我从外部类(所谓的插件)获取Source的实例.这些插件独立于我的摄取类.我无法将它们组合成一个巨大的类来拥有单一的Source. 解决方法
好的,一般来说,正确的方法是将一个源流加入一个源,即从Source [Source [T,_],Whatever]到Source [T,Whatever].这可以使用
flatMapConcat 或
flatMapMerge 来完成.因此,如果您可以获取Source [Source [Article,NotUsed],您可以使用flatMap *变体之一并获取最终的Source [Article,NotUsed].为你的每个来源(没有双关语)做,然后你的原始方法应该工作.
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容