加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何使用Avro文件上的架构加载Spark中的Avros?

发布时间:2020-12-16 19:07:15 所属栏目:安全 来源:网络整理
导读:我正在从Cloudera包裹运行带有Spark 0.9.0的CDH 4.4. 我有一堆通过猪的AvroStorage UDF创建的Avro文件.我想在Spark中加载这些文件,使用通用记录或Avro文件上的架构.到目前为止我已经尝试过: import org.apache.avro.mapred.AvroKeyimport org.apache.avro.m
我正在从Cloudera包裹运行带有Spark 0.9.0的CDH 4.4.

我有一堆通过猪的AvroStorage UDF创建的Avro文件.我想在Spark中加载这些文件,使用通用记录或Avro文件上的架构.到目前为止我已经尝试过:

import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader,GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)

val fsInput = new FsInput(inPath,sc.hadoopConfiguration)
val reader =  new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput,reader)
val schemaString = dataFileReader.getSchema

val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext)  {
  buf += dataFileReader.next
}
sc.parallelize(buf)

这适用于一个文件,但它不能扩展 – 我正在将所有数据加载到本地RAM中,然后将其分布在火花节点上.

解决方法

回答我自己的问题:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import java.io.BufferedInputStream
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader,GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.hadoop.mapred.JobConf
import java.io.File
import java.net.URI

// spark-shell -usejavacp -classpath "*.jar"

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"

val jobConf= new JobConf(sc.hadoopConfiguration)
val rdd = sc.hadoopFile(
  input,classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],classOf[org.apache.hadoop.io.NullWritable],10)
val f1 = rdd.first
val a = f1._1.datum
a.get("rawLog") // Access avro fields

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读