加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

hadoop2-MapReduce详解

发布时间:2020-12-14 06:32:22 所属栏目:Java 来源:网络整理
导读:本文是对Hadoop2.2.0版本的MapReduce进行详细讲解。请大家要注意版本,因为Hadoop的不同版本,源码可能是不同的。 以下是本文的大纲: 若有不正之处,还请多多谅解,并希望批评指正。 请尊重作者劳动成果,转发请标明blog地址 1.获取源码 大家可以下载Hbase

本文是对Hadoop2.2.0版本的MapReduce进行详细讲解。请大家要注意版本,因为Hadoop的不同版本,源码可能是不同的。

以下是本文的大纲:

若有不正之处,还请多多谅解,并希望批评指正。

请尊重作者劳动成果,转发请标明blog地址

1.获取源码

大家可以下载Hbase

Hbase:?

在里面就包含了Hadoop2.2.0版本的jar文件和源码。

2.WordCount案例分析

在做详解之前,我们先来看一个例子,就是在一个文件中有一下的内容

hello hongten 12345

文件中每一行包含一个hello,一个hongten,然后在每一行最后有一个数字,这个数字是递增的。

我们要统计这个文件里面的单词出现的次数(这个可以在网上找到很多相同的例子)

首先,我们要产生这个文件,大家可以使用以下的java代码生成这个文件

