文本挖掘分词mapreduce化
软件版本 paoding-analysis3.0 项目jar包和拷贝庖丁dic目录到项目的类路径下 修改paoding-analysis.jar下的paoding-dic-home.properties文件设置词典文件路径 paoding.dic.home=classpath:dic 分词程序demo import?java.io.IOException; import?java.io.StringReader; import?org.apache.lucene.analysis.TokenStream; import?org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import?net.paoding.analysis.analyzer.PaodingAnalyzer; public?class?TokenizeWithPaoding?{ public?static?void?main(String[]?args)?{ ???? ????String?line="中华民族共和国"; ????PaodingAnalyzer?analyzer?=new?PaodingAnalyzer(); ????StringReader?sr=new?StringReader(line); ????TokenStream?ts=analyzer.tokenStream("",?sr);//分词流,第一个参数无意义 ????//迭代分词流 ????try?{ ????????while(ts.incrementToken()){ ????????????CharTermAttribute?ta=ts.getAttribute(CharTermAttribute.class); ????????????System.out.println(ta.toString()); ????????} ????}?catch?(Exception?e)?{ ???????? ????????e.printStackTrace(); ????} } } 新闻文文本分类源文件 http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz 每个文件夹代表一个类别,每个类别下的文件代表一条新闻 中文新闻分类需要先分词 对于大量小文件可以使用FileInputFormat的另一个抽象子类CombineFileInputFormat实现createRecordReader方法 CombineFileInputFormat重写了getSpilt方法,返回的分片类型是CombineFileSpilt,是InputSpilt的子类,可包含多个文件 RecordReader怎么由文件生成key-value是由nextKeyValue函数决定 自定义的CombineFileInputFormat类 package?org.conan.myhadoop.fengci; import?java.io.IOException; import?org.apache.hadoop.fs.Path; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.mapreduce.InputSplit; import?org.apache.hadoop.mapreduce.JobContext; import?org.apache.hadoop.mapreduce.RecordReader; import?org.apache.hadoop.mapreduce.TaskAttemptContext; import?org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import?org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import?org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; /** ?*?自定义MyInputFormat类,?用于实现一个Split包含多个文件 ?*?@author?BOB ?* ?*/ public?class?MyInputFormat?extends?CombineFileInputFormat<Text,?Text>{ ???????? ????????//禁止文件切分 ????????@Override ????????protected?boolean?isSplitable(JobContext?context,?Path?file)?{ ????????????????return?false; ????????} ????????@Override ????????public?RecordReader<Text,?Text>?createRecordReader(InputSplit?split,?TaskAttemptContext?context)?throws?IOException?{ ????????????????return?new?CombineFileRecordReader<Text,?Text>((CombineFileSplit)split,?context,?MyRecordReader.class); ????????} } 自定义的RecordReader类 package?org.conan.myhadoop.fengci; import?java.io.IOException; import?org.apache.hadoop.conf.Configuration; import?org.apache.hadoop.fs.FSDataInputStream; import?org.apache.hadoop.fs.FileSystem; import?org.apache.hadoop.fs.Path; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.mapreduce.InputSplit; import?org.apache.hadoop.mapreduce.RecordReader; import?org.apache.hadoop.mapreduce.TaskAttemptContext; import?org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; /** ?*?自定义MyRecordReader类,?用于读取MyInputFormat对象切分的Split分片中的内容 ?*?@author?BOB ?* ?*/ public?class?MyRecordReader?extends?RecordReader<Text,?Text>?{ ????????private?CombineFileSplit?combineFileSplit;????????????????//当前处理的分片 ????????private?Configuration?conf;????????????????????????//作业的配置信息 ????????private?Text?currentKey?=?new?Text();????????????????//当前读入的key ????????private?Text?currentValue?=?new?Text();????????//当前读入的value ????????private?int?totalLength;????????????????????????//当前分片中文件的数量 ????????private?int?currentIndex;????????????????????????????????//正在读取的文件在当前分片中的位置索引 ????????private?float?currentProgress?=?0F;????????????????//当前进度 ????????private?boolean?processed?=?false;????????//标记当前文件是否已经被处理过 ???????? ????????//构造方法 ????????public?MyRecordReader(CombineFileSplit?combineFileSplit,????????????????????????TaskAttemptContext?context,?Integer?fileIndex)?{ ????????????????super(); ????????????????this.combineFileSplit?=?combineFileSplit; ????????????????this.currentIndex?=?fileIndex; ????????????????this.conf?=?context.getConfiguration(); ????????????????this.totalLength?=?combineFileSplit.getPaths().length; ????????} ????? ????????@Override ????????public?void?initialize(InputSplit?split,?TaskAttemptContext?context)?throws?IOException,?InterruptedException?{ ????????} ????????@Override ????????public?Text?getCurrentKey()?throws?IOException,?InterruptedException?{ ????????????????return?currentKey; ????????} ????????@Override ????????public?Text?getCurrentValue()?throws?IOException,?InterruptedException?{ ????????????????return?currentValue; ????????} ????????@Override ????????public?float?getProgress()?throws?IOException,?InterruptedException?{ ????????????????if(currentIndex?>=?0?&&?currentIndex?<?totalLength)?{ ????????????????????????return?currentProgress?=?(float)?currentIndex/totalLength; ????????????????} ????????????????return?currentProgress; ????????} ????????@Override ????????public?void?close()?throws?IOException?{ ????????} ????????@Override ????????public?boolean?nextKeyValue()?throws?IOException,?InterruptedException?{ ????????????????if(!processed)?{ ????????????????????????//由文件的父目录,?文件名以及目录分割符组成key ????????????????????????Path?file?=?combineFileSplit.getPath(currentIndex); ????????????????????????StringBuilder?sb?=?new?StringBuilder(); ????????????????????????sb.append("/"); ????????????????????????sb.append(file.getParent().getName()).append("/"); ????????????????????????sb.append(file.getName()); ????????????????????????currentKey.set(sb.toString()); ???????????????????????? ????????????????????????//以整个文件的内容作为value ????????????????????????FSDataInputStream?in?=?null; ????????????????????????byte[]?content?=?new?byte[(int)combineFileSplit.getLength(currentIndex)]; ????????????????????????FileSystem?fs?=?file.getFileSystem(conf); ????????????????????????in?=?fs.open(file); ????????????????????????in.readFully(content); ????????????????????????currentValue.set(content); ????????????????????????in.close(); ????????????????????????processed?=?true; ????????????????????????return?true; ????????????????} ????????????????return?false; ????????} } 分词驱动类 package?org.conan.myhadoop.fengci; import?java.io.IOException; import?java.io.StringReader; import?net.paoding.analysis.analyzer.PaodingAnalyzer; import?org.apache.hadoop.conf.Configuration; import?org.apache.hadoop.conf.Configured; import?org.apache.hadoop.fs.FileStatus; import?org.apache.hadoop.fs.FileSystem; import?org.apache.hadoop.fs.FileUtil; import?org.apache.hadoop.fs.Path; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.mapreduce.Job; import?org.apache.hadoop.mapreduce.Mapper; import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import?org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import?org.apache.hadoop.util.Tool; import?org.apache.hadoop.util.ToolRunner; import?org.apache.lucene.analysis.Analyzer; import?org.apache.lucene.analysis.TokenStream; import?org.apache.lucene.analysis.tokenattributes.CharTermAttribute; /** ?*?分词驱动器类,?用于给输入文件进行分词 ?*?@author?BOB ?* ?*/ public?class?TokenizerDriver?extends?Configured?implements?Tool{ ???????? ????????public?static?void?main(String[]?args)?throws?Exception{ ????????????????int?res?=?ToolRunner.run(new?Configuration(),?new?TokenizerDriver(),?args); ????????????????System.exit(res); ????????} ????????@Override ????????public?int?run(String[]?args)?throws?Exception?{ ????????????????Configuration?conf?=?new?Configuration(); ????????????????//参数设置 ????????????????conf.setLong("mapreduce.input.fileinputformat.split.maxsize",?4000000); ??????????????//作业名称 ????????????????Job?job?=?new?Job(conf,"Tokenizer"); ????????????????job.setJarByClass(TokenizerDriver.class); ???????????????? ????????????????job.setMapperClass(Map.class); ???????????????? ????????????????job.setInputFormatClass(MyInputFormat.class); ???????????????? ????????????????job.setOutputFormatClass(SequenceFileOutputFormat.class); ???????????????? ????????????????job.setOutputKeyClass(Text.class); ????????????????job.setOutputValueClass(Text.class); ???????????????? ????????????????Path?inpath=new?Path(args[0]); ????????????????Path?outpath=new?Path(args[1]); ????????????????FileSystem?fs?=?inpath.getFileSystem(conf); ????????????????FileStatus[]?status?=?fs.listStatus(inpath); ????????????????Path[]?paths?=?FileUtil.stat2Paths(status); ????????????????for(Path?path?:?paths)?{ ????????????????????????FileInputFormat.addInputPath(job,?path); ????????????????} ????????????????FileOutputFormat.setOutputPath(job,?outpath); ???????????????? ????????????????//输出文件夹已经存在则删除 ????????????????FileSystem?hdfs?=?outpath.getFileSystem(conf); ????????????????if(hdfs.exists(outpath)){ ????????????????????hdfs.delete(outpath,true); ????????????????????hdfs.close(); ????????????????} ????????????????//没有Reduce任务 ????????????????job.setNumReduceTasks(0);? ????????????????return?job.waitForCompletion(true)???0?:?1; ????????} ???????? ????????/** ?????????*?Hadoop计算框架下的Map类,?用于并行处理文本分词任务 ?????????*?@author?BOB ?????????* ?????????*/ ????????static?class?Map?extends?Mapper<Text,?Text,?Text>?{ ???????????????? ????????????????@Override ????????????????protected?void?map(Text?key,?Text?value,?Context?context)?throws?IOException,?InterruptedException?{ ????????????????????????//创建分词器 ????????????????????????Analyzer?analyzer?=?new?PaodingAnalyzer(); ????????????????????????String?line?=?value.toString(); ????????????????????????StringReader?reader?=?new?StringReader(line); ????????????????????????//获取分词流对象 ????????????????????????TokenStream?ts?=?analyzer.tokenStream("",?reader); ????????????????????????StringBuilder?sb?=?new?StringBuilder(); ???????????????????????? ????????????????????????//遍历分词流中的词语 ????????????????????????while(ts.incrementToken())?{ ????????????????????????????????CharTermAttribute?ta?=?ts.getAttribute(CharTermAttribute.class); ????????????????????????????????if(sb.length()?!=?0)?{ ????????????????????????????????????????sb.append("?").append(ta.toString()); ????????????????????????????????}?else?{ ????????????????????????????????????????sb.append(ta.toString()); ????????????????????????????????} ????????????????????????} ????????????????????????value.set(sb.toString()); ????????????????????????context.write(key,?value); ????????????????} ???????????????? ????????} } 分词预先处理结果,将所有新闻集中到一个文本中,key为类别,一行代表一篇新闻,单词之间用空格分开 处理后的数据可用于mahout做贝叶斯分类器 参考文章: http://f.dataguru.cn/thread-244375-1-1.html http://www.cnblogs.com/panweishadow/p/4320720.html (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |