加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark:写入Avro文件

发布时间:2020-12-16 09:20:59 所属栏目:安全 来源:网络整理
导读:我在Spark,我有一个从Avro文件的RDD.我现在想对该RDD进行一些转换,并将其另存为Avro文件: val job = new Job(new Configuration())AvroJob.setOutputKeySchema(job,getOutputSchema(inputSchema))rdd.map(elem = (new SparkAvroKey(doTransformation(elem._
我在Spark,我有一个从Avro文件的RDD.我现在想对该RDD进行一些转换,并将其另存为Avro文件:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job,getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)),elem._2))
   .saveAsNewAPIHadoopFile(outputPath,classOf[AvroKey[GenericRecord]],classOf[org.apache.hadoop.io.NullWritable],classOf[AvroKeyOutputFormat[GenericRecord]],job.getConfiguration)

运行这个Spark时,会抱怨Schema $recordSchema是不可序列化的.

如果我取消注释.map调用(并且只有rdd.saveAsNewAPIHadoopFile),则调用成功.

我在这里做错了什么?

任何想法?

解决方法

这里的问题与Job中使用的avro.Schema类的不可串行化有关.当您尝试从map函数中的代码引用架构对象时抛出异常.

例如,如果您尝试执行以下操作,您将获得“任务不可序列化”异常:

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
  // reference to the schema object declared outside
  val record = new GenericData.Record(schema)
})

您可以通过在功能块中创建一个新的模式实例来使所有工作都可以正常工作:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures,it's for other purposes
...
rdd.map(t => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
  val record = new GenericData.Record(innserSchema)
  ...
})

由于您不希望为处理的每个记录解析avro架构,所以更好的解决方案是在分区级别解析模式.以下也有效:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures,it's for other purposes
...
rdd.mapPartitions(tuples => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

  tuples.map(t => {
    val record = new GenericData.Record(innserSchema)
    ...
    // this closure will be bundled together with the outer one 
    // (no serialization issues)
  })
})

上述代码只要您提供对jsonSchema文件的便携式引用,因为映射函数将由多个远程执行程序执行.它可以是对HDFS中的文件的引用,也可以与JAR中的应用程序一起打包(在后一种情况下您将使用类加载器函数获取其内容).

对于那些试图使用Avro与Spark的人,请注意,仍然有一些未解决的编译问题,您必须在Maven POM上使用以下导入:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
<dependency>

注意“hadoop2”分类器.您可以在https://issues.apache.org/jira/browse/SPARK-3039跟踪问题.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读