Hadoop MultipleOutputs输出到多个文件中的实现方法
Hadoop MultipleOutputs输出到多个文件中的实现方法 1.输出到多个文件或多个文件夹: 驱动中不需要额外改变,只需要在MapClass或Reduce类中加入如下代码 private MultipleOutputs<Text,IntWritable> mos; public void setup(Context context) throws IOException,InterruptedException { mos = new MultipleOutputs(context); } public void cleanup(Context context) throws IOException,InterruptedException { mos.close(); } 然后就可以用mos.write(Key key,Value value,String baSEOutputPath)代替context.write(key,value); 在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0. 而且只有part-m-00*会传给Reduce。 注意:multipleOutputs.write(key,value,baSEOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。 如果baSEOutputPath不包含文件分隔符“/”,那么输出的文件格式为baSEOutputPath-r-nnnnn(name-r-nnnnn); 2.案例-需求 需求,下面是有些测试数据,要对这些数据按类目输出到output中: 1512,iphone5s,4英寸,指纹识别,A7处理器,64位,M7协处理器,低功耗 1512,iphone5,A6处理器,IOS7 1512,iphone4s,3.5英寸,A5处理器,双核,经典 50019780,ipad,9.7英寸,retina屏幕,丰富的应用 50019780,yoga,联想,待机18小时,外形独特 50019780,nexus 7,华硕&google,7英寸 50019780,ipad mini 2,retina显示屏,苹果,7.9英寸 1101,macbook air,苹果超薄,OS X mavericks 1101,macbook pro,OS X lion 1101,thinkpad yoga,windows 8,超级本 3.Mapper程序: package cn.edu.bjut.multioutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MultiOutPutMapper extends Mapper<LongWritable,Text,IntWritable,Text> { @Override protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String line = value.toString().trim(); if(null != line && 0 != line.length()) { String[] arr = line.split(","); context.write(new IntWritable(Integer.parseInt(arr[0])),value); } } } 4.Reducer程序: package cn.edu.bjut.multioutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class MultiOutPutReducer extends Reducer<IntWritable,NullWritable,Text> { private MultipleOutputs<NullWritable,Text> multipleOutputs = null; @Override protected void reduce(IntWritable key,Iterable<Text> values,InterruptedException { for(Text text : values) { multipleOutputs.write("KeySpilt",NullWritable.get(),text,key.toString()+"/"); multipleOutputs.write("AllPart",text); } } @Override protected void setup(Context context) throws IOException,InterruptedException { multipleOutputs = new MultipleOutputs<NullWritable,Text>(context); } @Override protected void cleanup(Context context) throws IOException,InterruptedException { if(null != multipleOutputs) { multipleOutputs.close(); multipleOutputs = null; } } } 5.主程序: package cn.edu.bjut.multioutput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MainJob { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf,"aaa"); job.setJarByClass(MainJob.class); job.setMapperClass(MultiOutPutMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MultiOutPutReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path(args[0])); MultipleOutputs.addNamedOutput(job,"KeySpilt",TextOutputFormat.class,NullWritable.class,Text.class); MultipleOutputs.addNamedOutput(job,"AllPart",Text.class); Path outPath = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(outPath)) { fs.delete(outPath,true); } FileOutputFormat.setOutputPath(job,outPath); job.waitForCompletion(true); } } 如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持! (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- java – 如何从命令行打开IntelliJ IDEA中的Play应用程序?
- java – SOAP webservice和RESTFUL webservice之间的区别
- java web中图片验证码功能的简单实现方法
- java – 为什么使用int参数的方法考虑数值?
- Java基于Scanner对象的简单输入计算功能示例
- 可以将对象数组视为Java中的对象集合吗?
- 初始化程序块和变量定义等执行什么顺序? (在java中)
- java GUI 快速入门
- java – jaxb2-maven-plugin将classpath添加到生成的源
- Spring3.1.1+MyBatis3.1.1的增、删、查、改以及分页和事务管