我正在做一些关于Java 8中并发性的实验
在ScheduledThreadPoolExecutor API中
我可以看到以下两个签名:
schedule(Callable<V> callable, long delay, TimeUnit unit)
schedule(Runnable command, long delay, TimeUnit unit)
一种用于
Callable
,一种用于Runnable
我也可以在API中看到以下两个信息:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
我的问题是,为什么不存在
Callable
的两个等效项scheduleAtFixedRate(Callable<V> callable, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Callable<V> callable, long initialDelay, long delay, TimeUnit unit)
我需要为操作检索 boolean 结果。
谢谢你。
最佳答案
您期望scheduleAtFixedRate(Callable<V>)
的返回类型是什么? schedule(Callable<V>)
的返回类型为Future<V>
,指示在将来的某个时刻,可调用对象返回的V
类型的值将可用。您可以通过在Future上调用get()
来等待此值可用。scheduleAtFixedRate(Callable<V>)
的返回类型不能类似于Future<List<V>>
,这意味着在将来的某个时刻,重复调用callable所返回的所有值都是可用的。但是,将始终有更多的可调用计划执行,因此此列表将永远不存在。
对于这样的事情,您需要的是异步结果流的概念,您可以通过这种方式订阅该结果流,以便在每个结果到达时对其进行处理。据我所知,这在标准库中不存在。我知道的一个包含此类内容的第三方库是Netflix的RxJava。例如,使用该库中的 ReplaySubject
,您可以创建结果流并在返回每个结果时对其进行处理:
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import rx.subjects.ReplaySubject;
public class Callables {
public static void main(String[] args) {
// Example Callable that returns a boolean result
Random random = new Random();
Callable<Boolean> callable = () -> random.nextBoolean();
// Turn the Callable into a Runnable that adds the last result to the stream of results
ReplaySubject<Boolean> results = ReplaySubject.create();
Runnable runnable = () -> {
try {
boolean result = callable.call();
results.onNext(result);
} catch (Exception e) {
// Needed since Callable can throw an exception, but Runnable cannot
}
};
// Periodically run the Runnable
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
// Handling the results as they arrive
results.forEach(result -> System.out.println("Result: " + result));
System.out.println("Waiting for results...");
}
}
如果您决定使用RxJava,可能值得使用更多的API,而不是直接使用Executor。您可以使用
Observable.interval
生成一个定期发出数字的流,然后将其映射以调用您的可调用对象。这样,您可以以更简洁的方式获得相同的结果流:import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import rx.Observable;
public class MoreCallables {
public static void main(String[] args) throws IOException {
Observable<Long> periodic = Observable.interval(1, TimeUnit.SECONDS);
Random random = new Random();
Observable<Boolean> results = periodic.map(i -> random.nextBoolean());
results.forEach(result -> System.out.println("Result: " + result));
System.out.println("Waiting for results...");
System.in.read();
}
}
关于java - 为什么scheduleAtFixedRate-scheduleWithFixedDelay方法不使用Callable <V>,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/25022238/