加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > Windows > 正文

Flink window Function - ProcessAllWindowFunction

发布时间:2020-12-14 04:29:35 所属栏目:Windows 来源:网络整理
导读:package window.non_keyedimport org.apache.flink.api.common.functions.FlatMapFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apach
package window.non_keyedimport org.apache.flink.api.common.functions.FlatMapFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala._import scala.collection.mutable/** * @author: create by maoxiangyi * @version: v1.0 * @description: window * @date:2019 /6/4 */object ProcessAllWindowWordCount { def main(args: Array[String]): Unit = {  //设置环境  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()  //设置数据源  env.addSource(new SourceFunction[String] {   override def run(ctx: SourceFunction.SourceContext[String]): Unit = {    while (true) {     ctx.collect("hello hadoop hello storm hello spark")     Thread.sleep(1000)    }   }   override def cancel(): Unit = {}  })   //计算逻辑   .flatMap(_.split(" "))   .map((_,1))   .timeWindowAll(Time.seconds(10),Time.seconds(10))      .process(new ProcessAllWindowFunction[(String,Int),mutable.Map[String,Int],TimeWindow] {    override def process(context: Context,elements: Iterable[(String,Int)],out: Collector[mutable.Map[String,Int]]): Unit = {     val wordCountMap = mutable.Map[String,Int]()     elements.foreach(kv => {      wordCountMap.put(kv._1,wordCountMap.get(kv._1).getOrElse(0) + kv._2)     })     out.collect(wordCountMap)    }   }).flatMap(new FlatMapFunction[mutable.Map[String,(String,Int)] {   override def flatMap(value: mutable.Map[String,out: Collector[(String,Int)]): Unit = {    value.foreach(out.collect(_))   }  })   .print()  //提交任务  env.execute("word count") }}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读