以下是我的代码段。

我知道您不应该这样阻止cachedFlowable,但这只是一个例子。

它卡在blockingGet行中。

如果将singleOrError替换为singleElement,代码仍然会卡住。如果将singleOrError替换为firstElement,代码将不再卡住。

有人可以向我解释为什么会这样吗?

    public static void main(String[] args) {
        final Flowable<Integer> cachedFlowable = Flowable.just(1).cache();
        cachedFlowable
                .doOnNext(i -> {
                    System.out.println("doOnNext " + i);
                    final Integer j = cachedFlowable.singleOrError().blockingGet();
                    System.out.println("after blockingGet " + j);
                })
                .blockingSubscribe();
    }

最佳答案

之所以用singleX运算符造成死锁,是因为此类运算符等待第二项可能的发出,但是由于您正在阻止它们,因此无法执行主源中的任何第二项或补全。使用firstX时,他们只关心第一个项目,因此几乎立即取消阻止,从而可以完成源。

因此,是的,您不应该在这样的流中使用阻塞方法,而应使用flatMapconcatMap进行每个项目的子流:

var cache = Flowable.just(1).cache();

cache
.doOnNext(i -> System.out.println("doOnNext " + i))
.concatMapSingle(item -> cache.firstOrError())
.doOnNext(j -> System.out.println("after " + j))
.blockingSubscribe();

10-06 10:03