我正在尝试使用一个作业的输出文件作为第二个作业的输入文件来运行mapreduce程序。我有这个当前代码:
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(BookAnalyzer.class);
job.setJobName("N-Gram Extraction");
FileSystem fs = FileSystem.get(conf);
FileStatus[] status_list = fs.listStatus(new Path(args[0]));
if (status_list != null) {
for (FileStatus status : status_list) {
FileInputFormat.addInputPath(job, status.getPath());
}
}
Path nGramOutput = new Path(args[1]);
FileOutputFormat.setOutputPath(job, nGramOutput);
job.setMapperClass(BookNGramMapper.class);
job.setReducerClass(BookNGramReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
if(job.waitForCompletion(true)) {
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2);
job2.setJarByClass(BookAnalyzer.class);
job2.setJobName("Term-frequency");
FileSystem fs2 = FileSystem.get(conf2);
FileStatus[] status_list2 = fs2.listStatus(nGramOutput);
if (status_list2 != null) {
for (FileStatus status : status_list2) {
FileInputFormat.addInputPath(job2, status.getPath());
}
}
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
job2.setMapperClass(TermFreqMapper.class);
job2.setReducerClass(TermFreqReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
我收到一条错误消息,说输入路径(nGramOutput)不存在,但是如果我的第一个作业正确执行(确实如此),则应在args [1]中创建该文件。所以,
任何建议都很好!
谢谢!
最佳答案
这是做chaining jobs的一种方法。
试试这个
Your Class {
private static final String OUTPUT_PATH;
}
main(){
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(BookAnalyzer.class);
job.setJobName("N-Gram Extraction");
Path nGramOutput = new Path(OUTPUT_PATH);
FileInputFormat.addInputPath(job,args[0]);
FileOutputFormat.setOutputPath(job, nGramOutput);
job.setMapperClass(BookNGramMapper.class);
job.setReducerClass(BookNGramReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
Configuration conf2 = getConf();
Job job2 = Job.getInstance(conf2);
job2.setJarByClass(BookAnalyzer.class);
job2.setJobName("Term-frequency");
FileInputFormat.addInputPath(job2, nGramOutput);
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job2.setMapperClass(TermFreqMapper.class);
job2.setReducerClass(TermFreqReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
使用的路径是
args[0] : Input path
nGramOutput : Intermediate output form job1, which acts as input to job2
args[1] : Final output path
因此,用于运行作业的命令将为
hadoop jar myjar.jar args[0] args[1]
您不必给出3个参数
args[0],args[1],args[2]