我想使用DSL重复following example
配置示例:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp https://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- From STDIN To RabbitMQ -->
<int-stream:stdin-channel-adapter id="consoleIn"
channel="toRabbit">
<int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-stream:stdin-channel-adapter>
<int:channel id="toRabbit" />
<int-amqp:outbound-channel-adapter
channel="toRabbit" amqp-template="amqpTemplate"
exchange-name-expression="payload.toLowerCase() == 'nack' ? 'badExchange' : 'si.test.exchange'"
routing-key-expression="payload.toLowerCase() == 'fail' ? 'badKey' : 'si.test.binding'"
confirm-correlation-expression="payload"
confirm-ack-channel="good"
confirm-nack-channel="errors"
return-channel="returns" />
<!--Confirms are correlated with the entire payload; for rich objects, we might just use 'payload.invoiceId' -->
<int:transformer input-channel="good" output-channel="stdOut" expression="payload + ' sent ok'"/>
<int:transformer input-channel="errors" output-channel="stdErr" expression="payload + ' send failed (nack)'"/>
<int:transformer input-channel="returns" output-channel="stdErr" expression="payload + ' returned:' + headers['amqp_returnReplyText']"/>
<!-- From RabbitMQ To STDOUT -->
<int-amqp:inbound-channel-adapter channel="fromRabbit"
queue-names="si.test.queue" connection-factory="connectionFactory" />
<int:channel id="fromRabbit">
<int:interceptors>
<int:wire-tap channel="loggingChannel" />
</int:interceptors>
</int:channel>
<int:transformer input-channel="fromRabbit" output-channel="stdOut" expression="'Received: ' + payload" />
<int-stream:stdout-channel-adapter id="stdOut"
append-newline="true" />
<int-stream:stderr-channel-adapter id="stdErr"
append-newline="true" />
<int:logging-channel-adapter id="loggingChannel" log-full-message="true" logger-name="tapInbound"
level="INFO" />
<!-- Infrastructure -->
<rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" publisher-returns="true" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true" /> <!-- for nacks -->
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="si.test.queue" />
<rabbit:direct-exchange name="si.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="si.test.queue" key="si.test.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
我写了以下DSL配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
public IntegrationFlow fromConsoleToRabbitFlow() {
return IntegrationFlows.from(consoleSource(), c -> c.id("consoleInput")
.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(5))
.autoStartup(true)
).channel("consoleOutputChannel")
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeNameFunction(message -> {
if ("nack".equals(message.getPayload())) {
return "bad_exchange";
} else {
return "my_spring_integration_exchange";
}
}).routingKeyFunction(message -> {
if ("fail".equals(message.getPayload())) {
return "bad_key";
} else {
return "my_spring_integration_key";
}
})
.confirmCorrelationExpression("payload")
.confirmAckChannel(ackChannel())
.confirmNackChannel(nackChannel())
.returnChannel(returnChannel()))
.get();
}
@Bean
public MessageChannel ackChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow ackChannelListener() {
return IntegrationFlows.from(this::ackChannel, c -> c.poller(Pollers.fixedRate(10000)))
.handle(m -> {
System.out.println("ACK:" + m);
})
.get();
}
@Bean
public MessageChannel nackChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow nackChannelListener() {
return IntegrationFlows.from(this::nackChannel, c -> c.poller(Pollers.fixedRate(10000)))
.handle(m -> {
System.out.println("NACK:" + m);
}).get();
}
@Bean
public MessageChannel returnChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow returnChannelListener() {
return IntegrationFlows.from(this::returnChannel, c -> c.poller(Pollers.fixedRate(10000)))
.handle(m -> {
System.out.println("RETURN:" + m);
}).get();
}
public MessageSource<String> consoleSource() {
return CharacterStreamReadingMessageSource.stdin();
}
@Bean
public IntegrationFlow fromRabbitToConsoleFlow() {
return IntegrationFlows.from(Amqp.inboundGateway(rabbitConnectionFactory, "foo"))
.log()
.transform(m -> " response: " + m)
.handle(System.out::println)
.get();
}
}
预期结果:
对于有效的味精,我希望得到响应并确认
对于“失败”,由于路由键失败,我希望从返回通道NO_ROUTE获取消息
对于“ nack”,我希望从nack通道获取消息,因为那里没有“ nack”交换
实际结果:
当我发送有效的smth(例如
123
)时,我在日志中看到以下内容:2019-08-27 15:41:14.165 INFO 1568 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=123, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=my_spring_integration_exchange, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@c3d79cf, amqp_deliveryTag=1, amqp_consumerQueue=foo, amqp_redelivered=false, amqp_receivedRoutingKey=my_spring_integration_key, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@c3d79cf, amqp_contentEncoding=UTF-8, amqp_timestamp=Tue Aug 27 15:41:09 MSK 2019, amqp_messageId=f009e667-b1c0-0476-daa9-be1a416be603, id=37a496ed-3224-ad3a-7924-5093a15482ce, amqp_consumerTag=amq.ctag-T2HBnQO6gUpnmU-Y_g0grg, contentType=text/plain, timestamp=1566909674165}]
如果我发送
nack
:2019-08-27 16:03:30.256 ERROR 15712 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'bad_exchange' in vhost '/', class-id=60, method-id=40)
如果我发送
fail
-我在日志中看不到任何反应。我看不到输入消息
NACK:GenericMessage [payload=nackChannel, headers={id=a79d3ce7-3c26-8ed2-f49f-6788f2a2faac, timestamp=1566911103243}]
ACK:GenericMessage [payload=ackChannel, headers={id=5a764889-ed36-881e-d2b5-842443031c2c, timestamp=1566911103243}]
RETURN:GenericMessage [payload=returnChannel, headers={id=2e808a30-859f-f09e-e317-008391092c63, timestamp=1566911103249}]
我每10秒就会看到一次日志。
如何获得预期的结果?
更新
应用Artem Bilan的建议后,我有以下配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Config {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost("localhost");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
@Bean
public AmqpTemplate amqpTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
@Bean
public IntegrationFlow fromConsoleToRabbitFlow() {
return IntegrationFlows.from(consoleSource(), c -> c.id("consoleInput")
.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(5))
.autoStartup(true)
).channel("consoleOutputChannel")
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeNameFunction(message -> {
if ("nack".equals(message.getPayload())) {
return "bad_exchange";
} else {
return "console_exchange";
}
}).routingKeyFunction(message -> {
if ("fail".equals(message.getPayload())) {
return "bad_key";
} else {
return "console_queue";
}
})
.confirmCorrelationExpression("payload")
.confirmAckChannel(ackChannel())
.confirmNackChannel(nackChannel())
.returnChannel(returnChannel()))
.get();
}
@Bean
public MessageChannel ackChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow ackChannelListener() {
return IntegrationFlows.from(this::ackChannel, c -> c.poller(Pollers.fixedRate(10000)))
.handle(m -> {
System.out.println("ACK:" + m);
})
.get();
}
@Bean
public MessageChannel nackChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow nackChannelListener() {
return IntegrationFlows.from(this::nackChannel, c -> c.poller(Pollers.fixedRate(10000)))
.handle(m -> {
System.out.println("NACK:" + m);
}).get();
}
@Bean
public MessageChannel returnChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow returnChannelListener() {
return IntegrationFlows.from(this::returnChannel, c -> c.poller(Pollers.fixedRate(10000)))
.handle(m -> {
System.out.println("RETURN:" + m);
}).get();
}
public MessageSource<String> consoleSource() {
return CharacterStreamReadingMessageSource.stdin();
}
@Bean
public IntegrationFlow fromRabbitToConsoleFlow() {
return IntegrationFlows.from(Amqp.inboundGateway(rabbitConnectionFactory, "console_queue"))
.log()
.transform(m -> " response: " + m)
.handle(System.out::println)
.get();
}
}
但我看到以下错误:
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<!!!!!!!!!!!!!_________________!!!!!!!!!!!!!!!!!!!!!!>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
2019-08-27 17:19:47.386 INFO 8260 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<!!!!!!!!!!!!!_________________!!!!!!!!!!!!!!!!!!!!!!>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=console_exchange, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@118db7e0, amqp_deliveryTag=29, amqp_consumerQueue=console_queue, amqp_redelivered=false, amqp_receivedRoutingKey=console_queue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@118db7e0, amqp_contentEncoding=UTF-8, spring_returned_message_correlation=7005b4d7-c9aa-8adc-fc5b-9500b9fd0961, amqp_timestamp=Tue Aug 27 17:19:47 MSK 2019, amqp_messageId=7005b4d7-c9aa-8adc-fc5b-9500b9fd0961, id=2774bee5-41aa-c35d-f89c-295215ac888e, amqp_consumerTag=amq.ctag-PSJwpYRV24p5PXeCqdiIXQ, contentType=text/plain, timestamp=1566915587386}]
GenericMessage [payload= response: <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<!!!!!!!!!!!!!_________________!!!!!!!!!!!!!!!!!!!!!!>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=console_exchange, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@118db7e0, amqp_deliveryTag=29, amqp_consumerQueue=console_queue, amqp_redelivered=false, amqp_receivedRoutingKey=console_queue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@118db7e0, amqp_contentEncoding=UTF-8, spring_returned_message_correlation=7005b4d7-c9aa-8adc-fc5b-9500b9fd0961, amqp_timestamp=Tue Aug 27 17:19:47 MSK 2019, amqp_messageId=7005b4d7-c9aa-8adc-fc5b-9500b9fd0961, id=e98a892b-c635-4050-05e5-9b3334e353b9, amqp_consumerTag=amq.ctag-PSJwpYRV24p5PXeCqdiIXQ, contentType=text/plain, timestamp=1566915587387}]
2019-08-27 17:19:47.389 ERROR 8260 --- [ 127.0.0.1:5672] o.s.a.r.c.PublisherCallbackChannelImpl : Exception delivering confirm
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.ackChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<!!!!!!!!!!!!!_________________!!!!!!!!!!!!!!!!!!!!!!>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>, headers={amqp_publishConfirm=true, id=f2f6187f-251f-43ed-d243-c2bc1796138e, timestamp=1566915587388}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint.handleConfirm(AbstractAmqpOutboundEndpoint.java:580) ~[spring-integration-amqp-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.confirm(AmqpOutboundEndpoint.java:139) ~[spring-integration-amqp-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.handleConfirm(RabbitTemplate.java:2447) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.doHandleConfirm(PublisherCallbackChannelImpl.java:1033) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.doProcessAck(PublisherCallbackChannelImpl.java:980) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.processAck(PublisherCallbackChannelImpl.java:937) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.handleAck(PublisherCallbackChannelImpl.java:923) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at com.rabbitmq.client.impl.ChannelN.callConfirmListeners(ChannelN.java:492) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:369) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597) ~[amqp-client-5.4.3.jar:5.4.3]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
... 22 common frames omitted
2019-08-27 17:19:48.388 ERROR 8260 --- [ 127.0.0.1:5672] o.s.a.r.c.PublisherCallbackChannelImpl : Exception delivering confirm
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.ackChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=, headers={amqp_publishConfirm=true, id=ce829fbe-8d5d-f8b4-34a5-3401803a397a, timestamp=1566915588388}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.amqp.outbound.AbstractAmqpOutboundEndpoint.handleConfirm(AbstractAmqpOutboundEndpoint.java:580) ~[spring-integration-amqp-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.confirm(AmqpOutboundEndpoint.java:139) ~[spring-integration-amqp-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.handleConfirm(RabbitTemplate.java:2447) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.doHandleConfirm(PublisherCallbackChannelImpl.java:1033) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.doProcessAck(PublisherCallbackChannelImpl.java:980) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.processAck(PublisherCallbackChannelImpl.java:937) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.handleAck(PublisherCallbackChannelImpl.java:923) ~[spring-rabbit-2.1.7.RELEASE.jar:2.1.7.RELEASE]
at com.rabbitmq.client.impl.ChannelN.callConfirmListeners(ChannelN.java:492) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:369) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) ~[amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597) ~[amqp-client-5.4.3.jar:5.4.3]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
... 22 common frames omitted
更新2
在技巧的下一部分之后,我具有以下配置:
@Bean
public MessageChannel ackChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow ackChannelListener() {
return IntegrationFlows.from(ackChannel())
.handle(m -> {
System.out.println("ACK:" + m);
})
.get();
}
@Bean
public MessageChannel nackChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow nackChannelListener() {
return IntegrationFlows.from(nackChannel())
.handle(m -> {
System.out.println("NACK:" + m);
}).get();
}
@Bean
public MessageChannel returnChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow returnChannelListener() {
return IntegrationFlows.from(returnChannel())
.handle(m -> {
System.out.println("RETURN:" + m);
}).get();
}
控制台输出:
correct_message
2019-08-27 17:35:43.025 INFO 2684 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=correct_message, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=console_exchange, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@65c2b5a5, amqp_deliveryTag=8, amqp_consumerQueue=console_queue, amqp_redelivered=false, amqp_receivedRoutingKey=console_queue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@65c2b5a5, amqp_contentEncoding=UTF-8, spring_returned_message_correlation=6c44fb68-f7f8-c911-8dda-517171a2b022, amqp_timestamp=Tue Aug 27 17:35:43 MSK 2019, amqp_messageId=6c44fb68-f7f8-c911-8dda-517171a2b022, id=2092b749-325b-7151-17d1-aa8f7ce998d4, amqp_consumerTag=amq.ctag-or-vX8P61nezVZOFiWjScQ, contentType=text/plain, timestamp=1566916543025}]
GenericMessage [payload= response: correct_message, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=console_exchange, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@65c2b5a5, amqp_deliveryTag=8, amqp_consumerQueue=console_queue, amqp_redelivered=false, amqp_receivedRoutingKey=console_queue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@65c2b5a5, amqp_contentEncoding=UTF-8, spring_returned_message_correlation=6c44fb68-f7f8-c911-8dda-517171a2b022, amqp_timestamp=Tue Aug 27 17:35:43 MSK 2019, amqp_messageId=6c44fb68-f7f8-c911-8dda-517171a2b022, id=bb9ef444-54e2-8c6d-29b8-0fee3dd56712, amqp_consumerTag=amq.ctag-or-vX8P61nezVZOFiWjScQ, contentType=text/plain, timestamp=1566916543025}]
ACK:GenericMessage [payload=correct_message, headers={amqp_publishConfirm=true, id=349e7e7d-32f1-6a84-c9f9-31f5b4f744a4, timestamp=1566916543026}]
fail
ACK:GenericMessage [payload=fail, headers={amqp_publishConfirm=true, id=8d38a9cc-449a-48bc-e08c-df5696763e7f, timestamp=1566916547025}]
nack
2019-08-27 17:35:51.025 ERROR 2684 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'bad_exchange' in vhost '/', class-id=60, method-id=40)
NACK:ErrorMessage [payload=org.springframework.integration.amqp.support.NackedAmqpMessageException, failedMessage=GenericMessage [payload=nack, headers={id=96a90c78-50f4-3704-ab47-6df39ee4cdfc, timestamp=1566916551024}] [correlationData=nack, nackReason=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'bad_exchange' in vhost '/', class-id=60, method-id=40)], headers={id=941cc78e-ec14-47f2-b983-e56df9a35c48, timestamp=1566916551026}]
如您所见,正确的消息和nack可以按预期工作,但是如果我发送失败消息-返回通道没有任何响应。
最佳答案
您需要使用amqpTemplate()
bean定义,而不是导入的。
因此,像这样:
.handle(Amqp.outboundAdapter(amqpTemplate())
这样,当我发送
fail
时,我会在日志中得到它:fail
RETURN:ErrorMessage [payload=org.springframework.integration.amqp.support.ReturnedAmqpMessageException, failedMessage=GenericMessage [payload=fail, headers={id=bf68813e-51a8-5ef4-7f3e-224e54003e17, timestamp=1566920849918}] [amqpMessage=(Body:'fail' MessageProperties [headers={spring_returned_message_correlation=10df000f-5a92-6794-078a-35b0f3450a75}, timestamp=Tue Aug 27 11:47:29 EDT 2019, messageId=10df000f-5a92-6794-078a-35b0f3450a75, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=console_exchange, routingKey=bad_key], headers={id=4f976428-9c10-9448-c556-94c3e962b8ce, timestamp=1566920849918}]
ACK:GenericMessage [payload=fail, headers={amqp_publishConfirm=true, id=a56cc7be-bdd9-9b81-940c-7cc8faef2512, timestamp=1566920849918}]
我的意思是
mandatory
标志对于返回功能确实非常重要。我不知道为什么也要调用
ack
,但这已经是完全不同的故事了……关于java - 如何使用DSL为ack/nack和返回队列配置amqp出站适配器?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57675504/