我用以下代码在RxJava上看到了奇怪的行为:
package com.hotels.guestreview.infrastructure.repository;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import org.apache.commons.lang.RandomStringUtils;
import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
final Main m = new Main();
m.run();
}
public void run() {
final List<String> result = Observable.from(new Integer[]{4, 5, 6, 6, 7, 3})
.doOnNext(debug("Init"))
.flatMap(i -> Observable.defer(() -> toRandomList(i)).subscribeOn(Schedulers.io()))
.doOnNext(debug("defer"))
.flatMap(this::chooseString)
.doOnNext(debug("chooseString"))
.toList()
.doOnNext(debug("list"))
.toBlocking()
.single();
System.out.println("\nresult = " + result);
}
public static Observable<List<String>> toRandomList(Integer n) {
debug("perform IO").call(n);
try {
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
debug("IO done").call(n);
final List<String> result = Stream.iterate(0, t -> t + 1)
.map(i -> RandomStringUtils.randomAlphanumeric(n))
.limit(n)
.collect(Collectors.toList());
return Observable.just(result);
}
public Observable<String> chooseString(List<String> list) {
// guilty code
/*
try {
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
*/
// end guilty code
if (Math.random() > .3) {
return Observable.just(list.get(new Random().nextInt(list.size())));
}
else {
return Observable.empty();
}
}
public static <T> Action1<T> debug(String s) {
return o -> System.out.println(o + " | " + s + " | " + Thread.currentThread().getName());
}
}
我正在尝试在io调度程序上执行方法
toRandomList
,并且在对有罪代码进行注释的情况下,一切工作正常,在单独的线程上具有每个toRandomList
的发射和后续流。如果我在
guilty code
方法中删除chooseString
的注释(添加睡眠),则toRandomList
之后的每个步骤都在同一线程上执行。为什么会这样呢?我做错了什么?
提前致谢
最佳答案
问题在平面图中在这里,应重构为:
Observable.from(new Integer[]{4, 5, 6, 6, 7, 3})
.doOnNext(debug("Init"))
.flatMap(i -> Observable.defer(() -> toRandomList(i))
.doOnNext(debug("defer"))
.flatMap(this::chooseString)
.subscribeOn(Schedulers.io())
)
这样,在
flatMap
内部定义的所有子流(称为subscribeOn
)都在所选的Scheduler
线程上执行。然后,正如@Dmitry在其响应中指出的,更好的方法是使用
fromCallable
而不是defer
和just
/ empty
的组合