我可以使用Retrofit + RxJava收听无休止的流吗?例如Twitter流。我所拥有的是:

public interface MeetupAPI {
    @GET("http://stream.meetup.com/2/rsvps/")
    Observable<RSVP> getRSVPs();
}

MeetupAPI api = new Retrofit.Builder()
            .baseUrl(MeetupAPI.RSVP_API)
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build()
            .create(MeetupAPI.class);

api.getRSVPs()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(rsvp -> Log.d(TAG, "got rsvp"),
                error -> Log.d(TAG, "error: " + error),
                () -> Log.d(TAG, "onComplete"));

但在解析第一个对象之后将调用“onComplete”。有没有办法告诉Retrofit保持开放直到另行通知?

最佳答案

这是我的解决方案:

您可以使用@Streaming批注:

public interface ITwitterAPI {

    @GET("/2/rsvps")
    @Streaming
    Observable<ResponseBody> twitterStream();
}

ITwitterAPI api = new Retrofit.Builder()
          .baseUrl("http://stream.meetup.com")
          .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
          .build().create(ITwitterAPI.class);

使用@Streaming我们可以从ResponseBody获取原始输入。

这是我的函数,用于将主体用事件行分隔:
public static Observable<String> events(BufferedSource source) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                while (!source.exhausted()) {
                    subscriber.onNext(source.readUtf8Line());
                }
                subscriber.onCompleted();
            } catch (IOException e) {
                e.printStackTrace();
                subscriber.onError(e);
            }
        }
    });
}

和结果用法:
api.twitterStream()
  .flatMap(responseBody -> events(responseBody.source()))
  .subscribe(System.out::println);

更新了有关正常停止的信息

当我们退订时,改造会关闭输入流。但是无法检测到输入流本身是否关闭或关闭,因此只有一种方法-尝试从流中读取-我们得到Socket closed消息的异常。
我们可以将此异常解释为关闭:
        @Override
        public void call(Subscriber<? super String> subscriber) {
            boolean isCompleted = false;
            try {
                while (!source.exhausted()) {
                    subscriber.onNext(source.readUtf8Line());
                }
            } catch (IOException e) {
                if (e.getMessage().equals("Socket closed")) {
                    isCompleted = true;
                    subscriber.onCompleted();
                } else {
                    throw new UncheckedIOException(e);
                }
            }
            //if response end we get here
            if (!isCompleted) {
                subscriber.onCompleted();
            }
        }

如果由于响应结束而关闭了连接,那么我们就没有任何异常(exception)。在这里isCompleted检查。让我知道我是否错了:)

关于android - Android Retrofit 2 + RxJava : listen to endless stream,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36603368/

10-12 03:12