今天在看jdk1.7的forkjoin框架时候,里面有个例子如下:
product类:
public class Product { private String name; private double price; public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public double getPrice() {
return price;
} public void setPrice(double price) {
this.price = price;
}
}
ProductListGenerator类:
public class ProductListGenerator { public List<Product> generate(int size){
List<Product> ret= new ArrayList<>();
for (int i = 0; i<size;i++){
Product product = new Product();
product.setName("product" + i);
product.setPrice(10);
ret.add(product);
}
return ret;
}
}
Task类:
public class Task extends RecursiveAction { private int first; private int last; private double increment; private List<Product> productList; public Task(int first, int last, double increment, List<Product> productList) {
this.first = first;
this.last = last;
this.increment = increment;
this.productList = productList;
} @Override
protected void compute() { if (last - first <9){
updatePrices(); }else {
int middle = (last+first)/2; //System.out.printf("Task:Pending tasks:%s\n",getQueuedTaskCount()); Task t1 = new Task(first,middle+1,increment,productList); Task t2 = new Task(middle+1,last,increment,productList);
invokeAll(t1,t2);
}
} private void updatePrices(){
for(int i = first;i<last;i++){
Product product = productList.get(i);
product.setPrice(product.getPrice()*(1+increment));
}
} }
main方法:
public class Main { public static void main(String[] args) {
ProductListGenerator generator = new ProductListGenerator();
Long startTime = System.currentTimeMillis();
List<Product> products = generator.generate(10000000);
Task task = new Task(0,products.size(),0.20,products); ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(task); do {
System.out.printf("Main:Thread count:%d\n",forkJoinPool.getActiveThreadCount());
System.out.printf("Main:Thread steal:%d\n",forkJoinPool.getStealCount());
System.out.printf("Main:Parallelism:%d\n",forkJoinPool.getParallelism());
try {
// TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}while (!task.isDone()); System.out.println("========");
forkJoinPool.shutdown(); long endTime = System.currentTimeMillis();
long time = endTime-startTime;
System.out.println("在内存中运算时间:" + time + "毫秒"); if (task.isCompletedNormally()){
System.out.printf("Main:The proccess has completed normally.\n");
} for (int i =0;i<products.size();i++){
Product product = products.get(i);
if(product.getPrice() != 12){
System.out.printf("Product %s : %f\n",product.getName(),product.getPrice());
}
} System.out.printf("Main:End of the program. \n");
} }
这样是没问题的,1千万条数据运行大概需要15000多毫秒.然而自己实现,不用实现RecursiveAction的话只要5000毫秒左右:代码
long starT1 = System.currentTimeMillis(); List<Product> list = new ArrayList<>();
for(int i =0;i<10000000;i++){
Product product = new Product();
product.setName("product" + i);
product.setPrice(10);
list.add(product);
} for(int i = 0;i<list.size();i++){
Product product = list.get(i);
product.setPrice(product.getPrice()*(1+0.2));
} long endTimeT2 =System.currentTimeMillis();
long t = endTimeT2 -starT1;
System.out.println("单线程1000万数据时间:" + t + "毫秒");
就有点不明白了,就算是因为在task里面有构造方法以及因为判断影响,但是这样多线程是为了什么那? 还有 我把ArrayList修改为Actor,这样也是差不多的结果。。。 没明白fork/join框架的devide方法 究竟有什么好处?
后来明白了:demo里面的 属于计算密集型,线程数目应该适当小些。因为有线程的来回切换,导致时间比单线程要慢些,如果在单线程加上休眠1毫秒,会发现那个速度特别慢了;而如果是IO密集型,比如读取文件、数据库连接、网络通讯,线程数适当大些。