问题描述
我正在尝试在 Spring Integration DSL 中使用单独的 RabbitMQ 队列来实现 HTTP 请求/回复.它类似于 Spring IntegrationFlow http 请求到 amqp 队列.不同之处在于我希望将响应返回给原始的 http 调用方.我可以看到测试 http post 消息成功传递到请求队列并转换(为大写)到响应队列.该消息也从响应队列中使用,但从未返回给调用者(http://localhost:8080/Tunner).最终调用因 500 错误超时.我是新手,所以可能有一些我完全错过的东西.有人可以提供建议吗?代码如下:
I am trying to implement a HTTP request/reply using separate RabbitMQ queues in Spring Integration DSL. It's similar to Spring IntegrationFlow http request to amqp queue. The difference is I want the response back to the original http caller. I could see the test http post message successfully passed to the request queue and transformed (into upper case) into the response queue. The message was consumed from the response queue as well but never returned back to the caller(http://localhost:8080/Tunner). Eventually the call timed out with 500 error. I am new to this so there could be something I totally missed. Could someone provide suggestion? The code is as follows:
public class TunnelApplication
{
public static void main(String[] args)
{
SpringApplication.run(TunnelApplication.class, args);
}
@Value("${outboundQueue}")
private String outboundQueue;
@Value("${inboundQueue}")
private String inboundQueue;
private ConnectionFactory rabbitConnectionFactory;
@Autowired
public TunnelApplication(ConnectionFactory factory) {
rabbitConnectionFactory = factory;
}
@Bean
public Queue targetQueue()
{
return new Queue(outboundQueue, true, false, true);
}
@Bean
public Queue requestQueue()
{
return new Queue(inboundQueue, true, false, true);
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
result.setMessageConverter(jsonMessageConverter());
result.setDefaultReceiveQueue(outboundQueue);
//result.setReplyAddress(outboundQueue);
result.setReplyTimeout(60000);
return result;
}
@Bean
public IntegrationFlow sendReceiveFlow(RabbitTemplate amqpTemplate) {
return IntegrationFlows
.from(Http.inboundGateway("/tunnel"))
.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey(inboundQueue)
.returnChannel(amqpOutboundChannel()))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow rabbitToWeb(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, requestQueue()))
.transform(String.class, String::toUpperCase)
.log()
.handle(Amqp.outboundGateway(amqpTemplate).routingKey(outboundQueue))
.log()
.bridge(null)
.get();
}
@Bean
public IntegrationFlow replyBackToHttp(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, targetQueue()))
.handle(Http.outboundGateway("http://localhost:8080/tunnel")
.expectedResponseType(String.class))
.log()
.bridge(null)
.channel(amqpOutboundChannel())
.get();
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
我们也尝试了以下代码(由我的同事),但我们也没有得到响应:
We have also tried the following code (by my coworker) and we didn't get the response either:
@Configuration
@EnableIntegration
public class FlowConfig {
@Value("${routingKey}")
private String routingKey;
@Value("${rabbitSinkChannel}")
private String rabbitSinkChannel;
@Bean
public MessageChannel rabbitSinkChannel(ConnectionFactory connectionFactory) {
return
Amqp
.channel(rabbitSinkChannel, connectionFactory)
.get();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
MessageChannel rabbitSinkChannel = rabbitSinkChannel(connectionFactory);
return IntegrationFlows
.from(
Http.inboundGateway("/sendreceive")
)
.handle(
Amqp.outboundGateway(rabbitTemplate)
.routingKey(routingKey)
.returnChannel(rabbitSinkChannel)
)
.channel(rabbitSinkChannel) // or .handle? if so, what?
.get();
}
}
推荐答案
以下更新有效(我还删除了 replyBackToHttp() 方法):
The following update works (I also removed the replyBackToHttp() method):
@Bean
public AmqpTemplate amqpTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
//result.setDefaultReceiveQueue(outboundQueue);
rabbitTemplate.setReplyAddress(outboundQueue);
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueues(replyQueue());
container.setMessageListener((MessageListener) amqpTemplate());
return container;
}
这篇关于当响应来自使用 Spring Integration DSL 的 rabbitMQ 回复队列时,如何实现 HTTP 请求/回复?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!