加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 创建像TextLine一样的Scalding Source,它将多个文件组

发布时间:2020-12-16 09:53:50 所属栏目:安全 来源:网络整理
导读:我们有许多需要组合的小文件.在Scalding中,您可以使用TextLine将文件作为文本行读取.问题是我们每个文件得到1个映射器,但我们想要组合多个文件,以便它们由1个映射器处理. 我知道我们需要将输入格式更改为CombineFileInputFormat的实现,这可能涉及使用Cascadi
我们有许多需要组合的小文件.在Scalding中,您可以使用TextLine将文件作为文本行读取.问题是我们每个文件得到1个映射器,但我们想要组合多个文件,以便它们由1个映射器处理.

我知道我们需要将输入格式更改为CombineFileInputFormat的实现,这可能涉及使用Cascadings CombinedHfs.我们无法弄清楚如何做到这一点,但它应该只是少数几行代码来定义我们自己的Scalding源,比如CombineTextLine.

非常感谢能够提供代码的任何人.

作为一个附带问题,我们有一些在s3中的数据,如果给出的解决方案适用于s3文件会很好 – 我想这取决于CombineFileInputFormat或CombinedHfs是否适用于s3.

解决方法

你在问题中得到了想法,所以这里有可能是你的解决方案.

创建自己的输入格式,扩展CombineFileInputFormat并使用您自己的自定义RecordReader.我正在向您展示Java代码,但如果您愿意,可以轻松地将其转换为Scala.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

public class CombinedInputFormat<K,V> extends CombineFileInputFormat<K,V> {

    public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
        private final RecordReader<LongWritable,Text> delegate;

        public MyKeyValueLineRecordReader(CombineFileSplit split,Configuration conf,Reporter reporter,Integer idx) throws IOException {
            FileSplit fileSplit = new FileSplit(split.getPath(idx),split.getOffset(idx),split.getLength(idx),split.getLocations());
            delegate = new LineRecordReader(conf,fileSplit);
        }

        @Override
        public boolean next(LongWritable key,Text value) throws IOException {
            return delegate.next(key,value);
        }

        @Override
        public LongWritable createKey() {
            return delegate.createKey();
        }

        @Override
        public Text createValue() {
            return delegate.createValue();
        }

        @Override
        public long getPos() throws IOException {
            return delegate.getPos();
        }

        @Override
        public void close() throws IOException {
            delegate.close();
        }

        @Override
        public float getProgress() throws IOException {
            return delegate.getProgress();
        }
    }

    @Override
    public RecordReader getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException {
        return new CombineFileRecordReader(job,(CombineFileSplit) split,reporter,(Class) MyKeyValueLineRecordReader.class);
    }

}

然后,您需要扩展TextLine类并使其使用您刚刚定义的自己的输入格式(从现在开始的Scala代码).

import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred.{OutputCollector,RecordReader,JobConf}
import cascading.tap.Tap
import com.twitter.scalding.{FixedPathSource,TextLineScheme}
import cascading.scheme.Scheme

class CombineFileTextLine extends TextLine{

  override def sourceConfInit(flowProcess: FlowProcess[JobConf],tap: Tap[JobConf,RecordReader[_,_],OutputCollector[_,_]],conf: JobConf) {
    super.sourceConfInit(flowProcess,tap,conf)
    conf.setInputFormat(classOf[CombinedInputFormat[String,String]])
  }
}

为您的组合输入创建一个方案.

trait CombineFileTextLineScheme extends TextLineScheme{

  override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,_,_]]
}

最后,创建您的源类:

case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme

如果要使用单个路径而不是多个路径,则对源类的更改是微不足道的.

我希望有所帮助.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读