加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在foreachPartition中使用SQLContext和SparkContex

发布时间:2020-12-16 09:25:07 所属栏目:安全 来源:网络整理
导读:我想在foreachPartition中使用SparkContext和SQLContext,但由于序列化错误而无法执行此操作.我知道这两个对象都不是可序列化的,但我认为foreachPartition是在master上执行的,其中Spark Context和SQLContext都可用. 符号: `msg - Map[String,String]``result
我想在foreachPartition中使用SparkContext和SQLContext,但由于序列化错误而无法执行此操作.我知道这两个对象都不是可序列化的,但我认为foreachPartition是在master上执行的,其中Spark Context和SQLContext都可用.

符号:

`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`

这是我当前的代码(UtilsDM是一个扩展Serializable的对象).失败的代码部分从val schema = …开始,我想将结果写入DataFrame,然后将其保存到Parquet.也许我组织代码的方式效率低下,那么我想在此提出您的建议.谢谢.

// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode),sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
  df = sqlContext
    .read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)

// Here I process myDStream
myDStream.foreachRDD(rdd => {
  rdd.foreachPartition{iter =>
    val r = new RedisClient(UtilsDM.getHost,UtilsDM.getPort)
    val producer = UtilsDM.createProducer
    var df = UtilsDM.getDF
    val result = iter.map{ msg =>
        // ... 
        Seq(msg("key"),msg("value"))
    }

    // HERE I WANT TO WRITE result TO S3,BUT IT FAILS
    val schema = StructType(
                    StructField("key",StringType,true) ::
                    StructField("value",true)

    result.foreach { row =>
       val rdd = sc.makeRDD(row)
       val df2 = sqlContext.createDataFrame(rdd,schema)

       // If the parquet file is not created,then create it
       var df_final: DataFrame = null
       if (df != null) {
          df_final = df.unionAll(df2)
       } else {
          df_final = df2
       }
       df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
  }
})

编辑:

我使用的是Spark 1.6.2和Scala 2.10.6.

解决方法

这不可能. SparkContext,SQLContext和SparkSession只能在驱动程序上使用.您可以在foreachRDD的顶级使用sqlContext:

myDStream.foreachRDD(rdd => {
     val df = sqlContext.createDataFrame(rdd,schema)
     ... 
 })

您无法在转换/操作中使用它:

myDStream.foreachRDD(rdd => {
     rdd.foreach { 
        val df = sqlContext.createDataFrame(...)
        ... 
     }
 })

你可能想要相当于:

myDStream.foreachRDD(rdd => {
   val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter))
   val df = sqlContext.createDataFrame(foo,schema)
   df.write.parquet("s3n://bucket/pathToSentMessages)
})

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读