加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

文本挖掘分词mapreduce化

发布时间:2020-12-14 02:13:17 所属栏目:大数据 来源:网络整理
导读:软件版本 paoding-analysis3.0 项目jar包和拷贝庖丁dic目录到项目的类路径下 修改paoding-analysis.jar下的paoding-dic-home.properties文件设置词典文件路径 paoding.dic.home=classpath:dic 分词程序demo import?java.io.IOException;import?java.io.Strin

软件版本

paoding-analysis3.0


wKiom1ZIMLnSpXvwAABoYgiZJjY303.png


项目jar包和拷贝庖丁dic目录到项目的类路径下


wKioL1ZIOyzR7XXgAAAyReDtVSY178.png


修改paoding-analysis.jar下的paoding-dic-home.properties文件设置词典文件路径

paoding.dic.home=classpath:dic


分词程序demo

import?java.io.IOException;
import?java.io.StringReader;

import?org.apache.lucene.analysis.TokenStream;
import?org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

import?net.paoding.analysis.analyzer.PaodingAnalyzer;


public?class?TokenizeWithPaoding?{
public?static?void?main(String[]?args)?{
????
????String?line="中华民族共和国";
????PaodingAnalyzer?analyzer?=new?PaodingAnalyzer();
????StringReader?sr=new?StringReader(line);
????TokenStream?ts=analyzer.tokenStream("",?sr);//分词流,第一个参数无意义
????//迭代分词流
????try?{
????????while(ts.incrementToken()){
????????????CharTermAttribute?ta=ts.getAttribute(CharTermAttribute.class);
????????????System.out.println(ta.toString());
????????}
????}?catch?(Exception?e)?{
????????
????????e.printStackTrace();
????}
}
}



新闻文文本分类源文件

http://people.csail.mit.edu/jrennie/20Newsgroups/20news-bydate.tar.gz


每个文件夹代表一个类别,每个类别下的文件代表一条新闻

wKioL1ZIRtvSi4DQAABoW3rzr3Q384.png

中文新闻分类需要先分词


对于大量小文件可以使用FileInputFormat的另一个抽象子类CombineFileInputFormat实现createRecordReader方法

CombineFileInputFormat重写了getSpilt方法,返回的分片类型是CombineFileSpilt,是InputSpilt的子类,可包含多个文件


RecordReader怎么由文件生成key-value是由nextKeyValue函数决定


自定义的CombineFileInputFormat类

package?org.conan.myhadoop.fengci;



import?java.io.IOException;

import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.InputSplit;
import?org.apache.hadoop.mapreduce.JobContext;
import?org.apache.hadoop.mapreduce.RecordReader;
import?org.apache.hadoop.mapreduce.TaskAttemptContext;
import?org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import?org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import?org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
?*?自定义MyInputFormat类,?用于实现一个Split包含多个文件
?*?@author?BOB
?*
?*/
public?class?MyInputFormat?extends?CombineFileInputFormat<Text,?Text>{
????????
????????//禁止文件切分
????????@Override
????????protected?boolean?isSplitable(JobContext?context,?Path?file)?{
????????????????return?false;
????????}

????????@Override
????????public?RecordReader<Text,?Text>?createRecordReader(InputSplit?split,?TaskAttemptContext?context)?throws?IOException?{
????????????????return?new?CombineFileRecordReader<Text,?Text>((CombineFileSplit)split,?context,?MyRecordReader.class);
????????}

}



自定义的RecordReader类

package?org.conan.myhadoop.fengci;

import?java.io.IOException;

