scala – Flink:如何将弃用的折叠转换为聚合?
发布时间:2020-12-16 18:13:36 所属栏目:安全 来源:网络整理
导读:我正在关注Flink: Monitoring the Wikipedia Edit Stream的快速启动示例. 这个例子是用Java编写的,我在Scala中实现它,如下所示: /** * Wikipedia Edit Monitoring */object WikipediaEditMonitoring { def main(args: Array[String]) { // set up the exec
我正在关注Flink:
Monitoring the Wikipedia Edit Stream的快速启动示例.
这个例子是用Java编写的,我在Scala中实现它,如下所示: /** * Wikipedia Edit Monitoring */ object WikipediaEditMonitoring { def main(args: Array[String]) { // set up the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) val result = edits.keyBy( _.getUser ) .timeWindow(Time.seconds(5)) .fold(("",0L)) { (acc: (String,Long),event: WikipediaEditEvent) => { (event.getUser,acc._2 + event.getByteDiff) } } result.print // execute program env.execute("Wikipedia Edit Monitoring") } } 但是,Flink中的fold函数已被弃用,建议使用聚合函数. 但是我没有找到关于如何将弃用的折叠转换为聚合的示例或教程. 知道怎么做吗?可能不仅仅是通过应用聚合. UPDATE 我有另一个实现如下: /** * Wikipedia Edit Monitoring */ object WikipediaEditMonitoring { def main(args: Array[String]) { // set up the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource) val result = edits .map( e => UserWithEdits(e.getUser,e.getByteDiff) ) .keyBy( "user" ) .timeWindow(Time.seconds(5)) .sum("edits") result.print // execute program env.execute("Wikipedia Edit Monitoring") } /** Data type for words with count */ case class UserWithEdits(user: String,edits: Long) } 我也想知道如何使用自定义 UPDATE 我按照此文档:AggregateFunction,但有以下问题: 在版本1.3的接口 void add(IN value,ACC accumulator); 但是对于版本1.4 ACC add(IN value,ACC accumulator); 我该怎么处理? 我使用的Flink版本是1.3.2,此版本的文档没有AggregateFunction,但是还没有版本1.4. 解决方法
您将找到AggregateFunction
in the Flink 1.4 docs的一些文档,包括一个示例.
1.3.2中包含的版本仅限于与可变累加器类型一起使用,其中add操作修改累加器.这已经是fixed for Flink 1.4,但还没有发布. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- angularjs – Internet Explorer 8上的Angular JS ui-route
- 利用shell的expect实现自动登录服务器
- Unix/Linux编程实践教程–cat在OS X的实现
- angularjs – 如何使用ui-grid在网格中在运行时添加列
- 部署YUM软件仓库
- 在jar中包含Hyperic Sigar库,同时为Scala项目使用sbt程序集
- node.js – 学习NodeJS和MongoDB Docker组成
- angularjs+bootstrap实现自定义分页的实例代码
- 使用Shell脚本递归删除Linux中的所有备份文件[复制]
- scala – 用Akka HTTP创建基本的HTTP Post请求的惯用方式