注意转义

注意mapper 和 reducer 不能用head ,more 等中断后续数据处理的程序,直接的命令行 不支持管道
关于管道:
不要在streaming里mapper和reducer中使用管道,会出现”java.io.IOException: Broken pipe”错误。
关于程序文件:
如果使用的是非shell命令或者slave机器中没有的命令,都需要手动添加-file参数来分发程序文件。如果mapper和reducer需要分发两个或者更多个程序文件,则直接使用多个-file参数即可。

关于grep:
hadoop streaming -input /user/hadoop/hadoopfile -output /user/hadoop/result -mapper "grep hello" -jobconf mapre.job.name="grep-test" -jobconf stream.non.zero.exit.is.failure=false -jobconf mapred.reduce.tasks=1
说明:
-input  /user/hadoop/hadoopfile : 待处理文件目录
-output /user/hadoop/result   :处理结果存放目录
-mapper "grep hello" :map程序
-jobconf mapre.job.name="grep-test" :任务名
-jobconf stream.non.zero.exit.is.failure=false  : map-reduce程序返回值不做判断;streaming默认的情况下,mapper和reducer的返回值不是0,被认为异常任务,将被再次执行,默认尝试4次都不是0,整个job都将失败。而grep在没有匹配结果时返回1。

-jobconf mapred.reduce.tasks=1 : reduce任务数。 此处也可配置为0,为0配置表示告诉Map/reduce框架不要创建reducer任务


关于awk:
hadoop jar $HADOOP_STREAMING_JAR \
-input /test/ylb/mock_data/cv-pt-demo.txt \
-output /test/ylb/mock_data/output/cv-pt-demo-10 \
-mapper "awk '\$1~/1/ {printf(\"%s\t%s\n\",\$1,\$2)}'" \
-reducer "awk 'BEGIN{pt[\"test_key\"]=0;} {key=\$2/10;pt[key]+=1;} END{ for(k in pt){printf(\"%d\t%d\n\",k,pt[k]);} }'"
-mapper " awk '  '  " , 这种写法的话, ' ' 里面的脚本中有三种字符(shell特殊字符)需要转义,分别是  " ' $ ,这种写法的好处是可以方便的引用外部shell变量
-mapper '  awk " "  '    这种写法则不需要转义

类似的其他命令中需要嵌套单引号双引号也是这样。


1. -mapper " awk '  '  "    , 这种写法的话, ' ' 里面的脚本中有三种字符(shell特殊字符)需要转义,分别是  " ' $ 

  这种写法的好处是可以方便的引用外部shell变量


    -mapper '  awk " "  '    这种写法则不需要转义


hadoop jar \
    /mnt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/oozie/oozie-sharelib-yarn/lib/mapreduce-streaming/hadoop-streaming.jar  \
    -input  /user/data/text-spam/log/dt=${dt}/bj_log_center*text-spam_${zone}*.gz \
    -numReduceTasks 1 \
    -mapper "awk '/${T}.*finished at be/{++T[\$1\"-\"\$11] ; if(\$13>20000000){ ++C[\$1\"-\"\$11]} } END { for(i in C ) { print i,C[i],T[i] } }'"  \
    -reducer "awk '{C[\$1]+=\$2; T[\$1]+=\$3} END{ printf(\"BE\t\t\t\tgt20msCount\tBETotal\t1%%%%\n\"); ;for(i in C) { printf(\"%-28s\t%d\t%d\t%.2f\n\",i,C[i],T[i],C[i]/T[i]*10000) } }' " \
    -output /tmp/lqy/alone_${zone}_gt20ms_${T}_be.$NUM/  

hadoop jar \
               /mnt/cloudera/parcels/CDH-5.7.0-1.cdh5.7.0.p0.45/lib/oozie/oozie-sharelib-yarn/lib/mapreduce-streaming/hadoop-streaming.jar \
               -D mapred.output.compress=true \
               -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
               -D stream.num.map.output.key.fields=2 \
               -D num.key.fields.for.partition=1 \
               -files src \
               -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
               -input $input \
               -output $output \
               -mapper "python src/hdp2mysql.py $hour" \
               -reducer cat \
               -numReduceTasks 1


hadoop streaming多路输出方法和注意点(附超大数据diff对比源码)

简介

hadoop 支持reduce多路输出的功能,一个reduce可以输出到多个part-xxxxx-X文件中,其中X是A-Z的字母之一,程序在输出对的时候,在value的后面追加"#X"后缀,比如#A,输出的文件就是part-00000-A,不同的后缀可以把key,value输出到不同的文件中,方便做输出类型分类, #X仅仅用做指定输出文件后缀, 不会体现到输出的内容中

使用方法

启动脚本中需要指定-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat或者-outputformat org.apache.hadoop.mapred.lib.SuffixMultipleSequenceFileOutputFormat, 输出就会按照多路输出的方式进行分文件输出

所有标准输出的value中都要加上 #X后缀,X代表A-Z, 不然会报invalid suffix错误 


$HADOOP_HOME_PATH/bin/hadoop streaming \
      -Dhadoop.job.ugi="$HADOOP_JOB_UGI" \
      -file ./map.sh \
      -file ./red.sh \
      -file ./config.sh \
      -mapper "sh -x map.sh" \
      -reducer "sh -x red.sh" \
      -input $NEW_INPUT_PATH \
      -input $OLD_INPUT_PATH \
      -output  $OUTPUT_PATH \
      -jobconf stream.num.map.output.key.fields=1 \
      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
      -outputformat org.apache.hadoop.mapred.lib.SuffixMultipleTextOutputFormat \
      -jobconf mapred.job.name="test-shapherd-dist-diff" \
      -jobconf mapred.job.priority=HIGH \
      -jobconf mapred.job.map.capacity=100 \
      -jobconf mapred.job.reduce.capacity=100 \
      -jobconf mapred.reduce.tasks=3
map.sh如下:
source ./config.sh
 
awk 'BEGIN{
}
{
    if(match("'${map_input_file}'","'$OLD_INPUT_PATH'"))
    {
        print $0"\t"0
    next
    }
    if(match("'${map_input_file}'","'$NEW_INPUT_PATH'"))
    print $0"\t"1
}'
 
exit 0

red.sh如下:
awk  -F"\t" 'BEGIN{
    key=""
        flag=0
        num=0
        old_num=0
        new_num=0
        diff_num=0
}
{
    if($NF == "0")
        old_num++
    else
        new_num++
    if($1 != key)
    {
        if(key != "")
        {
            if(num <= 1)
            {
                diff_num++
                if(flag == "0")
                    print $0"#A"
                else
                    print $0"#B"
            }
        }
        key=$1
        flag=$NF
        num=1
        next
    }
 
    if(key == $1)
    {
        num++
        next   
    }
 
}
END{
        if(num  == 1)
        {
            if(flag == "0")
                print $0"#A"
            else
                print $0"#B"
        }
 
        print old_num"\tshapherd#C"
        print new_num"\tshapherd#D"
        print diff_num"\tshapherd#E"
}'
 
 
exit 0


注意事项

  • 多路输出最多支持26路, 也就是字母只能是A-Z范围。
  • reduce的输入key和value的分隔符默认是\t, 如果输出中没有\t,reduce脚本会把整行当作key, value就是空的,这时如果加了#X,会报invalid suffix错误,因为#X作为了key的一部分,这种问题一种是保证你的key和value是按照\t分隔的, 一种是指定自己想要的分隔符。









09-04 13:59