import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.FSDataInputStream;
import?org.apache.hadoop.fs.FileSystem;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.InputSplit;
import?org.apache.hadoop.mapreduce.RecordReader;
import?org.apache.hadoop.mapreduce.TaskAttemptContext;
import?org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
?*?自定义MyRecordReader类,?用于读取MyInputFormat对象切分的Split分片中的内容
?*?@author?BOB
?*
?*/
public?class?MyRecordReader?extends?RecordReader<Text,?Text>?{

????????private?CombineFileSplit?combineFileSplit;????????????????//当前处理的分片
????????private?Configuration?conf;????????????????????????//作业的配置信息
????????private?Text?currentKey?=?new?Text();????????????????//当前读入的key
????????private?Text?currentValue?=?new?Text();????????//当前读入的value
????????private?int?totalLength;????????????????????????//当前分片中文件的数量
????????private?int?currentIndex;????????????????????????????????//正在读取的文件在当前分片中的位置索引
????????private?float?currentProgress?=?0F;????????????????//当前进度
????????private?boolean?processed?=?false;????????//标记当前文件是否已经被处理过
????????
????????//构造方法
????????public?MyRecordReader(CombineFileSplit?combineFileSplit,????????????????????????TaskAttemptContext?context,?Integer?fileIndex)?{
????????????????super();
????????????????this.combineFileSplit?=?combineFileSplit;
????????????????this.currentIndex?=?fileIndex;
????????????????this.conf?=?context.getConfiguration();
????????????????this.totalLength?=?combineFileSplit.getPaths().length;
????????}

?????
????????@Override
????????public?void?initialize(InputSplit?split,?TaskAttemptContext?context)?throws?IOException,?InterruptedException?{

????????}
????????@Override
????????public?Text?getCurrentKey()?throws?IOException,?InterruptedException?{
????????????????return?currentKey;
????????}

????????@Override
????????public?Text?getCurrentValue()?throws?IOException,?InterruptedException?{
????????????????return?currentValue;
????????}

????????@Override
????????public?float?getProgress()?throws?IOException,?InterruptedException?{
????????????????if(currentIndex?>=?0?&&?currentIndex?<?totalLength)?{
????????????????????????return?currentProgress?=?(float)?currentIndex/totalLength;
????????????????}
????????????????return?currentProgress;
????????}

????????@Override
????????public?void?close()?throws?IOException?{

????????}


????????@Override
????????public?boolean?nextKeyValue()?throws?IOException,?InterruptedException?{
????????????????if(!processed)?{
????????????????????????//由文件的父目录,?文件名以及目录分割符组成key
????????????????????????Path?file?=?combineFileSplit.getPath(currentIndex);
????????????????????????StringBuilder?sb?=?new?StringBuilder();
????????????????????????sb.append("/");
????????????????????????sb.append(file.getParent().getName()).append("/");
????????????????????????sb.append(file.getName());
????????????????????????currentKey.set(sb.toString());
????????????????????????
????????????????????????//以整个文件的内容作为value
????????????????????????FSDataInputStream?in?=?null;
????????????????????????byte[]?content?=?new?byte[(int)combineFileSplit.getLength(currentIndex)];
????????????????????????FileSystem?fs?=?file.getFileSystem(conf);
????????????????????????in?=?fs.open(file);
????????????????????????in.readFully(content);
????????????????????????currentValue.set(content);
????????????????????????in.close();
????????????????????????processed?=?true;
????????????????????????return?true;
????????????????}
????????????????return?false;
????????}

}



分词驱动类

package?org.conan.myhadoop.fengci;

import?java.io.IOException;
import?java.io.StringReader;



import?net.paoding.analysis.analyzer.PaodingAnalyzer;

import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.conf.Configured;
import?org.apache.hadoop.fs.FileStatus;
import?org.apache.hadoop.fs.FileSystem;
import?org.apache.hadoop.fs.FileUtil;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import?org.apache.hadoop.util.Tool;
import?org.apache.hadoop.util.ToolRunner;
import?org.apache.lucene.analysis.Analyzer;
import?org.apache.lucene.analysis.TokenStream;
import?org.apache.lucene.analysis.tokenattributes.CharTermAttribute;



