我有MissingBackpressureException问题。
我只为测试添加了几个.onBackpressureDrop(),但仍然出现异常。
我添加了RxJavaHooks.enableAssemblyTracking()以获得更多日志详细信息。
1-3分钟后引发异常。
知道这段代码有什么问题吗?
谢谢。
引发异常的代码:
Subscription notifySubscription = connection.setupNotification(notifyCharacteristic)
.onBackpressureBuffer()
.doOnNext(new Action1<Observable<byte[]>>() {
@Override
public void call(Observable<byte[]> observable) {
Log.d(TAG, "Notifications set, calling bypassConnect()");
// Bypass connect in WB to make it aware of this new device
String wbAddress = addressMap.getOrCreateWbAddress(bleMac);
bleWrapper.bypassConnect(wbAddress);
}
})
.onBackpressureDrop()
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.onBackpressureDrop()
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
dataAvailable(bleMac, bytes);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "dataAvailable() Error: ", throwable);
}
});
登录:
rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:325)
at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:379)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:361)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at com.jakewharton.rxrelay.RelaySubscriptionManager$RelayObserver.onNext(RelaySubscriptionManager.java:205)
at com.jakewharton.rxrelay.PublishRelay.call(PublishRelay.java:47)
at com.jakewharton.rxrelay.SerializedAction1.call(SerializedAction1.java:84)
at com.jakewharton.rxrelay.SerializedRelay.call(SerializedRelay.java:20)
at com.polidea.rxandroidble.internal.connection.RxBleGattCallback$4.onCharacteristicChanged(RxBleGattCallback.java:139)
at android.bluetooth.BluetoothGatt$1.onNotify(BluetoothGatt.java:438)
at android.bluetooth.IBluetoothGattCallback$Stub.onTransact(IBluetoothGattCallback.java:399)
at android.os.Binder.execTransact(Binder.java:453)
Caused by: rx.exceptions.AssemblyStackTraceException: Assembly trace:
at rx.Observable.unsafeCreate(Observable.java:162)
at rx.Observable.lift(Observable.java:299)
at rx.Observable.merge(Observable.java:2572)
at rx.Observable.merge(Observable.java:2914)
at rx.Observable.merge(Observable.java:2637)
at com.polidea.rxandroidble.internal.connection.RxBleGattCallback.getOnCharacteristicChanged(RxBleGattCallback.java:311)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager.observeOnCharacteristicChangeCallbacks(NotificationAndIndicationManager.java:186)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager.access$400(NotificationAndIndicationManager.java:31)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager$1$1.call(NotificationAndIndicationManager.java:110)
at com.polidea.rxandroidble.internal.connection.NotificationAndIndicationManager$1$1.call(NotificationAndIndicationManager.java:107)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:91)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:94)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.innerNext(OnSubscribeConcatMap.java:182)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapInnerScalarProducer.request(OnSubscribeConcatMap.java:366)
at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:278)
at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onNext(OnSubscribeConcatMap.java:144)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.slowPath(OnSubscribeFromArray.java:100)
at rx.internal.operators.OnSubscribeFromArray$FromArrayProducer.request(OnSubscribeFromArray.java:63)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:32)
at rx.internal.operators.OnSubscribeFromArray.call(OnSubscribeFromArray.java:24)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
at rx.Observable.unsafeSubscribe(Observable.java:10256)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:248)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:148)
at rx.in
最佳答案
看起来Observable
返回的rxandroidble
不支持反压,但flatMap
期望它,因此您必须在.onBackpressureDrop
内应用flatMap
:
.flatMap(observable -> observable.onBackpressureDrop())
请注意,在
flatMap
之外应用运算符通常不会影响合并源时内部发生的情况。