我正在使用一些correlationId(每个消息唯一)向ibm mq发送消息。然后,我想从输出队列中读取具有特定correlationId的具体消息,并且希望它在Java webflux控制器中不受阻碍地使用。
我想知道是否有一种方法可以减轻痛苦?诸如jmsTemplate.receiveSelected(...)之类的选项正在阻止,而创建实现接口MessageBean的bean时,并没有提供通过动态选择器选择消息的方法(即,每条消息的RELATIONID都是唯一的)。
最佳答案
您可以使用spring MessageListener
检索所有消息,并通过Mono.create(...)
和您自己的事件监听器将其与控制器连接,从而触发结果Mono
// Consumes message and trigger result Mono
public interface MyEventListener extends Consumer<MyOutputMessage> {}
用于将传入消息路由到正确的MyEventListener的类
public class MyMessageProcessor {
// You could use in-memory cache here if you need ttl etc.
private static final ConcurrentHashMap<String, MyEventListener> REGISTRY
= new ConcurrentHashMap<>();
public void register(String correlationId, MyEventListener listener) {
MyEventListener oldListeer = REGISTRY.putIfAbsent(correlationId, listener);
if (oldListeer != null)
throw new IllegalStateException("Correlation ID collision!");
}
public void unregister(String correlationId) {
REGISTRY.remove(correlationId);
}
public void accept(String correlationId, MyOutputMessage myOutputMessage) {
Optional.ofNullable(REGISTRY.get(correlationId))
.ifPresent(listener -> listener.accept(myOutputMessage));
}
}
Webflux控制器
private final MyMessageProcessor messageProcessor;
....
@PostMapping("/process")
Mono<MyOutputMessage> process(Mono<MyInputMessage> inputMessage) {
String correlationId = ...; //generate correlationId
// then send message asynchronously
return Mono.<MyOutputMessage>create(sink ->
// create and save MyEventListener which call MonoSink.success
messageProcessor.register(correlationId, sink::success))
// define timeout if you don't want to wait forever
.timeout(...)
// cleanup MyEventListener after success, error or cancel
.doFinally(ignored -> messageProcessor.unregister(correlationId));
}
并进入JMS
onMessage
实现的MessageListener
中,您可以调用messageProcessor.accept(correlationId, myOutputMessage);
您可以在reactor 3 reference guide中找到关于Flux的类似示例