/**
?*?分词驱动器类,?用于给输入文件进行分词
?*?@author?BOB
?*
?*/
public?class?TokenizerDriver?extends?Configured?implements?Tool{

????????
????????public?static?void?main(String[]?args)?throws?Exception{
????????????????int?res?=?ToolRunner.run(new?Configuration(),?new?TokenizerDriver(),?args);
????????????????System.exit(res);
????????}

????????@Override
????????public?int?run(String[]?args)?throws?Exception?{
????????????????Configuration?conf?=?new?Configuration();
????????????????//参数设置
????????????????conf.setLong("mapreduce.input.fileinputformat.split.maxsize",?4000000);
??????????????//作业名称
????????????????Job?job?=?new?Job(conf,"Tokenizer");
????????????????job.setJarByClass(TokenizerDriver.class);
????????????????
????????????????job.setMapperClass(Map.class);
????????????????
????????????????job.setInputFormatClass(MyInputFormat.class);
????????????????
????????????????job.setOutputFormatClass(SequenceFileOutputFormat.class);
????????????????
????????????????job.setOutputKeyClass(Text.class);
????????????????job.setOutputValueClass(Text.class);
????????????????
????????????????Path?inpath=new?Path(args[0]);
????????????????Path?outpath=new?Path(args[1]);
????????????????FileSystem?fs?=?inpath.getFileSystem(conf);
????????????????FileStatus[]?status?=?fs.listStatus(inpath);
????????????????Path[]?paths?=?FileUtil.stat2Paths(status);
????????????????for(Path?path?:?paths)?{
????????????????????????FileInputFormat.addInputPath(job,?path);
????????????????}
????????????????FileOutputFormat.setOutputPath(job,?outpath);
????????????????
????????????????//输出文件夹已经存在则删除
????????????????FileSystem?hdfs?=?outpath.getFileSystem(conf);
????????????????if(hdfs.exists(outpath)){
????????????????????hdfs.delete(outpath,true);
????????????????????hdfs.close();
????????????????}
????????????????//没有Reduce任务
????????????????job.setNumReduceTasks(0);?
????????????????return?job.waitForCompletion(true)???0?:?1;
????????}
????????
????????/**
?????????*?Hadoop计算框架下的Map类,?用于并行处理文本分词任务
?????????*?@author?BOB
?????????*
?????????*/
????????static?class?Map?extends?Mapper<Text,?Text,?Text>?{
????????????????
????????????????@Override
????????????????protected?void?map(Text?key,?Text?value,?Context?context)?throws?IOException,?InterruptedException?{
????????????????????????//创建分词器
????????????????????????Analyzer?analyzer?=?new?PaodingAnalyzer();
????????????????????????String?line?=?value.toString();
????????????????????????StringReader?reader?=?new?StringReader(line);
????????????????????????//获取分词流对象
????????????????????????TokenStream?ts?=?analyzer.tokenStream("",?reader);
????????????????????????StringBuilder?sb?=?new?StringBuilder();
????????????????????????
????????????????????????//遍历分词流中的词语
????????????????????????while(ts.incrementToken())?{
????????????????????????????????CharTermAttribute?ta?=?ts.getAttribute(CharTermAttribute.class);
????????????????????????????????if(sb.length()?!=?0)?{
????????????????????????????????????????sb.append("?").append(ta.toString());
????????????????????????????????}?else?{
????????????????????????????????????????sb.append(ta.toString());
????????????????????????????????}
????????????????????????}
????????????????????????value.set(sb.toString());
????????????????????????context.write(key,?value);
????????????????}
????????????????
????????}
}


分词预先处理结果,将所有新闻集中到一个文本中,key为类别,一行代表一篇新闻,单词之间用空格分开

wKiom1ZIYcrC9P5iAAkjLJnMLkE116.png

处理后的数据可用于mahout做贝叶斯分类器


参考文章:


http://f.dataguru.cn/thread-244375-1-1.html

http://www.cnblogs.com/panweishadow/p/4320720.html

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读