加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

文本挖掘实例

发布时间:2020-12-14 02:47:23 所属栏目:大数据 来源:网络整理
导读:一、开发环境: 1、系统:WIN7 2、IDE:Eclipse 3、Java:jdk1.6 二、所需jar包 1、lucene-core-3.1.0.jar 2、paoding-analysis.jar 3、数据词典 dic 三、集群环境 1、节点:Master(1),Slave(2) 2、系统:RedHat 6.2 3、JDK:jdk1.6 4、Hadoop: Hadoop1.1.2

一、开发环境:

1、系统:WIN7

2、IDE:Eclipse

3、Java:jdk1.6


二、所需jar包

1、lucene-core-3.1.0.jar

2、paoding-analysis.jar

3、数据词典 dic


三、集群环境

1、节点:Master(1),Slave(2)

2、系统:RedHat 6.2

3、JDK:jdk1.6

4、Hadoop: Hadoop1.1.2

5、Mahout: Mahout0.6

6、pig: pig0.11


四、数据准备

1、18.7M,8000+个模型文件

2、19.2M,9000+个测试文件


五、开发步骤

(一)、购建cbayes模型

1、模型文件由8000多个小文件组成,若用MapReduce默认的FileInputFormat读取时,将产生至少8000+个map任务,这样效率将非常低,为了处理小文件的问题,需要自定义FileInputFormat并extends CombineFileInputFormat (将多个小文件组合生成切片).

自定义的CombineFileInputFormat 和 RecordReader 代码如下:

1)、自定义的CombineFileInputFormat?

package fileInputFormat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;


public class MyFileInputFormat extends CombineFileInputFormat<Text,Text>{
@Override
public RecordReader<Text,Text> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException {
CombineFileRecordReader<Text,Text> recordReader = new CombineFileRecordReader<Text,Text>((CombineFileSplit)split,context,MyFileRecordReader.class);

//返回自定义的RecordReader ?
return recordReader;
}
//要求一个文件必须在一个切片中,一个切片可以包含多个文件
@Override
protected boolean isSplitable(JobContext context,Path file) {
return false;
}
}

2)、自定义的RecordReader

package fileInputFormat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.util.ReflectionUtils;


public class MyFileRecordReader extends RecordReader<Text,Text>{
private Text currentKey = new Text(); ? ? ?// 当前的Key
private Text currentValue = new Text(); ? ?// 当前的Value
private Configuration conf; ? ? ? ? ? ? ? ?// 任务信息
private boolean processed; ? ? ? ? ? ? ?// 记录当前文件是否已经读取
private CombineFileSplit split; ? ? ? ? //待处理的任务切片
private int totalLength; ? ? ? ?//切片包含的文件数量
private int index; ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //当前文件在split中的索引
private float currentProgress = 0; ? ?//当前的处理进度

public MyFileRecordReader(CombineFileSplit split,TaskAttemptContext context,Integer index) throws IOException {
? ? ? ?super();
? ? ? ?this.split = split;
? ? ? ?this.index = index; // 当前要处理的小文件Block在CombineFileSplit中的索引
? ? ? ?this.conf = context.getConfiguration();
? ? ? ?this.totalLength = split.getPaths().length;
? ? ? ?this.processed = false;
}
@Override
public void close() throws IOException {
}
@Override
public Text getCurrentKey() throws IOException,InterruptedException {
// TODO Auto-generated method stub
return currentKey;
}
@Override
public Text ?getCurrentValue() throws IOException,InterruptedException {
// TODO Auto-generated method stub
return currentValue;
}
@Override
public float getProgress() throws IOException,InterruptedException {
if (index >= 0 && index < totalLength) {
currentProgress = (float) index / totalLength;
? ? ? ? ?return currentProgress;
?}
return currentProgress;
}
@Override
public void initialize(InputSplit split,TaskAttemptContext context)
throws IOException,InterruptedException {
this.split = (CombineFileSplit) split;
? // 处理CombineFileSplit中的一个小文件Block,因为使用LineRecordReader,需要构造一个FileSplit对象,然后才能够读取数据
// ? ?FileSplit fileSplit = new FileSplit(this.split.getPath(index),this.split.getOffset(index),this.split.getLength(index),this.split.getLocations());
// ? ?lineRecordReader.initialize(fileSplit,context);
// ? ? ? ? ? ?this.paths = this.split.getPaths();
// ? ?totalLength = paths.length;
// ? ?context.getConfiguration().set("map.input.file.name",this.split.getPath(index).getName()); ?
}


//一次读取一个文件的所有内容生成一行
@Override
public boolean nextKeyValue() throws IOException,InterruptedException {
if (!processed) { ? ?// 如果文件未处理则读取文件并设置key-value
// set key
Path file = split.getPath(index);
currentKey.set(file.getParent().getName());
// set value
FSDataInputStream in = null;
byte[] contents = new byte[(int)(split.getLength(index))];
try{
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file);
in.readFully(contents);
currentValue.set(contents);
} catch (Exception e) {
} finally {
in.close();
}
processed = true;
return true;
}
return false; ? ? ? ?//如果文件已经处理,必须返回false
}
}


