本文介绍了使用MultithreadMapper替换Mapper时,从地图键入类型不匹配的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我为MapReduce作业实现了一个MultithreadMapper。
为此,我用一个工作代码替换了Mapper和MultithreadMapper。
下面是我得到的豁免:
java.io.IOException:键中的类型不匹配from map:expected org.apache.hadoop.io.IntWritable,received org.apache.hadoop.io.LongWritable $ b $ org.apache.hadoop.mapred.MapTask $ MapOutputBuffer.collect(MapTask.java:862)
at org.apache.hadoop.mapred.MapTask $ NewOutputCollector.write(MapTask.java:549)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
在org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
$ or $ $
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apac he.hadoop.mapreduce.lib.map.MultithreadedMapper $ MapRunner.run(MultithreadedMapper.java:264)
以下是代码设置:
public static void main(String [] args){
try {
if(args.length!= 2){
System.err.println(Usage:MapReduceMain< input path> < output path>);
System.exit(123);
}
Job job = new Job();
job.setJarByClass(MapReduceMain.class);
job.setInputFormatClass(TextInputFormat.class);
FileSystem fs = FileSystem.get(URI.create(args [0]),job.getConfiguration());
FileStatus [] files = fs .listStatus(new Path(args [0]));
for(FileStatus sfs:files){
FileInputFormat.addInputPath(job,sfs.getPath());
}
FileOutputFormat.setOutputPath(job,new Path(args [1]));
job.setMapperClass(MyMultithreadMapper.class);
job.setReducerClass(MyReducer.class);
)MultithreadedMapper.setNumberOfThreads(job,MyMultithreadMapper.nThreads);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(MyPage.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class); //将结果写入顺序fi le
System.exit(job.waitForCompletion(true)? 0:1);
} catch(Exception e){
e.printStackTrace();
以下是映射器的代码:
public class MyMultithreadMapper扩展了MultithreadedMapper< LongWritable,Text,IntWritable,MyPage> {
ConcurrentLinkedQueue< MyScraper> scrapers = new ConcurrentLinkedQueue< MyScraper>();
public static final int nThreads = 5;
$ b $ public MyMultithreadMapper(){
for(int i = 0; i< nThreads; i ++){
scrapers.add(new MyScraper());
$ b $ public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
MyScraper scraper = scrapers.poll();
MyPage结果= null;
for(int i = 0; i try {
result = scraper.scrapPage(value.toString(),true);
休息;
} catch(Exception e){
e.printStackTrace();
}
}
if(result == null){
result = new MyPage();
result.setUrl(key.toString());
}
context.write(new IntWritable(result.getUrl()。hashCode()),result);
scrapers.add(刮板);
}
为什么我得到这个?
MultithreadedMapper.setMapperClass(job,MyMapper.class);
MyMapper必须实现映射逻辑
MultithreadMapper必须为空
I'd like to implement a MultithreadMapper for my MapReduce job.
For this I replaced Mapper with MultithreadMapper in a working code.
Here's the exeption I'm getting:
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)
Here's the code setup:
public static void main(String[] args) {
try {
if (args.length != 2) {
System.err.println("Usage: MapReduceMain <input path> <output path>");
System.exit(123);
}
Job job = new Job();
job.setJarByClass(MapReduceMain.class);
job.setInputFormatClass(TextInputFormat.class);
FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration());
FileStatus[] files = fs.listStatus(new Path(args[0]));
for(FileStatus sfs:files){
FileInputFormat.addInputPath(job, sfs.getPath());
}
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MyMultithreadMapper.class);
job.setReducerClass(MyReducer.class);
MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(MyPage.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);//write the result as sequential file
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
And here's the mapper's code:
public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {
ConcurrentLinkedQueue<MyScraper> scrapers = new ConcurrentLinkedQueue<MyScraper>();
public static final int nThreads = 5;
public MyMultithreadMapper() {
for (int i = 0; i < nThreads; i++) {
scrapers.add(new MyScraper());
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
MyScraper scraper = scrapers.poll();
MyPage result = null;
for (int i = 0; i < 10; i++) {
try {
result = scraper.scrapPage(value.toString(), true);
break;
} catch (Exception e) {
e.printStackTrace();
}
}
if (result == null) {
result = new MyPage();
result.setUrl(key.toString());
}
context.write(new IntWritable(result.getUrl().hashCode()), result);
scrapers.add(scraper);
}
Why the hell am I getting this?
解决方案
Here's what has to be done:
MultithreadedMapper.setMapperClass(job, MyMapper.class);
MyMapper must implement the map logic
MultithreadMapper must be empty
这篇关于使用MultithreadMapper替换Mapper时,从地图键入类型不匹配的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!