过去几天,我一直在自学Hadoop,并根据this webpage提供的信息尝试实现基本的BFS算法。我必须做一些修改和补充才能编译代码。我在运行时遇到以下错误,即使经过数小时的调试,我仍无法解决。有人可以帮我吗?
错误:
15/05/11 03:04:20 WARN mapred.LocalJobRunner: job_local934121164_0001
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:125)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/05/11 03:04:21 INFO mapreduce.Job: Job job_local934121164_0001 running in uber mode : false
15/05/11 03:04:21 INFO mapreduce.Job: map 0% reduce 0%
正如我在遵循相同的key,value类型的映射器和reducer中那样,这不会发生,如下所示。我认为这里发生的唯一一件事是未使用我的映射器类,而是使用了默认的映射器类(它发出 LongWritable 键)。我不确定我在做什么错。
SearchMapper.java
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
public class SearchMapper extends Mapper<Object, Text, Text, Text> {
// Types of the input key, input value and the Context object through which
// the Mapper communicates with the Hadoop framework
public void map(Object key, Text value, Context context, Node inNode)
throws IOException, InterruptedException {
// For each GRAY node, emit each of the adjacent nodes as a new node
// (also GRAY) if the adjacent node is already processed
// and colored BLACK, the reducer retains the color BLACK
// Note that the mapper is not differentiating between BLACK GREY AND WHITE
if (inNode.getColor() == Node.Color.GRAY) {
for (String neighbor : inNode.getEdges()) {
Node adjacentNode = new Node();
// Remember that the current node only has the value the id
// of its neighbour, and not the object itself. Therefore at
// this stage there is no way of knowing and assigning any of
// its other properties. Also remember that the reducer is doing
// the 'clean up' task and not the mapper.
adjacentNode.setId(neighbor);
adjacentNode.setDistance(inNode.getDistance() + 1);
adjacentNode.setColor(Node.Color.GRAY);
adjacentNode.setParent(inNode.getId());
context.write(new Text(adjacentNode.getId()), adjacentNode.getNodeInfo()); // Get nodeinfo returns a Text Object
}
inNode.setColor(Node.Color.BLACK);
}
// Emit the input node, other wise the BLACK color change(if it happens)
// Wont be persistent
context.write(new Text(inNode.getId()), inNode.getNodeInfo());
}
}
SearchReducer.java
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public class SearchReducer extends Reducer<Text, Text, Text, Text> {
// Types of the input key, the values associated with the key, the Context object for Reducers communication
// with the Hadoop framework and the node whose information has to be output
// the return type is a Node
public Node reduce(Text key, Iterable<Text> values, Context context, Node outNode)
throws IOException, InterruptedException {
// set the node id as the key
outNode.setId(key.toString());
// TODO : (huh?) Since the values are of the type Iterable, iterate through the values associated with the key
// for all the values corresponding to a particular node id
for (Text value : values) {
Node inNode = new Node(key.toString() + "\t" + value.toString());
// Emit one node after combining all the mapper outputs
// Only one node(the original) will have a non-null adjascency list
if (inNode.getEdges().size() > 0) {
outNode.setEdges(inNode.getEdges());
}
// Save the minimum distance and parent
if (inNode.getDistance() < outNode.getDistance()) {
outNode.setDistance(inNode.getDistance());
outNode.setParent(inNode.getParent());
}
// Save the darkest color
if (inNode.getColor().ordinal() > outNode.getColor().ordinal()) {
outNode.setColor(inNode.getColor());
}
}
context.write(key, new Text(outNode.getNodeInfo()));
return outNode;
}
}
BaseJob.java (网站提到的通用类,基本上可以完成工作)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.io.Text;
import java.io.IOException;
public abstract class BaseJob extends Configured implements Tool {
protected Job setupJob(String jobName,JobInfo jobInfo) throws Exception {
Job job = new Job(new Configuration(), jobName);
job.setJarByClass(jobInfo.getJarByClass());
job.setMapperClass(jobInfo.getMapperClass());
if (jobInfo.getCombinerClass() != null)
job.setCombinerClass(jobInfo.getCombinerClass());
job.setReducerClass(jobInfo.getReducerClass());
// TODO : set number of reducers as required
job.setNumReduceTasks(3);
job.setOutputKeyClass(jobInfo.getOutputKeyClass());
job.setOutputValueClass(jobInfo.getOutputValueClass());
/*
job.setJarByClass(SSSPJob.class);
job.setMapperClass(SearchMapper.class);
job.setReducerClass(SearchReducer.class);
job.setNumReduceTasks(3);
job.setOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);*/
return job;
}
// Implement an abstract class for JobInfo object
protected abstract class JobInfo {
public abstract Class<?> getJarByClass();
public abstract Class<? extends Mapper> getMapperClass();
public abstract Class<? extends Reducer> getCombinerClass();
public abstract Class<? extends Reducer> getReducerClass();
public abstract Class<?> getOutputKeyClass();
public abstract Class<?> getOutputValueClass();
}
}
SSSPJob.java (驱动程序)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
public class SSSPJob extends BaseJob {
// method to set the configuration for the job and the mapper and the reducer classes
private Job getJobConf(String[] args)
throws Exception {
// Defining the abstract class objects
JobInfo jobInfo = new JobInfo() {
@Override
public Class<? extends Reducer> getCombinerClass() {
return null;
}
@Override
public Class<?> getJarByClass() {
return SSSPJob.class;
}
@Override
public Class<? extends Mapper> getMapperClass() {
return SearchMapper.class;
}
@Override
public Class<?> getOutputKeyClass() {
return Text.class;
}
@Override
public Class<?> getOutputValueClass() {
return Text.class;
}
@Override
public Class<? extends Reducer> getReducerClass() {
return SearchReducer.class;
}
};
return setupJob("ssspjob", jobInfo);
}
// the driver to execute the job and invoke the map/reduce functions
public int run(String[] args) throws Exception {
int iterationCount = 0;
Job job;
// No of grey nodes
long terminationValue = 1;
while( terminationValue >0){
job = getJobConf(args);
String input, output;
// Setting the input file and output file for each iteration
// During the first time the user-specified file will be the
// input whereas for the subsequent iterations
// the output of the previous iteration will be the input
// NOTE: Please be clear of how the input output files are set
// before proceding.
// for the first iteration the input will be the first input argument
if (iterationCount == 0)
input = args[0];
else
// for the remaining iterations, the input will be the output of the previous iteration
input = args[1] + iterationCount;
output = args[1] + (iterationCount + 1);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
Counters jobCntrs = job.getCounters();
terminationValue = jobCntrs.findCounter(MoreIterations.numberOfIterations).getValue();
// if the counter's value is incremented in the reducer(s), then there are more
// GRAY nodes to process implying that the iteration has to be continued.
iterationCount++;
}
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SSSPJob(), args);
if(args.length != 2){
System.err.println("Usage: <in> <output name> ");
System.exit(1);
System.out.println("Huh?");
}
System.exit(res);
}
}
而且,我不确定在hadoop上如何进行调试。我所有的调试打印语句似乎都没有作用,我怀疑hadoop框架会将日志消息写入其他位置或文件。
谢谢 :)
最佳答案
MR作业中的键应实现WritableComparable,而值应实现Writable。我认为您的映射器代码使用的是“对象”类型的实例。
只需在 map 之前添加@Override批注并减少方法,以便它们显示错误。
否则,您不会看到任何错误,但是由于签名不匹配,因此将调用默认的IdentityMapper,从而导致错误。
如果要处理文本文件,则map方法的键应为LongWritable,如果要使用自定义键,则应实现WritableComparable。
关于java - Hadoop-MapReduce中的调试。映射器没有被调用?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/30157077/