3)、自定义MapReduce和Run接口的相关代码

package mr;
import java.io.IOException;
import java.io.StringReader;
import net.paoding.analysis.analyzer.PaodingAnalyzer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import fileInputFormat.MyFileInputFormat;

public class CountApp ?extends Configured implements Tool{
public static void main(String[] args) ?throws Exception{
ToolRunner.run(new CountApp(),args);
}
static class MyMapper extends Mapper<Text,Text,Text>
{
Text outValue = new Text();
PaodingAnalyzer analyzer = new PaodingAnalyzer();
@Override
protected void map(Text key,Text value,Mapper<Text,Text>.Context ctx) throws IOException,InterruptedException {
System.out.println(key.toString());
String line = value.toString();
StringReader sr = new StringReader(line);
TokenStream ts = analyzer.tokenStream("",sr);
StringBuilder sb = new StringBuilder();
while(ts.incrementToken())
{
CharTermAttribute ta = ts.getAttribute(CharTermAttribute.class);
sb.append(ta.toString());
sb.append(" ");
}
outValue.set(sb.toString().trim());
ctx.write(key,outValue);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setLong("mapreduce.input.fileinputformat.split.maxsize",4000000);
conf.setInt("mapred.min.split.size",1);
conf.setInt("mapred.reduce.tasks",5);

Job job = new Job(conf,CountApp.class.getSimpleName());
job.setJarByClass(CountApp.class);

//使用自定义的FileInputFormat
job.setInputFormatClass(MyFileInputFormat.class);
job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Path outPath = new Path(args[1]);
Path inPath = new Path(args[0]);

FileSystem ifs = inPath.getFileSystem(conf);
FileStatus[] inPaths = ifs.listStatus(inPath);
for(FileStatus fls:inPaths)
{
FileInputFormat.addInputPath(job,fls.getPath());
}

FileSystem fs = outPath.getFileSystem(conf);
if(fs.exists(outPath))
{
fs.delete(outPath,true);
fs.close();
}
FileOutputFormat.setOutputPath(job,outPath);
job.setOutputFormatClass(TextOutputFormat.class);
job.waitForCompletion(true);
System.exit(0);
return 1;
}
}


4)、生成jar包,并在hadoop上运行

hadoop jar??wordcount.jar ?/yu/news /yu/out/news

原始数据:

http://mp3.zol.com.cn/300/3003150.html
多点推荐 台电平板购机攻略
多点推荐 台电平板购机攻略


  最实惠的,最轻薄的质感,最综合的性能,为的就是让更多人享受数字生活。一直以来台电设计出厂的均能得到市场良好反映,甚至一度达到供不应求的热销情况。

......


生成的数据

