我正在尝试使一个非常基础的RxJava应用程序正常工作。我定义了以下Observable类,该类从文件读取并返回行:

public Observable<String> getObservable() throws IOException
    {
        return Observable.create(subscribe -> {
            InputStream in = getClass().getResourceAsStream("/trial.txt");
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            String line = null;
            try {
                while((line = reader.readLine()) != null)
                {
                    subscribe.onNext(line);
                }
            } catch (IOException e) {
                subscribe.onError(e);
            }
            finally {
                subscribe.onCompleted();
            }
        });
    }


接下来,我定义了subscrober代码:

public static void main(String[] args) throws IOException, InterruptedException {
        Thread thread = new Thread(() ->
        {
            RxObserver observer = new RxObserver();
            try {
                observer.getObservable()
                        .observeOn(Schedulers.io())
                        .subscribe( x ->System.out.println(x),
                                    t -> System.out.println(t),
                                    () -> System.out.println("Completed"));

            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        thread.start();
        thread.join();
    }


该文件有近50000条记录。运行该应用程序时,我收到“ rx.exceptions.MissingBackpressureException”。我已经阅读了一些文档,并且按照建议,我尝试在调用链中添加“ .onBackpressureBuffer()”方法。但是然后我没有收到异常,但是完成的调用也没有被解雇。

处理快速生成可观察值的场景的正确方法是什么?

最佳答案

第一个问题是您的readLine逻辑忽略背压。您可以在onBackpressureBuffer()开始之前应用observeOn,但是最近添加了一个SyncOnSubscribe,可以让您逐个生成值并处理背压:

SyncOnSubscribe.createSingleState(() => {
    try {
        InputStream in = getClass().getResourceAsStream("/trial.txt");
        return new BufferedReader(new InputStreamReader(in));
    } catch (IOException ex) {
        throw new RuntimeException(ex);
    }
},
(s, o) -> {
    try {
        String line = s.readLine();
        if (line == null) {
            o.onCompleted();
        } else {
            o.onNext(line);
        }
    } catch (IOException ex) {
        s.onError(ex);
    }
},
s -> {
    try {
       s.close();
    } catch (IOException ex) {
    }
});


第二个问题是,您的线程将在io线程上的所有元素都已交付之前完成,因此主程序将退出。删除observeOn,添加.toBlocking或使用CountDownLatch

RxObserver observer = new RxObserver();
try {

    CountDownLatch cdl = new CountDownLatch(1);

    observer.getObservable()
           .observeOn(Schedulers.io())
           .subscribe( x ->System.out.println(x),
                       t -> { System.out.println(t); cdl.countDown(); },
                       () -> { System.out.println("Completed"); cdl.countDown(); });

    cdl.await();
 } catch (IOException | InterruptedException e) {
     e.printStackTrace();
 }

09-26 12:14