我正在使用以下方法获取订阅的ExecutorService
:
public static ExecutorService getThreadPoolExecutorService(int threads)
{
int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
NUMBER_OF_CORES * 2,
NUMBER_OF_CORES * 2,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
return Executors.newFixedThreadPool(threads, threadPoolExecutor.getThreadFactory());
}
并按如下所示使用它:
public static Scheduler getBackgroundSchedular()
{
if (mBackgroundExecutorService == null)
mBackgroundExecutorService = ThreadUtil.getThreadPoolExecutorService(4);
return Schedulers.from(mBackgroundExecutorService);
}
我将此时间表用于应在后台运行的可观察对象。
问题
如何为RXJava使用
PriorityBlockingQueue
?通常,我会使用一些特殊的可运行对象来实现一个比较功能,并将相应的比较功能用于PriorityBlockingQueue
并将上面示例中的LinkedBlockingQueue
替换为PriorityBlockingQueue
,但是如何使用RXJava observables来做到这一点呢? 最佳答案
RxJava中的调度程序与数据流正交,根本不了解它们。他们所做的只是执行一个Runnable实例,并且运算符创建的所有Runnable都是平等的。因此,在调度程序中使用优先级队列是没有意义的。
另外,数据流是顺序的,除非有诸如线程交换之类的边界,否则它们将保持其顺序。除非您收集事件并手动进行排序,否则具有优先级队列的运算符(operator)很可能会按预期对数据进行重新排序。
编辑
有点不合常规,但是您也可以在主题和某些运算符之间放置PriorityBlockingQueue,以便您可以为序列提供任务:
PriorityBlockingQueue<Task> q = ...
Subject<Integer, Integer> subject = PublishSubject.<Integer>create().toSerialized();
subject
.map(v -> q.poll())
.doOnNext(v -> v.execute())
.subscribe();
q.offer(new Task(...));
subject.onNext(1);