我试图编写一个非常简单的作业,仅使用1个映射器,而没有reducer将一些数据写入hbase。在映射器中,我试图简单地使用hbase打开连接,将几行数据写入表中,然后关闭连接。在作业驱动程序中,我正在使用JobConf.setNumMapTasks(1);和JobConf.setNumReduceTasks(0);指定将仅执行1个映射器,而不执行reducer。我还在jobConf中将reducer类设置为IdentityReducer。我观察到的奇怪行为是该作业已成功将数据写入hbase表,但是之后,我在日志中看到它不断尝试打开与hbase的连接,然后关闭连接,该连接持续20-30分钟,在该作业之后宣布已成功完成100%。最后,当我检查由放置在OutputCollector.collect(...)中的虚拟数据创建的_success文件时,我看到数百行的虚拟数据应该只有1行。
以下是作业驱动程序的代码
public int run(String[] arg0) throws Exception {
Configuration config = HBaseConfiguration.create(getConf());
ensureRequiredParametersExist(config);
ensureOptionalParametersExist(config);
JobConf jobConf = new JobConf(config, getClass());
jobConf.setJobName(config.get(ETLJobConstants.ETL_JOB_NAME));
//set map specific configuration
jobConf.setNumMapTasks(1);
jobConf.setMaxMapAttempts(1);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(SingletonMapper.class);
jobConf.setMapOutputKeyClass(LongWritable.class);
jobConf.setMapOutputValueClass(Text.class);
//set reducer specific configuration
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setNumReduceTasks(0);
//set job specific configuration details like input file name etc
FileInputFormat.setInputPaths(jobConf, jobConf.get(ETLJobConstants.ETL_JOB_FILE_INPUT_PATH));
System.out.println("setting output path to : " + jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH));
FileOutputFormat.setOutputPath(jobConf,
new Path(jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH)));
JobClient.runJob(jobConf);
return 0;
}
驱动程序类扩展了配置并实现了工具(我使用了权威指南中的示例)以下是我的映射器类中的代码。
以下是我在Mapper的map方法中的代码,其中我只是打开与Hbase的连接,进行一些初步检查以确保表存在,然后写入行并关闭表。
public void map(LongWritable arg0, Text arg1,
OutputCollector<LongWritable, Text> arg2, Reporter arg3)
throws IOException {
HTable aTable = null;
HBaseAdmin admin = null;
try {
arg3.setStatus("started");
/*
* set-up hbase config
*/
admin = new HBaseAdmin(conf);
/*
* open connection to table
*/
String tableName = conf.get(ETLJobConstants.ETL_JOB_TABLE_NAME);
HTableDescriptor htd = new HTableDescriptor(toBytes(tableName));
String colFamilyName = conf.get(ETLJobConstants.ETL_JOB_TABLE_COLUMN_FAMILY_NAME);
byte[] tablename = htd.getName();
/* call function to ensure table with 'tablename' exists */
/*
* loop and put the file data into the table
*/
aTable = new HTable(conf, tableName);
DataRow row = /* logic to generate data */
while (row != null) {
byte[] rowKey = toBytes(row.getRowKey());
Put put = new Put(rowKey);
for (DataNode node : row.getRowData()) {
put.add(toBytes(colFamilyName), toBytes(node.getNodeName()),
toBytes(node.getNodeValue()));
}
aTable.put(put);
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo added another data row to hbase");
row = fileParser.getNextRow();
}
aTable.flushCommits();
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo Finished adding data to hbase");
} finally {
if (aTable != null) {
aTable.close();
}
if (admin != null) {
admin.close();
}
}
arg2.collect(new LongWritable(10), new Text("something"));
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxoadded some dummy data to the collector");
}
如您所见,到最后,我正在将一些虚拟数据写到集合中(10,“某物”),并且在作业终止后,我在_success文件中看到了数百行该数据。
我无法确定为什么映射器代码会一遍又一遍地重启,而不是一次运行。任何帮助将不胜感激。
最佳答案
与JobConf.setNumMapTasks(1)
(实际上定义您指定的编号)不同,使用setNumReduceTasks
只是想告诉hadoop您希望使用1个映射器(如果可能)。
这就是为什么要运行更多映射器并观察所有这些数字的原因。
有关更多详细信息,请阅读this post。
关于java - 计划运行一次的映射器意外地多次执行,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31174997/