我正在测试将JMS Request / Reply与Camel和ActiveMQ一起使用的示例。当骆驼为您创建侦听器时,我可以使用该示例。即。
from("direct:entryPoint").inOut("jms:queue:A");
from("jms:queue:A").
process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Hello World.");
}
});
我现在遇到的问题是,我无法使JMS请求/答复与Camel的jvm之外的MessageListener一起使用。连接超时,等待答复。我确保MessageListener将答复发送到replyTo队列中,并且还要设置correlationId。我在这里做错了什么?我已经搜寻了好几天,试图解决这个问题,但没有运气。提前感谢您的帮助。
下面是我正在使用的路由,我还将MessageListener逻辑也放在下面。
from("direct:entryPoint").
inOut("jms:queue:B?concurrentConsumers=4&requestTimeout=240000");
队列B的MessageListener onMessage:
@Override
public void onMessage(Message message) {
String msg = null;
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true);
String jsonOutput = null;
try{
msg = ((TextMessage) message).getText();
//convert message payload to purchase order
PurchaseOrder order = mapper.readValue(msg, PurchaseOrder.class);
//Set the id to see if the request reply worked.
order.setOrderId(BigInteger.valueOf(111111111));
if(message.getJMSReplyTo() != null){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("JMSCorrelationID", message.getJMSCorrelationID());
headers.put("JMSReplyTo", message.getJMSReplyTo().toString());
jsonOutput = mapper.writeValueAsString(order);
//Camel runs on the external jvm so leverage the producerTemplate.
producerTemplate.
sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(),
jsonOutput, headers);
}
}
catch(Exception e){
logger.fatal(e.getMessage());
try {
if(message.getJMSReplyTo() != null){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("JMSCorrelationID", message.getJMSCorrelationID());
producerTemplate.
sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(),
e.getMessage(), headers);
}
} catch (JMSException e1) {
logger.fatal(e1.getMessage());
} catch (Exception e1) {
logger.fatal(e1.getMessage());
}
}
}
最佳答案
sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(),
jsonOutput, headers);
通常会解析为
jms:queue://someQueue
,这可能会使事情搞砸。除非您有所注意,否则使用javax.jms.Destination转换为字符串通常不是一个好主意。您可以使用骆驼头
CamelJmsDestination
:headers.put("CamelJmsDestination",message.getJMSReplyTo());
如果这样行不通,我不知道。通常,尝试将ActiveMQ与Web控制台一起使用(或使用JMX连接到ActiveMQ(jconsole))来查看队列,并尝试弄清楚谁正在读取什么队列以及消息在何处结束。真的很有帮助。
关于java - Camel JMS请求回复问题与远程MessageListener,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/15588084/