加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在Spark Streaming中将RDD转换为DataFrame,而不仅

发布时间:2020-12-16 18:34:40 所属栏目:安全 来源:网络整理
导读:如何在Spark Streaming中将RDD转换为DataFrame,而不仅仅是Spark? 我看到了这个例子,但它需要SparkContext. val sqlContext = new SQLContext(sc) import sqlContext.implicits._rdd.toDF() 在我的情况下,我有StreamingContext.我应该在foreach中创建SparkCo
如何在Spark Streaming中将RDD转换为DataFrame,而不仅仅是Spark?

我看到了这个例子,但它需要SparkContext.

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

在我的情况下,我有StreamingContext.我应该在foreach中创建SparkContext吗?它看起来太疯狂……那么,如何处理这个问题呢?我的最终目标(如果它可能有用)是使用rdd.toDF.write.format(“json”).saveAsTextFile(“s3://iiiii /ttttt.json”);来保存Amazon S3中的DataFrame,这是如果没有将RDD转换为DataFrame,则无法实现RDD(据我所知).

myDstream.foreachRDD { rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
}

解决方法

在foreachRDD之外创建sqlContext,一旦使用sqlContext将rdd转换为DF,就可以写入S3.

例如:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}

更新:

甚至你可以在foreachRDD中创建sqlContext,它将在Driver上执行.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读