我用以下代码在RxJava上看到了奇怪的行为:

package com.hotels.guestreview.infrastructure.repository;

    import java.util.List;
    import java.util.Random;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;

    import rx.Observable;
    import rx.functions.Action1;
    import rx.schedulers.Schedulers;


    import org.apache.commons.lang.RandomStringUtils;
    import rx.Observable;
    import rx.functions.Action1;
    import rx.schedulers.Schedulers;

    import java.util.List;
    import java.util.Random;
    import java.util.stream.Collectors;
    import java.util.stream.Stream;

    public class Main {

        public static void main(String[] args) {
            final Main m = new Main();
            m.run();
        }

        public void run() {
            final List<String> result = Observable.from(new Integer[]{4, 5, 6, 6, 7, 3})
                    .doOnNext(debug("Init"))
                    .flatMap(i -> Observable.defer(() -> toRandomList(i)).subscribeOn(Schedulers.io()))
                    .doOnNext(debug("defer"))
                    .flatMap(this::chooseString)
                    .doOnNext(debug("chooseString"))
                    .toList()
                    .doOnNext(debug("list"))
                    .toBlocking()
                    .single();
            System.out.println("\nresult = " + result);
        }


        public static Observable<List<String>> toRandomList(Integer n) {
            debug("perform IO").call(n);
            try {
                Thread.sleep(new Random().nextInt(3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            debug("IO done").call(n);
            final List<String> result = Stream.iterate(0, t -> t + 1)
                .map(i -> RandomStringUtils.randomAlphanumeric(n))
                .limit(n)
                .collect(Collectors.toList());
            return Observable.just(result);
        }

        public Observable<String> chooseString(List<String> list) {
            // guilty code
            /*
            try {
                Thread.sleep(new Random().nextInt(3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            */
            // end guilty code
            if (Math.random() > .3) {
                return Observable.just(list.get(new Random().nextInt(list.size())));
            }
            else {
                return Observable.empty();
            }
        }

        public static <T> Action1<T> debug(String s) {
            return o -> System.out.println(o + " | " + s + " | " + Thread.currentThread().getName());
        }

    }


我正在尝试在io调度程序上执行方法toRandomList,并且在对有罪代码进行注释的情况下,一切工作正常,在单独的线程上具有每个toRandomList的发射和后续流。
如果我在guilty code方法中删除chooseString的注释(添加睡眠),则toRandomList之后的每个步骤都在同一线程上执行。
为什么会这样呢?我做错了什么?

提前致谢

最佳答案

问题在平面图中在这里,应重构为:

Observable.from(new Integer[]{4, 5, 6, 6, 7, 3})
                .doOnNext(debug("Init"))
                .flatMap(i -> Observable.defer(() -> toRandomList(i))
                      .doOnNext(debug("defer"))
                      .flatMap(this::chooseString)
                      .subscribeOn(Schedulers.io())
                )


这样,在flatMap内部定义的所有子流(称为subscribeOn)都在所选的Scheduler线程上执行。

然后,正如@Dmitry在其响应中指出的,更好的方法是使用fromCallable而不是deferjust / empty的组合

08-16 19:06