main(String[] args) num = 12000000 StringBuilder sb = ( i=1;i sb.append("hello").append(" ").append("hongten").append(" ").append(i).append("n" File writename = File("/root/word.txt" BufferedWriter out = BufferedWriter( System.out.println("done." }

进入Linux系统,编译GenerateWord.java文件

javac GenerateWord.java

编译好了以后,会生成GenerateWord.class文件,然后执行

java GenerateWord

等待一段时间....就会生成这个文件了(大概252MB左右)。

接下来,我们来写统计单词的map,reduce,以及客户端的实现。

项目结构

这里总共有三个java文件

客户端

首先,我们需要定义Configuration和job,然后就是job的set操作,最后到job.waitForCompletion()方法,才触发了动作的提交。

这里可以理解为在客户端,包含了一个配置分布式运行的相关配置信息,最后提交动作。

main(String[] args) Configuration conf = Job job = job.setJarByClass(WordCount. job.setJobName("wordcount" job.setMapperClass(MyMapper. job.setMapOutputKeyClass(Text. job.setMapOutputValueClass(IntWritable. job.setReducerClass(MyReducer. job.setOutputKeyClass(Text. job.setOutputValueClass(IntWritable. FileInputFormat.addInputPath(job, Path("/usr/input/wordcount1" Path output = Path("/usr/output/wordcount" output.getFileSystem(conf).delete(output, job.waitForCompletion( }

自定义的Mapper

MyMapper Mapper IntWritable one = IntWritable(1 Text word = map(Object key,Text value,Context context) StringTokenizer itr = }

自定义的Reduce

MyReducer Reducer IntWritable result = reduce(Text key,Iterable values,InterruptedException { sum = 0 sum += }

运行并查看结果

cd homehadoopbin

<span style="color: #008080;">--<span style="color: #008080;">创建测试文件夹
.<span style="color: #808080;">/hdfs dfs <span style="color: #808080;">-mkdir <span style="color: #808080;">-p <span style="color: #808080;">/usr<span style="color: #808080;">/input<span style="color: #808080;">/<span style="color: #000000;">wordcount1

<span style="color: #008080;">--<span style="color: #008080;">把测试文件放入测试文件夹
.<span style="color: #808080;">/hdfs dfs <span style="color: #808080;">-put <span style="color: #808080;">/root<span style="color: #808080;">/word.txt <span style="color: #808080;">/usr<span style="color: #808080;">/input<span style="color: #808080;">/<span style="color: #000000;">wordcount1

<span style="color: #008080;">--<span style="color: #008080;">运行测试
.<span style="color: #808080;">/hadoop jar <span style="color: #808080;">/root<span style="color: #808080;">/<span style="color: #000000;">wordcount.jar com.b510.hongten.hadoop.WordCount

<span style="color: #008080;">--<span style="color: #008080;">下载hdfs上面的文件
.<span style="color: #808080;">/hdfs dfs <span style="color: #808080;">-get <span style="color: #808080;">/usr<span style="color: #808080;">/output<span style="color: #808080;">/wordcount<span style="color: #008080;">/*<span style="color: #008080;"> ~/

--查看文件最后5行
tail -n5 /root/part-r-00000

运行结果

从yarn客户端可以看到程序运行的时间长度

从11:47:46开始,到11:56:48结束,总共9min2s.(这是在我机器上面的虚拟机里面跑的结果,如果在真正的集群里面跑的话,应该要快很多)

数据条数:12000000-1条

3.客户端源码分析

当我们在客户端进行了分布式作业的配置后,最后执行

job.waitForCompletion();

那么在waiteForCompletion()方法里面都做了些什么事情呢?

waitForCompletion( (state == completionPollIntervalMillis = (!

这个方法里面最重要的就是submit()方法,提交分布式作业。所以,我们需要进入submit()方法。

JobSubmitter submitter == ugi.doAs( PrivilegedExceptionAction JobStatus run() The job submission process involves: 1. Checking the input and output specifications of the job. 2. Computing the InputSplits for the job. 3. Setup the requisite accounting information for the DistributedCache of the job,if necessary. 4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 5. Submitting the job to the JobTracker and optionally monitoring it's status. </span><span style="color: #008000;"&gt;*/</span> <span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;在这个方法里面包含5件事情。 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;1.检查输入和输出 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;2.为每个job计算输入切片的数量 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;3.4.提交资源文件 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;5.提交作业,监控状态 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;这里要注意的是,在2.x里面,已经没有JobTracker了。 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;JobTracker is no longer used since M/R 2.x. </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;This is a dummy JobTracker class,which is used to be compatible with M/R 1.x applications.</span> <span style="color: #0000ff;"&gt;return</span> submitter.submitJobInternal(Job.<span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;,cluster); }

});
state =<span style="color: #000000;"> JobState.RUNNING;
LOG.info("The url to track the job: " +<span style="color: #000000;"> getTrackingURL());
}

所以我们需要进入submitter.submitJObInternal()方法去看看里面的实现。

</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;validate the jobs output specs </span>

<span style="color: #000000;"> checkSpecs(job);

Path jobStagingArea </span>=<span style="color: #000000;"&gt; JobSubmissionFiles.getStagingDir(cluster,job.getConfiguration());
</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;configure the command line options correctly on the submitting dfs</span>
Configuration conf =<span style="color: #000000;"&gt; job.getConfiguration();
InetAddress ip </span>=<span style="color: #000000;"&gt; InetAddress.getLocalHost();
</span><span style="color: #0000ff;"&gt;if</span> (ip != <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;) {
  submitHostAddress </span>=<span style="color: #000000;"&gt; ip.getHostAddress();
  submitHostName </span>=<span style="color: #000000;"&gt; ip.getHostName();
  conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
  conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId </span>=<span style="color: #000000;"&gt; submitClient.getNewJobID();
</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;设置Job的ID</span>

<span style="color: #000000;"> job.setJobID(jobId);
Path submitJobDir = <span style="color: #0000ff;">new<span style="color: #000000;"> Path(jobStagingArea,jobId.toString());
JobStatus status = <span style="color: #0000ff;">null<span style="color: #000000;">;
<span style="color: #0000ff;">try<span style="color: #000000;"> {
conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers"<span style="color: #000000;">,"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"<span style="color: #000000;">);
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR,submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " +<span style="color: #000000;"> submitJobDir
+ " as the submit dir"<span style="color: #000000;">);
<span style="color: #008000;">//<span style="color: #008000;"> get delegation token for the dir
<span style="color: #000000;"> TokenCache.obtainTokensForNamenodes(job.getCredentials(),<span style="color: #0000ff;">new<span style="color: #000000;"> Path[] { submitJobDir },conf);

  populateTokenCache(conf,job.getCredentials());

  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; generate a secret to authenticate shuffle transfers</span>
  <span style="color: #0000ff;"&gt;if</span> (TokenCache.getShuffleSecretKey(job.getCredentials()) == <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;) {
    KeyGenerator keyGen;
    </span><span style="color: #0000ff;"&gt;try</span><span style="color: #000000;"&gt; {
      keyGen </span>=<span style="color: #000000;"&gt; KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
      keyGen.init(SHUFFLE_KEY_LENGTH);
    } </span><span style="color: #0000ff;"&gt;catch</span><span style="color: #000000;"&gt; (NoSuchAlgorithmException e) {
      </span><span style="color: #0000ff;"&gt;throw</span> <span style="color: #0000ff;"&gt;new</span> IOException("Error generating shuffle secret key"<span style="color: #000000;"&gt;,e);
    }
    SecretKey shuffleKey </span>=<span style="color: #000000;"&gt; keyGen.generateKey();
    TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());
  }

  copyAndConfigureFiles(job,submitJobDir);
  Path submitJobFile </span>=<span style="color: #000000;"&gt; JobSubmissionFiles.getJobConfPath(submitJobDir);

  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; Create the splits for the job</span>
  LOG.debug("Creating splits at " +<span style="color: #000000;"&gt; jtFs.makeQualified(submitJobDir));
  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;写切片信息,我们主要关系这个方法 :))</span>
  <span style="color: #0000ff;"&gt;int</span> maps =<span style="color: #000000;"&gt; writeSplits(job,submitJobDir);
  conf.setInt(MRJobConfig.NUM_MAPS,maps);
  LOG.info(</span>"number of splits:" +<span style="color: #000000;"&gt; maps);

  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; write "queue admins of the queue to which job is being submitted"
  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; to job file.</span>
  String queue =<span style="color: #000000;"&gt; conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);
  AccessControlList acl </span>=<span style="color: #000000;"&gt; submitClient.getQueueAdmins(queue);
  conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()),acl.getAclString());

  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; removing jobtoken referrals before copying the jobconf to HDFS
  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; as the tasks don't need this setting,actually they may break
  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; because of it if present as the referral will point to a
  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; different job.</span>

<span style="color: #000000;"> TokenCache.cleanUpTokenReferral(conf);

  </span><span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (conf.getBoolean(
      MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
    </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; Add HDFS tracking ids</span>
    ArrayList<String> trackingIds = <span style="color: #0000ff;"&gt;new</span> ArrayList<String><span style="color: #000000;"&gt;();
    </span><span style="color: #0000ff;"&gt;for</span> (Token<? <span style="color: #0000ff;"&gt;extends</span> TokenIdentifier><span style="color: #000000;"&gt; t :
        job.getCredentials().getAllTokens()) {
      trackingIds.add(t.decodeIdentifier().getTrackingId());
    }
    conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(</span><span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; String[trackingIds.size()]));
  }

  </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; Write job file to submit dir</span>

<span style="color: #000000;"> writeConf(conf,submitJobFile);

  </span><span style="color: #008000;"&gt;//</span>
  <span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; Now,actually submit the job (using the submit name)
  </span><span style="color: #008000;"&gt;//</span>
  <span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;到这里才真正提交job</span>

<span style="color: #000000;"> printTokens(jobId,job.getCredentials());
status =<span style="color: #000000;"> submitClient.submitJob(
jobId,submitJobDir.toString(),job.getCredentials());
<span style="color: #0000ff;">if (status != <span style="color: #0000ff;">null<span style="color: #000000;">) {
<span style="color: #0000ff;">return<span style="color: #000000;"> status;
} <span style="color: #0000ff;">else<span style="color: #000000;"> {
<span style="color: #0000ff;">throw <span style="color: #0000ff;">new IOException("Could not launch job"<span style="color: #000000;">);
}
} <span style="color: #0000ff;">finally<span style="color: #000000;"> {
<span style="color: #0000ff;">if (status == <span style="color: #0000ff;">null<span style="color: #000000;">) {
LOG.info("Cleaning up the staging area " +<span style="color: #000000;"> submitJobDir);
<span style="color: #0000ff;">if (jtFs != <span style="color: #0000ff;">null && submitJobDir != <span style="color: #0000ff;">null<span style="color: #000000;">)
jtFs.delete(submitJobDir,<span style="color: #0000ff;">true<span style="color: #000000;">);

  }
}

}

在这里我们关心的是

maps = writeSplits(job,submitJobDir);

进入writeSplites()方法

JobConf jConf = maps = maps =

我们使用的版本是2.x,所以,我们使用writeNewSplites()方法。

@SuppressWarnings("unchecked" InputSplit> writeNewSplits(JobContext job,Path jobSubmitDir) Configuration conf = InputFormat input === 1 ==

<span style="color: #008000;">//<span style="color: #008000;">输入格式化进行切片计算
List splits = input.getSplits(job); // == 2 ==<span style="color: #000000;">
T[] array = (T[]) splits.toArray(<span style="color: #0000ff;">new<span style="color: #000000;"> InputSplit[splits.size()]);

<span style="color: #008000;">//<span style="color: #008000;"> sort the splits into order based on size,so that the biggest
<span style="color: #008000;">//<span style="color: #008000;"> go first
Arrays.sort(array,<span style="color: #0000ff;">new<span style="color: #000000;"> SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir,conf,jobSubmitDir.getFileSystem(conf),array);
<span style="color: #0000ff;">return<span style="color: #000000;"> array.length;
}

我们看到‘==? 1 ==’,这里是获取输入格式化,进入job.getInputFormatClass()方法

@SuppressWarnings("unchecked" Class InputFormat> (Class InputFormat>

我们看到,系统默认的输入格式化为TextInputFormat。

我们看到‘==? 2 ==’,这里从输入格式化里面进行切片计算。那么我们进入getSplites()方法

List getSplits(JobContext job) minSize = Math.max(getFormatMinSplitSize(),getMinSplitSize(job)); maxSize = getMaxSplitSize(job); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; generate splits</span> List<InputSplit> splits = <span style="color: #0000ff;"&gt;new</span> ArrayList<InputSplit><span style="color: #000000;"&gt;(); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;获取输入文件列表</span> List<FileStatus> files =<span style="color: #000000;"&gt; listStatus(job); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;遍历文件列表</span> <span style="color: #0000ff;"&gt;for</span><span style="color: #000000;"&gt; (FileStatus file: files) { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;一个文件一个文件的处理 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;然后计算文件的切片</span> Path path =<span style="color: #000000;"&gt; file.getPath(); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;文件大小</span> <span style="color: #0000ff;"&gt;long</span> length =<span style="color: #000000;"&gt; file.getLen(); </span><span style="color: #0000ff;"&gt;if</span> (length != 0<span style="color: #000000;"&gt;) { BlockLocation[] blkLocations; </span><span style="color: #0000ff;"&gt;if</span> (file <span style="color: #0000ff;"&gt;instanceof</span><span style="color: #000000;"&gt; LocatedFileStatus) { blkLocations </span>=<span style="color: #000000;"&gt; ((LocatedFileStatus) file).getBlockLocations(); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;通过路径获取FileSystem</span> FileSystem fs =<span style="color: #000000;"&gt; path.getFileSystem(job.getConfiguration()); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;获取文件所有块信息</span> blkLocations = fs.getFileBlockLocations(file,0<span style="color: #000000;"&gt;,length); } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;判断文件是否可以切片</span> <span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (isSplitable(job,path)) { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;可以切片 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;获取文件块大小</span> <span style="color: #0000ff;"&gt;long</span> blockSize =<span style="color: #000000;"&gt; file.getBlockSize(); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;切片大小 splitSize = blockSize </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;默认情况下,切片大小等于块的大小</span> <span style="color: #0000ff;"&gt;long</span> splitSize = computeSplitSize(blockSize,minSize,maxSize); <span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; == C == </span> <span style="color: #0000ff;"&gt;long</span> bytesRemaining =<span style="color: #000000;"&gt; length; </span><span style="color: #0000ff;"&gt;while</span> (((<span style="color: #0000ff;"&gt;double</span>) bytesRemaining)/splitSize ><span style="color: #000000;"&gt; SPLIT_SLOP) { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;块的索引</span> <span style="color: #0000ff;"&gt;int</span> blkIndex = getBlockIndex(blkLocations,length-bytesRemaining); <span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; == D == </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;切片详细信息</span> splits.add(makeSplit(path,length-<span style="color: #000000;"&gt;bytesRemaining,splitSize,blkLocations[blkIndex].getHosts())); bytesRemaining </span>-=<span style="color: #000000;"&gt; splitSize; } </span><span style="color: #0000ff;"&gt;if</span> (bytesRemaining != 0<span style="color: #000000;"&gt;) { </span><span style="color: #0000ff;"&gt;int</span> blkIndex = getBlockIndex(blkLocations,length-<span style="color: #000000;"&gt;bytesRemaining); splits.add(makeSplit(path,length</span>-<span style="color: #000000;"&gt;bytesRemaining,bytesRemaining,blkLocations[blkIndex].getHosts())); } } </span><span style="color: #0000ff;"&gt;else</span> { <span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; not splitable </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;不可切片</span> splits.add(makeSplit(path,length,blkLocations[0<span style="color: #000000;"&gt;].getHosts())); } } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;Create empty hosts array for zero length files</span> splits.add(makeSplit(path,<span style="color: #0000ff;"&gt;new</span> String[0<span style="color: #000000;"&gt;])); } } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; Save the number of input files for metrics/loadgen</span>

<span style="color: #000000;"> job.getConfiguration().setLong(NUM_INPUT_FILES,files.size());
LOG.debug("Total # of splits: " +<span style="color: #000000;"> splits.size());
<span style="color: #0000ff;">return<span style="color: #000000;"> splits;
}

我们看‘== A ==’,getFormatMinSplitSize()方法返回1,getMinSplitSize()方法返回1L。

1<span style="color: #0000ff;">public <span style="color: #0000ff;">static <span style="color: #0000ff;">long<span style="color: #000000;"> getMinSplitSize(JobContext job) {
<span style="color: #008000;">//<span style="color: #008000;">如果我们在配置文件中有配置SPLIT_MINSIZE(mapreduce.input.fileinputformat.split.minsize),则取配置文件里面的
<span style="color: #008000;">//<span style="color: #008000;">否则返回默认值1L
<span style="color: #008000;">//<span style="color: #008000;">这里我们,没有配置,所以返回1L
<span style="color: #0000ff;">return job.getConfiguration().getLong(SPLIT_MINSIZE,1L<span style="color: #000000;">);
}

我们看‘== B ==’,getMaxSplitSize()方法返回Long.MAX_VALUE(我们没有进行对SPLIT_MAXSIZE进行配置)

我们看‘== C ==’,在我们没有进行配置的情况下,切片大小等于块大小。

computeSplitSize( blockSize, Math.min(Long.MAX_VALUE,blockSize) -> blockSize Math.max(1,blockSize) -> blockSize

我们看‘== D ==’,通过偏移量获取块的索引信息。

( i = 0 ; i < blkLocations.length; i++ ((blkLocations[i].getOffset() <= offset) &&< blkLocations[i].getOffset() += blkLocations[blkLocations.length -1 fileLength = last.getOffset() + last.getLength() -1 IllegalArgumentException("Offset " + offset + " is outside of file (0.." ++ ")"

4.小结

用通俗的语言来描述上面的事情,可以用下面的图来说明:

系统默认的块大小为128MB,在我们没有进行其他配置的时候,块大小等于切片大小。

Type1:块大小为45MB,小于系统默认大小128MB,

切片信息:path,45,[3,8,10]

切片信息:文件的位置path, 偏移量0, 切片大小45, 块的位置信息[3,10]=该文件(块)存在HDFS文件系统的datanode3,datanode8,datanode10上面。

Type2:块大小为128MB,即等于系统默认大小128MB,不会分成两个快,和Type1一样。

Type3:块大小为414MB,即大于系统默认128MB,那么在我们上传该文件到HDFS的时候,系统就会把该文件分成很多块,每一块128MB,每一块128MB,直到分完为止,最后剩下30MB单独为一块。那么,每一个切片信息由文件位置path,偏移量,切片大小, 块的位置信息构成。我们把这一串信息称为文件的切片清单。

当系统拿到了文件的切片清单了以后,那么就会把这些清单提交给分布式系统,再由分布式系统去处理各个切片。

?

??

5.Mapper详解

5.1.map输入

map从HDFS获取输入流,然后定位到切片的位置,除了第一个切片,其他切片都是从第二行开始读取数据进行处理。

在org.apache.hadoop.mapred.MapTask里面,包含了run()方法

run( JobConf job,.umbilical =</span><span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (isMapTask()) { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; If there are no reducers then there won't be any sort. Hence the map </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; phase will govern the entire attempt's progress. </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;我们在客户端可以设置reduce的个数 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; job.setNumReduceTasks(10); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;如果没有Reduce,只有map阶段,</span> <span style="color: #0000ff;"&gt;if</span> (conf.getNumReduceTasks() == 0<span style="color: #000000;"&gt;) { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;那么就执行这行</span> mapPhase = getProgress().addPhase("map",1.0f<span style="color: #000000;"&gt;); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; If there are reducers then the entire attempt's progress will be </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; split between the map phase (67%) and the sort phase (33%). </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;只要有Reduce阶段,</span> mapPhase = getProgress().addPhase("map",0.667f<span style="color: #000000;"&gt;); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;就要加入排序</span> sortPhase = getProgress().addPhase("sort",0.333f<span style="color: #000000;"&gt;); } } TaskReporter reporter </span>=<span style="color: #000000;"&gt; startReporter(umbilical); </span><span style="color: #0000ff;"&gt;boolean</span> useNewApi =<span style="color: #000000;"&gt; job.getUseNewMapper(); initialize(job,getJobID(),reporter,useNewApi); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; check if it is a cleanupJobTask</span> <span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (jobCleanup) { runJobCleanupTask(umbilical,reporter); </span><span style="color: #0000ff;"&gt;return</span><span style="color: #000000;"&gt;; } </span><span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (jobSetup) { runJobSetupTask(umbilical,reporter); </span><span style="color: #0000ff;"&gt;return</span><span style="color: #000000;"&gt;; } </span><span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (taskCleanup) { runTaskCleanupTask(umbilical,reporter); </span><span style="color: #0000ff;"&gt;return</span><span style="color: #000000;"&gt;; } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;是否使用新的API</span> <span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (useNewApi) { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;我们使用的是new mapper</span>

<span style="color: #000000;"> runNewMapper(job,splitMetaInfo,umbilical,reporter);
} <span style="color: #0000ff;">else<span style="color: #000000;"> {
runOldMapper(job,reporter);
}
done(umbilical,reporter);
}

我们进入到runNewMapper()方法,我们可以看到整个map的宏观动作

1.输入初始化

2.调用org.apache.hadoop.mapreduce.Mapper.run()方法

3.更新状态

4.关闭输入

5.关闭输出

@SuppressWarnings("unchecked" runNewMapper( org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = org.apache.hadoop.mapreduce.Mapper mapper = org.apache.hadoop.mapreduce.InputFormat inputFormat = org.apache.hadoop.mapreduce.InputSplit split = = getSplitDetails("Processing split: " + org.apache.hadoop.mapreduce.RecordReader input = NewTrackingRecordReader = <span style="color: #008000;">//<span style="color: #008000;"> get an output object
<span style="color: #0000ff;">if
(job.getNumReduceTasks() == 0<span style="color: #000000;">) {
output
=
<span style="color: #0000ff;">new
<span style="color: #000000;"> NewDirectOutputCollector(taskContext,job,reporter);
}
<span style="color: #0000ff;">else
<span style="color: #000000;"> {
output
= <span style="color: #0000ff;">new
<span style="color: #000000;"> NewOutputCollector(taskContext,reporter);
}

<span style="color: #008000;">//<span style="color: #008000;">创建一个map上下文对象
<span style="color: #008000;">//<span style="color: #008000;">这里传入input对象
<span style="color: #008000;">//<span style="color: #008000;">这里MapContext,NewTrackingRecordReader,LineRecordReader他们之间的关系是什么呢?
<span style="color: #008000;">//<span style="color: #008000;">在MapContext,NewTrackingRecordReader,LineRecordReader类里面都包含了nextKeyValue(),getCurrentKey(),getCurrentValue()方法
<span style="color: #008000;">//<span style="color: #008000;">当我们调用MapContext里面的nextKeyValue()的时候,会去掉用NewTrackingRecordReader里面的nextKeyValue()方法,这个方法最终会去调用LineRecordReader里面的nextKeyValue()方法
<span style="color: #008000;">//<span style="color: #008000;">即LineRecordReader才是最终做事情的
org.apache.hadoop.mapreduce.MapContext<INKEY,OUTVALUE><span style="color: #000000;">
mapContext =
<span style="color: #0000ff;">new MapContextImpl<INKEY,OUTVALUE><span style="color: #000000;">(job,input,output,committer,split); <span style="color: #008000;">//<span style="color: #008000;"> == EE ==
<span style="color: #000000;">
org.apache.hadoop.mapreduce.Mapper<INKEY,OUTVALUE><span style="color: #000000;">.Context
mapperContext =
<span style="color: #0000ff;">new WrappedMapper<INKEY,OUTVALUE><span style="color: #000000;">().getMapContext(
mapContext);

<span style="color: #0000ff;">try<span style="color: #000000;"> {
<span style="color: #008000;">//<span style="color: #008000;">=============================
<span style="color: #008000;">//<span style="color: #008000;"> 这里列出了mapper的宏观动作
<span style="color: #008000;">//<span style="color: #008000;"> 1. 输入初始化
<span style="color: #008000;">//<span style="color: #008000;"> 2. 调用org.apache.hadoop.mapreduce.Mapper.run()方法
<span style="color: #008000;">//<span style="color: #008000;"> 3. 更新状态
<span style="color: #008000;">//<span style="color: #008000;"> 4. 关闭输入
<span style="color: #008000;">//<span style="color: #008000;"> 5. 关闭输出
<span style="color: #008000;">//<span style="color: #008000;">=============================
<span style="color: #008000;">//<span style="color: #008000;">输入初始化
input.initialize(split,mapperContext); <span style="color: #008000;">//<span style="color: #008000;"> == FF ==
<span style="color: #008000;">//<span style="color: #008000;">然后调用mapper里面的run()方法,即org.apache.hadoop.mapreduce.Mapper里面的run()方法
<span style="color: #000000;"> mapper.run(mapperContext); // == GG ==
<span style="color: #008000;">//<span style="color: #008000;">map结束
<span style="color: #000000;"> mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
<span style="color: #008000;">//<span style="color: #008000;">关闭输入
<span style="color: #000000;"> input.close();
input = <span style="color: #0000ff;">null<span style="color: #000000;">;
<span style="color: #008000;">//<span style="color: #008000;">关闭输出
<span style="color: #000000;"> output.close(mapperContext);
output = <span style="color: #0000ff;">null<span style="color: #000000;">;
} <span style="color: #0000ff;">finally<span style="color: #000000;"> {
closeQuietly(input);
closeQuietly(output,mapperContext);
}
}

我们看'== AA ==',由于我们在客户端已经设置了我们自定义的mapper,所以系统会返回我们定义的mapper类

@SuppressWarnings("unchecked" Class Mapper> (Class Mapper>

我们看'== BB ==',在上面我们已经提到,系统默认为TextInputFormat输入格式化

@SuppressWarnings("unchecked" Class InputFormat> (Class InputFormat

我们看'== CC ==',这里返回一个RecordReader对象

.reporter =.inputRecordCounter =.fileInputByteCounter =List matchedStats = <span style="color: #0000ff;">null<span style="color: #000000;">;
<span style="color: #0000ff;">if
(split <span style="color: #0000ff;">instanceof
<span style="color: #000000;"> org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats
=<span style="color: #000000;"> getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(),taskContext.getConfiguration());
}
fsStats
=<span style="color: #000000;"> matchedStats;

<span style="color: #0000ff;">long bytesInPrev =<span style="color: #000000;"> getInputBytes(fsStats);
<span style="color: #008000;">//<span style="color: #008000;">客户端输入格式化计算切片
<span style="color: #008000;">//<span style="color: #008000;">而在map阶段,输入格式化会创建一个
<span style="color: #008000;">//<span style="color: #008000;">org.apache.hadoop.mapreduce.RecordReader<KEYIN,VALUEIN>
<span style="color: #0000ff;">this.real = inputFormat.createRecordReader(split,taskContext); <span style="color: #008000;">//<span style="color: #008000;"> == DD ==
<span style="color: #0000ff;">long bytesInCurr =<span style="color: #000000;"> getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr -<span style="color: #000000;"> bytesInPrev);
}

我们看'== DD ==',这里直接new一个LineRecordReader行读取器。这个在后面还会提到。因为真正做事情的就是这个行读取器。

RecordReader="textinputformat.record.delimiter"[] recordDelimiterBytes = ( !==

我们看'== EE =='创建map上下文

.reader =.split =

看到这里以后,这里MapContext,NewTrackingRecordReader,LineRecordReader他们之间的关系是什么呢?

这要看这三个类里面的一些共同的方法:

nextKeyValue()

getCurrentKey()

getCurrentValue()

当我们调用MapContext里面的nextKeyValue()的时候,会去掉用NewTrackingRecordReader里面的nextKeyValue()方法,这个方法最终会去调用LineRecordReader里面的nextKeyValue()方法。

即LineRecordReader才是最终做事情的

我们看'== FF ==',输入初始化

==.maxLineLength = start = end = start + Path file =</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; open the file and seek to the start of the split </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;打开HDFS文件</span> <span style="color: #0000ff;"&gt;final</span> FileSystem fs =<span style="color: #000000;"&gt; file.getFileSystem(job); fileIn </span>=<span style="color: #000000;"&gt; fs.open(file); CompressionCodec codec </span>= <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; CompressionCodecFactory(job).getCodec(file); </span><span style="color: #0000ff;"&gt;if</span> (<span style="color: #0000ff;"&gt;null</span>!=<span style="color: #000000;"&gt;codec) { isCompressedInput </span>= <span style="color: #0000ff;"&gt;true</span><span style="color: #000000;"&gt;; decompressor </span>=<span style="color: #000000;"&gt; CodecPool.getDecompressor(codec); </span><span style="color: #0000ff;"&gt;if</span> (codec <span style="color: #0000ff;"&gt;instanceof</span><span style="color: #000000;"&gt; SplittableCompressionCodec) { </span><span style="color: #0000ff;"&gt;final</span> SplitCompressionInputStream cIn =<span style="color: #000000;"&gt; ((SplittableCompressionCodec)codec).createInputStream( fileIn,decompressor,start,end,SplittableCompressionCodec.READ_MODE.BYBLOCK); </span><span style="color: #0000ff;"&gt;if</span> (<span style="color: #0000ff;"&gt;null</span> == <span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;.recordDelimiterBytes){ in </span>= <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; LineReader(cIn,job); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { in </span>= <span style="color: #0000ff;"&gt;new</span> LineReader(cIn,<span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;.recordDelimiterBytes); } start </span>=<span style="color: #000000;"&gt; cIn.getAdjustedStart(); end </span>=<span style="color: #000000;"&gt; cIn.getAdjustedEnd(); filePosition </span>=<span style="color: #000000;"&gt; cIn; } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { </span><span style="color: #0000ff;"&gt;if</span> (<span style="color: #0000ff;"&gt;null</span> == <span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;.recordDelimiterBytes) { in </span>= <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; LineReader(codec.createInputStream(fileIn,decompressor),job); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { in </span>= <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; LineReader(codec.createInputStream(fileIn,</span><span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;.recordDelimiterBytes); } filePosition </span>=<span style="color: #000000;"&gt; fileIn; } } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { fileIn.seek(start); </span><span style="color: #0000ff;"&gt;if</span> (<span style="color: #0000ff;"&gt;null</span> == <span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;.recordDelimiterBytes){ in </span>= <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; LineReader(fileIn,job); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { in </span>= <span style="color: #0000ff;"&gt;new</span> LineReader(fileIn,<span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;.recordDelimiterBytes); } filePosition </span>=<span style="color: #000000;"&gt; fileIn; } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; If this is not the first split,we always throw away first record </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; because we always (except the last split) read one extra line in </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; next() method. </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;如果不是第一个切片,即从第二个切片开始,通常情况下,不会去读取第一行 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;而是从第二行开始读取</span> <span style="color: #0000ff;"&gt;if</span> (start != 0<span style="color: #000000;"&gt;) { start </span>+= in.readLine(<span style="color: #0000ff;"&gt;new</span> Text(),maxBytesToConsume(start)); } </span><span style="color: #0000ff;"&gt;this</span>.pos =<span style="color: #000000;"&gt; start;

}

怎样理解下面代码呢?

(start != 0+= in.readLine( Text(),maxBytesToConsume(start)); }

我们可以通过下图可以知道

一个文件上传到HDFS后,被分成很多block,然而每个block有一定的size,那么在切分这些文件的时候,就可能产生一个block的最后一行被放在两个block里面

e.g.Block1里面的最后一行,原本应该是'hello hongten 5'

但是由于block的size的大小限制,该文本被分成两部分'hello hong' 和 'ten 5'

现在切片个数大于1,那么Block2在读取内容的时候,从第二行开始读取,即从'hello hongten 6'开始读取。而对于Block1在读取内容的时候,则会读取Block2的第一行,即'ten 5'。

这样就保证了数据的完整性了。

我们看'== GG ==',调用org.apache.hadoop.mapreduce.Mapper.run()方法

run(Context context)

那么LineRecordReader里面的nextKeyValue()做了什么呢?

nextKeyValue() (key == key = (value == value = newSize = 0 (getFilePosition() <= newSize =+= (newSize < </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; line too long. try again</span> LOG.info("Skipped line of size " + newSize + " at pos " +<span style="color: #000000;"&gt; (pos </span>-<span style="color: #000000;"&gt; newSize)); } </span><span style="color: #0000ff;"&gt;if</span> (newSize == 0<span style="color: #000000;"&gt;) { key </span>= <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;; value </span>= <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;; </span><span style="color: #0000ff;"&gt;return</span> <span style="color: #0000ff;"&gt;false</span><span style="color: #000000;"&gt;; } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { </span><span style="color: #0000ff;"&gt;return</span> <span style="color: #0000ff;"&gt;true</span><span style="color: #000000;"&gt;; }

}

@Override
<span style="color: #0000ff;">public<span style="color: #000000;"> LongWritable getCurrentKey() {
<span style="color: #008000;">//<span style="color: #008000;">因为在nextKeyValue()已经赋值,直接返回
<span style="color: #0000ff;">return<span style="color: #000000;"> key;
}

@Override
<span style="color: #0000ff;">public<span style="color: #000000;"> Text getCurrentValue() {
<span style="color: #008000;">//<span style="color: #008000;">因为在nextKeyValue()已经赋值,直接返回
<span style="color: #0000ff;">return<span style="color: #000000;"> value;
}

5.2.map输出?

<div class="cnblogs_code">

@SuppressWarnings("unchecked" 
   runNewMapper(
    org.apache.hadoop.mapreduce.RecordWriter output = </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; get an output object
</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;如果没有Reduce</span>
<span style="color: #0000ff;"&gt;if</span> (job.getNumReduceTasks() == 0<span style="color: #000000;"&gt;) {
  output </span>= 
    <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; NewDirectOutputCollector(taskContext,reporter);
} </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; {
    </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;在我们客户端定义了一个reduce</span>
  output = <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; NewOutputCollector(taskContext,reporter);
}
</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;.....其他代码省略</span>

}

在NewOutputCollector里面做了什么呢?

@SuppressWarnings("unchecked" collector = createSortingCollector(job,reporter); partitions = (partitions > 1 partitioner = (org.apache.hadoop.mapreduce.Partitioner } partitioner = org.apache.hadoop.mapreduce.Partitioner getPartition(K key,V value, partitions - 1

我们看看'== OO1 ==',调用createSortingCollector()创建一个collector容器

@SuppressWarnings("unchecked" MapOutputCollector= (MapOutputCollector,MapOutputCollector."Map output collector class = " += MapOutputCollector.Context(

调用init()方法,在该方法里面主要做了以下几件事情:

1.设置内存缓冲区

2.设置排序器

3.设置比较器

4.设置合并器

5.设置溢写线程

========</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;sanity checks </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;map处理数据的时候,需要放入内存缓冲区 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;那么这里的100就是系统默认的缓冲区大小,即100MB。 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;我们可以通过配置IO_SORT_MB(mapreduce.task.io.sort.mb)对缓冲区大小进行调节。 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;0.8的是内存缓冲区阈值的意思,就是当这个缓冲区使用了80%,那么这个时候, </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;缓冲区里面的80%的数据就可以溢写到磁盘。 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; 我们可以通过配置MAP_SORT_SPILL_PERCENT(mapreduce.map.sort.spill.percent)对缓冲区阈值进行调节。</span> <span style="color: #0000ff;"&gt;final</span> <span style="color: #0000ff;"&gt;float</span> spillper =<span style="color: #000000;"&gt; job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(</span><span style="color: #0000ff;"&gt;float</span>)0.8<span style="color: #000000;"&gt;); </span><span style="color: #0000ff;"&gt;final</span> <span style="color: #0000ff;"&gt;int</span> sortmb = job.getInt(JobContext.IO_SORT_MB,100<span style="color: #000000;"&gt;); indexCacheMemoryLimit </span>=<span style="color: #000000;"&gt; job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,INDEX_CACHE_MEMORY_LIMIT_DEFAULT); </span><span style="color: #0000ff;"&gt;if</span> (spillper > (<span style="color: #0000ff;"&gt;float</span>)1.0 || spillper <= (<span style="color: #0000ff;"&gt;float</span>)0.0<span style="color: #000000;"&gt;) { </span><span style="color: #0000ff;"&gt;throw</span> <span style="color: #0000ff;"&gt;new</span> IOException("Invalid "" + JobContext.MAP_SORT_SPILL_PERCENT + "": " +<span style="color: #000000;"&gt; spillper); } </span><span style="color: #0000ff;"&gt;if</span> ((sortmb &amp; 0x7FF) !=<span style="color: #000000;"&gt; sortmb) { </span><span style="color: #0000ff;"&gt;throw</span> <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; IOException( </span>"Invalid "" + JobContext.IO_SORT_MB + "": " +<span style="color: #000000;"&gt; sortmb); } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;排序器,默认为快速排序算法(QuickSort) </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;把map里面的乱序的数据,使用快速排序算法进行排序 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;使得内存中乱序的数据进行排序,然后把排序好的数据,溢写到磁盘</span> sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class"<span style="color: #000000;"&gt;,QuickSort.</span><span style="color: #0000ff;"&gt;class</span>,IndexedSorter.<span style="color: #0000ff;"&gt;class</span><span style="color: #000000;"&gt;),job); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; buffers and accounting</span> <span style="color: #0000ff;"&gt;int</span> maxMemUsage = sortmb << 20<span style="color: #000000;"&gt;; maxMemUsage </span>-= maxMemUsage %<span style="color: #000000;"&gt; METASIZE; kvbuffer </span>= <span style="color: #0000ff;"&gt;new</span> <span style="color: #0000ff;"&gt;byte</span><span style="color: #000000;"&gt;[maxMemUsage]; bufvoid </span>=<span style="color: #000000;"&gt; kvbuffer.length; kvmeta </span>=<span style="color: #000000;"&gt; ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(</span>0<span style="color: #000000;"&gt;); bufstart </span>= bufend = bufindex =<span style="color: #000000;"&gt; equator; kvstart </span>= kvend =<span style="color: #000000;"&gt; kvindex; maxRec </span>= kvmeta.capacity() /<span style="color: #000000;"&gt; NMETA; softLimit </span>= (<span style="color: #0000ff;"&gt;int</span>)(kvbuffer.length *<span style="color: #000000;"&gt; spillper); bufferRemaining </span>=<span style="color: #000000;"&gt; softLimit; </span><span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (LOG.isInfoEnabled()) { LOG.info(JobContext.IO_SORT_MB </span>+ ": " +<span style="color: #000000;"&gt; sortmb); LOG.info(</span>"soft limit at " +<span style="color: #000000;"&gt; softLimit); LOG.info(</span>"bufstart = " + bufstart + "; bufvoid = " +<span style="color: #000000;"&gt; bufvoid); LOG.info(</span>"kvstart = " + kvstart + "; length = " +<span style="color: #000000;"&gt; maxRec); } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; k/v serialization </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;比较器</span> comparator = job.getOutputKeyComparator(); <span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; == OO3 ==</span> keyClass = (Class<K><span style="color: #000000;"&gt;)job.getMapOutputKeyClass(); valClass </span>= (Class<V><span style="color: #000000;"&gt;)job.getMapOutputValueClass(); serializationFactory </span>= <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; SerializationFactory(job); keySerializer </span>=<span style="color: #000000;"&gt; serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer </span>=<span style="color: #000000;"&gt; serializationFactory.getSerializer(valClass); valSerializer.open(bb); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; output counters</span> mapOutputByteCounter =<span style="color: #000000;"&gt; reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter </span>=<span style="color: #000000;"&gt; reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter </span>=<span style="color: #000000;"&gt; reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; compression</span> <span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (job.getCompressMapOutput()) { Class</span><? <span style="color: #0000ff;"&gt;extends</span> CompressionCodec> codecClass =<span style="color: #000000;"&gt; job.getMapOutputCompressorClass(DefaultCodec.</span><span style="color: #0000ff;"&gt;class</span><span style="color: #000000;"&gt;); codec </span>=<span style="color: #000000;"&gt; ReflectionUtils.newInstance(codecClass,job); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { codec </span>= <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;; } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; combiner </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;合并器</span> <span style="color: #0000ff;"&gt;final</span> Counters.Counter combineInputCounter =<span style="color: #000000;"&gt; reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner </span>=<span style="color: #000000;"&gt; CombinerRunner.create(job,combineInputCounter,</span><span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;); </span><span style="color: #0000ff;"&gt;if</span> (combinerRunner != <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;) { </span><span style="color: #0000ff;"&gt;final</span> Counters.Counter combineOutputCounter =<span style="color: #000000;"&gt; reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector</span>= <span style="color: #0000ff;"&gt;new</span> CombineOutputCollector<K,V><span style="color: #000000;"&gt;(combineOutputCounter,job); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { combineCollector </span>= <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;; } spillInProgress </span>= <span style="color: #0000ff;"&gt;false</span><span style="color: #000000;"&gt;; </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;最小溢写值,默认为3 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;即在默认情况下,我们在定义了合并器, </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; 1. 在内存溢写到磁盘的过程中,在溢写之前,数据会在内存中进行合并。 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; 2. 在溢写的文件的过程中,文件数量>3,那么此时就会触发合并器进行合并文件。</span> minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS,3<span style="color: #000000;"&gt;); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;溢写线程</span> spillThread.setDaemon(<span style="color: #0000ff;"&gt;true</span><span style="color: #000000;"&gt;); spillThread.setName(</span>"SpillThread"<span style="color: #000000;"&gt;); spillLock.lock(); </span><span style="color: #0000ff;"&gt;try</span><span style="color: #000000;"&gt; { spillThread.start(); </span><span style="color: #0000ff;"&gt;while</span> (!<span style="color: #000000;"&gt;spillThreadRunning) { spillDone.await(); } } </span><span style="color: #0000ff;"&gt;catch</span><span style="color: #000000;"&gt; (InterruptedException e) { </span><span style="color: #0000ff;"&gt;throw</span> <span style="color: #0000ff;"&gt;new</span> IOException("Spill thread failed to initialize"<span style="color: #000000;"&gt;,e); } </span><span style="color: #0000ff;"&gt;finally</span><span style="color: #000000;"&gt; { spillLock.unlock(); } </span><span style="color: #0000ff;"&gt;if</span> (sortSpillException != <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;) { </span><span style="color: #0000ff;"&gt;throw</span> <span style="color: #0000ff;"&gt;new</span> IOException("Spill thread failed to initialize"<span style="color: #000000;"&gt;,sortSpillException); }

}

这里涉及到环形缓冲区:

我们看看'== OO3 ==',获取比较器

Class RawComparator> theClass =,RawComparator. (theClass != ReflectionUtils.newInstance(theClass, WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.

我们看看'== OO2 ==',获取分区器

@SuppressWarnings("unchecked" Class Partitioner> (Class Partitioner><span style="color: #008000;">//<span style="color: #008000;">在HashPartitioner中包含getPartition()方法
<span style="color: #0000ff;">public
<span style="color: #0000ff;">int
<span style="color: #000000;"> getPartition(K key,
<span style="color: #0000ff;">int
<span style="color: #000000;"> numReduceTasks) {
<span style="color: #008000;">//
<span style="color: #008000;">分区

<span style="color: #0000ff;">return
(key.hashCode() & Integer.MAX_VALUE) %<span style="color: #000000;"> numReduceTasks;
}

当我们客户端的map调用write(key,value)方法时,系统会在内部调用collector.collect()方法,获取key,value,partitions,即k,v,p

write(K key,V value)

最后会调用close()方法,关闭输出

IOException("can't find class "

5.3.map小结

?在map输入阶段:每个map处理一个切片的数据量,需要seek(),让出第一行,从第二行开始读取数据(切片数量大于1)。

在map输出阶段:map输出的是Key,value;但是map计算完成以后,会得到key,partition.也就是说,每个数据从map输出只有,就知道归属于哪一个reduce task去处理了,归属于那个分区

之后,在内存中有一个内存缓冲区buffer in memory,这个内存缓冲区是环形缓冲区。内存大小默认是100MB,为了是内存溢写不阻塞,默认的阈值是80%,即只要大于等于80MB的时候,就会触发溢写,溢写会把内存中的数据写入到磁盘。在写入磁盘之前要对数据进行快速排序,这是整个框架当中仅有的一次,把数据从乱序到有序。后面的排序都是把有序的数据进行归并排序了。

在排序的时候,有一个判定。有可能我们定义了combiner,需要压缩一下数据。

现在大数据,最大的瓶颈就是I/O,磁盘I/O,网络I/O,都是慢I/O。

所以在I/O之前,能在内存里面排序就排序,能压缩就尽量压缩。那么在调用I/O的时候,写的数据越少越好,速度就越快。

在溢写的时候(partion,sort and spill to disk),先按分区排序,在分区内再按key排序。这是因为map计算的结果是key,partition.这样的文件才能是内部有序。最后,溢写很多的小文件要归并成一个大文件。那么大文件也是按分区排序,文件里面再按key排序。

  • 如果我们做了combiner,在归并成大文件的时候,框架默认的小文件数量是3个
  • 只要我们设置的值大于等于3(mapreduce.map.combine.minspills)

就会触发combiner压缩数据,这是为了减少在shuffer阶段拉取网络I/O,以及在拉完数据以后,让Reduce处理数据量变少,加快计算速度。所以map的工作的核心目的,就是让reduce跑的越来越快。

6.Reduce详解

Reduce需要从Map那边获取Map的输出,作为Reduce的输入。?

"unchecked" run(JobConf job,</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;================= Shuffer阶段从Map端拉取数据 开始 ============</span> <span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (isMapOrReduce()) { copyPhase </span>= getProgress().addPhase("copy"<span style="color: #000000;"&gt;); sortPhase </span>= getProgress().addPhase("sort"<span style="color: #000000;"&gt;); reducePhase </span>= getProgress().addPhase("reduce"<span style="color: #000000;"&gt;); } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; start thread that will handle communication with parent</span> TaskReporter reporter =<span style="color: #000000;"&gt; startReporter(umbilical); </span><span style="color: #0000ff;"&gt;boolean</span> useNewApi =<span style="color: #000000;"&gt; job.getUseNewReducer(); initialize(job,reporter); </span><span style="color: #0000ff;"&gt;return</span><span style="color: #000000;"&gt;; } </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; Initialize the codec</span> codec =<span style="color: #000000;"&gt; initCodec(); RawKeyValueIterator rIter </span>= <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;; ShuffleConsumerPlugin shuffleConsumerPlugin </span>= <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;; </span><span style="color: #0000ff;"&gt;boolean</span> isLocal = <span style="color: #0000ff;"&gt;false</span><span style="color: #000000;"&gt;; </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; local if </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; 1) framework == local or </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; 2) framework == null and job tracker address == local</span> String framework =<span style="color: #000000;"&gt; job.get(MRConfig.FRAMEWORK_NAME); String masterAddr </span>= job.get(MRConfig.MASTER_ADDRESS,"local"<span style="color: #000000;"&gt;); </span><span style="color: #0000ff;"&gt;if</span> ((framework == <span style="color: #0000ff;"&gt;null</span> &amp;&amp; masterAddr.equals("local"<span style="color: #000000;"&gt;)) </span>|| (framework != <span style="color: #0000ff;"&gt;null</span> &amp;&amp;<span style="color: #000000;"&gt; framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) { isLocal </span>= <span style="color: #0000ff;"&gt;true</span><span style="color: #000000;"&gt;; } </span><span style="color: #0000ff;"&gt;if</span> (!<span style="color: #000000;"&gt;isLocal) { Class combinerClass </span>=<span style="color: #000000;"&gt; conf.getCombinerClass(); CombineOutputCollector combineCollector </span>=<span style="color: #000000;"&gt; (</span><span style="color: #0000ff;"&gt;null</span> != combinerClass) ? <span style="color: #0000ff;"&gt;new</span> CombineOutputCollector(reduceCombineOutputCounter,conf) : <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;; Class</span><? <span style="color: #0000ff;"&gt;extends</span> ShuffleConsumerPlugin> clazz =<span style="color: #000000;"&gt; job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,Shuffle.</span><span style="color: #0000ff;"&gt;class</span>,ShuffleConsumerPlugin.<span style="color: #0000ff;"&gt;class</span><span style="color: #000000;"&gt;); shuffleConsumerPlugin </span>=<span style="color: #000000;"&gt; ReflectionUtils.newInstance(clazz,job); LOG.info(</span>"Using ShuffleConsumerPlugin: " +<span style="color: #000000;"&gt; shuffleConsumerPlugin); ShuffleConsumerPlugin.Context shuffleContext </span>= <span style="color: #0000ff;"&gt;new</span><span style="color: #000000;"&gt; ShuffleConsumerPlugin.Context(getTaskID(),FileSystem.getLocal(job),</span><span style="color: #0000ff;"&gt;super</span><span style="color: #000000;"&gt;.lDirAlloc,codec,combinerClass,combineCollector,spilledRecordsCounter,reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes,failedShuffleCounter,mergedMapOutputsCounter,taskStatus,copyPhase,sortPhase,</span><span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;,mapOutputFile); shuffleConsumerPlugin.init(shuffleContext); </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;rIter这个迭代器里面的数据就是从Map端拉取的数据集 </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;即接下来Reduce的数据输入源</span> rIter =<span style="color: #000000;"&gt; shuffleConsumerPlugin.run(); } </span><span style="color: #0000ff;"&gt;else</span><span style="color: #000000;"&gt; { </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt; local job runner doesn't have a copy phase</span>

<span style="color: #000000;"> copyPhase.complete();
<span style="color: #0000ff;">final FileSystem rfs =<span style="color: #000000;"> FileSystem.getLocal(job).getRaw();
rIter =<span style="color: #000000;"> Merger.merge(job,rfs,job.getMapOutputKeyClass(),job.getMapOutputValueClass(),getMapFiles(rfs,<span style="color: #0000ff;">true<span style="color: #000000;">),!<span style="color: #000000;">conf.getKeepFailedTaskFiles(),job.getInt(JobContext.IO_SORT_FACTOR,100<span style="color: #000000;">),<span style="color: #0000ff;">new<span style="color: #000000;"> Path(getTaskID().toString()),job.getOutputKeyComparator(),<span style="color: #0000ff;">null<span style="color: #000000;">);
}
<span style="color: #008000;">//<span style="color: #008000;"> free up the data structures
<span style="color: #000000;"> mapOutputFilesOnDisk.clear();

</span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;=================  Shuffer阶段从Map端拉取数据 结束 ============</span>

<span style="color: #000000;">
sortPhase.complete(); <span style="color: #008000;">//<span style="color: #008000;"> sort is complete
<span style="color: #000000;"> setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass =<span style="color: #000000;"> job.getMapOutputKeyClass();
Class valueClass =<span style="color: #000000;"> job.getMapOutputValueClass();
<span style="color: #008000;">//<span style="color: #008000;">分组比较器
RawComparator comparator = job.getOutputValueGroupingComparator(); <span style="color: #008000;">//<span style="color: #008000;"> === RR0 ==

<span style="color: #0000ff;"&gt;if</span><span style="color: #000000;"&gt; (useNewApi) {
    </span><span style="color: #008000;"&gt;//</span><span style="color: #008000;"&gt;使用新API</span>

<span style="color: #000000;"> runNewReducer(job,rIter,comparator,keyClass,valueClass); <span style="color: #008000;">//<span style="color: #008000;"> === RR1 ==
} <span style="color: #0000ff;">else<span style="color: #000000;"> {
runOldReducer(job,valueClass);
}

</span><span style="color: #0000ff;"&gt;if</span> (shuffleConsumerPlugin != <span style="color: #0000ff;"&gt;null</span><span style="color: #000000;"&gt;) {
  shuffleConsumerPlugin.close();
}
done(umbilical,reporter);

}

我们看'=== RR0 ==',分组比较器

我们通过代码可以看出里面的逻辑:

1.如果用户设置了分组比较器,系统则使用

2.如果用户没有设置分组比较器,系统会查看用户是否设置了排序比较器,如果有设置,则使用

3.如果用户没有设置分组比较器,排序比较器,那么系统会使用自身的key比较器

Class RawComparator> theClass = (theClass == </span><span style="color: #0000ff;"&gt;return</span> ReflectionUtils.newInstance(theClass,<span style="color: #0000ff;"&gt;this</span><span style="color: #000000;"&gt;);

}

<span style="color: #0000ff;">public<span style="color: #000000;"> RawComparator getOutputKeyComparator() {
<span style="color: #008000;">//<span style="color: #008000;">用户是否设置排序比较器KEY_COMPARATOR
<span style="color: #008000;">//<span style="color: #008000;">如果用户都没有设置,则使用自身key比较器
Class<? <span style="color: #0000ff;">extends RawComparator> theClass =<span style="color: #000000;"> getClass(
JobContext.KEY_COMPARATOR,<span style="color: #0000ff;">this<span style="color: #000000;">);
<span style="color: #0000ff;">return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.<span style="color: #0000ff;">class<span style="color: #000000;">));
}

<span style="color: #008000;">//<span style="color: #008000;">自身key比较器
<span style="color: #0000ff;">public Class<?><span style="color: #000000;"> getMapOutputKeyClass() {
Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS,<span style="color: #0000ff;">null,Object.<span style="color: #0000ff;">class<span style="color: #000000;">);
<span style="color: #0000ff;">if (retv == <span style="color: #0000ff;">null<span style="color: #000000;">) {
retv =<span style="color: #000000;"> getOutputKeyClass();
}
<span style="color: #0000ff;">return<span style="color: #000000;"> retv;
}

我们看看‘=== RR1 ==’,

RawKeyValueIterator rawIter == close() DataInputBuffer getKey() DataInputBuffer getValue() next() ret = org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = org.apache.hadoop.mapreduce.Reducer reducer = trackedRW = NewTrackingRecordWriter("mapred.skip.on" =

进入createReduceContext()方法

keyClass,Class= ReduceContextImplorg.apache.hadoop.mapreduce.Reducer<INKEY,OUTVALUE><span style="color: #000000;">.Context
reducerContext
=
<span style="color: #0000ff;">new
WrappedReducer<INKEY,OUTVALUE><span style="color: #000000;">().getReducerContext(
reduceContext);

<span style="color: #0000ff;">return<span style="color: #000000;"> reducerContext;
}

进入ReduceContextImpl()方法

.input =.inputKeyCounter =.inputValueCounter =.comparator =.serializationFactory = .keyDeserializer =.valueDeserializer ==.keyClass =.valueClass =.conf =.taskid =

最后会把map端的输出,作为Reduce端的输入传递到这里。

run(Context context) Iterator iter =(iter

7.总结

========================================================

More reading,and english is important.

I'm Hongten

大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。Hongten博客排名在100名以内。粉丝过千。Hongten出品,必是精品。

E | hongtenzone@foxmail.com ?B |?

========================================================

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读