MP3 http mp 3 mp3 zol com cn 3.zol.com.cn 289 2897244 html 2897244.html 10 小时 玩 13 a13 普耐 momo 9 momo9 加强 加强版 续航 航测 测试 10 小时 玩 13 a13 普耐 momo 9 momo9 加强 加强版 续航 航测 测试 4 月 情人 季 普耐 推出 全新 7 英寸 13 a13 主控 游戏 平板 momo 9 momo9 加强 加强版 版二 二代 简称 momo 9 momo9 加强 加强版 享受 恒久 真挚 爱情 普耐 momo 9 momo9 加强 加强版 内置 7 寸

.......


5)、根据生成的模型数据,生成cbayes模型

启动pig

processed = load'/yu/news' as (category:chararray,doc:chararray);

test = sample processed 0.2;

jnt = join processed by (category,doc) left outer,test by (category,doc);

filt_test = filter jnt by test::category is null;

train = foreach filt_test generate processed::category as category,processed::doc as doc;

store test into '/yu/model/test';

store train into '/yu/model/train';


执行mahout命令,生成训练模型

mahout trainclassifier

-i /yu/out/model/test

-o /yu/out/train/model/cbayes

-type cbayes

-ng 1

-source hdfs;


(二)、根据生成的cbayes模型,对测试数据进行归类,并找出用户最感兴趣的分类

1、对测试数据进行初始化操作

执行命令:

hadoop jar?wordcount.jar ?/yu/user_sport /yu/out/user_sport

生成的数据如下:

12213800 专访 孙凤 凤武 女篮 人才 面临 断层 奥运 运进 前 进前 8 不易 搜狐 体育 裴力 力张 张亮 月 日 北京 京报 报道 第 届 奥运 运会 即将 开幕 今天 中国 女篮 篮队 首都 机场 出发 奔赴 法国 将在 法国 进行 三场 热身 热身赛 日 奔赴 赴英 英国 利兹 中国 国代 代表 代表团 大 大本 本营 出征 前 之前 中国 女篮 主帅 孙凤 凤武 接受 受了 搜狐 体育 独家 家专 专访 表示 本届 奥运 运会 会中 中国 女篮 大 最大 大的 任务 就是 顺利 完成 新老 交替 一批 岁 上下 下的 年轻 运动 动员 非常 常有 潜力 本届 奥运 运会 会对 对于 她们 来说 说是 是很 很好 好的 锻炼 机会 下届 奥运 运会 她们 正值 值


2、借用mahout的API,重定义mapreduce,载入 cbayes模型

1)、编写类,生成main函数调用接口,相关代码如下:

package mahout;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.classifier.bayes.BayesParameters;


public class BayesDriver extends Configured implements Tool{


public static void main(String[] args) throws Exception{
ToolRunner.run(new BayesDriver(),args);
}


@Override
public int run(String[] args) throws Exception {
BayesParameters params = new BayesParameters();
params.set("classifierType",args[3]);
params.set("alpha_i","1.0");
params.set("defaultCat","unknown");
params.setGramSize(1);
params.setBasePath(args[2]);

Configuration conf = new Configuration();
conf.set("bayes.parameters",params.toString());
Job job = new Job(conf,BayesDriver.class.getSimpleName());
job.setJarByClass(BayesDriver.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

Path inPath = new Path(args[0]);
Path outPath = new Path(args[1]);
FileInputFormat.addInputPath(job,inPath);
FileOutputFormat.setOutputPath(job,outPath);

FileSystem fs = outPath.getFileSystem(conf);
if(fs.exists(outPath))
{
fs.delete(outPath,true);
fs.close();
}
job.waitForCompletion(true);
System.exit(1);
return 1;
}
}

2)、编写自定义的MAPPER类

package mahout;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.classifier.ClassifierResult;
import org.apache.mahout.classifier.bayes.Algorithm;
import org.apache.mahout.classifier.bayes.BayesAlgorithm;
import org.apache.mahout.classifier.bayes.BayesParameters;
import org.apache.mahout.classifier.bayes.CBayesAlgorithm;
import org.apache.mahout.classifier.bayes.ClassifierContext;
import org.apache.mahout.classifier.bayes.Datastore;
import org.apache.mahout.classifier.bayes.InMemoryBayesDatastore;
import org.apache.mahout.classifier.bayes.InvalidDatastoreException;
import org.apache.mahout.common.nlp.NGrams;


