本文介绍了RxJava:如何用依赖关系组合多个Observable并在最后收集所有结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习RxJava,并且作为我的第一个实验,试图在(引用是RxJava可以帮助解决的问题),以改善使用RxJava的异步性,即它不会等待第一个Future的结果( f1.get()



f3 取决于,然后继续执行其他代码。 F1 。我看到了如何处理这个问题, flatMap 似乎可以做到这一点:

 可观察<字符串> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
.flatMap(new Func1< String,Observable< String>>(){
@Override
public Observable< String> call(String s){
return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
}
});

接下来, f4 和 f5 取决于 f2 。我有这个:

  final Observable< Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
.flatMap(new Func1< Integer,Observable< Integer>>(){
@Override
public Observable< Integer> call(Integer i){
Observable< Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
Observable< Integer> f5Observable = Observable.from(executor。 submit(new CallToRemoteServiceE(i)));
return Observable.merge(f4Observable,f5Observable);
}
});

开始变得奇怪( merge ing他们可能不是我想要的......),但允许我在最后做到这一点,不是我想要的:

  f3Observable.subscribe(new Action1< String>(){
@Override
public void call(String s){
System.out.println(Observed from f3:+ s );
f4And5Observable.subscribe(new Action1< Integer>(){
@Override
public void call(Integer i){
System.out.println(Observed from f4和f5:+ i);
}
});
}
});

这给了我:

 从f3观察到:responseB_responseA 
从f4和f5观察到:140
从f4和f5观察到:5100

这就是所有的数字,但不幸的是我在单独的调用中得到了结果,所以我不能完全替换原始代码中的最终println:

  System.out.println(f3.get()+=>+(f4.get()* f5.get())) ; 

我不明白如何在同一行上访问这两个返回值。我认为这里可能有一些函数式编程fu。我怎样才能做到这一点?感谢。

解决方案

看起来您真正需要的是对RX的使用方式提供更多的鼓励和观点。我建议你阅读更多的文档以及大理石图(我知道它们并不总是有用的)。我还建议看看 lift()函数和运算符。


  • 可观察对象的整个观点是将数据流和数据操作连接成单个对象
  • 调用 map , flatMap 和过滤器将操纵数据流中的数据
  • 合并的要点是合并数据流

  • 运算符的要点是允许你可以破坏稳定的可观察流,并在数据流上定义你自己的操作。例如,我编写了一个移动平均值算子。在一个可观察的双精度数据中,总结 n double s来返回移动平均值流。代码看起来像这样


    $ b

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

    您可以放心,您认为理所当然的很多过滤方法都具有 lift()在引擎盖下。

    有了这个说法,所有它需要合并多个依赖项是:使用 map 将所有传入数据更改为标准数据类型

      code>或 flatMap
    • 将标准数据类型合并到流中
    • 如果一个对象需要在另一个对象上等待,或者需要在流中订购数据,则使用自定义运算符。警告:这种方法会减慢流量

    • 使用列表或订阅收集所有数据


    I'm learning RxJava and, as my first experiment, trying to rewrite the code in the first run() method in this code (cited on Netflix's blog as a problem RxJava can help solve) to improve its asynchronicity using RxJava, i.e. so it doesn't wait for the result of the first Future (f1.get()) before proceeding on to the rest of the code.

    f3 depends on f1. I see how to handle this, flatMap seems to do the trick:

    Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
        .flatMap(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
            }
        });
    

    Next, f4 and f5 depend on f2. I have this:

    final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
        .flatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer i) {
                Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
                Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
                return Observable.merge(f4Observable, f5Observable);
            }
        });
    

    Which starts to get weird (mergeing them probably isn't what I want...) but allows me to do this at the end, not quite what I want:

    f3Observable.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println("Observed from f3: " + s);
            f4And5Observable.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer i) {
                    System.out.println("Observed from f4 and f5: " + i);
                }
            });
        }
    });
    

    That gives me:

    Observed from f3: responseB_responseA
    Observed from f4 and f5: 140
    Observed from f4 and f5: 5100
    

    which is all the numbers, but unfortunately I get the results in separate invocations, so I can't quite replace the final println in the original code:

    System.out.println(f3.get() + " => " + (f4.get() * f5.get()));
    

    I don't understand how to get access to both those return values on the same line. I think there's probably some functional programming fu I'm missing here. How can I do this? Thanks.

    解决方案

    It looks like all you really need is a bit more encouragement and perspective on how RX is used. I'd suggest you read more into the documentation as well as marble diagrams (I know they're not always useful). I also suggest looking into the lift() function and operators.

    • The entire point of an observable is to concatenate data flow and data manipulation into a single object
    • The point of calls to map, flatMap and filter are to manipulate the data in your data flow
    • The point of merges are to combine data flows
    • The point of operators are to allow you to disrupt a steady stream of observables and define your own operations on a data flow. For example, I coded a moving average operator. That sums up n doubles in an Observable of doubles to return a stream of moving averages. The code literally looked like this

      Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

    You'll be a relieved that a lot of the filtering methods that you take for granted all have lift() under the hood.

    With that said; all it takes to merge multiple dependencies is:

    • changing all incoming data to a standard data type using map or flatMap
    • merging standard data-types to a stream
    • using custom operators if one object needs to wait on another, or if you need to order data in the stream. Caution: this approach will slow the stream down
    • using to list or subscribe to collect all of that data

    这篇关于RxJava:如何用依赖关系组合多个Observable并在最后收集所有结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-02 14:56