加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > Windows > 正文

Flink Time 与 Window

发布时间:2020-12-14 05:33:14 所属栏目:Windows 来源:网络整理
导读:6.Time 与 Window 6.1 Time 在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示: ? ? Event Time :是事件创建的时间。它通常由事件中的时间戳描述,例如采集的 日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事 件时

6.Time 与 Window

6.1 Time
在 Flink 的流式处理中,会涉及到时间的不同概念,如下图所示:
?

?
   Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的
日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事
件时间戳。
   Ingestion Time:是数据进入 Flink 的时间。
   Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器
相关,默认的时间属性就是 Processing Time。
  例如,一条日志进入 Flink 的时间为 2017-11-12 10:00:00.123,到达 Window 的
系统时间为 2017-11-12 10:00:01.234,日志的内容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
  对于业务来说,要统计 1min 内的故障日志个数,哪个时间是最有意义的?——
eventTime,因为我们要根据日志的生成时间进行统计。
?
?
?
?

6.2 Window

6.2.1 Window 概述

  streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限
数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据
为有限块进行处理的手段。
  Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大
小的”buckets”桶,我们可以在这些桶上做计算操作。
?
?

6.2.2 Window 类型

  Window 可以分成两类:
  ? CountWindow:按照指定的数据条数生成一个 Window,与时间无关。
  ? TimeWindow:按照时间生成 Window。
  对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling
Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
?
   1. 滚动窗口(Tumbling Windows)
   将数据依据固定的窗口长度对数据进行切片。
   特点: 时间对齐,窗口长度固定,没有重叠。
  滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一
个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗
口,窗口的创建如下图所示:

   适用场景:适合做 BI 统计等(做每个时间段的聚合计算)。
?
?
   2. 滑动窗口(Sliding Windows)
  滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动
间隔组成。
   特点: 时间对齐,窗口长度固定,有重叠。
  滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大
小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,
滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素
会被分配到多个窗口中。
  例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包
含着上个 10 分钟产生的数据,如下图所示:
?

   适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定
是否要报警)。
?
?
  3. 会话窗口(Session Windows)
   由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的
session,也就是一段时间没有接收到新数据就会生成新的窗口。
   特点: 时间无对齐。
  session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗
口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反, 当它
在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关
。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃
周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将
被分配到新的 session 窗口中去。
?

?

?
?

6.3 Window API

6.3.1 CountWindow

  CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素
数量达到窗口大小的 key 对应的结果。
  注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输
入的所有元素的总数。
?
1 滚动窗口
  默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量
达到窗口大小时,就会触发窗口的执行。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource val stream = env.socketTextStream("localhost",22222)
// 对 stream 进行处理并按 key 聚合 val streamKeyBy = stream.map(item => (item.split(" ")(0),item.split(" ")(1).toLong)).keyBy(0)
// 引入滚动窗口 // 这里的 5 指的是 5 个相同 key 的元素计算一次 val streamWindow = streamKeyBy.countWindow(5)
// 执行聚合操作 val streamReduce = streamWindow.reduce(   (item1,item2) => (item1._1,item1._2 + item2._2) )
// 将聚合数据写入文件 streamReduce.print()
// 执行程序 env.execute("TumblingWindow")

?

?

2 滑动窗口
  滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
数,一个是 window_size,一个是 sliding_size。
  下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据
就计算一次,每一次计算的 window 范围是 5 个元素。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource val stream = env.socketTextStream("localhost",item.split(" ")(1).toLong)).keyBy(0)
// 引入滚动窗口 // 当相同 key 的元素个数达到 2 个时,触发窗口计算,计算的窗口范围为 5 val streamWindow = streamKeyBy.countWindow(5,2)
// 执行聚合操作 val streamReduce = streamWindow.reduce(   (item1,item1._2 + item2._2) )
// 将聚合数据写入文件 streamReduce.print()
// 执行程序 env.execute("TumblingWindow") }

?

?
?

6.3.2 TimeWindow

  TimeWindow 是将指定时间范围内的所有数据组成一个 window,一次对一个
window 里面的所有数据进行计算。
?
1. 滚动窗口
  Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的
数据根据进入 Flink 的时间划分到不同的窗口中。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource val stream = env.socketTextStream("localhost",22222)
// 对 stream 进行处理并按 key 聚合 val streamKeyBy = stream.map(item => (item,1)).keyBy(0)
// 引入时间窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作 val streamReduce = streamWindow.reduce(   (item1,item1._2 + item2._2) )
// 将聚合数据写入文件 streamReduce.print()
// 执行程序 env.execute("TumblingWindow")
  时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其
中的一个来指定。
?
?
2. 滑动窗口(SlidingEventTimeWindows)
  滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参
数,一个是 window_size,一个是 sliding_size。
  下面代码中的 sliding_size 设置为了 2s,也就是说,窗口每 2s 就计算一次,每
一次计算的 window 范围是 5s 内的所有元素。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource val stream = env.socketTextStream("localhost",1)).keyBy(0)
// 引入滚动窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5),Time.seconds(2))
// 执行聚合操作 val streamReduce = streamWindow.reduce(   (item1,item1._2 + item2._2) )
  
// 将聚合数据写入文件 streamReduce.print()
// 执行程序 env.execute("TumblingWindow")
  时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其
中的一个来指定。

?

?
?

6.3.3 Window Reduce

  WindowedStream → DataStream:给 window 赋一个 reduce 功能的函数,并
返回一个聚合的结果。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource val stream = env.socketTextStream("localhost",item1._2 + item2._2) )
// 将聚合数据写入文件 streamReduce.print()
// 执行程序 env.execute("TumblingWindow")

?

?

?

6.3.4 Window Fold

   WindowedStream → DataStream:给窗口赋一个 fold 功能的函数,并返回一
个 fold 后的结果。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建 SocketSource
val stream = env.socketTextStream("localhost",22222,‘n‘,3)

// 对 stream 进行处理并按 key 聚合
val streamKeyBy = stream.map(item => (item,1)).keyBy(0)

// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

// 执行 fold 操作
val streamFold = streamWindow.fold(100){
  (begin,item) =>
    begin + item._2
}

// 将聚合数据写入文件
streamFold.print()

// 执行程序
env.execute("TumblingWindow")

?

?

6.3.5 Aggregation on Window

   WindowedStream → DataStream:对一个 window 内的所有元素做聚合操作。
min 和 minBy 的区别是 min 返回的是最小值,而 minBy 返回的是包含最小值字段
的元素(同样的原理适用于 max 和 maxBy)。
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 SocketSource val stream = env.socketTextStream("localhost",item.split(" ")(1))).keyBy(0)
// 引入滚动窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作 val streamMax = streamWindow.max(1)
// 将聚合数据写入文件 streamMax.print()
// 执行程序 env.execute("TumblingWindow")

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读