用PHP和Shell写Hadoop的MapReduce程序
使得任何支持标准IO (stdin,stdout)的可执行程序都能成为hadoop的mapper或者 reducer。例如: 代码如下: hadoop jar hadoop-streaming.jar -input SOME_INPUT_DIR_OR_FILE -output SOME_OUTPUT_DIR -mapper /bin/cat -reducer /usr/bin/wc
在这个例子里,就使用了Unix/Linux自带的cat和wc工具来作为mapper / reducer,是不是很神奇? 如果你习惯了使用一些动态语言,用动态语言来写mapreduce吧,跟之前的编程没有任何不同,hadoop只是运行它的一个框架,下面我演示一下用PHP来实现Word Counter的mapreduce。 一、找到Streaming jarHadoop根目录下是没有hadoop-streaming.jar的,因为streaming是一个contrib,所以要去contrib下面找,以hadoop-0.20.2为例,它在这里: 代码如下: 二、写Mapper新建一个wc_mapper.php,写入如下代码: 代码如下: #!/usr/bin/php
$in = fopen(“php://stdin”,“r”); $results = array(); while ( $line = fgets($in,4096) ) { $words = preg_split(‘/W/',$line,PREG_SPLIT_NO_EMPTY); foreach ($words as $word) $results[] = $word; } fclose($in); foreach ($results as $key => $value) { print “$valuet1n”; } 这段代码的大致意思是:把输入的每行文本中的单词找出来,并以” 和之前写的PHP基本没有什么不同,对吧,可能稍微让你感到陌生有两个地方: PHP作为可执行程序第一行的“#!/usr/bin/php”告诉linux,要用/usr/bin/php这个程序作为以下代码的解释器。写过linux shell的人应该很熟悉这种写法了,每个shell脚本的第一行都是这样: #!/bin/bash,#!/usr/bin/python 有了这一行,保存好这个文件以后,就可以像这样直接把wc_mapper.php当作cat,grep一样的命令执行了:./wc_mapper.php 使用stdin接收输入PHP支持多种参数传入的方法,大家最熟悉的应该是从$_GET,$_POST超全局变量里面取通过Web传递的参数,次之是从$_SERVER['argv']里取通过命令行传入的参数,这里,采用的是标准输入stdin 它的使用效果是: 在linux控制台输入 ./wc_mapper.php wc_mapper.php运行,控制台进入等候用户键盘输入状态 用户通过键盘输入文本用户按下Ctrl + D终止输入,wc_mapper.php开始执行真正的业务逻辑,并将执行结果输出 那么stdout在哪呢?print本身已经就是stdout啦,跟我们以前写web程序和CLI脚本没有任何不同。 三、写Reducer新建一个wc_reducer.php,写入如下代码: 代码如下: #!/usr/bin/php
$in = fopen(“php://stdin”,4096) ) { list($key,$value) = preg_split(“/t/”,trim($line),2); $results[$key] += $value; } fclose($in); ksort($results); foreach ($results as $key => $value) { print “$keyt$valuen”; } 这段代码的大意是统计每个单词出现了多少次,并以” 四、用Hadoop来运行上传要统计的示例文本 代码如下: hadoop fs -put *.TXT /tmp/input
以Streaming方式执行PHP mapreduce程序 代码如下: 注意: input和output目录是在hdfs上的路径 mapper和reducer是在本地机器的路径,一定要写绝对路径,不要写相对路径,以免到时候hadoop报错说找不到mapreduce程序。 查看结果 代码如下: 五、shell版的Hadoop MapReduce程序 代码如下: #!/bin/bash -
# 加载配置文件 # 处理命令行参数 # 默认处理日期为昨天 # 最终处理日期. 如果日期格式不对,则退出执行 # 待处理文件 # 如果待处理文件数目为零,则退出执行 # 输入文件列表 function map_reduce () { # 循环处理每一个bucket (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |