scala – 对于DStream中的每个RDD,如何将其转换为数组或其他典型
发布时间:2020-12-16 19:07:59 所属栏目:安全 来源:网络整理
导读:我想将DStream转换为数组,列表等,然后我可以将其转换为json并在端点上提供它.我正在使用apache spark,注入twitter数据.如何在Dstream状态下执行此操作?除了print()之外,我似乎无法获得任何工作. import org.apache.spark._import org.apache.spark.SparkCon
我想将DStream转换为数组,列表等,然后我可以将其转换为json并在端点上提供它.我正在使用apache spark,注入twitter数据.如何在Dstream状态下执行此操作?除了print()之外,我似乎无法获得任何工作.
import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ import TutorialHelper._ object Tutorial { def main(args: Array[String]) { // Location of the Spark directory val sparkHome = "/opt/spark" // URL of the Spark cluster val sparkUrl = "local[8]" // Location of the required JAR files val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar" // HDFS directory for checkpointing val checkpointDir = "/tmp" // Configure Twitter credentials using twitter.txt TutorialHelper.configureTwitterCredentials() val ssc = new StreamingContext(sparkUrl,"Tutorial",Seconds(1),sparkHome,Seq(jarFile)) val filters = Array("#americasgottalent","iamawesome") val tweets = TwitterUtils.createStream(ssc,None,filters) val statuses = tweets.map(status => status.getText()) val arry = Array("firstval") statuses.foreachRDD { arr :+ _.collect() } ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() } } 解决方法
如果你的RDD是你可以做的状态.
val arr = new ArrayBuffer[String](); statuses.foreachRDD { arr ++= _.collect() //you can now put it in an array or d w/e you want with it ... } 请记住,由于DStream可能很大,因此最终可能会比您在驱动程序中拥有的数据更多. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |