加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

基于Hadoo的日志收集框架---Chukwa的源码分析(数据处理)

发布时间:2020-12-14 03:38:27 所属栏目:大数据 来源:网络整理
导读:refer to :http://savagegarden.iteye.com/blog/1441842 1.工具类、接口简介 (1) Java代码?? ??? //?用于对数据进行分类 ?? ???org.apache.hadoop.chukwa.extraction.demux.DemuxManager?? ????? //?mapreduce程序的map处理接口 ?? ???org.apache.hadoop.chu

refer to :http://savagegarden.iteye.com/blog/1441842

1.工具类、接口简介

(1)

Java代码??

收藏代码

  1. ???//?用于对数据进行分类??
  2. ???org.apache.hadoop.chukwa.extraction.demux.DemuxManager??
  3. ?????
  4. //?mapreduce程序的map处理接口??
  5. ???org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor??
  6. //?MapProcessor的工厂类,?用HashMap存储了数据类型与其对应的MapProcessor??
  7. ???org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory??
  8. //?map处理接口的实现类,?实现了数据块(ChunkImpl)处理、逐行获取内容、创建ChukwaRecord对象,??
  9. //?但是将写入OutputCollector的实现放在了抽象方法parse()的实现类中??
  10. ???org.apache.hadoop.chukwa.extraction.demux.processor.mapper.AbstractProcessor??
  11. ??
  12. //?AbstractProcessor中parse()方法的实现类??
  13. ???org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor??
  14. //?mapreduce程序的reduce处理接口??
  15. ???org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor??
  16. //?reduce处理接口的实现类,?实现了将key、value写入OutputCollector的过程??
  17. ???org.apache.hadoop.chukwa.extraction.demux.processor.reducer.IdentityReducer??

?

(2)

//?用于对数据进行归档??
  • ???org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveManager??
  • (3)

    //?用于将分类后的数据转储备份??
  • org.apache.hadoop.chukwa.extraction.demux.PostProcessorManager??
  • (4)

    //?用于将备份数据按小时进行合并??
  • org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling??
  • (5)

    //?用于将备份数据按日期进行合并??
  • org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling??
  • 2.处理流程

    (1)数据分类
    ??? org.apache.hadoop.chukwa.extraction.demux.DemuxManager

    /**?
  • ?*?创建守护进程"DemuxManager",将进程号写入相应的pid文件,以便于运行stop命令时可根据此pid文件杀死进程?
  • ?*?调用start方法启动循环处理过程?
  • ?*/??
  • public?static?void?main(String[]?args)??
  • ?*?1.?初始化处理过程中用到的目录?
  • ?*????chukwaRootDir?????/chukwa/?
  • ?*????demuxRootDir??????/chukwa/demuxProcessing/?
  • ?*????demuxErrorDir?????/chukwa/demuxProcessing/inError/?
  • ?*????demuxInputDir?????/chukwa/demuxProcessing/mrInput/?
  • ?*????demuxOutputDir????/chukwa/demuxProcessing/mrOutput/?
  • ?*????dataSinkDir???????/chukwa/logs/?
  • ?*????postProcessDir????/chukwa/postProcess/?
  • ?*????archiveRootDir????/chukwa/dataSinkArchives/?
  • ?*?2.?从系统配置文件中获取nagios的配置信息?
  • ?*?3.?循环处理?
  • ?*????统计异常次数,如果超过五次则退出?
  • ?*????检测demuxOutputDir是否存在,如果存在则将其删除?
  • ?*????检测demuxInputDir是否存在,如果存在则设置可以处理并将处理次数加1,如果已经尝试过3次则将则将demuxInputDir转移到demuxErrorDir?
  • ?*????如果以上两个目录都不存在,则:?
  • ?*???????reprocess归0?
  • ?*???????在dataSinkDir目录下检测是否有.done文件,如果存在则将其移动到demuxInputDir目录下(最多移动500个),移动完成后将demuxReady设为true?
  • ?*????如果demuxReady已经为true,则开始处理数据(processData)?
  • ?*????发送demux信息到nagios???
  • void?start()??
  • ?*?1.?运行Demux,设置reduce数为demuxReducerCount,设置输入输出目录分别为demuxInputDir、demuxOutputDir?
  • ?*?2.?检测demuxOutputDir是否存在,存在则将demuxOutputDir转移到postProcessDir?
  • ?*?3.?将demuxInputDir转移到archiveDir(archiveRootDir)?
  • protected?boolean?processData(String?dataSinkDir,?String?demuxInputDir,?String?demuxOutputDir,?String?postProcessDir,?String?archiveDir)??
  • ?*?在main方法中使用ToolRunner启动Demux,?调用内部类MapClass、ReduceClass进行mapreduce处理?
  • ?*?在运行时设置输出的格式化类为ChukwaRecordOutputFormat?
  • void?main(String[]?args)???
  • ??? org.apache.hadoop.chukwa.extraction.demux.Demux
    ??? 在Demux中实现了对数据分类的Mapper类和Reducer类

    ??? org.apache.hadoop.chukwa.extraction.demux.Demux$MapClass

    ?*?根据ChunkImpl的datatype在$CHUKWA_HOME/conf/chukwa-demux-conf.xml中查找是否配置了mapProcessorClass?
  • ?*?如果没有则默认使用DefaultProcessor处理?
  • ?*?将获取的mapProcessorClass添加到mapProcessor工厂类的hashmap中?
  • ?*?在AbstractProcessor中实现了process方法?
  • ?*????获取数据块的相关信息?
  • ?*????从数据块中逐行读取内容?
  • ?*????将处理行内容的parse方法实现放在了DefaultProcessor中,?在此方法中调用父类的buildGenericRecord方法完成?
  • ?*????ChukwaArchiveKey??->???ChukwaRecordKey?
  • ?*????ChunkImpl?????????->???ChukwaRecord?
  • ?*????的转换?
  • ?*????写入OutputCollector?
  • void?map(ChukwaArchiveKey?key,?ChunkImpl?chunk,??
  • ????????????????OutputCollector<ChukwaRecordKey,?ChukwaRecord>?output,250); line-height:18px"> ????????????????Reporter?reporter)??
  • ??? org.apache.hadoop.chukwa.extraction.demux.Demux$ReduceClass

    ?*?根据ChukwaRecordKey的reduceType来确定reduceProcessorClass?
  • ?*?org.apache.hadoop.chukwa.extraction.demux.processor.reducer.?+?reduceType?
  • ?*?如果没有找到类则使用IdentityReducer?
  • ?*?在IdentityReducer的process方法中只是将key、value写入OutputCollector?
  • void?reduce(ChukwaRecordKey?key,?Iterator<ChukwaRecord>?values,sans-serif; font-size:14px; line-height:25.200000762939453px"> ??? org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordOutputFormat
    ??? 继承自MultipleSequenceFileOutputFormat,用于格式化输出结果

    ?*?使输出目录为clusterName/dataType/dataType+Util.generateTimeOutput(record.getTime())?
  • protected?String?generateFileNameForKeyValue(ChukwaRecordKey?key,250); line-height:18px"> ????????????ChukwaRecord?record,?String?name)???
  • ??? org.apache.hadoop.chukwa.extraction.demux.processor.Util

    ?*?根据时间戳构造文件名?
  • ?*?从timestamp中获取workingDay、workingHour、minutes?
  • ?*?如果运行日期不等于当前日期,则输出文件为?
  • ?*????[dataType]_[yyyyMMdd].D.evt?
  • ?*?否则如果运行时间(小时)不等于当前时间(小时),则输出文件为?
  • ?*????[dataType]_[yyyyMMdd]_[HH].H.evt?
  • ?*?否则输出结果文件为?
  • ?*????分钟的个位数小于5,?
  • ?*???????[dataType]_[yyyyMMdd]_[HH]_[minutes/10].0.R.evt?
  • ?*????分钟的个位数大于等于5?
  • ?*???????[dataType]_[yyyyMMdd]_[HH]_[minutes/10].5.R.evt?
  • static?String?generateTimeOutput(long?timestamp)??
  • (2)数据归档
    ??? org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveManager

    ?*????chukwaRootDir?????????????????/chukwa/?
  • ?*????archiveRootDir????????????????/chukwa/dataSinkArchives/?
  • ?*????archivesRootProcessingDir?????/chukwa/archivesProcessing/?
  • ?*????archivesMRInputDir????????????/chukwa/archivesProcessing/mrInput/?
  • ?*????archivesMROutputDir???????????/chukwa/archivesProcessing/mrOutput/?
  • ?*????finalArchiveOutput????????????/chukwa/finalArchives/?
  • ?*?2.?开始循环处理?
  • ?*??????errorCount记录错误次数,当超过四次时退出?
  • ?*??????判断archivesMRInputDir目录是否存在(是否已经有archive?job的输入目录存在),存在则使用runArchive方法对当天数据进行归档?
  • ?*??????获取archiveRootDir目录下的文件,?
  • ?*??????如果没有文件则休眠一分钟?
  • ?*??????如果只有一个日期目录,则判断是否是当天且当前时间是否与上次运行间隔了两个小时,如果还不到两个小时则休眠半个小时?
  • ?*??????如果有多个日期目录,则使用processDay方法处理每个日期目录下的文件?
  • ?*?处理指定日期的数据?
  • ?*?????now??????????????当前时间?????????
  • ?*?????currentDay???????当前日期?
  • ?*?????workingDay???????要处理的日期目录?
  • ?*?????oneHourAgo???????一个小时之前的时间?
  • ?*?如果要处理的日期目录下没有文件且日期在当天日期之前,则将目录删除后返回?
  • ?*?使用fileCount记录文件总数,?遍历日期目录?
  • ?*?????如果文件修改时间不到一个小时?
  • ?*?????????记录日期目录下的文件数?
  • ?*?????????将文件转移到archivesMRInputDir目录?
  • ?*?????????如果文件数已经到达最大文件数(500),则调用runArchive方法进行归档,并将fileCount重置为0?
  • void?processDay(FileStatus?fsDay,?String?archivesMRInputDir,250); line-height:18px"> ????????????String?archivesMROutputDir,?String?finalArchiveOutput)??
  • ?*?1.?构建job运行参数,从配置文件中加载配置项"archive.grouper"(归档分组类型)?
  • ?*?2.?检测mrOutput目录是否存在,存在则删除?
  • ?*?3.?使用ToolRunner运行ChukwaArchiveBuilder?
  • ?*?4.?使用mrInput中的日期在finalArchiveOutput中创建对应的目录?
  • ?*?5.?将mrOutput目录转储到上一步创建的目录下的"archive_+当前时间"?
  • ?*?6.?删除mrInput目录?
  • void?runArchive(String?archivesMRInputDir,?String?finalArchiveOutput)??
  • ??? org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder

    ??? org.apache.hadoop.chukwa.extraction.archive.ChukwaArchiveBuilder$UniqueKeyReduce

    ?*?统计每个key有多少个value?
  • ?*/???
  • void?reduce(ChukwaArchiveKey?key,?Iterator<ChunkImpl>?vals,250); line-height:18px"> ????????????????OutputCollector<ChukwaArchiveKey,?ChunkImpl>?out,?Reporter?r)??
  • ??????????????????????????????
  • ?*?在main方法中使用ToolRunner启动ChukwaArchiveBuilder,?
  • ?*?在run方法中调用IdentityMapper、内部类UniqueKeyReduce进行mapreduce处理?
  • ?*?根据不同的参数设置不同的partitionerClass、outputFormat、jobName?????
  • void?main(String[]?args)??
  • (3)数据转储
    ??? org.apache.hadoop.chukwa.extraction.demux.PostProcessorManager

    ?*?创建守护进程"PostProcessorManager",0); padding:0px; margin:0px; width:auto; border:0px">?*????chukwaRootDir?????????????????????/chukwa/?
  • ?*????postProcessDir????????????????????/chukwa/postProcess/?
  • ?*????chukwaRootReposDir????????????????/chukwa/repos/?
  • ?*????chukwaPostProcessInErrorDir???????/chukwa/postProcessInError/?
  • ?*??????errorCount记录了错误次数,如果已经超过四次则退出?
  • ?*??????遍历demuxOutputDir,0); padding:0px; margin:0px; width:auto; border:0px">?*??????如果此目录下没有文件则休眠10秒?
  • ?*??????如果有文件则将文件添加到directories(ArrayList<String>)并进行排序?
  • ?*??????遍历directories?
  • ?*??????????对每一个文件使用processDemuxPigOutput方法进行处理,完成后调用?
  • ?*??????????movetoMainRepository方法移动文件,然后将文件删除,如果出现异常,?
  • ?*??????????则将文件转移到错误目录?
  • ?*?从$CHUKWA_HOME/conf/chukwa-demux-conf.xml中加载配置项?
  • ?*??chukwa.post.demux.data.loader,得到两个实现类?
  • ?*??org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,0); padding:0px; margin:0px; width:auto; border:0px">?*??org.apache.hadoop.chukwa.dataloader.FSMDataLoader?
  • ?*??遍历配置的实现类?
  • ?*??????分别获取实例,构建查找对象?
  • ?*??????(/chukwa/postProcess/cluster/dataType/*.evt)?
  • ?*??????将查找到的文件交由dataloader处理?
  • boolean?processDemuxPigOutput(String?directory)??
  • ?*?这里将移动到repository的功能整合到了org.apache.hadoop.chukwa.extraction.demux.MoveToRepository中?
  • boolean?movetoMainRepository(String?sourceDirectory,250); line-height:18px"> ????????????String?repoRootDirectory)??
  • ??? org.apache.hadoop.chukwa.extraction.demux.MoveToRepository

    ?*?遍历要转储的目录?
  • ?*????获取cluster名称,过滤掉"_logs"、"_temporary"?
  • ?*????使用processClusterDirectory方法处理每个cluster下的文件?
  • ?*????删除已经完成转储的cluster目录?
  • ?*?遍历cluster目录?
  • ?*????获取dataType名称?
  • ?*????使用processDatasourceDirectory方法处理每个dataType下的文件?
  • void?processClusterDirectory(Path?srcDir,?String?destDir)??
  • ?*?遍历某个dataType下的evt文件?
  • ?*????如果是以".D.evt"结尾的文件?
  • ?*????????获取日期?
  • ?*????????调用writeRecordFile方法写入文件,其中?
  • ?*????????输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/?
  • ?*????????输出文件为<dataType>_<yyyyMMdd>?
  • ?*????如果是以".H.evt"结尾的文件?
  • ?*????????获取日期、小时?
  • ?*????????输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/<HH>/?
  • ?*????????输出文件为<dataType>_<yyyyMMdd>_<HH>?
  • ?*????????调用addDirectory4Rolling方法对日期、小时做标记?
  • ?*????如果是以".R.evt"结尾的文件?
  • ?*????????获取日期、小时、分钟?
  • ?*????????输出目录为/chukwa/repos/<cluster>/<dataType>/<yyyyMMdd>/<HH>/<mm>/?
  • ?*????????输出文件为<dataType>_<yyyyMMdd>_<HH>_<mm>?
  • void?processDatasourceDirectory(String?cluster,?Path?srcDir,0); padding:0px; margin:0px; width:auto; border:0px">?*?将文件转储并改名?
  • void?writeRecordFile(String?destDir,?Path?recordFile,?String?fileName)??
  • ?*?在/chukwa/rolling/目录下生成相应数据类型的目录以便于进行数据合并?
  • ?*?如果不在当前日期,在/chukwa/rolling/下生成?
  • ?*????daily/<yyyyMMdd>/<cluster>/<dataType>?
  • ?*?否则,在/chukwa/rolling/下生成?
  • ?*????hourly/<yyyyMMdd>/<cluster>/<dataType>???
  • void?addDirectory4Rolling(boolean?isDailyOnly,?int?day,85); font-weight:bold">int?hour,250); line-height:18px"> ????????????String?cluster,?String?dataSource)??
  • (4)数据合并
    ??? org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling

    ?*?1.?创建守护进程"PostProcessorManager",0); padding:0px; margin:0px; width:auto; border:0px">?*?2.?初始化处理过程中用到的目录?
  • ?*????rollingFolder?????????????/chukwa/rolling/?
  • ?*????chukwaMainRepository??????/chukwa/repos/?
  • ?*????tempDir???????????????????/chukwa/temp/hourlyRolling/??
  • ?*?3.?从执行参数中获取rollInSequence(是否顺序执行合并线程),deleteRawdata(是否删除原始数据)这两个参数设置的值?
  • ?*?4.?获取当前日期和小时?
  • ?*?5.?遍历?/chukwa/rolling/hourly/?目录?
  • ?*???????获取此目录下的每个日期,对其遍历?
  • ?*??????????获取此目录下的小时,如果此目录日期小于当前日期,或两者相同而且此小时小于当前小时(即只对当前小时之前的数据进行合并)?
  • ?*??????????调用buildHourlyFiles?
  • ?*?获取处理目录?/chukwa/rolling/hourly/workingDay/workingHour?
  • ?*?遍历此目录下所有的cluster?
  • ?*????遍历每个cluster目录下的所有数据类型(dataSource)?
  • ?*???????创建目录?/chukwa/repos/cluster/dataSource/workingDay/workingHour/rotateDone?
  • ?*???????构建合并参数数组?mergeArgs?
  • ?*???????mergeArgs[0]?=?/chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]*?/*.evt?
  • ?*???????mergeArgs[1]?=?/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis?
  • ?*???????mergeArgs[2]?=?/chukwa/repos/cluster/dataSource/workingDay/workingHour?
  • ?*???????mergeArgs[3]?=?dataSource_HourlyDone_workingDay_workingHour?
  • ?*???????mergeArgs[4]?=?/chukwa/rolling/hourly/workingDay/workingHour/cluster/dataSource?
  • ?*???????新建RecordMerger对象merge?
  • ?*???????如果rollInSequence为true,则启动merge线程?
  • ?*???????否则将merge添加到列表并启动线程,等待此线程运行完成?
  • ?*???????删除dataSource目录?
  • ?*????删除cluster目录?
  • ?*?删除hour目录?
  • void?buildHourlyFiles(String?chukwaMainRepository,250); line-height:18px"> ????????????String?tempDir,?String?rollingFolder,85); font-weight:bold">int?workingDay,250); line-height:18px"> ????????????int?workingHour)??
  • ??????????????
  • ?*?使用IdentityMapper作为mapperClass,?使用IdentityReducer作为reducerClass,?只是简单的合并数据?
  • int?run(String[]?args)???
  • ??? org.apache.hadoop.chukwa.extraction.demux.RecordMerger

    ?*?调用ToolRunner.run运行tool,?即调用HourlyChukwaRecordRolling中的run方法,将?
  • ?*?/chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]*?/*.evt?
  • ?*?合并到?
  • ?*?/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis?
  • ?*?如果此任务处理成功?
  • ?*????调用?writeRecordFile()?
  • ?*????如果deleteRawData参数为true?
  • ?*???????删除输入文件?
  • ?*???????/chukwa/repos/cluster/dataSource/workingDay/workingHour/[0-5]*?/*.evt?
  • ?*???????遍历?/chukwa/repos/cluster/dataSource/workingDay/workingHour?
  • ?*??????????删除小时或分钟的目录?
  • ?*??????????/chukwa/repos/cluster/dataSource/workingDay/workingHour[0-5]*?/?
  • ?*????删除合并标记文件?
  • ?*????dataSource_HourlyDone_workingDay_workingHour?
  • ?*????删除临时目录?
  • ?*????/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis?
  • void?run()??
  • ?*?将mapreduce运行结果文件改名转储,?即将HourlyChukwaRecordRolling中的run方法合并后的文件转储?
  • ?*?input????????/chukwa/temp/hourlyRolling/cluster/dataSource/workingDay/workingHour_currentTimeMillis/part-00000?
  • ?*?outputDir????/chukwa/repos/cluster/dataSource/workingDay/workingHour?
  • ?*?filename?????dataSource_HourlyDone_workingDay_workingHour?
  • ?*?destFile?????/chukwa/repos/cluster/dataSource/workingDay/workingHour/dataSource_HourlyDone_workingDay_workingHour.1.evt?
  • void?writeRecordFile(String?input,?String?outputDir,?String?fileName)??
  • (编辑:李大同)

    【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

      推荐文章
        热点阅读