ConCurrent in Practice小记 (4)#

Executors

Callable && Future <T>

Callable:此接口有一个call()方法。在这个方法中,必须实现任务的(处理)逻辑。Callable接口是一个参数化的接口。意味着必须指明call()方法返回的数据类型。

Future:也是一个接口,用来保证Callable对象结果的获取和管理。

Executors一般接受一个Callable对象,返回一个Future对象。可以取消任务,也可以查看任务的状态。使用get()方法得到call()的结果,在得到结果前会一直等待。如果get()被中断则会报InterruptionException,如果是在call()上面中断则报ExecutorsException。

Executors的invokeAny()方法,在多个任务同时执行时,得到第一个完成的任务的结果。输入是一个tasklist,仅仅给出正常完成的结果,如果所有的task线程都出现异常,则抛出异常。也有等待时限的版本。

同样的invokeAll()实现同样的功能,不过是针对所有的Future结果。

使用Future接口的cancel可以取消已经提交的任务,根据参数和任务状态的不同有以下几种可能:

  • 如果这个任务已经完成或之前的已被取消或由于其他原因不能被取消,那么这个方法将会返回false并且这个任务不会被取消。
  • 如果这个任务正在等待执行者获取执行它的线程,那么这个任务将被取消而且不会开始它的执行。如果这个任务已经正在运行,则视方法的参数情况而定。 cancel()方法接收一个Boolean值参数。如果参数为true并且任务正在运行,那么这个任务将被取消。如果参数为false并且任务正在运行,那么这个任务将不会被取消。

但是当使用Future的get方法来控制一个已经取消的任务,则抛出CancellationException异常。

也可以使用实现了Runnable的FutureTask类来得到线程计算的结果,其中get方法依然是在未得到结果前阻塞。call成功后会调用FutureTask中的done()方法。

CompletionService就是在内部维持了一个Future的队列,用来接受完成的任务,再此基础上通过poll和take取元素。这样就把处理结果和运行本身分开了,运行是自己开线程,而结果是在阻塞队列中的。

Fork/Join框架

Fork/Join框架主要是配合分治算法,同Executor框架不同的是,Fork/Join增加了work-steal算法,当执行任务的线程在等待join的时候,其工作线程可以去寻找其他未被执行的任务执行。

但是,由于该算法的特殊性,也使得Fork/Join框架有一定的局限性:

  • 任务只能使用fork()和join()操作,作为同步机制。如果使用其他同步机制,在同步期间,工作线程不能执行其他任务。比如,在Fork/Join框架中,使任务进入睡眠,在睡眠期间正在执行这个任务的工作线程将不会执行其他任务。
  • 任务不应该执行I/O操作,如读或写数据文件。
  • 任务不能抛出检查异常,它必须包括必要的代码来处理它们。

主要有两个类:

ForkJoinPool:实现了ExecutorService接口和work-stealing算法,管理工作线程等。

ForkJoinTask:执行任务的基类,提供在任务中执行fork()和join()的操作等等。通常情况下只要继承并实现该类的两个子类就可以,分别是没有返回值的RecursiveAction和有返回结果的RecursiveTask。

