scala – 创建像TextLine一样的Scalding Source,它将多个文件组
发布时间:2020-12-16 09:53:50 所属栏目:安全 来源:网络整理
导读:我们有许多需要组合的小文件.在Scalding中,您可以使用TextLine将文件作为文本行读取.问题是我们每个文件得到1个映射器,但我们想要组合多个文件,以便它们由1个映射器处理. 我知道我们需要将输入格式更改为CombineFileInputFormat的实现,这可能涉及使用Cascadi
我们有许多需要组合的小文件.在Scalding中,您可以使用TextLine将文件作为文本行读取.问题是我们每个文件得到1个映射器,但我们想要组合多个文件,以便它们由1个映射器处理.
我知道我们需要将输入格式更改为CombineFileInputFormat的实现,这可能涉及使用Cascadings CombinedHfs.我们无法弄清楚如何做到这一点,但它应该只是少数几行代码来定义我们自己的Scalding源,比如CombineTextLine. 非常感谢能够提供代码的任何人. 作为一个附带问题,我们有一些在s3中的数据,如果给出的解决方案适用于s3文件会很好 – 我想这取决于CombineFileInputFormat或CombinedHfs是否适用于s3. 解决方法
你在问题中得到了想法,所以这里有可能是你的解决方案.
创建自己的输入格式,扩展CombineFileInputFormat并使用您自己的自定义RecordReader.我正在向您展示Java代码,但如果您愿意,可以轻松地将其转换为Scala. import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileRecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; public class CombinedInputFormat<K,V> extends CombineFileInputFormat<K,V> { public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> { private final RecordReader<LongWritable,Text> delegate; public MyKeyValueLineRecordReader(CombineFileSplit split,Configuration conf,Reporter reporter,Integer idx) throws IOException { FileSplit fileSplit = new FileSplit(split.getPath(idx),split.getOffset(idx),split.getLength(idx),split.getLocations()); delegate = new LineRecordReader(conf,fileSplit); } @Override public boolean next(LongWritable key,Text value) throws IOException { return delegate.next(key,value); } @Override public LongWritable createKey() { return delegate.createKey(); } @Override public Text createValue() { return delegate.createValue(); } @Override public long getPos() throws IOException { return delegate.getPos(); } @Override public void close() throws IOException { delegate.close(); } @Override public float getProgress() throws IOException { return delegate.getProgress(); } } @Override public RecordReader getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException { return new CombineFileRecordReader(job,(CombineFileSplit) split,reporter,(Class) MyKeyValueLineRecordReader.class); } } 然后,您需要扩展TextLine类并使其使用您刚刚定义的自己的输入格式(从现在开始的Scala代码). import cascading.scheme.hadoop.TextLine import cascading.flow.FlowProcess import org.apache.hadoop.mapred.{OutputCollector,RecordReader,JobConf} import cascading.tap.Tap import com.twitter.scalding.{FixedPathSource,TextLineScheme} import cascading.scheme.Scheme class CombineFileTextLine extends TextLine{ override def sourceConfInit(flowProcess: FlowProcess[JobConf],tap: Tap[JobConf,RecordReader[_,_],OutputCollector[_,_]],conf: JobConf) { super.sourceConfInit(flowProcess,tap,conf) conf.setInputFormat(classOf[CombinedInputFormat[String,String]]) } } 为您的组合输入创建一个方案. trait CombineFileTextLineScheme extends TextLineScheme{ override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,_,_]] } 最后,创建您的源类: case class CombineFileMultipleTextLine(p : String*) extends FixedPathSource(p :_*) with CombineFileTextLineScheme 如果要使用单个路径而不是多个路径,则对源类的更改是微不足道的. 我希望有所帮助. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- scala – 将IndexToString应用于Spark中的特征向量
- angular – 如何在汇总中导入stompjs
- bootstrap、jQuery、Ajax数据分页
- AngularJS中的单向,双向,3向,……或更多数据绑定?
- twitter-bootstrap – Twitter Bootstrap:如何使顶部固定导
- Scala的Vector如何工作?
- angularjs – 使用ngResource时的空视图
- 比WebService更简单的远程调用方案框架:Hessian
- bash – “<<(command-here)”shell成语导致“重定向意外
- 浅谈安装ORACLE时在Linux上设置内核参数的含义