scala – Spark Streaming累计字数
发布时间:2020-12-16 08:55:22 所属栏目:安全 来源:网络整理
导读:这是一个用 scala编写的火花流程序.它计算每1秒钟来自套接字的字数.结果将是单词count,例如,从0到1的单词计数,以及从1到2的单词计数.但是我想知道是否有某种方法可以改变这个程序以便我们可以累积字数?也就是说,字数从时间0到现在为止. val sparkConf = new
这是一个用
scala编写的火花流程序.它计算每1秒钟来自套接字的字数.结果将是单词count,例如,从0到1的单词计数,以及从1到2的单词计数.但是我想知道是否有某种方法可以改变这个程序以便我们可以累积字数?也就是说,字数从时间0到现在为止.
val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf,Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x,1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() 解决方法
您可以使用
StateDStream .有一个
example of stateful word count from sparks examples.
object StatefulNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val updateFunc = (values: Seq[Int],state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf,Seconds(1)) ssc.checkpoint(".") // Create a NetworkInputDStream on target ip:port and count the // words in input stream of n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0),args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x,1)) // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() ssc.start() ssc.awaitTermination() } } 它的工作方式是为每个批次获得一个Seq [T],然后更新一个类似累加器的Option [T].它是一个选项的原因是因为在第一批它将是None并保持这种方式,除非它被更新.在这个例子中,count是一个int,如果你正在处理很多数据,你可能想要一个Long或BigInt (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |