我需要介于可观察到的冷热之间。当客户订阅时,它应该发射项目,而当客户退订时,它应该停止发射。但是,当客户订阅相同的Observable
时,其余项目应交付。最后一件事是项目之间的时间。
Observable<String> hotishObservable = createHotishObservable("a", "b", "c");
Disposable sub = hotishObservable.subscribe();
// emit "a"
// 1 second passed
// emit "b"
sub.dispose()
Disposable sub = hotishObservable.subscribe();
// emit "c"
显而易见的解决方案是扩展
ObservableOnSubscribe
并处理ObservableEmitter
:class HotishSub implements ObservableOnSubscribe<String> {
public HotishSub(String... items) {
this.items = items;
}
@Override
public void subscribe(ObservableEmitter<String> emitter) {
if(isNotEmpty())
emitter.onNext(nextItem);
executor.schedule(this::handleNext, 1000, TimeUnit.MILLISECONDS);
else
emitter.onComplete();
}
private void handleNext(){
//if emitter is not disposed and there're still items then emit it
}
}
Observable<String> createHotishObservable(String... items){
return Observable.create(new HotishSub(items));
}
有什么更好的选择吗?
来自简化的聊天机器人的消息流需要它。 UI客户端代码使用相同的
Observable
来获取来自漫游器和真实用户的消息流。 最佳答案
我用Flowable.generate
来做。
public static <T> Flowable<T> create(T... ts) {
List<T> list = new ArrayList<>(Arrays.asList(ts));
return Flowable.generate(() -> list, (l, e) -> {
if (l.isEmpty()) {
e.onComplete();
} else {
e.onNext(l.remove(0));
if (!l.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// you can use other way to delay it
}
}
}
});
}
然后测试
public static void main(String[] args) throws Exception {
Flowable<String> ob = create("a", "b", "c", "d", "e");
Disposable d = ob.subscribeOn(Schedulers.computation())
.subscribe(i -> System.out.println(System.currentTimeMillis() + "\t" + i));
Thread.sleep(2500);
d.dispose();
ob.subscribeOn(Schedulers.computation())
.blockingSubscribe(i -> System.err.println(System.currentTimeMillis() + "\t" + i));
}
并输出:
1520304164412 a // sys.out
1520304165413 b // sys.out
1520304166413 c // sys.out
1520304166928 d // sys.err
1520304167927 e // sys.err