package com.lyb.Section5;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit; /**
* Created by lyb on 15-8-6.
*/
public class ForkJoinHelloWord {
public static void main(String[] args){
ProductListGenerator productListGenerator = new ProductListGenerator();
List<PenProduct> productList = productListGenerator.generate(10000);
PenTask task = new PenTask(productList,0,productList.size(),0.2);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(task); do {
System.out.printf("Main: Thread count: %d \n",pool.getActiveThreadCount());
System.out.printf("Main: Thread steal: %d \n",pool.getStealCount());
System.out.printf("Main: Thread Parallelism: %d \n",pool.getParallelism());
try {
TimeUnit.MILLISECONDS.sleep(5);
}catch (InterruptedException e){
e.printStackTrace();
}
}while (!task.isDone()); pool.shutdown(); if (task.isCompletedNormally()){
System.out.printf("Main: The process has completed Normally!\n");
} for (int i = 0; i<productList.size(); i++){
PenProduct product = productList.get(i);
if (product.getPrice() != 12){
System.out.printf("PenPaoduct : %s , %d \n",product.getName(),product.getPrice());
}
}
System.out.printf("Main : End of the program. \n");
}
} class PenProduct{
private String name;
private double price; public double getPrice() {
return price;
} public void setName(String name) {
this.name = name;
} public void setPrice(double price) {
this.price = price;
} public String getName() {
return name;
}
} class ProductListGenerator{
public List<PenProduct> generate(int size){
List<PenProduct> ret = new ArrayList<>();
for (int i = 0; i<size; i++ ){
PenProduct product = new PenProduct();
product.setName("PenProduct" + i);
product.setPrice(10);
ret.add(product);
}
return ret;
} } class PenTask extends RecursiveAction{
private static final long serialVersionUID = 1L;
private List<PenProduct> penProducts;
private int first;
private int last;
private double increment; public PenTask(List<PenProduct> penTaskList, int first, int last, double increment){
this.penProducts = penTaskList;
this.first = first;
this.last = last;
this.increment = increment;
} @Override
protected void compute() {
if (last - first < 10){
updatePrice();
}else {
int middle = (first+last)/2;
System.out.printf("PenTask : Pending tasks: %s \n",getQueuedTaskCount());
PenTask task1 = new PenTask(penProducts,first,middle+1,increment);
PenTask task2 = new PenTask(penProducts,middle+1,last,increment);
invokeAll(task1, task2);
}
} private void updatePrice(){
for (int i = first; i < last; i++){
PenProduct product = penProducts.get(i);
product.setPrice(product.getPrice()*(1+increment));
}
}
}

使用ForkJoinPool的无参构造器会构造一个线程数等于计算机处理器数的池。由于这里不需要返回值,因此这里产生ForkJoinTask的对象继承自RecursiveAction类,当任务中处理的项目数大于10,就分解为两个项目,一次执行一个任务。

invokeAll()方法执行每个任务创建的子任务,需要完成的任务在它所有的子任务未完成之前都是等待,而工作线程也是。这和Executor框架有区别,在Executors中仅仅有任务被提交到池中,而在Fork/Join框架中还包括对任务的控制方法。

即使是使用一个ForkJoinPool类也可以提交Runnable或者是Callable进行执行。但是不会调用work-steal算法。

ForkJoinTask类还提供其他的方法来完成一个任务的执行,并返回一个结果,这就是complete()方法。这个方法接收一个RecursiveTask类的参数化类型的对象,并且当join()方法被调用时,将这个对象作为任务的结果返回。 它被推荐使用在:提供异步任务结果。

异步任务的提交

使用join(),fork()方法可以异步的将执行结果回传,每次fork(),都会把任务交给到池中,然后池中根据当前执行任务的线程数进行分配,并且将执行完毕的结果返回。最终在所有任务执行完成后join().

然后再使用get得到结果。

get()和join()有两个主要的区别:

join()方法不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。

如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。

抛出异常:

在ForkJoinTask类的compute方法中并不会抛出异常,但是必须有处理异常的代码,可以通过该类中其他的一些方法得到异常。

isCompletedAbnormally()会检测任务是否正常完成,而且可以设置:

Exception e=new Exception("This task throws an Exception: "+ "Task
from "+start+" to "+end);
completeExceptionally(e);

不抛出异常。

取消一个任务

ForkJoinTask也可以使用cancel(),但是需要注意的是:

  • ForkJoinPool类并没有提供任何方法来取消正在池中运行或等待的所有任务。
  • 当你取消一个任务时,你不能取消一个已经执行的任务。

所以,取消任务只是取消当前任务,并不影响其他线程的任务或者是缓存在池中的任务,但是取消任务的上下文切换的开销很大,基本上不值得,不如先返回在做一次执行。

05-02 15:41