问题描述
我试图通过改变hadoop给出的wordcount示例来创建一个简单的地图缩减作业。
我试图列出一个列表而不是数字的话。 wordcount示例提供以下输出:
hello 2
world 2
我试图将它作为列表输出,这将构成未来工作的基础。
hello 1 1
world 1 1
您好foo.MyArrayWritable@61250ff2
世界foo.MyArrayWritable@483a0ab1
这是我的MyArrayWritable。我在
写入(DataOuptut arg0)
中放置了一个sys,但它从不输出任何内容,所以我认为该方法可能不会被调用,我不知道为什么。class MyArrayWritable extends ArrayWritable {
public MyArrayWritable(Class< ;? extends Writable> valueClass,Writable [] values) {
super(valueClass,values);
}
public MyArrayWritable(Class< ;? extends Writable> valueClass){
super(valueClass);
}
@Override
public IntWritable [] get(){
return(IntWritable [])super.get();
$ b @Override
public void write(DataOutput arg0)throws IOException {
for(IntWritable i:get()){
i.write (为arg0);
$ / code $ / pre
$ b
编辑 - 添加更多源代码
public class WordCount {
public static class Map extends Mapper< LongWritable,Text,Text,IntWritable> {
private static static IntWritable one = new IntWritable(1);
私人文字=新文字();
public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
context.write(word,one);
$ b public static class Reduce extends Reducer< Text,IntWritable,Text,MyArrayWritable> {
$ b $ public void reduce(Text key,Iterable< IntWritable> values,Context context)throws IOException,InterruptedException {
ArrayList< IntWritable> list = new ArrayList< IntWritable>(); (IntWritable val:values)
(
list.add(val);
}
context.write(key,new MyArrayWritable(IntWritable.class,list.toArray(new IntWritable [list.size()])));
public static void main(String [] args)throws Exception {
if(args == null || args.length == 0)
args = new String [] {./ wordcount / input,./ wordcount / output};
Path p = new Path(args [1]);
FileSystem fs = FileSystem.get(new Configuration());
fs.exists(p);
fs.delete(p,true);
配置conf = new Configuration();
工作职位=新职位(conf,wordcount);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));
job.waitForCompletion(true);
}
}
解决方案在reducer中有一个'bug' - 值迭代器在整个循环中重新使用相同的IntWritable,因此应该按照以下方式将要添加到列表中的值包装:
public void reduce(Text key,Iterable< IntWritable> values,Context context)
throws IOException,InterruptedException {
的ArrayList< IntWritable> list = new ArrayList< IntWritable>(); (IntWritable val:values)
{
list.add(new IntWritable(val));
}
context.write(key,new MyArrayWritable(IntWritable.class,list.toArray(new IntWritable [list.size()])));
$ / code>
这实际上不是一个问题,因为您使用的是数组列表和你的映射器只输出一个值(一个),但是如果你扩展了这个代码,可能会让你感觉不舒服。
你还需要在你的工作中定义你的map和reducer的输出类型不同:
// map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// reducer输出类型
job.setOutputValueClass(Text.class);
job.setOutputValueClass(MyArrayWritable.class);
您可能想明确定义reducer的数量(这可能是您为什么永远不会看到您的sysouts正在写入任务日志,特别是如果您的集群管理员已将默认数字定义为0):
job.setNumReduceTasks(1 );
您使用默认的文本输出格式,它在输出键和值对上调用toString() MyArrayWritable没有重写的toString方法,所以你应该在MyArrayWritable中放一个:
@Override
public String toString(){
返回Arrays.toString(get());
}
最后删除重写的
write 方法 - 这不是与免费的readFields方法兼容的有效实现。你不需要重写这个方法,但是如果你这样做了(比如说你想看到一个sysout来验证它被调用),那就改为像这样:
<$ p $ (DataOutput arg0)抛出IOException异常{
System.out.println(write method called);
super.write(arg0);
}
I'm trying to create a simple map reduce job by changing the wordcount example given by hadoop.
I'm trying to out put a list instead of a count of the words. The wordcount example gives the following ouput
hello 2 world 2
I'm trying to get it to output as a list, which will form the basis of future work
hello 1 1 world 1 1
I think I'm on the right track but I'm having trouble writing the list. Instead of the above, I'm getting
Hello foo.MyArrayWritable@61250ff2 World foo.MyArrayWritable@483a0ab1
Here's my MyArrayWritable. I put a sys out in the
write(DataOuptut arg0)
but it never output anything so I think that method might not be called and I don't know why.
class MyArrayWritable extends ArrayWritable{ public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) { super(valueClass, values); } public MyArrayWritable(Class<? extends Writable> valueClass) { super(valueClass); } @Override public IntWritable[] get() { return (IntWritable[]) super.get(); } @Override public void write(DataOutput arg0) throws IOException { for(IntWritable i : get()){ i.write(arg0); } } }
EDIT - adding more source code
public class WordCount { public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { ArrayList<IntWritable> list = new ArrayList<IntWritable>(); for (IntWritable val : values) { list.add(val); } context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()]))); } } public static void main(String[] args) throws Exception { if(args == null || args.length == 0) args = new String[]{"./wordcount/input","./wordcount/output"}; Path p = new Path(args[1]); FileSystem fs = FileSystem.get(new Configuration()); fs.exists(p); fs.delete(p, true); Configuration conf = new Configuration(); Job job = new Job(conf, "wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setJarByClass(WordCount.class); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }
}
解决方案
You have a 'bug' in your reducer - the value iterator re-uses the same IntWritable throughout the loop, so you should wrap the value being added to the list as follows:
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { ArrayList<IntWritable> list = new ArrayList<IntWritable>(); for (IntWritable val : values) { list.add(new IntWritable(val)); } context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()]))); }
This isn't actually a problem as you're using an array list and your mapper only outputs a single value (one) but is something that may trip you up if you ever extend this code.
You also need to define in your job that your map and reducer output types are different:
// map output types job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // reducer output types job.setOutputValueClass(Text.class); job.setOutputValueClass(MyArrayWritable.class);
You might want to explicitly define the number of reducers (which may be why you never see your sysouts being written to the task logs, especially if your cluster admin has defined the default number to be 0):
job.setNumReduceTasks(1);
Your using the default Text output format, which calls toString() on the output key and value pairs - MyArrayWritable doesn't have an overridden toString method so you should put one in your MyArrayWritable:
@Override public String toString() { return Arrays.toString(get()); }
Finally remove the overridden
write
method from MyArrayWritable - this is not a valid implementation compatible with the complimentary readFields method. you don't need to override this method but if you do (say you want to see a sysout to verify it's being called) then do something like this instead:
@Override public void write(DataOutput arg0) throws IOException { System.out.println("write method called"); super.write(arg0); }
这篇关于使用自定义可写入从Hadoop Map Reduce作业输出列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!