不要在streaming里mapper和reducer中使用管道,会出现”java.io.IOException: Broken pipe”错误。
如果使用的是非shell命令或者slave机器中没有的命令,都需要手动添加-file参数来分发程序文件。如果mapper和reducer需要分发两个或者更多个程序文件,则直接使用多个-file参数即可。
关于grep:
hadoop streaming -input /user/hadoop/hadoopfile -output /user/hadoop/result -mapper "grep hello" -jobconf mapre.job.name="grep-test" -jobconf stream.non.zero.exit.is.failure=false -jobconf mapred.reduce.tasks=1
说明:
-input /user/hadoop/hadoopfile : 待处理文件目录
-output /user/hadoop/result :处理结果存放目录
-mapper "grep hello" :map程序
-jobconf mapre.job.name="grep-test" :任务名
-jobconf stream.non.zero.exit.is.failure=false : map-reduce程序返回值不做判断;streaming默认的情况下,mapper和reducer的返回值不是0,被认为异常任务,将被再次执行,默认尝试4次都不是0,整个job都将失败。而grep在没有匹配结果时返回1。
-jobconf mapred.reduce.tasks=1 : reduce任务数。 此处也可配置为0,为0配置表示告诉Map/reduce框架不要创建reducer任务
关于awk:
hadoop jar $HADOOP_STREAMING_JAR \
-input /test/ylb/mock_data/cv-pt-demo.txt \
-output /test/ylb/mock_data/output/cv-pt-demo-10 \
-mapper "awk '\$1~/1/ {printf(\"%s\t%s\n\",\$1,\$2)}'" \
-reducer "awk 'BEGIN{pt[\"test_key\"]=0;} {key=\$2/10;pt[key]+=1;} END{ for(k in pt){printf(\"%d\t%d\n\",k,pt[k]);} }'"
-mapper " awk ' ' " , 这种写法的话, ' ' 里面的脚本中有三种字符(shell特殊字符)需要转义,分别是 " ' $ ,这种写法的好处是可以方便的引用外部shell变量
-mapper ' awk " " ' 这种写法则不需要转义
类似的其他命令中需要嵌套单引号双引号也是这样。
1. -mapper " awk ' ' " , 这种写法的话, ' ' 里面的脚本中有三种字符(shell特殊字符)需要转义,分别是 " ' $
这种写法的好处是可以方便的引用外部shell变量
-mapper ' awk " " ' 这种写法则不需要转义
hadoop jar \
/mnt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/oozie/oozie-sharelib-yarn/lib/mapreduce-streaming/hadoop-streaming.jar \
-D mapred.output.compress=true \
-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-files src \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input $input \
-output $output \
-mapper "python src/hdp2mysql.py $hour" \
-reducer cat \
-numReduceTasks 1
hadoop streaming多路输出方法和注意点(附超大数据diff对比源码)
简介
hadoop 支持reduce多路输出的功能,一个reduce可以输出到多个part-xxxxx-X文件中,其中X是A-Z的字母之一,程序在输出对的时候,在value的后面追加"#X"后缀,比如#A,输出的文件就是part-00000-A,不同的后缀可以把key,value输出到不同的文件中,方便做输出类型分类, #X仅仅用做指定输出文件后缀, 不会体现到输出的内容中
使用方法
启动脚本中需要指定-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat或者-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat, 输出就会按照多路输出的方式进行分文件输出
所有标准输出的value中都要加上 #X后缀,X代表A-Z, 不然会报invalid suffix错误
$HADOOP_HOME_PATH/bin/hadoop streaming \
-Dhadoop.job.ugi="$HADOOP_JOB_UGI" \
-file ./map.sh \
-file ./red.sh \
-file ./config.sh \
-mapper "sh -x map.sh" \
-reducer "sh -x red.sh" \
-input $NEW_INPUT_PATH \
-input $OLD_INPUT_PATH \
-output $OUTPUT_PATH \
-jobconf stream.num.map.output.key.fields=1 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat \
-jobconf mapred.job.name="test-shapherd-dist-diff" \
-jobconf mapred.job.priority=HIGH \
-jobconf mapred.job.map.capacity=100 \
-jobconf mapred.job.reduce.capacity=100 \
-jobconf mapred.reduce.tasks=3
map.sh如下:
red.sh如下:
注意事项
- 多路输出最多支持26路, 也就是字母只能是A-Z范围。
- reduce的输入key和value的分隔符默认是\t, 如果输出中没有\t,reduce脚本会把整行当作key, value就是空的,这时如果加了#X,会报invalid suffix错误,因为#X作为了key的一部分,这种问题一种是保证你的key和value是按照\t分隔的, 一种是指定自己想要的分隔符。