我正在使用一些correlationId(每个消息唯一)向ibm mq发送消息。然后,我想从输出队列中读取具有特定correlationId的具体消息,并且希望它在Java webflux控制器中不受阻碍地使用。

我想知道是否有一种方法可以减轻痛苦?诸如jmsTemplate.receiveSelected(...)之类的选项正在阻止,而创建实现接口MessageBean的bean时,并没有提供通过动态选择器选择消息的方法(即,每条消息的RELATIONID都是唯一的)。

最佳答案

您可以使用spring MessageListener检索所有消息,并通过Mono.create(...)和您自己的事件监听器将其与控制器连接,从而触发结果Mono

// Consumes message and trigger result Mono
public interface MyEventListener extends Consumer<MyOutputMessage> {}


用于将传入消息路由到正确的MyEventListener的类

public class MyMessageProcessor {
    // You could use in-memory cache here if you need ttl etc.
    private static final ConcurrentHashMap<String, MyEventListener> REGISTRY
            = new ConcurrentHashMap<>();

    public void register(String correlationId, MyEventListener listener) {
        MyEventListener oldListeer = REGISTRY.putIfAbsent(correlationId, listener);
        if (oldListeer != null)
            throw new IllegalStateException("Correlation ID collision!");
    }

    public void unregister(String correlationId) {
        REGISTRY.remove(correlationId);
    }

    public void accept(String correlationId, MyOutputMessage myOutputMessage) {
        Optional.ofNullable(REGISTRY.get(correlationId))
                .ifPresent(listener -> listener.accept(myOutputMessage));
    }
}


Webflux控制器

private final MyMessageProcessor messageProcessor;

....

@PostMapping("/process")
Mono<MyOutputMessage> process(Mono<MyInputMessage> inputMessage) {
    String correlationId = ...; //generate correlationId

    // then send message asynchronously

    return Mono.<MyOutputMessage>create(sink ->
            // create and save MyEventListener which call MonoSink.success
            messageProcessor.register(correlationId, sink::success))
            // define timeout if you don't want to wait forever
            .timeout(...)
            // cleanup MyEventListener after success, error or cancel
            .doFinally(ignored -> messageProcessor.unregister(correlationId));
}


并进入JMS onMessage实现的MessageListener中,您可以调用

messageProcessor.accept(correlationId, myOutputMessage);


您可以在reactor 3 reference guide中找到关于Flux的类似示例

09-27 18:47