我可以使用Retrofit + RxJava收听无休止的流吗?例如Twitter流。我所拥有的是:
public interface MeetupAPI {
@GET("http://stream.meetup.com/2/rsvps/")
Observable<RSVP> getRSVPs();
}
MeetupAPI api = new Retrofit.Builder()
.baseUrl(MeetupAPI.RSVP_API)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build()
.create(MeetupAPI.class);
api.getRSVPs()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(rsvp -> Log.d(TAG, "got rsvp"),
error -> Log.d(TAG, "error: " + error),
() -> Log.d(TAG, "onComplete"));
但在解析第一个对象之后将调用“onComplete”。有没有办法告诉Retrofit保持开放直到另行通知?
最佳答案
这是我的解决方案:
您可以使用@Streaming批注:
public interface ITwitterAPI {
@GET("/2/rsvps")
@Streaming
Observable<ResponseBody> twitterStream();
}
ITwitterAPI api = new Retrofit.Builder()
.baseUrl("http://stream.meetup.com")
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build().create(ITwitterAPI.class);
使用
@Streaming
我们可以从ResponseBody
获取原始输入。这是我的函数,用于将主体用事件行分隔:
public static Observable<String> events(BufferedSource source) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
subscriber.onCompleted();
} catch (IOException e) {
e.printStackTrace();
subscriber.onError(e);
}
}
});
}
和结果用法:
api.twitterStream()
.flatMap(responseBody -> events(responseBody.source()))
.subscribe(System.out::println);
更新了有关正常停止的信息
当我们退订时,改造会关闭输入流。但是无法检测到输入流本身是否关闭或关闭,因此只有一种方法-尝试从流中读取-我们得到
Socket closed
消息的异常。我们可以将此异常解释为关闭:
@Override
public void call(Subscriber<? super String> subscriber) {
boolean isCompleted = false;
try {
while (!source.exhausted()) {
subscriber.onNext(source.readUtf8Line());
}
} catch (IOException e) {
if (e.getMessage().equals("Socket closed")) {
isCompleted = true;
subscriber.onCompleted();
} else {
throw new UncheckedIOException(e);
}
}
//if response end we get here
if (!isCompleted) {
subscriber.onCompleted();
}
}
如果由于响应结束而关闭了连接,那么我们就没有任何异常(exception)。在这里
isCompleted
检查。让我知道我是否错了:)关于android - Android Retrofit 2 + RxJava : listen to endless stream,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36603368/