准备工作:
1、hadoop集群;
2、mapper和reducer划分;
初步把提取url的操作划分为mapper中执行,随机挑选5条由reducer去执行。
首先看下MR的工作原理:
![使用Python和Hadoop Streaming编写MapReduce-LMLPHP 使用Python和Hadoop Streaming编写MapReduce-LMLPHP](https://c1.lmlphp.com/user/master/2020/09/25/son_1/e116bd91bb9d77486774641760fcd0ae.jpg)
上图是MR的workflow,在介绍Hadoop Streaming的时候,可以拿来做参照。
Hadoop 和 MapReduce已经如日中天。Hadoop 不仅可以使用Java进行MapReduce的编写,也通过Hadoop Streaming的方式提供了其他语言编写MR的接口。更重要的是,使用python来编写MR,比使用亲儿子Java编写MR要更简单和方便……所以在一些不非常复杂的任务中使用python来编写MR比起使用Java,是更加划算的。
Hadoop Streaming:
Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力。Hadoop Streaming比较独特的一点是利用的UNIX标准输入输出stdin和stdout,所以只要能处理stdin和stdout的编程语言都能够使用Hadoop Streaming来进行MR的编写。甚至,wc、awk这些linux自带的能处理标准输入输出的程序,也能被用来编写Hadoop Streaming。
怎么工作的呢:
Hadoop Streaming 提供了一个hadoop-streaming.jar,默认处于$HADOOP_HOME目录下。如果不在可以自己搜索下,然后使用Hadoop 执行该jar,传入MR job 的参数们开始MapReduce。一个最基本的使用Hadoop Streaming来执行MR的命令行如下:
/home/xitong/software/hadoop-0.20.2.1U29/bin/hadoop jar \
/home/xitong/software/hadoop-0.20.2.1U29/contrib/streaming/hadoop-streaming.jar \
-input /user/eng-test/vidio -output /user/eng-test/vidiooutnew \
-mapper /home/eng-test/donhiyue/mapper.py \
-reducer /home/eng-test/donhiyue/reducer.py
这时候我们执行下上面的命令,这个时候会报错:
1、sh: /home/eng-test/donhiyue/mapper.py: 没有那个文件或目录
这里因为map是需要分发给下面的各个slave去执行的,所以有个文件拷贝的动作,这里加上-file就可以了,于是命令变为如下:
/home/xitong/software/hadoop-0.20.2.1U29/bin/hadoop jar /home/xitong/software/hadoop-0.20.2.1U29/contrib/streaming/hadoop-streaming.jar
-input /user/eng-test/vidio -output /user/eng-test/vidiooutnew3
-file /home/eng-test/dongshiyue/mapper.py
-file /home/eng-test/dongiyue/reducer.py
-mapper /home/eng-test/dongiyue/mapper.py
-reducer /home/eng-test/dongshiyue/reducer.py
2、报各种命令错误如import失败等,是因为-mapper 应该用Python执行
/home/xitong/software/hadoop-0.20.2.1U29/bin/hadoop jar /home/xitong/software/hadoop-0.20.2.1U29/contrib/streaming/hadoop-streaming.jar
-input /user/eng-test/vidio -output /user/eng-test/vidiooutnew3
-file /home/eng-test/dongshiyue/mapper.py
-file /home/eng-test/dongiyue/reducer.py
-mapper “python /home/eng-test/dongiyue/mapper.py"
-reducer "python /home/eng-test/dongshiyue/reducer.py
总结下流程:
Hadoop Streaming的工作流程大概如下:
- hadoop-streaming.jar向Hadoop集群注册一个新的job,传入input path和output path等
- 开始mapper时,Hadoop Streaming会将输入文件按行传入stdin
- 我们自己编写的mapper需要读取stdin每一行,对其进行处理
- mapper处理完毕的中间结果也要写入stdout,在Python中print语句默认即输出到stdout,当然若是不放心,也可以手动指定输出流。对于输出到stdout中的每一行,hadoop将默认以’\t’作为分隔符切割成k-v的格式。
- mapper处理结束后,Hadoop 自动进行partition、sort、group,准备进行reduce任务
- Hadoop Streaming将中间结果按行传给reducer
- 我们编写的reducer需要读取stdin的每一行,对其进行处理
- 处理结束之后仍然输出到stdout中
- Hadoop 转存到output path中
- 结束
我们一般在本地调试的话,用这条命令就可以了:
cat part-00001 |python mapper.py |sort|python reducer.py
part-00001为要执行的文件。调试通过后再去集群上执行;
map代码示例:
点击(此处)折叠或打开
- import json
- import sys
- import pattern
- def map():
- patterns = {}
- urls = []
- for line in sys.stdin:
- line = line.strip()
- #print line
- json_obj = json.loads(line)
- play_url = json_obj['play_url'].strip()
- if play_url != "":
- urlpatter = pattern.hand_url(play_url)
- if urlpatter not in patterns.keys():
- patterns[urlpatter] = [play_url]
- else:
- patterns[urlpatter].append(play_url)
- for i in patterns.keys():
- print "%s\t%s"%(i,patterns[i])
- if __name__ == '__main__':
- map()
点击(此处)折叠或打开
- import sys
- def reducer():
- for line in sys.stdin:
- #print line
- key,value = line.rstrip('\n').split('\t')
- print key,value
- if __name__ == "__main__":
- reducer()