这正在使用RxJava版本0.19.6。

在groupBy操作之外,可以创建以下代码描述的管道,例如,基于某些条件从Observable中选择一条记录,或选择满足某些替代条件的第一条记录:

Observable<Long> observable = Observable.interval(1, TimeUnit.MILLISECONDS).take(10);
Observable<Long> filter1 = observable.filter(new Func1<Long, Boolean>() {
    @Override
    public Boolean call(Long aLong) {
        return 5 == aLong % 5;
    }
});
Observable<Long> filter2 = observable.filter(new Func1<Long, Boolean>() {
    @Override
    public Boolean call(Long aLong) {
        return 2 == aLong % 5;
    }
});
BlockingObservable.from(Observable.concat(filter1, filter2).first()).forEach(new Action1<Long>() {
    @Override
    public void call(Long aLong) {
        System.out.println(aLong);
    }
});


...不幸的是,由于对GroupedObservable的限制,似乎无法在分组上下文中运行相同类型的过程:

BlockingObservable.from(observable.groupBy(new Func1<Long, Long>() {
    @Override
    public Long call(Long aLong) {
        return aLong % 5;
    }
}).flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Long>>() {
    @Override
    public Observable<Long> call(GroupedObservable<Long, Long> in) {
        Observable<Long> filter1 = in.filter(new Func1<Long, Boolean>() {
            @Override
            public Boolean call(Long aLong) {
                return 5 == aLong % 5;
            }
        });
        Observable<Long> filter2 = in.filter(new Func1<Long, Boolean>() {
            @Override
            public Boolean call(Long aLong) {
                return 2 == aLong % 5;
            }
        });
        return Observable.concat(filter1, filter2).first();
    }
})).forEach(new Action1<Long>() {
    @Override
    public void call(Long aLong) {
        System.out.println(aLong);
    }
});


...导致多个订阅者异常(线程“ main”中的异常java.lang.IllegalStateException:仅允许一个订阅者!)。

我是否缺少解决此问题的明显方法?在这种情况下,我尝试使用ConnectableObservables来显示单个订户的外观,但是这些尝试同样也是失败的(肯定是由于我的无知)。



与此相关的是,groupByUntil似乎也为您提供了对GroupedObservable的引用,如果我实际上试图使用它来确定何时关闭窗口,这也使我感到类似的麻烦,即抱怨多个订阅者。再次重申,我确信自己会忽略一些明显的事情,因为API显然希望人们使用GroupedObservable!

最佳答案

您可能可以在GroupedObservable上使用.cache()。


final Observable<Long> inCached = in.cache();


然后在您的过滤器中使用结果Observable。

Observable<Long> filter1 = inCached.filter(new Func1<Long, Boolean>() {
        @Override
        public Boolean call(Long aLong) {
            return 5 == aLong % 5;
        }
    });


这样,每个订户将看到相同的项目,但是GroupedObservable只有一个订户。

10-08 12:51