问题描述
我正在玩RxJava 运营商。在互联网上发现它很少,唯一值得一提的是。这也无法探索我想了解的各种用例。我还投入异步执行并使用后退重试以使其更加真实。
I'm playing with the RxJava retryWhen operator. Very little is found about it on the internet, the only one worthy of any mention being this. That too falls short of exploring the various use cases that I'd like to understand. I also threw in asynchronous execution and retry with back-off to make it more realistic.
我的设置很简单:我有一个类 ChuckNorrisJokesRepository
从JSON文件中返回随机数量的Chuck Norris笑话。我正在测试的课程是 ChuckNorrisJokesService
,如下所示。我感兴趣的用例如下:
My setup is simple: I've a class ChuckNorrisJokesRepository
that returns random number of Chuck Norris jokes from a JSON file. My class under test is ChuckNorrisJokesService
which is shown below. The use cases I'm interested in are as follows:
- 第一次尝试成功(无重试)
- 1次重试后失败
- 尝试重试3次但是第2次成功因此没有重试第3次
- 第3次重试成功
- Succeeds on 1st attempt (no retries)
- Fails after 1 retry
- Attempts to retry 3 times but succeeds on 2nd hence doesn't retry 3rd time
- Succeeds on 3rd retry
注意:该项目可在我的。
ChuckNorrisJokesService.java :
@Slf4j
@Builder
public class ChuckNorrisJokesService {
@Getter
private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());
private final Scheduler scheduler;
private final ChuckNorrisJokesRepository jokesRepository;
private final CountDownLatch latch;
private final int numRetries;
private final Map<String, List<String>> threads;
public static class ChuckNorrisJokesServiceBuilder {
public ChuckNorrisJokesService build() {
if (scheduler == null) {
scheduler = Schedulers.io();
}
if (jokesRepository == null) {
jokesRepository = new ChuckNorrisJokesRepository();
}
if (threads == null) {
threads = new ConcurrentHashMap<>();
}
requireNonNull(latch, "CountDownLatch must not be null.");
return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
}
}
public void setRandomJokes(int numJokes) {
mergeThreadNames("getRandomJokes");
Observable.fromCallable(() -> {
log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
mergeThreadNames("fromCallable");
latch.countDown();
List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
log.debug("fromCallable - after call. Latch: {}.", latch.getCount());
return randomJokes;
}).retryWhen(errors ->
errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
log.debug("retryWhen. retryCount: {}.", retryCount);
mergeThreadNames("retryWhen");
return Observable.timer(retryCount, TimeUnit.SECONDS);
}))
.subscribeOn(scheduler)
.subscribe(j -> {
log.debug("onNext. Latch: {}.", latch.getCount());
mergeThreadNames("onNext");
jokes.set(new Jokes("success", j));
latch.countDown();
},
ex -> {
log.error("onError. Latch: {}.", latch.getCount(), ex);
mergeThreadNames("onError");
},
() -> {
log.debug("onCompleted. Latch: {}.", latch.getCount());
mergeThreadNames("onCompleted");
latch.countDown();
}
);
}
private void mergeThreadNames(String methodName) {
threads.merge(methodName,
new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
(value, newValue) -> {
value.addAll(newValue);
return value;
});
}
}
为简洁起见,我只会展示Spock测试第一个用例的案例。请参阅我的用于其他测试用例。
For brevity, I'll only show the Spock test case for the 1st use case. See my GitHub for the other test cases.
def "succeeds on 1st attempt"() {
setup:
CountDownLatch latch = new CountDownLatch(2)
Map<String, List<String>> threads = Mock(Map)
ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
.latch(latch)
.threads(threads)
.build()
when:
service.setRandomJokes(3)
latch.await(2, TimeUnit.SECONDS)
Jokes jokes = service.jokes.get()
then:
jokes.status == 'success'
jokes.count() == 3
1 * threads.merge('getRandomJokes', *_)
1 * threads.merge('fromCallable', *_)
0 * threads.merge('retryWhen', *_)
1 * threads.merge('onNext', *_)
0 * threads.merge('onError', *_)
1 * threads.merge('onCompleted', *_)
}
这失败了:
Too few invocations for:
1 * threads.merge('fromCallable', *_) (0 invocations)
1 * threads.merge('onNext', *_) (0 invocations)
Wha我期待的是 fromCallable
被调用一次,它成功, onNext
被调用一次,然后是 onCompleted
。我缺少什么?
What I'm expecting is that fromCallable
is called once, it succeeds, onNext
is called once, followed by onCompleted
. What am I missing?
PS:完全披露 - 我也在。
P.S.: Full disclosure - I've also posted this question on RxJava GitHub.
推荐答案
我在几个小时之后解决了这个问题在ReactiveX会员David Karnok的帮助下进行故障排除。
I solved this after several hours of troubleshooting and with help from ReactiveX member David Karnok.
retryWhen
是一个复杂的,甚至是错误的运营商。官方文档和至少一个答案使用 range
运算符,如果没有重试,它将立即完成。请参阅我与David Karnok的。
retryWhen
is a complicated, perhaps even buggy, operator. The official doc and at least one answer here use range
operator, which completes immediately if there are no retries to be made. See my discussion with David Karnok.
该代码可在我的完成以下测试用例:
The code is available on my GitHub complete with the following test cases:
- 第一次尝试成功(无重试)
- 1次重试后失败
- 尝试重试3次但是第2次成功因此没有重试第3次
- 第三次重试成功
- Succeeds on 1st attempt (no retries)
- Fails after 1 retry
- Attempts to retry 3 times but succeeds on 2nd hence doesn't retry 3rd time
- Succeeds on 3rd retry
这篇关于RxJava重试当奇怪的行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!