本文介绍了在hbase mapreduce中传递一个Delete或Put错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 29岁程序员,3月因学历无情被辞! java.io.IOException:传递一个删除或一个Put at org.apache.hadoop.hbase.mapreduce.TableOutputFormat $ TableRecordWriter.write(TableOutputFormat.java:125) at org.apache.hadoop.hbase.mapreduce.TableOutputFormat $ TableRecordWriter.write(TableOutputFormat。 at org.apache.hadoop.mapred.MapTask $ NewDirectOutputCollector.write(MapTask.java:639) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java: 80)b $ HBaseImporter $ InnerMap.map(HBaseImporter.java:61) HBaseImporter $ InnerMap.map(HBaseImporter.java:1) at org.apache.hadoop.mapreduce.Mapper .run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask .java:370) at org.apache.hadoop.mapred.LocalJobRunner $ Job.run(LocalJobRunner.java:212) 12/11/27 16:16:50信息mapred.JobClient:map 0%reduce 0% 12/11/27 16:16:50信息mapred.JobClient:作业完成:job_local_0001 12/11/27 16:16:50信息mapred .JobClient:Counters:0 代码: public class HBaseImporter extends Configured implements Tool { public static class InnerMap extends TableMapper< Text,IntWritable> { IntWritable one = new IntWritable(); $ b $ public void map(ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException { String val = new String(value.getValue(Bytes.toBytes(cf),Bytes .toBytes( 线))); String [] words = val.toString()。split(); 尝试{ for(String word:words) { context.write(new Text(word),one); } } catch(InterruptedException e){ e.printStackTrace(); $ b public static class MyTableReducer扩展了TableReducer< Text,IntWritable,ImmutableBytesWritable> { $ b $ public void reduce(Text key,Iterable< IntWritable> values,Context context)throws IOException,InterruptedException { int i = 0; (IntWritable val:values) { i + = val.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(Bytes.toBytes(cf),Bytes.toBytes(count),Bytes.toBytes(i)); context.write(null,put); $ b public int run(String args [])throws Exception { // Configuration conf = getConf() ; 配置conf = HBaseConfiguration.create(); conf.addResource(new Path(/ home / trg / hadoop-1.0.4 / conf / core-site.xml)); conf.addResource(new Path(/ home / trg / hadoop-1.0.4 / conf / hdfs-site.xml)); 工作职位=新职位(conf,SM LogAnalyzer MR); job.setJarByClass(HBaseImporter.class); //FileInputFormat.setInputPaths(job,new Path(args [1])); //FileOutputFormat.setOutputPath(job,new Path(outyy)); //job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //job.setMapperClass(InMapMap.class); 扫描扫描=新扫描(); scan.setCaching(500); // 1是Scan中的默认设置,这对于MapReduce作业会很糟糕 scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob(wc_in,//输入表格 scan,//扫描实例以控制CF和属性选择 InnerMap.class,// mapper class Text.class,//映射器输出键 IntWritable.class,//映射器输出值 job); TableMapReduceUtil.initTableReducerJob(word_count,//输出表 MyTableReducer.class,// reducer class job); job.setNumReduceTasks(1); job.setNumReduceTasks(0); return job.waitForCompletion(true)?0:1; public static void main(String [] args)throws Exception { // Configuration conf = new HBaseConfiguration(); // Job job = configureJob(conf,args); //System.exit(job.waitForCompletion(true)?0:1); String [] inArgs = new String [4]; inArgs [0] =HBaseImporter; inArgs [1] =/ user / trg / wc_in; inArgs [2] =AppLogMRImport; inArgs [3] =MessageDB; int res = ToolRunner.run(new Configuration(),new HBaseImporter(),inArgs); // int res = ToolRunner.run(new Configuration(),new HBaseImporter(),args); $ p $设置图输出值类为IntWritable.class,但仍然TableOutputFormat.write在映射器中调用,期望Put对象。解决方案为我自己的问题得到答案。我错误地将减速器任务设置为'0'。 job.setNumReduceTasks(0); 因此Mapper希望Put对象可以直接写入Hbase表中。通过上面的一行来解决这个问题。 / p> I am getting below Error while running mapreduce on hbase:java.io.IOException: Pass a Delete or a Put at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:639) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at HBaseImporter$InnerMap.map(HBaseImporter.java:61) at HBaseImporter$InnerMap.map(HBaseImporter.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)12/11/27 16:16:50 INFO mapred.JobClient: map 0% reduce 0%12/11/27 16:16:50 INFO mapred.JobClient: Job complete: job_local_000112/11/27 16:16:50 INFO mapred.JobClient: Counters: 0Code:public class HBaseImporter extends Configured implements Tool { public static class InnerMap extendsTableMapper<Text, IntWritable> { IntWritable one = new IntWritable(); public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("line"))); String[] words = val.toString().split(" "); try { for(String word:words) { context.write(new Text(word), one); } } catch (InterruptedException e) { e.printStackTrace(); } }}public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int i = 0; for (IntWritable val : values) { i += val.get(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i)); context.write(null, put); }}public int run(String args[]) throws Exception{ //Configuration conf = getConf(); Configuration conf = HBaseConfiguration.create(); conf.addResource(new Path("/home/trg/hadoop-1.0.4/conf/core-site.xml")); conf.addResource(new Path("/home/trg/hadoop-1.0.4/conf/hdfs-site.xml")); Job job = new Job(conf,"SM LogAnalyzer MR"); job.setJarByClass(HBaseImporter.class); //FileInputFormat.setInputPaths(job, new Path(args[1])); //FileOutputFormat.setOutputPath(job, new Path("outyy")); //job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //job.setMapperClass(InnerMap.class); Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); TableMapReduceUtil.initTableMapperJob( "wc_in", // input table scan, // Scan instance to control CF and attribute selection InnerMap.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job); TableMapReduceUtil.initTableReducerJob( "word_count", // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); job.setNumReduceTasks(0); return job.waitForCompletion(true)?0:1;}public static void main(String[] args) throws Exception { //Configuration conf = new HBaseConfiguration(); //Job job = configureJob(conf, args); //System.exit(job.waitForCompletion(true) ? 0 : 1); String[] inArgs = new String[4]; inArgs[0] = "HBaseImporter"; inArgs[1] = "/user/trg/wc_in"; inArgs[2] = "AppLogMRImport"; inArgs[3] = "MessageDB"; int res = ToolRunner.run(new Configuration(), new HBaseImporter(), inArgs); //int res = ToolRunner.run(new Configuration(), new HBaseImporter(), args); }}Am setting map output value class as IntWritable.class, but still TableOutputFormat.write getting called in mapper which expects Put object. 解决方案 Got Answer for my own question. I was setting mistakenly no of reducer tasks as '0'. job.setNumReduceTasks(0);So Mapper expects Put object to directly write into Hbase table.Commenting the above line solved the issue. 这篇关于在hbase mapreduce中传递一个Delete或Put错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
07-29 15:46
查看更多