void?reduce(ChukwaRecordKey?key,?Iterator<ChukwaRecord>?values,sans-serif; font-size:14px; line-height:25.200000762939453px"> ??? org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordOutputFormat
??? 继承自MultipleSequenceFileOutputFormat,用于格式化输出结果
?*?使输出目录为clusterName/dataType/dataType+Util.generateTimeOutput(record.getTime())?
protected?String?generateFileNameForKeyValue(ChukwaRecordKey?key,250); line-height:18px"> ????????????ChukwaRecord?record,?String?name)???
??? org.apache.hadoop.chukwa.extraction.demux.processor.Util
?*?根据时间戳构造文件名?
?*?从timestamp中获取workingDay、workingHour、minutes?
?*?如果运行日期不等于当前日期,则输出文件为?
?*????[dataType]_[yyyyMMdd].D.evt?
?*?否则如果运行时间(小时)不等于当前时间(小时),则输出文件为?
?*????[dataType]_[yyyyMMdd]_[HH].H.evt?
?*?否则输出结果文件为?
?*????分钟的个位数小于5,?
?*???????[dataType]_[yyyyMMdd]_[HH]_[minutes/10].0.R.evt?
?*????分钟的个位数大于等于5?
?*???????[dataType]_[yyyyMMdd]_[HH]_[minutes/10].5.R.evt?
static?String?generateTimeOutput(long?timestamp)??
(2)数据归档
??? org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveManager
?*????chukwaRootDir?????????????????/chukwa/?
?*????archiveRootDir????????????????/chukwa/dataSinkArchives/?
?*????archivesRootProcessingDir?????/chukwa/archivesProcessing/?
?*????archivesMRInputDir????????????/chukwa/archivesProcessing/mrInput/?
?*????archivesMROutputDir???????????/chukwa/archivesProcessing/mrOutput/?
?*????finalArchiveOutput????????????/chukwa/finalArchives/?
?*?2.?开始循环处理?
?*??????errorCount记录错误次数,当超过四次时退出?
?*??????判断archivesMRInputDir目录是否存在(是否已经有archive?job的输入目录存在),存在则使用runArchive方法对当天数据进行归档?
?*??????获取archiveRootDir目录下的文件,?
?*??????如果没有文件则休眠一分钟?
?*??????如果只有一个日期目录,则判断是否是当天且当前时间是否与上次运行间隔了两个小时,如果还不到两个小时则休眠半个小时?
?*??????如果有多个日期目录,则使用processDay方法处理每个日期目录下的文件?
?*?处理指定日期的数据?
?*?????now??????????????当前时间?????????
?*?????currentDay???????当前日期?
?*?????workingDay???????要处理的日期目录?
?*?????oneHourAgo???????一个小时之前的时间?
?*?如果要处理的日期目录下没有文件且日期在当天日期之前,则将目录删除后返回?
?*?使用fileCount记录文件总数,?遍历日期目录?
?*?????如果文件修改时间不到一个小时?
?*?????????记录日期目录下的文件数?
?*?????????将文件转移到archivesMRInputDir目录?
?*?????????如果文件数已经到达最大文件数(500),则调用runArchive方法进行归档,并将fileCount重置为0?
void?processDay(FileStatus?fsDay,?String?archivesMRInputDir,250); line-height:18px"> ????????????String?archivesMROutputDir,?String?finalArchiveOutput)??
?*?1.?构建job运行参数,从配置文件中加载配置项"archive.grouper"(归档分组类型)?
?*?2.?检测mrOutput目录是否存在,存在则删除?
?*?3.?使用ToolRunner运行ChukwaArchiveBuilder?
?*?4.?使用mrInput中的日期在finalArchiveOutput中创建对应的目录?
?*?5.?将mrOutput目录转储到上一步创建的目录下的"archive_+当前时间"?
?*?6.?删除mrInput目录?
void?runArchive(String?archivesMRInputDir,?String?finalArchiveOutput)??
??? org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder
??? org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder$UniqueKeyReduce
?*?统计每个key有多少个value?
?*/???
void?reduce(ChukwaArchiveKey?key,?Iterator<ChunkImpl>?vals,250); line-height:18px"> ????????????????OutputCollector<ChukwaArchiveKey,?ChunkImpl>?out,?Reporter?r)??
??????????????????????????????
?*?在main方法中使用ToolRunner启动ChukwaArchiveBuilder,?
?*?在run方法中调用IdentityMapper、内部类UniqueKeyReduce进行mapreduce处理?
?*?根据不同的参数设置不同的partitionerClass、outputFormat、jobName?????
void?main(String[]?args)??
(3)数据转储
??? org.apache.hadoop.chukwa.extraction.demux.PostProcessorManager
?*?创建守护进程"PostProcessorManager",0); padding:0px; margin:0px; width:auto; border:0px">?*????chukwaRootDir?????????????????????/chukwa/?
?*????postProcessDir????????????????????/chukwa/postProcess/?
?*????chukwaRootReposDir????????????????/chukwa/repos/?
?*????chukwaPostProcessInErrorDir???????/chukwa/postProcessInError/?
?*??????errorCount记录了错误次数,如果已经超过四次则退出?
?*??????遍历demuxOutputDir,0); padding:0px; margin:0px; width:auto; border:0px">?*??????如果此目录下没有文件则休眠10秒?
?*??????如果有文件则将文件添加到directories(ArrayList<String>)并进行排序?
?*??????遍历directories?
?*??????????对每一个文件使用processDemuxPigOutput方法进行处理,完成后调用?
?*??????????movetoMainRepository方法移动文件,然后将文件删除,如果出现异常,?
?*??????????则将文件转移到错误目录?
?*?从$CHUKWA_HOME/conf/chukwa-demux-conf.xml中加载配置项?
?*??chukwa.post.demux.data.loader,得到两个实现类?
?*??org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,0); padding:0px; margin:0px; width:auto; border:0px">?*??org.apache.hadoop.chukwa.dataloader.FSMDataLoader?
?*??遍历配置的实现类?
?*??????分别获取实例,构建查找对象?
?*??????(/chukwa/postProcess/cluster/dataType/*.evt)?
?*??????将查找到的文件交由dataloader处理?
boolean?processDemuxPigOutput(String?directory)??
?*?这里将移动到repository的功能整合到了org.apache.hadoop.chukwa.extraction.demux.MoveToRepository中?
boolean?movetoMainRepository(String?sourceDirectory,250); line-height:18px"> ????????????String?repoRootDirectory)??
??? org.apache.hadoop.chukwa.extraction.demux.MoveToRepository
?*?遍历要转储的目录?
?*????获取cluster名称,过滤掉"_logs"、"_temporary"?
?*????使用processClusterDirectory方法处理每个cluster下的文件?
?*????删除已经完成转储的cluster目录?
?*?遍历cluster目录?
?*????获取dataType名称?
?*????使用processDatasourceDirectory方法处理每个dataType下的文件?
void?processClusterDirectory(Path?srcDir,?String?destDir)??
?*?遍历某个dataType下的evt文件?
?*????如果是以".D.evt"结尾的文件?
?*????????获取日期?
?*????????调用writeRecordFile方法写入文件,其中?
?*????????输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/?
?*????????输出文件为<dataType>_<yyyyMMdd>?
?*????如果是以".H.evt"结尾的文件?
?*????????获取日期、小时?
?*????????输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/<HH>/?
?*????????输出文件为<dataType>_<yyyyMMdd>_<HH>?
?*????????调用addDirectory4Rolling方法对日期、小时做标记?
?*????如果是以".R.evt"结尾的文件?
?*????????获取日期、小时、分钟?
?*????????输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/<HH>/<mm>/?
?*????????输出文件为<dataType>_<yyyyMMdd>_<HH>_<mm>?
void?processDatasourceDirectory(String?cluster,?Path?srcDir,0); padding:0px; margin:0px; width:auto; border:0px">?*?将文件转储并改名?
void?writeRecordFile(String?destDir,?Path?recordFile,?String?fileName)??
?*?在/chukwa/rolling/目录下生成相应数据类型的目录以便于进行数据合并?
?*?如果不在当前日期,在/chukwa/rolling/下生成?
?*????daily/<yyyyMMdd>/<cluster>/<dataType>?
?*?否则,在/chukwa/rolling/下生成?
?*????hourly/<yyyyMMdd>/<cluster>/<dataType>???
void?addDirectory4Rolling(boolean?isDailyOnly,?int?day,85); font-weight:bold">int?hour,250); line-height:18px"> ????????????String?cluster,?String?dataSource)??
(4)数据合并
??? org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling
?*?1.?创建守护进程"PostProcessorManager",0); padding:0px; margin:0px; width:auto; border:0px">?*?2.?初始化处理过程中用到的目录?
?*????rollingFolder?????????????/chukwa/rolling/?
?*????chukwaMainRepository??????/chukwa/repos/?
?*????tempDir???????????????????/chukwa/temp/hourlyRolling/??
?*?3.?从执行参数中获取rollInSequence(是否顺序执行合并线程),deleteRawdata(是否删除原始数据)这两个参数设置的值?
?*?4.?获取当前日期和小时?
?*?5.?遍历?/chukwa/rolling/hourly/?目录?
?*???????获取此目录下的每个日期,对其遍历?
?*??????????获取此目录下的小时,如果此目录日期小于当前日期,或两者相同而且此小时小于当前小时(即只对当前小时之前的数据进行合并)?
?*??????????调用buildHourlyFiles?
?*?获取处理目录?/chukwa/rolling/hourly/workingDay/workingHour?
?*?遍历此目录下所有的cluster?
?*????遍历每个cluster目录下的所有数据类型(dataSource)?
?*???????创建目录?/chukwa/repos/cluster/dataSource/workingDay/workingHour/rotateDone?
?*???????构建合并参数数组?mergeArgs?
?*???????mergeArgs[0]?=?/chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]*?/*.evt?
?*???????mergeArgs[1]?=?/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis?
?*???????mergeArgs[2]?=?/chukwa/repos/cluster/dataSource/workingDay/workingHour?
?*???????mergeArgs[3]?=?dataSource_HourlyDone_workingDay_workingHour?
?*???????mergeArgs[4]?=?/chukwa/rolling/hourly/workingDay/workingHour/cluster/dataSource?
?*???????新建RecordMerger对象merge?
?*???????如果rollInSequence为true,则启动merge线程?
?*???????否则将merge添加到列表并启动线程,等待此线程运行完成?
?*???????删除dataSource目录?
?*????删除cluster目录?
?*?删除hour目录?
void?buildHourlyFiles(String?chukwaMainRepository,250); line-height:18px"> ????????????String?tempDir,?String?rollingFolder,85); font-weight:bold">int?workingDay,250); line-height:18px"> ????????????int?workingHour)??
??????????????
?*?使用IdentityMapper作为mapperClass,?使用IdentityReducer作为reducerClass,?只是简单的合并数据?
int?run(String[]?args)???
??? org.apache.hadoop.chukwa.extraction.demux.RecordMerger
?*?调用ToolRunner.run运行tool,?即调用HourlyChukwaRecordRolling中的run方法,将?
?*?/chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]*?/*.evt?
?*?合并到?
?*?/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis?
?*?如果此任务处理成功?
?*????调用?writeRecordFile()?
?*????如果deleteRawData参数为true?
?*???????删除输入文件?
?*???????/chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]*?/*.evt?
?*???????遍历?/chukwa/repos/cluster/dataSource/workingDay/workingHour?
?*??????????删除小时或分钟的目录?
?*??????????/chukwa/repos/cluster/dataSource/workingDay/workingHour[0-5]*?/?
?*????删除合并标记文件?
?*????dataSource_HourlyDone_workingDay_workingHour?
?*????删除临时目录?
?*????/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis?
void?run()??
?*?将mapreduce运行结果文件改名转储,?即将HourlyChukwaRecordRolling中的run方法合并后的文件转储?
?*?input????????/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis/part-00000?
?*?outputDir????/chukwa/repos/cluster/dataSource/workingDay/workingHour?
?*?filename?????dataSource_HourlyDone_workingDay_workingHour?
?*?destFile?????/chukwa/repos/cluster/dataSource/workingDay/workingHour/dataSource_HourlyDone_workingDay_workingHour.1.evt?
void?writeRecordFile(String?input,?String?outputDir,?String?fileName)??