分支/合并框架
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任
务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给
线程池(称为ForkJoinPool)中的工作线程。首先来看看如何定义任务和子任务。
使用RecursiveTask
要把任务提交到这个池,必须创建RecursiveTask的一个子类,其中R是并行化任务(以 及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(当 然它可能会更新其他非局部机构)。要定义RecursiveTask,只需实现它唯一的抽象方法 compute:
protected abstract R compute();
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成 单个子任务结果的逻辑。
正由于此,这个方法的实现类似于下面的伪代码:
if (任务足够小或不可分) { 顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}
一般来说并没有确切的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你做
出这一决定,递归的任务拆分过程如图下图所示:
你可能已经注意到,这只不过是著名的分治算法的并行版本而已。这里举一个用分支/合并
框架的实际例子,让我们试着用这个框架为一个数字范围(这里用一个
long[]数组表示)求和。如前所述,你需要先为RecursiveTask类做一个实现,就是下面代码
ForkJoinSumCalculator。
用分支/合并框架执行并行求和
package java8.java8example;
import java.util.concurrent.RecursiveTask;
/**
* @desctiption:
* @author: yinghuaYang
* @date: 2019/1/7
*/
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
/**
* 要求和的数组
*/
private final long[] numners;
/**
* 子任务处理的数组的起始位置
*/
private final int start;
/**
* 子任务处理的数组的终止位置
*/
private final int end;
/**
* 不再将任务分解为子任务的数组大小
*/
public static final long THRESHOLD = 10000;
/**
* 公共构造方法用于创建主任务
* @param numners
*/
public ForkJoinSumCalculator(long[] numners) {
this(numners,0,numners.length);
}
/**
* 私有构造方法用于以递归方式为主任务创建子任务
* @param numners
* @param start
* @param end
*/
private ForkJoinSumCalculator(long[] numners, int start, int end) {
this.numners = numners;
this.start = start;
this.end = end;
}
/**
* 覆盖RecursiveTask抽象方法
*
* 该任务负责求和的步伐的大小
* @return
*/
@Override
protected Long compute() {
int length = end - start;
/*如果大小小于或等于阈值,顺序执行计算结果*/
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numners,start,start+length/2);
leftTask.fork();
/*创建一个任务为数组的后一半求和*/
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numners,start+length/2,end);
/*同步执行第二个子任务,有可能允许进一步递归划分*/
Long rightResult = rightTask.compute();
/*读取第一个子任务的结果,如果尚未完成就等待*/
Long leftResult = leftTask.join();
/*该任务的结果是两个子任务结果的组合*/
return leftResult + rightResult;
}
/**
* 在子任务不再可分时及时结果的简单写法
* @return
*/
private long computeSequentially() {
long sum = 0;
for (int i=0;i<end;i++) {
sum += numners[i];
}
return sum;
}
}
现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给
ForkJoinSumCalculator的构造函数:
package java8.java8example;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
* @desctiption:
* @author: yinghuaYang
* @date: 2019/1/7
*/
public class ForkJoinSumCalculatorMain {
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
public static void main(String[] args) {
long totalResult = forkJoinSum(10000L);
System.out.println(totalResult);
}
}
注意:
上面用了一个LongStream来生成包含前n个自然数的数组,然后创建一个ForkJoinTask (RecursiveTask的父类),并把数组传递给代码所示ForkJoinSumCalculator的公共 构造函数。最后,你创建了一个新的ForkJoinPool,并把任务传给它的调用方法 。
在 ForkJoinPool中执行时,最后一个方法返回的值就是ForkJoinSumCalculator类定义的任务结果。 请注意在实际应用时,使用多个ForkJoinPool是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任 何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM 能够使用的所有处理器。更确切地说,该构造函数将使用Runtime.availableProcessors的 返回值来决定线程池使用的线程数。请注意availableProcessors方法虽然看起来是处理器, 但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。
运行ForkJoinSumCalculator,当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就由池中的一个线程 执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,如果不 够小则会把要求和的数组分成两半,分给两个新的ForkJoinSumCalculator,而它们也由 ForkJoinPool安排执行。
因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足 不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。这时会顺序计 算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合 并每个子任务的部分结果,从而得到总任务的结果。
这一过程如图下图所示:
我的新博客地址:https://www.itaofly.com