scala – 如何在Spark Streaming中将RDD转换为DataFrame,而不仅
发布时间:2020-12-16 18:34:40 所属栏目:安全 来源:网络整理
导读:如何在Spark Streaming中将RDD转换为DataFrame,而不仅仅是Spark? 我看到了这个例子,但它需要SparkContext. val sqlContext = new SQLContext(sc) import sqlContext.implicits._rdd.toDF() 在我的情况下,我有StreamingContext.我应该在foreach中创建SparkCo
如何在Spark Streaming中将RDD转换为DataFrame,而不仅仅是Spark?
我看到了这个例子,但它需要SparkContext. val sqlContext = new SQLContext(sc) import sqlContext.implicits._ rdd.toDF() 在我的情况下,我有StreamingContext.我应该在foreach中创建SparkContext吗?它看起来太疯狂……那么,如何处理这个问题呢?我的最终目标(如果它可能有用)是使用rdd.toDF.write.format(“json”).saveAsTextFile(“s3://iiiii /ttttt.json”);来保存Amazon S3中的DataFrame,这是如果没有将RDD转换为DataFrame,则无法实现RDD(据我所知). myDstream.foreachRDD { rdd => val conf = new SparkConf().setMaster("local").setAppName("My App") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ rdd.toDF() } 解决方法
在foreachRDD之外创建sqlContext,一旦使用sqlContext将rdd转换为DF,就可以写入S3.
例如: val conf = new SparkConf().setMaster("local").setAppName("My App") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ myDstream.foreachRDD { rdd => val df = rdd.toDF() df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json") } 更新: 甚至你可以在foreachRDD中创建sqlContext,它将在Driver上执行. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |