我正在使用以下方法获取订阅的ExecutorService:

public static ExecutorService getThreadPoolExecutorService(int threads)
{
    int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            NUMBER_OF_CORES * 2,
            NUMBER_OF_CORES * 2,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>()
    );
    return Executors.newFixedThreadPool(threads, threadPoolExecutor.getThreadFactory());
}

并按如下所示使用它:
public static Scheduler getBackgroundSchedular()
{
    if (mBackgroundExecutorService == null)
        mBackgroundExecutorService = ThreadUtil.getThreadPoolExecutorService(4);
    return Schedulers.from(mBackgroundExecutorService);
}

我将此时间表用于应在后台运行的可观察对象。

问题

如何为RXJava使用PriorityBlockingQueue?通常,我会使用一些特殊的可运行对象来实现一个比较功能,并将相应的比较功能用于PriorityBlockingQueue并将上面示例中的LinkedBlockingQueue替换为PriorityBlockingQueue,但是如何使用RXJava observables来做到这一点呢?

最佳答案

RxJava中的调度程序与数据流正交,根本不了解它们。他们所做的只是执行一个Runnable实例,并且运算符创建的所有Runnable都是平等的。因此,在调度程序中使用优先级队列是没有意义的。

另外,数据流是顺序的,除非有诸如线程交换之类的边界,否则它们将保持其顺序。除非您收集事件并手动进行排序,否则具有优先级队列的运算符(operator)很可能会按预期对数据进行重新排序。

编辑

有点不合常规,但是您也可以在主题和某些运算符之间放置PriorityBlockingQueue,以便您可以为序列提供任务:

PriorityBlockingQueue<Task> q = ...

Subject<Integer, Integer> subject = PublishSubject.<Integer>create().toSerialized();

subject
.map(v -> q.poll())
.doOnNext(v -> v.execute())
.subscribe();

q.offer(new Task(...));
subject.onNext(1);

10-08 09:13