加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Flink:如何将弃用的折叠转换为聚合?

发布时间:2020-12-16 18:13:36 所属栏目:安全 来源:网络整理
导读:我正在关注Flink: Monitoring the Wikipedia Edit Stream的快速启动示例. 这个例子是用Java编写的,我在Scala中实现它,如下所示: /** * Wikipedia Edit Monitoring */object WikipediaEditMonitoring { def main(args: Array[String]) { // set up the exec
我正在关注Flink: Monitoring the Wikipedia Edit Stream的快速启动示例.

这个例子是用Java编写的,我在Scala中实现它,如下所示:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits.keyBy( _.getUser )
      .timeWindow(Time.seconds(5))
      .fold(("",0L)) {
        (acc: (String,Long),event: WikipediaEditEvent) => {
          (event.getUser,acc._2 + event.getByteDiff)
        }
      }

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }
}

但是,Flink中的fold函数已被弃用,建议使用聚合函数.

enter image description here

但是我没有找到关于如何将弃用的折叠转换为聚合的示例或教程.

知道怎么做吗?可能不仅仅是通过应用聚合.

UPDATE

我有另一个实现如下:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits
      .map( e => UserWithEdits(e.getUser,e.getByteDiff) )
      .keyBy( "user" )
      .timeWindow(Time.seconds(5))
      .sum("edits")

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }

  /** Data type for words with count */
  case class UserWithEdits(user: String,edits: Long)
}

我也想知道如何使用自定义AggregateFunction进行实现.

UPDATE

我按照此文档:AggregateFunction,但有以下问题:

在版本1.3的接口AggregateFunction的源代码中,您将看到add确实返回void:

void add(IN value,ACC accumulator);

但是对于版本1.4 AggregateFunction,正在返回:

ACC add(IN value,ACC accumulator);

我该怎么处理?

我使用的Flink版本是1.3.2,此版本的文档没有AggregateFunction,但是还没有版本1.4.

enter image description here

解决方法

您将找到AggregateFunction in the Flink 1.4 docs的一些文档,包括一个示例.

1.3.2中包含的版本仅限于与可变累加器类型一起使用,其中add操作修改累加器.这已经是fixed for Flink 1.4,但还没有发布.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读