问题描述
我需要找到一种在java中并行执行任务(依赖和独立)的方法。
I need to find a way to execute tasks (dependent and independent) in parallel in java.
- 任务A和任务C可以独立运行。
- 任务B取决于任务A的输出。
我查了一下java.util.concurrent Future和Fork / Join,但看起来我们无法向任务添加依赖。
I checked java.util.concurrent Future and Fork/Join, but looks like we cannot add dependency to a Task.
任何人都可以指出我更正Java API。
Can anyone point me to correct Java API.
推荐答案
在Scala中,这很容易做到,我认为你最好使用Scala。以下是我从这里获取的一个示例(新手指南Scala第16部分:从何处开始)这家伙有一个很棒的博客(我不是那个人)
In Scala this is very easy to do, and I think you are better off using Scala. Here is an example I pulled from here http://danielwestheide.com/ (The Neophyte’s Guide to Scala Part 16: Where to Go From Here) this guy has a great blog (I am not that guy)
让我们去喝咖啡吧。要做的任务是:
Lets take a barrista making coffee. The tasks to do are:
- 研磨所需的咖啡豆(没有前面的任务)
- 热一些水(没有前面的任务)
- 用磨碎的咖啡和加热的水冲泡浓缩咖啡(取决于1& 2)
- 泡沫一些牛奶(没有先前的任务)
- 将泡沫牛奶和浓缩咖啡混合(取决于3,4)
- Grind the required coffee beans (no preceding tasks)
- Heat some water (no preceding tasks)
- Brew an espresso using the ground coffee and the heated water (depends on 1 & 2)
- Froth some milk (no preceding tasks)
- Combine the froth milk and the espresso (depends on 3,4)
或作为树:
Grind _
Coffe \
\
Heat ___\_Brew____
Water \_____Combine
/
Foam ____________/
Milk
在使用并发api的java中,这将是:
In java using the concurrency api this would be:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Barrista {
static class HeatWater implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Heating Water");
Thread.sleep(1000);
return "hot water";
}
}
static class GrindBeans implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("Grinding Beans");
Thread.sleep(2000);
return "grinded beans";
}
}
static class Brew implements Callable<String> {
final Future<String> grindedBeans;
final Future<String> hotWater;
public Brew(Future<String> grindedBeans, Future<String> hotWater) {
this.grindedBeans = grindedBeans;
this.hotWater = hotWater;
}
@Override
public String call() throws Exception
{
System.out.println("brewing coffee with " + grindedBeans.get()
+ " and " + hotWater.get());
Thread.sleep(1000);
return "brewed coffee";
}
}
static class FrothMilk implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "some milk";
}
}
static class Combine implements Callable<String> {
public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
super();
this.frothedMilk = frothedMilk;
this.brewedCoffee = brewedCoffee;
}
final Future<String> frothedMilk;
final Future<String> brewedCoffee;
@Override
public String call() throws Exception {
Thread.sleep(1000);
System.out.println("Combining " + frothedMilk.get() + " "
+ brewedCoffee.get());
return "Final Coffee";
}
}
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));
executor.execute(heatWaterFuture);
executor.execute(grindBeans);
executor.execute(brewCoffee);
executor.execute(frothMilk);
executor.execute(combineCoffee);
try {
/**
* Warning this code is blocking !!!!!!!
*/
System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
e.printStackTrace();
} finally{
executor.shutdown();
}
}
}
确保增加时间但是要确保你的代码不会永远等待完成某些事情,这可以通过使用Future.get(long,TimeUnit)来完成,然后相应地处理失败。
Make sure that you add time outs though to ensure that your code will not wait forever on something to complete, that is done by using the Future.get(long, TimeUnit) and then handle failure accordingly.
它在scala中更好,但是,它就像在博客上一样:
准备一些咖啡的代码看起来像这样:
It is much nicer in scala however, here it is like it's on the blog:The code to prepare some coffee would look something like this:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
其中所有方法都返回未来(打字的未来),例如grind就像这个:
where all the methods return a future (typed future), for instance grind would be something like this:
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
// grinding function contents
}
对于所有实施,请查看博客,但这就是它的全部内容。您也可以轻松集成Scala和Java。我真的建议在Scala而不是Java中做这种事情。 Scala需要更少的代码,更清洁和事件驱动。
For all the implementations check out the blog but that's all there is to it. You can integrate Scala and Java easily as well. I really recommend doing this sort of thing in Scala instead of Java. Scala requires much less code, much cleaner and event driven.
这篇关于在Java中并行执行从属任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!