本文介绍了(Hadoop)MapReduce - 链作业 - JobControl不停止的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要链接两个MapReduce作业。我使用JobControl将job2设置为job1的依赖项。
它工作,输出文件被创建!但它不会停止!
在shell中它仍然处于这种状态:

  12/09/11 19:06:24 WARN mapred。 JobClient:使用GenericOptionsParser解析参数。应用程序应该实现相同的工具。 
12/09/11 19:06:25 INFO input.FileInputFormat:要输入的总输入路径:1
12/09/11 19:06:25 INFO util.NativeCodeLoader:加载native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy:Snappy本地库未加载
12/09/11 19:07:00警告mapred.JobClient:使用GenericOptionsParser解析参数。应用程序应该实现相同的工具。
12/09/11 19:07:00 INFO input.FileInputFormat:处理的输入路径总数:1

我该如何阻止它?
这是我的主。

  public static void main(String [] args)throws Exception {
配置conf =新配置();
配置conf2 =新配置();

工作job1 =新工作(conf,canzoni);
job1.setJarByClass(CanzoniOrdinate.class);
job1.setMapperClass(CanzoniMapper.class);
job1.setReducerClass(CanzoniReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);

ControlledJob cJob1 =新的ControlledJob(conf);
cJob1.setJob(job1);
FileInputFormat.addInputPath(job1,new Path(args [0]));
FileOutputFormat.setOutputPath(job1,new Path(/ user / hduser / tmp));


工作job2 =新工作(conf2,songsort);
job2.setJarByClass(CanzoniOrdinate.class);
job2.setMapperClass(CanzoniSorterMapper.class);
job2.setSortComparatorClass(ReverseOrder.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setReducerClass(CanzoniSorterReducer.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);

ControlledJob cJob2 =新的ControlledJob(conf2);
cJob2.setJob(job2);
FileInputFormat.addInputPath(job2,new Path(/ user / hduser / tmp / part *));
FileOutputFormat.setOutputPath(job2,new Path(args [1]));

JobControl jobctrl = new JobControl(jobctrl);
jobctrl.addJob(cJob1);
jobctrl.addJob(cJob2);
cJob2.addDependingJob(cJob1);
jobctrl.run();


////////////////
//新代码///
/////// ///////


//删除jobctrl.run();
线程t =新线程(jobctrl);
t.start();
字符串oldStatusJ1 = null;
字符串oldStatusJ2 = null; $!$ b $ while(!jobctrl.allFinished()){
String status = cJob1.toString();
String status2 = cJob2.toString();
if(!status.equals(oldStatusJ1)){
System.out.println(status);
oldStatusJ1 =状态;
}
if(!status2.equals(oldStatusJ2)){
System.out.println(status2);
oldStatusJ2 = status2;
}
}
System.exit(0);

}
}

  public class JobRunner implements Runnable {
私人JobControl控制;

public JobRunner(JobControl _control){
this.control = _control;
}

public void run(){
this.control.run();


$ / code $ / pre

在我的map / reduce类中我有:

  public void handleRun(JobControl control)throws InterruptedException {
JobRunner runner = new JobRunner(control);
线程t =新线程(runner);
t.start(); $!
$ b while(!control.allFinished()){
System.out.println(Still running ...);
Thread.sleep(5000);
}
}

其中我只传递jobControl对象。 p>

I need to chain two MapReduce jobs. I used JobControl to set job2 as dependent of job1.It works, output files are created!! But it doesn't stop!In the shell it remains in this state:

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1

How can I stop it?This is my main.

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Configuration conf2 = new Configuration();

    Job job1 = new Job(conf, "canzoni");
    job1.setJarByClass(CanzoniOrdinate.class);
    job1.setMapperClass(CanzoniMapper.class);
    job1.setReducerClass(CanzoniReducer.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    ControlledJob cJob1 = new ControlledJob(conf);
    cJob1.setJob(job1);
    FileInputFormat.addInputPath(job1, new Path(args[0]));
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));


    Job job2 = new Job(conf2, "songsort");
    job2.setJarByClass(CanzoniOrdinate.class);
    job2.setMapperClass(CanzoniSorterMapper.class);
    job2.setSortComparatorClass(ReverseOrder.class);
    job2.setInputFormatClass(KeyValueTextInputFormat.class);
    job2.setReducerClass(CanzoniSorterReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    ControlledJob cJob2 = new ControlledJob(conf2);
    cJob2.setJob(job2);
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
    FileOutputFormat.setOutputPath(job2, new Path(args[1]));

    JobControl jobctrl = new JobControl("jobctrl");
    jobctrl.addJob(cJob1);
    jobctrl.addJob(cJob2);
    cJob2.addDependingJob(cJob1);
    jobctrl.run();


    ////////////////
    // NEW CODE ///
    //////////////


    // delete jobctrl.run();
    Thread t = new Thread(jobctrl);
    t.start();
    String oldStatusJ1 = null;
    String oldStatusJ2 = null;
    while (!jobctrl.allFinished()) {
      String status =cJob1.toString();
      String status2 =cJob2.toString();
      if (!status.equals(oldStatusJ1)) {
        System.out.println(status);
        oldStatusJ1 = status;
      }
      if (!status2.equals(oldStatusJ2)) {
        System.out.println(status2);
        oldStatusJ2 = status2;
      }
     }
    System.exit(0);

}}

解决方案

I essentially did what Pietro alluded to above.

public class JobRunner implements Runnable {
  private JobControl control;

  public JobRunner(JobControl _control) {
    this.control = _control;
  }

  public void run() {
    this.control.run();
  }
}

and in my map/reduce class I have:

public void handleRun(JobControl control) throws InterruptedException {
    JobRunner runner = new JobRunner(control);
    Thread t = new Thread(runner);
    t.start();

    while (!control.allFinished()) {
        System.out.println("Still running...");
        Thread.sleep(5000);
    }
}

in which I just pass the jobControl object.

这篇关于(Hadoop)MapReduce - 链作业 - JobControl不停止的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-06 10:07