public class MyMapper extends Mapper<Text,LongWritable>{
private ClassifierContext classifier ;
private String defaultCategory;
private int gramSize = 1;
Text outKey = new Text();
LongWritable one = new LongWritable(1);
@Override
protected void setup(Context ctx)
throws IOException,InterruptedException {
Configuration conf = ctx.getConfiguration();
BayesParameters params = new BayesParameters(conf.get("bayes.parameters",""));
Algorithm algorithm;
Datastore datastore;
if("bayes".equalsIgnoreCase(params.get("classifierType"))) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
{
algorithm = new BayesAlgorithm();
datastore = new InMemoryBayesDatastore(params);
}
else if("cbayes".equalsIgnoreCase(params.get("classifierType")))
{
algorithm = new CBayesAlgorithm();
datastore = new InMemoryBayesDatastore(params);
}
else
{
throw new IllegalArgumentException("Unrecognized classifier Type: "+params.get("classifierType"));
}

classifier = new ClassifierContext(algorithm,datastore);
try {
classifier.initialize();
} catch (InvalidDatastoreException e) {
e.printStackTrace();
}
defaultCategory = params.get("defaultCat");
gramSize = params.getGramSize();
}

@Override
protected void map(Text key,
Mapper<Text,LongWritable>.Context ctx)
throws IOException,InterruptedException {
String docLabel = "";
String userId = key.toString();
List<String> ngrams = new NGrams(value.toString(),gramSize).generateNGramsWithoutLabel();
ClassifierResult result;
try {
result = classifier.classifyDocument(ngrams.toArray(new String[ngrams.size()]),defaultCategory);
docLabel = result.getLabel();
} catch (InvalidDatastoreException e) {
e.printStackTrace();
}
outKey.set(userId + "|" + docLabel );
ctx.write(outKey,one);
}


}


3)、编写自定义的Reduce类

package mahout;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text,LongWritable,NullWritable,Text>{
private Text outValue = new Text();
@Override
protected void reduce(Text key,Iterable<LongWritable> vals,
Reducer<Text,Text>.Context ctx)
throws IOException,InterruptedException {
long sum = 0;
for(LongWritable lw:vals)
{
sum+=lw.get();
}
outValue.set(key.toString()+"|"+sum);
ctx.write(NullWritable.get(),outValue);
}
}


3、将项目打成jar包,执行命令

hadoop jar?cbayesSort.jar ?

?>>?/yu/out/user_sport ?

?>>?/yu/out/user_info

>>?/yu/model/cbayes ? ? ? ? //生成的cbayes模型存放地址

>> cbayes;

生成的数据集如下:?

10511838|camera|7
10511838|household|2
10511838|mobile|53
10564290|camera|4
10564290|household|4
10564290|mobile|80
107879|camera|8
107879|household|1
107879|mobile|83
11516148|camera|12
11516148|household|1

......

4、启动pig,执行命令

u_ct = load'/yu/out/user_info' using PigStorage('|') as (userId:chararray,category:chararray,visitNums:int);

u_stat = foreach u_ct (group u_ct by userId) {

sorted = order u_ct by visitNums desc;

top = limit sorted 1;

generate flatten(top),SUM(u_ct.visitNums);?

}

store u_stat into '/yu/out/user_info_stort';

生成的数据如下(用户ID 分类 该类访问次数 访问总次数)

10511838 mobile 53 62
10564290 mobile 80 88
107879 mobile 83 92
11516148 mobile 80 93
11837625 mobile 91 100
11845829 mobile 161 183
11884229 mobile 117 125
12210378 mobile 106 128
12213800 mobile 140 165
12301189 mobile 146 163

......

依此数据即可以看出每个用户最可能的兴趣归类,如10511838,最感兴趣的话题为数码科技等。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读