Flink window Function - ProcessAllWindowFunction
发布时间:2020-12-14 04:29:35 所属栏目:Windows 来源:网络整理
导读:package window.non_keyedimport org.apache.flink.api.common.functions.FlatMapFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apach
package window.non_keyedimport org.apache.flink.api.common.functions.FlatMapFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala._import scala.collection.mutable/** * @author: create by maoxiangyi * @version: v1.0 * @description: window * @date:2019 /6/4 */object ProcessAllWindowWordCount { def main(args: Array[String]): Unit = { //设置环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment() //设置数据源 env.addSource(new SourceFunction[String] { override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (true) { ctx.collect("hello hadoop hello storm hello spark") Thread.sleep(1000) } } override def cancel(): Unit = {} }) //计算逻辑 .flatMap(_.split(" ")) .map((_,1)) .timeWindowAll(Time.seconds(10),Time.seconds(10)) .process(new ProcessAllWindowFunction[(String,Int),mutable.Map[String,Int],TimeWindow] { override def process(context: Context,elements: Iterable[(String,Int)],out: Collector[mutable.Map[String,Int]]): Unit = { val wordCountMap = mutable.Map[String,Int]() elements.foreach(kv => { wordCountMap.put(kv._1,wordCountMap.get(kv._1).getOrElse(0) + kv._2) }) out.collect(wordCountMap) } }).flatMap(new FlatMapFunction[mutable.Map[String,(String,Int)] { override def flatMap(value: mutable.Map[String,out: Collector[(String,Int)]): Unit = { value.foreach(out.collect(_)) } }) .print() //提交任务 env.execute("word count") }} (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- windows – 如何在批处理脚本中使用通配符和需要特定文件名
- iis – Windows Web Server是否应该是Active Directory域的
- Windows – 硬盘坏扇区/故障分析工具
- windows-7 – 我应该在开发PC上安装哪个版本的Windows 7?
- 如何在Windows Azure中安装和使用COM对象?
- windows – 命令行 – 仅在使用for命令循环时如何提取文件名
- windows – 如何在不同的子网中设置VPN
- windows-server-2003 – 用户无法访问Windows Server 2003中
- windows – 无法将2012服务器添加到2003域,因为它表示forre
- Node的安装,Windows下常用的命令行操作
推荐文章
站长推荐
- windows-services – 在Windows服务中调用时ssh失
- windows-server-2008 – 使用perfmon监控服务
- Windows下安装Redis
- windows – 如何阻止单个GPO的继承/应用?
- MSBuild由于错误APPX0002失败:任务’GenerateAp
- 从Windows C#Canon SDK与PTP或MTP对齐图片
- 实体框架 – 使用Microsoft Access的实体框架
- 如何在Windows Phone 8上使用NFC标签启动应用程序
- xaml – x中ElementName的替代:与DataTemplates
- 如何检查已修补已知的Windows漏洞?
热点阅读