本文介绍了Hadoop:Reducer将Mapper输出写入输出文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到了一个非常奇怪的问题。减速器可以工作,但如果我检查输出文件,我只能找到映射器的输出。
当我尝试调试时,在将映射器的输出值类型从Longwritable更改为Text时,我发现了与单词count样本相同的问题

  package org.myorg; 

import java.io.IOException;
import java.util。*;

导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.conf。*;
import org.apache.hadoop.io。*;
import org.apache.hadoop.mapreduce。*;
import org.apache.hadoop.mapreduce.lib.input。*;
import org.apache.hadoop.mapreduce.lib.output。*;
import org.apache.hadoop.util。*;

公共类WordCount扩展Configured implements工具{

public static class Map
扩展Mapper< LongWritable,Text,Text,Text> {
private static static IntWritable one = new IntWritable(1);
私人文字=新文字();

public void map(LongWritable key,Text wtf,Context context)
throws IOException,InterruptedException {
String line = wtf.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
context.write(word,new Text(frommapper));




public static class Reduce
extends Reducer< Text,Text,Text,Text> {
public void reduce(Text key,Text wtfs,
Context context)throws IOException,InterruptedException {
/ *
int sum = 0; (IntWritable val:wtfs){
sum + = val.get();

}
context.write(key,new IntWritable(sum)); * /
context.write(key,new Text(can not output));



public int run(String [] args)throws Exception {
Job job = new Job(getConf());
job.setJarByClass(WordCount.class);
job.setJobName(wordcount);


job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
//job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

布尔成功= job.waitForCompletion(true);
返回成功? 0:1;
}

public static void main(String [] args)throws Exception {
int ret = ToolRunner.run(new WordCount(),args);
System.exit(ret);


$ / code>

这里是结果

  JobClient:合并输出记录= 0 
12/06/13 17:37:46信息mapred.JobClient:映射输入记录= 7
12/06/13 17:37:46信息mapred.JobClient:减少随机字节= 116
12/06/13 17:37:46信息mapred.JobClient:减少输出记录= 7
12/06/13 17:37:46信息mapred.JobClient:Spilled Records = 14
12/06/13 17:37:46信息mapred.JobClient:地图输出字节= 96
12/06 / 13 17:37:46信息mapred.JobClient:合并输入记录= 0
12/06/13 17:37:46信息mapred.JobClient:地图输出记录= 7
12/06/13 17:37:46信息mapred.JobClient:减少输入记录= 7

然后我发现了奇怪的结果在outfile中。无论是否更改减少输出值的类型,我都将映射的输出值类型和减速器的输入键类型更改为文本后发生此问题。我也被迫更改job.setOutputValue(Text.class)

  a frommapper 
a frommapper
a frommapper
gg frommapper
h frommapper
sss frommapper
sss frommapper

帮助!

解决方案

您的reduce函数参数应如下所示:

  public void reduce(Text key,Iterable< Text> wtfs,
Context context)throws IOException,InterruptedException {

通过您定义参数的方式,减少操作不会获取值列表,因此它仅输出从地图函数获得的任何输入因为

  sum + = val.get()

每次只会从0到1,因为每个< key,value> 对的格式为< word,one> 将单独传递给reducer。



另外,映射函数通常不会写入输出文件(我从来没有听说过它,但我不知道这是否可能)。在通常情况下,它总是写入输出文件的减速器。映射器输出是Hadoop透明处理的中间数据。所以如果你在输出文件中看到某些东西,那必须是reducer输出,而不是mapper输出。如果你想验证这一点,你可以去日志中查看你所运行的工作,并查看每个映射器和reducer分别发生了什么。



希望这可以清除一些给你的东西。

I met a very very strange problem. The reducers do work but if I check the output files, I only found the output from the mappers.When I was trying to debug, I found the same problem with the word count sample after I changed the mappers' output value type from Longwritable to Text

    package org.myorg;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class WordCount extends Configured implements Tool {

   public static class Map
       extends Mapper<LongWritable, Text, Text, Text> {
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text wtf, Context context)
         throws IOException, InterruptedException {
       String line = wtf.toString();
       StringTokenizer tokenizer = new StringTokenizer(line);
       while (tokenizer.hasMoreTokens()) {
         word.set(tokenizer.nextToken());
         context.write(word, new Text("frommapper"));
       }
     }
   }

   public static class Reduce
       extends Reducer<Text, Text, Text, Text> {
     public void reduce(Text key, Text wtfs,
         Context context) throws IOException, InterruptedException {
/*
       int sum = 0;
       for (IntWritable val : wtfs) {
         sum += val.get();
       }
       context.write(key, new IntWritable(sum));*/
    context.write(key,new Text("can't output"));
     }
   }

   public int run(String [] args) throws Exception {
     Job job = new Job(getConf());
     job.setJarByClass(WordCount.class);
     job.setJobName("wordcount");


     job.setOutputKeyClass(Text.class);
     job.setMapOutputValueClass(Text.class);
       job.setOutputValueClass(Text.class);
     job.setMapperClass(Map.class);
     //job.setCombinerClass(Reduce.class);
     job.setReducerClass(Reduce.class);

     job.setInputFormatClass(TextInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);

     FileInputFormat.setInputPaths(job, new Path(args[0]));
     FileOutputFormat.setOutputPath(job, new Path(args[1]));

     boolean success = job.waitForCompletion(true);
     return success ? 0 : 1;
         }

   public static void main(String[] args) throws Exception {
     int ret = ToolRunner.run(new WordCount(), args);
     System.exit(ret);
   }
}

here are the results

JobClient:     Combine output records=0
12/06/13 17:37:46 INFO mapred.JobClient:     Map input records=7
12/06/13 17:37:46 INFO mapred.JobClient:     Reduce shuffle bytes=116
12/06/13 17:37:46 INFO mapred.JobClient:     Reduce output records=7
12/06/13 17:37:46 INFO mapred.JobClient:     Spilled Records=14
12/06/13 17:37:46 INFO mapred.JobClient:     Map output bytes=96
12/06/13 17:37:46 INFO mapred.JobClient:     Combine input records=0
12/06/13 17:37:46 INFO mapred.JobClient:     Map output records=7
12/06/13 17:37:46 INFO mapred.JobClient:     Reduce input records=7

then I found the strange results in the outfile. This problem happened after I changed the output value type of map and input key type of reducer to Text no matter I changed the type of reduce output value or not. I was also forced to change job.setOutputValue(Text.class)

a   frommapper
a   frommapper
a   frommapper
gg  frommapper
h   frommapper
sss frommapper
sss frommapper

Help!

解决方案

Your reduce function arguments should be as follows:

public void reduce(Text key, Iterable <Text> wtfs,
     Context context) throws IOException, InterruptedException {

With the way you've defined the arguments, reduce operation is not getting a list of values, and therefore it just outputs whatever input it gets from the map function because

sum+ = val.get()

is just going from 0 to 1 every time because each <key, value> pair in the form <word, one> is coming separately to the reducer.

Also, the mapper function doesn't normally write to the output file ( i have never heard of it, but I don't know if that's possible). In the usual case, it is always the reducer that writes to output file. Mapper output is intermediate data that is handled transparently by Hadoop. So if you see something in the output file, that has to be the reducer output, not the mapper output. If you want to verify this, you can go to the logs for the job you ran, and check out what's happening in each mapper and reducer individually.

Hope this clears some things for you.

这篇关于Hadoop:Reducer将Mapper输出写入输出文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

05-29 03:51
查看更多