scala – 如何在foreachPartition中使用SQLContext和SparkContex
发布时间:2020-12-16 09:25:07 所属栏目:安全 来源:网络整理
导读:我想在foreachPartition中使用SparkContext和SQLContext,但由于序列化错误而无法执行此操作.我知道这两个对象都不是可序列化的,但我认为foreachPartition是在master上执行的,其中Spark Context和SQLContext都可用. 符号: `msg - Map[String,String]``result
我想在foreachPartition中使用SparkContext和SQLContext,但由于序列化错误而无法执行此操作.我知道这两个对象都不是可序列化的,但我认为foreachPartition是在master上执行的,其中Spark Context和SQLContext都可用.
符号: `msg -> Map[String,String]` `result -> Iterable[Seq[Row]]` 这是我当前的代码(UtilsDM是一个扩展Serializable的对象).失败的代码部分从val schema = …开始,我想将结果写入DataFrame,然后将其保存到Parquet.也许我组织代码的方式效率低下,那么我想在此提出您的建议.谢谢. // Here I am creating df from parquet file on S3 val exists = FileSystem.get(new URI("s3n://" + bucketNameCode),sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages)) var df: DataFrame = null if (exists) { df = sqlContext .read.parquet("s3n://bucket/pathToParquetFile") } UtilsDM.setDF(df) // Here I process myDStream myDStream.foreachRDD(rdd => { rdd.foreachPartition{iter => val r = new RedisClient(UtilsDM.getHost,UtilsDM.getPort) val producer = UtilsDM.createProducer var df = UtilsDM.getDF val result = iter.map{ msg => // ... Seq(msg("key"),msg("value")) } // HERE I WANT TO WRITE result TO S3,BUT IT FAILS val schema = StructType( StructField("key",StringType,true) :: StructField("value",true) result.foreach { row => val rdd = sc.makeRDD(row) val df2 = sqlContext.createDataFrame(rdd,schema) // If the parquet file is not created,then create it var df_final: DataFrame = null if (df != null) { df_final = df.unionAll(df2) } else { df_final = df2 } df_final.write.parquet("s3n://bucket/pathToSentMessages) } } }) 编辑: 我使用的是Spark 1.6.2和Scala 2.10.6. 解决方法
这不可能. SparkContext,SQLContext和SparkSession只能在驱动程序上使用.您可以在foreachRDD的顶级使用sqlContext:
myDStream.foreachRDD(rdd => { val df = sqlContext.createDataFrame(rdd,schema) ... }) 您无法在转换/操作中使用它: myDStream.foreachRDD(rdd => { rdd.foreach { val df = sqlContext.createDataFrame(...) ... } }) 你可能想要相当于: myDStream.foreachRDD(rdd => { val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter)) val df = sqlContext.createDataFrame(foo,schema) df.write.parquet("s3n://bucket/pathToSentMessages) }) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |