我需要介于可观察到的冷热之间。当客户订阅时,它应该发射项目,而当客户退订时,它应该停止发射。但是,当客户订阅相同的Observable时,其余项目应交付。最后一件事是项目之间的时间。

Observable<String> hotishObservable = createHotishObservable("a", "b", "c");
Disposable sub = hotishObservable.subscribe();
// emit "a"
// 1 second passed
// emit "b"
sub.dispose()
Disposable sub = hotishObservable.subscribe();
// emit "c"


显而易见的解决方案是扩展ObservableOnSubscribe并处理ObservableEmitter

class HotishSub implements ObservableOnSubscribe<String> {

    public HotishSub(String... items) {
        this.items = items;
    }

    @Override
    public void subscribe(ObservableEmitter<String> emitter) {
        if(isNotEmpty())
            emitter.onNext(nextItem);
            executor.schedule(this::handleNext, 1000, TimeUnit.MILLISECONDS);
        else
            emitter.onComplete();
    }

    private void handleNext(){
        //if emitter is not disposed and there're still items then emit it
    }
}

Observable<String> createHotishObservable(String... items){
  return Observable.create(new HotishSub(items));
}


有什么更好的选择吗?

来自简化的聊天机器人的消息流需要它。 UI客户端代码使用相同的Observable来获取来自漫游器和真实用户的消息流。

最佳答案

我用Flowable.generate来做。

  public static <T> Flowable<T> create(T... ts) {
    List<T> list = new ArrayList<>(Arrays.asList(ts));
    return Flowable.generate(() -> list, (l, e) -> {
      if (l.isEmpty()) {
        e.onComplete();
      } else {
        e.onNext(l.remove(0));
        if (!l.isEmpty()) {
          try {
            Thread.sleep(1000);
          } catch (InterruptedException ex) {
            // you can use other way to delay it
          }
        }
      }
    });
  }


然后测试

  public static void main(String[] args) throws Exception {
    Flowable<String> ob = create("a", "b", "c", "d", "e");
    Disposable d = ob.subscribeOn(Schedulers.computation())
        .subscribe(i -> System.out.println(System.currentTimeMillis() + "\t" + i));
    Thread.sleep(2500);
    d.dispose();
    ob.subscribeOn(Schedulers.computation())
        .blockingSubscribe(i -> System.err.println(System.currentTimeMillis() + "\t" + i));
  }


并输出:

1520304164412   a // sys.out
1520304165413   b // sys.out
1520304166413   c // sys.out
1520304166928   d // sys.err
1520304167927   e // sys.err

08-03 23:49