问题描述
这是我的代码:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySort extends Configured implements Tool{
public static void main(String[] args) {
try {
ToolRunner.run(new Configuration(), new SecondarySort(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
static class KeyPartitioner implements Partitioner<StockKey, DoubleWritable> {
@Override
public int getPartition(StockKey arg0, DoubleWritable arg1, int arg2) {
int partition = arg0.name.hashCode() % arg2;
return partition;
}
@Override
public void configure(JobConf job) {
}
}
static class StockKey implements WritableComparable<StockKey> {
String name;
Long timestamp;
public StockKey() {
}
StockKey(String name, Long timestamp){
this.name = name;
this.timestamp = timestamp;
}
@Override
public void readFields(DataInput arg0) throws IOException {
name = WritableUtils.readString(arg0);
timestamp = arg0.readLong();
}
@Override
public void write(DataOutput arg0) throws IOException {
WritableUtils.writeString(arg0, name);
arg0.writeLong(timestamp);
}
@Override
public int compareTo(StockKey arg0) {
int result = 0;
result = name.compareToIgnoreCase(arg0.name);
if(result == 0)
result = timestamp.compareTo(arg0.timestamp);
return result;
}
public String toString() {
String outputString = name+","+timestamp;
return outputString;
}
}
static class StockReducer implements Reducer<StockKey, DoubleWritable, Text, Text>{
public void reduce(StockKey key, Iterator<DoubleWritable> value, Outp
OutputCollector<Text, Text> context, Reporter reporter)
throws IOException {
Text k = new Text(key.toString());
while(value.hasNext()) {
Double v = value.next().get();
Text t = new Text(v.toString());
context.collect(k, t);
}
}
@Override
public void configure(JobConf job) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}
static class StockMapper implements Mapper<LongWritable, Text, StockKey,
DoubleWritable> {
public void map(LongWritable offset, Text value, OutputCollector<StockKey,
DoubleWritable> context, Reporter reporter)
throws IOException {
String[] values = value.toString().split(",");
StockKey key = new StockKey(values[0].trim(),
Long.parseLong(values[1].trim()));
DoubleWritable val = new
DoubleWritable(Double.parseDouble(values[2].trim()));
context.collect(key, val);
}
@Override
public void configure(JobConf job) {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}
@SuppressWarnings("unchecked")
@Override
public int run(String[] arg) throws Exception {
JobConf conf = new JobConf(getConf(), SecondarySort.class);
conf.setJobName(SecondarySort.class.getName());
conf.setJarByClass(SecondarySort.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
conf.setMapOutputKeyClass(StockKey.class);
conf.setMapOutputValueClass(Text.class);
conf.setPartitionerClass((Class<? extends Partitioner<StockKey,
DoubleWritable>>) KeyPartitioner.class);
conf.setMapperClass((Class<? extends Mapper<LongWritable, Text, StockKey,
DoubleWritable>>) StockMapper.class);
conf.setReducerClass((Class<? extends Reducer<StockKey, DoubleWritable,
Text, Text>>) StockReducer.class);
FileInputFormat.addInputPath(conf, new Path(arg[0]));
FileOutputFormat.setOutputPath(conf, new Path(arg[1]));
JobClient.runJob(conf);
return 0;
}
}
以下是例外:
java.io.IOException: Type mismatch in value from map: expected
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
at SecondarySort$StockMapper.map(SecondarySort.java:135)
at SecondarySort$StockMapper.map(SecondarySort.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
12/07/13 03:22:32 INFO mapred.JobClient: Task Id :
attempt_201207130314_0002_m_000001_2, Status : FAILED
java.io.IOException: Type mismatch in value from map: expected
org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.DoubleWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:876)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:499)
at SecondarySort$StockMapper.map(SecondarySort.java:135)
at SecondarySort$StockMapper.map(SecondarySort.java:1)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
推荐答案
此代码存在许多可能导致此问题的潜在问题:
There are so many potential problems with this code that could be causing it:
-
StockKey
- 你应该覆盖默认的hashCode()
方法 - 目前两个StockKey
具有相同内容的将具有不同的hashCode值(就好像你没有覆盖JVM默认值,那么它会去返回一个在所有范围内的数字,目的是两个对象的内存中的地址)。我知道在你的分区器中你只使用name
字段(这是一个字符串,并且将具有hashCode()的有效实现,但这是一个很好的做法以防将来你使用整个股票
对象的hashCode()
,并想知道为什么两个相同的Stock对象最终会出现在不同的Reducer中
StockKey
- you should override the defaulthashCode()
method - at the moment twoStockKey
's with the same contents will have different hashCode values (as if you don't override the JVM default, then its going to return a number which is to all extent and purposes is the address in memory of the two objects). I know in your partitioner you only use thename
field (which is a String and will have a valid implementation of hashCode(), but this is good practice in case in future you use the entireStock
object'shashCode()
and wonder why two identical Stock objects end up at different reducers
KeyPartitioner
- 您需要 Math.abs(..)
arg0.name.hashCode()
的结果。此时,此值可能会返回负值,当您使用reducer数量模数时,将返回一个负数。连锁效应是MR框架会抛出一个异常,因为它期望一个介于0(含)和减速器数(不包括)之间的数字。这可能是你问题所在的地方,因为我将在下一点
KeyPartitioner
- You need to Math.abs(..)
the result of the arg0.name.hashCode()
. At the moment, this value could come back negative, which when you modulo with the number of reducers, will return a negative number. The knock-on effect is the MR framework will thrown an exception because it's expecting a number between 0 (inclusive) and the number of reducers (exclusive). This is probably where you problem lies as i'll explain in the next point
Mapper.map
方法 - 当您致电<$时,您正在吞咽任何潜在的输出异常c $ c> context.collect 。继续从我之前关于分区程序的观点 - 如果它返回一个负数,将抛出一个异常,你需要处理它。在某些情况下,捕获和吞咽异常可能是正常的(例如输入记录的数据验证),但是输出时发生的任何异常都应抛出到MR框架以标记出错并且此映射器的输出错误/不完整:
Mapper.map
method - You are swallowing any potential output exceptions when you call context.collect
. Continuing from my previous point about the partitioner - if it returns a negative number, an exception will be thrown, which you need to deal with. In some cases catching and swallowing exceptions may be ok (data validation for input records for example), but any exception that occurs when outputting should be thrown up to the MR framework to flag that something went wrong and the output of this mapper is wrong / incomplete:
try {
context.collect(key, val);
} catch (IOException e) {
e.printStackTrace();
}
最后,您需要显式声明地图并减少输出类型(当你将当前地图值输出类型声明为Text时,导致异常,实际上mapper正在输出DoubleWritable):
Finally you need to explicitly declare your map and reduce output types (which is causing the exception as you currently declare the map value output type as Text, when in fact the mapper is outputting a DoubleWritable):
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
我建议你删除context.collect调用周围的try / catch块并重新运行您的作业(或者只检查映射任务的日志,看看是否看到堆栈跟踪)。
I suggest that you remove the try/catch block around context.collect call and rerun your job (or just check the logs for the map tasks and see if you see a stack trace).
这篇关于Mapreduce作业运行,并且有一个例外的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!