问题描述
我在我的应用中使用了以及RxJava。
只要后端数据发生变化(添加,删除,更改...),Firebase就能够通知您的应用程序。
我试图将Firebase与RxJava的功能结合在一起。
我正在听的数据叫做 $ c $,并且 Observable 发出 LeisureUpdate ,其中包含 Leisure 和更新类型(添加,删除,移动,更改)。
这是我的方法,它允许订阅这个事件。
private Observable< LeisureUpdate> leisureUpdatesObservable;
私人ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable< LeisureUpdate> subscribeToLeisuresUpdates(){
if(leisureUpdatesObservable == null){
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe< LeisureUpdate>(){
$ b $ @Override
public void call(final订阅者< ;?超级LeisureUpdate>订阅者){
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener(){
@Override
public void onChildAdded DataSnapshot dataSnapshot,String s){
final Leisure Leisure = convertMapToLeisure((Map< String,Object>)dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure,LeisureUpdate.ADDED)) ;
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot,String s){
final Lei (休闲)= convertMapToLeisure((Map< String,Object>)dataSnapshot.getValue());
subscriber.onNext(新的LeisureUpdate(休闲,LeisureUpdate.CHANGED)); (Map< String,Object>)dataSnapshot.getValue()返回一个数组,其中包含一个数据快照(DataSnapshot dataSnapshot) );
subscriber.onNext(新的LeisureUpdate(leisure,LeisureUpdate.REMOVED));
$ b @Override
public void onChildMoved(DataSnapshot dataSnapshot,String s){
Leisure Leisure = convertMapToLeisure((Map< String,Object>)dataSnapshot。的getValue());
subscriber.onNext(新的LeisureUpdate(休闲,LeisureUpdate.MOVED));
$ b @Override
public void onCancelled(FirebaseError firebaseError){
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount ++;
返回leisureUpdatesObservable;
$ b首先,我想使用 Observable。 fromCallable()方法,以创建 Observable ,但我猜这是不可能的,因为Firebase使用回调,对吗? b
$ b为了始终有一个 Observable >其中多个订阅者可以订阅。
当大家取消订阅,我需要停止监听事件在Firebase中。
如果还有任何订阅,我还是没有找到使 Observable 理解。所以我一直在计算多少个电话到 subscribeToLeisuresUpdates(),用 leisureUpdatesSubscriptionsCount 。
然后每次有人想取消订阅,都必须打电话给b
$ b@Override
public void unsubscribeFromLeisuresUpdates(){
if(leisureUpdatesObservable == null){
return;
}
leisureUpdatesSubscriptionsCount--;
if(leisureUpdatesSubscriptionsCount == 0){
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
$ b $ p
$ b $ p $这是我发现的唯一方法使 Observable 会在有订阅者的时候发射条目,但是我觉得有一个更简单的方法,特别是当没有更多的订阅者正在监听observable的时候。 b
$ b任何遇到类似问题或有不同做法的人?另外,你可以使用我的库
I am using Firebase in my app, along with RxJava. Firebase is capable of notify your app whenever something changed in the backend data (addition, removals, changes, ...). I am trying to combine the feature of Firebase with RxJava.
The data I am listening for is called Leisure, and the Observable emits LeisureUpdate which contains a Leisure and the type of update (add, remove, moved, changed).
Here is my method which allows to subscribe to this events.
private Observable<LeisureUpdate> leisureUpdatesObservable; private ChildEventListener leisureUpdatesListener; private int leisureUpdatesSubscriptionsCount; @NonNull public Observable<LeisureUpdate> subscribeToLeisuresUpdates() { if (leisureUpdatesObservable == null) { leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() { @Override public void call(final Subscriber<? super LeisureUpdate> subscriber) { leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() { @Override public void onChildAdded(DataSnapshot dataSnapshot, String s) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED)); } @Override public void onChildChanged(DataSnapshot dataSnapshot, String s) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED)); } @Override public void onChildRemoved(DataSnapshot dataSnapshot) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED)); } @Override public void onChildMoved(DataSnapshot dataSnapshot, String s) { final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED)); } @Override public void onCancelled(FirebaseError firebaseError) { subscriber.onError(new Error(firebaseError.getMessage())); } }); } }); } leisureUpdatesSubscriptionsCount++; return leisureUpdatesObservable; }First off, I would like to use Observable.fromCallable() method in order to create the Observable, but I guess it is impossible, since Firebase uses callbacks, right?
I keep a single instance of the Observable in order to always have one Observable where multiple Subscriber can subscribe.
The problem comes when everyone unsubscribe and I need to stop listening for the events in Firebase.I didn't find anyway to make the Observable understand if there is any subscription still. So I keep counting how many calls I got to subscribeToLeisuresUpdates(), with leisureUpdatesSubscriptionsCount.
Then every time someone wants to unsubscribe it has to call
@Override public void unsubscribeFromLeisuresUpdates() { if (leisureUpdatesObservable == null) { return; } leisureUpdatesSubscriptionsCount--; if (leisureUpdatesSubscriptionsCount == 0) { firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener); leisureUpdatesObservable = null; } }This is the only way I found to make the Observable emits items when there is a subscriber, but I feel like there must be an easier way, specially understanding when there is no more subscribers listening to the observable.
Anyone who encountered a similar problem or have a different approach?
解决方案Also, you can use my library rxFirebase
这篇关于将Firebase实时数据侦听器与RxJava相结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!