本文介绍了具有异步订阅者功能的 RxJS Observable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试做一些感觉应该很简单的事情,但事实证明这是非常困难的.

I'm trying to do something that feels like it should be straightforward, but is proving surprisingly difficult.

我有一个订阅 RabbitMQ 队列的函数.具体来说,这里是 Channel.consume 函数:http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

I have a function to subscribe to a RabbitMQ queue. Concretely, this is the Channel.consume function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

它返回一个使用订阅 ID 解析的承诺 - 稍后需要取消订阅 - 并且还有一个回调参数在消息从队列中拉出时调用.

It returns a promise which is resolved with a subscription id - which is needed to unsubscribe later - and also has a callback argument to invoke when messages are pulled off the queue.

当我想取消订阅队列时,我需要在这里使用 Channel.cancel 函数取消消费者:http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel.这需要之前返回的订阅 ID.

When I want to unsubscribe from the queue, I'd need to cancel the consumer using the Channel.cancel function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel. This takes the previously returned subscription id.

我想将所有这些东西都包装在一个 Observable 中,该 Observable 在订阅 observable 时订阅队列,并在取消订阅 observable 时取消订阅.然而,由于调用的双重异步"性质,这证明有点困难(我的意思是说它们既有回调又有返回承诺).

I want to wrap all of this stuff in an Observable that subscribes to the queue when the observable is subscribed to, and cancels the subscription when the observable is unsubscribed from. However, this is proving somewhat hard due to the 'double-asynchronous' nature of the calls (I mean to say that they have both a callback AND return a promise).

理想情况下,我希望能够编写的代码是:

Ideally, the code I'd like to be able to write is:

return new Rx.Observable(async (subscriber) => {
  var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
  return async () => {
    await channel.cancel(consumeResult.consumerTag);
  };
});

但是,这是不可能的,因为此构造函数不支持异步订阅者功能或拆卸逻辑.

However, this isn't possible as this constructor doesn't support async subscriber functions or teardown logic.

我一直无法弄清楚这一点.我在这里错过了什么吗?为什么这么难?

I've not been able to figure this one out. Am I missing something here? Why is this so hard?

干杯,亚历克斯

推荐答案

创建的 observable 不需要等待 channel.consume 承诺解析,因为观察者(它是一个被传递的观察者,而不是订阅者)只在您提供的函数中调用.

The created observable does not need to wait for the channel.consume promise to resolve, as the observer (it's an observer that's passed, not a subscriber) is only called from within the function you provide.

但是,您返回的取消订阅功能必须等待该承诺解决.它可以在内部做到这一点,就像这样:

However, the unsubscribe function that you return will have to wait for that promise to resolve. And it can do that internally, like this:

return new Rx.Observable((observer) => {
  var consumeResult = channel.consume(queueName, (message) => observer.next(message));
  return () => {
    consumeResult.then(() => channel.cancel(consumeResult.consumerTag));
  };
});

这篇关于具有异步订阅者功能的 RxJS Observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 03:02