本文介绍了输出一个mapreduce程序作为另一个mapreduce程序的输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试一个简单的例子,其中一个MapReduce作业的输出应该是另一个MapReduce作业的输入。

流程应如下所示: Mapper1 - > Reducer1 - > Mapper2 - > Reducer2 (Mapper1的输出必须是Reducer1的输入,Reducer1的输出必须是Mapper2的输入,Mapper2的输出必须是Reducer2的输入,Reducer2的输出必须存储在输出文件)。

如何将多个Mappers and Reducers添加到我的程序中,以便像上面一样维护流程?

我需要使用链式映射器还是链式缩减器?如果是这样,我该如何使用它们?

解决方案

我猜你在找什么是ControlledJob和JobControl。它适合你的目的。在一个Driver类中,您可以构建多个彼此依赖的作业。以下代码可能会帮助您理解。

  Job jobOne = Job(jobOneConf,Job-1); 
FileInputFormat.addInputPath(jobOne,jobOneInput);
FileOutputFormat.setOutputPath(jobOne,jobOneOutput);
ControlledJob jobOneControl =新的ControlledJob(jobOneConf);
jobOneControl.setJob(jobOne);

工作jobTwo =工作(jobTwoConf,Job-2);
FileInputFormat.addInputPath(jobTwo,jobOneOutput); //这里我们将job-1的输出设置为job-2的输入
FileOutputFormat.setOutputPath(jobTwo,jobTwoOutput); //最终输出
ControlledJob jobTwoControl = new ControlledJob(jobTwoConf);
jobTwoControl.setJob(jobTwo);

JobControl jobControl = new JobControl(job-control);
jobControl.add(jobOneControl);
jobControl.add(jobTwoControl);
jobTwoControl.addDependingJob(jobOneControl); //这个条件使job-2等到job-1完成

线程jobControlThread = new Thread(jobControl);
jobControlThread.start();
jobControlThread.join();

/ * jobControl.allFinished()也可用于等待所有作业完成* /


I am trying a simple example, in which the output of one MapReduce job should be the input of another MapReduce job.

The flow should be like this: Mapper1 --> Reducer1 --> Mapper2 --> Reducer2 (The output of Mapper1 must be the input of Reducer1. The output of Reducer1 must be the input of Mapper2. The output of Mapper2 must be the input of Reducer2. The output of Reducer2 must be stored in output file).

How can I add multiple Mappers and Reducers to my program such that the flow is maintained like above?

Do I need to use Chain Mappers or Chain Reducers? If so how can I use them?

解决方案

I guess what you are looking for is ControlledJob and JobControl. It aptly fits your purpose. In a single Driver class you can build multiple jobs which have dependencies on each other. Following code might help you understand.

    Job jobOne = Job(jobOneConf, "Job-1");
    FileInputFormat.addInputPath(jobOne, jobOneInput);
    FileOutputFormat.setOutputPath(jobOne, jobOneOutput);
    ControlledJob jobOneControl = new ControlledJob(jobOneConf);
    jobOneControl.setJob(jobOne);

    Job jobTwo = Job(jobTwoConf, "Job-2");
    FileInputFormat.addInputPath(jobTwo, jobOneOutput); // here we set the job-1's output as job-2's input
    FileOutputFormat.setOutputPath(jobTwo, jobTwoOutput); // final output
    ControlledJob jobTwoControl = new ControlledJob(jobTwoConf);
    jobTwoControl.setJob(jobTwo);

    JobControl jobControl = new JobControl("job-control");
    jobControl.add(jobOneControl);
    jobControl.add(jobTwoControl);
    jobTwoControl.addDependingJob(jobOneControl); // this condition makes the job-2 wait until job-1 is done

    Thread jobControlThread = new Thread(jobControl);
    jobControlThread.start();
    jobControlThread.join();

    /* The jobControl.allFinished() can also be used to wait until all jobs are done */

这篇关于输出一个mapreduce程序作为另一个mapreduce程序的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-28 01:13