嗨,我试图在Spring Integration XML中实现死信交换,因此,如果在某些情况下BBB队列失败,例如lister抛出异常,则该场景是AAA交换绑定的BBB队列,我想将异常导航到Dead Exchange队列以存储以下消息:

创建示例项目

main.java

package com.spring.rabbit.first.deadletter;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {

    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("/applicationContext.xml");
    }
}


XML文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
       http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- Spring configuration -->

    <context:component-scan base-package="com.spring.rabbit.first" />
    <context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />

    <!-- RabbitMQ common configuration -->

    <rabbit:connection-factory id="connectionFactory"
        username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />


    <!-- <rabbit:connection-factory id="connectionFactory"/> -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- Queues -->

    <rabbit:queue id="springQueue" name="spring.queue"
        auto-delete="true" durable="false" />

    <rabbit:listener-container
        connection-factory="connectionFactory" advice-chain="retryAdvice">
        <rabbit:listener queues="BBBqueue" ref="messageListener" />
    </rabbit:listener-container>

    <bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />



    <bean id="retryAdvice"
        class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
        <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
        <property name="retryOperations" ref="retryTemplate" />
    </bean>

    <bean id="rejectAndDontRequeueRecoverer"
        class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />


    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="2000" />
                <property name="multiplier" value="10.0" />
                <property name="maxInterval" value="30000" />
            </bean>
        </property>
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="3" />
            </bean>
        </property>
    </bean>



    <rabbit:topic-exchange name="AAAqueue">
        <rabbit:bindings>
            <rabbit:binding queue="BBBqueue" pattern="" />
        </rabbit:bindings>
    </rabbit:topic-exchange>


    <rabbit:queue name="BBBqueue">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
        </rabbit:queue-arguments>
    </rabbit:queue>


    <!-- dead letter -->

    <rabbit:topic-exchange name="XXX.dead.letter">
        <rabbit:bindings>
            <rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>



</beans>


讯息处理常式

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MessageHandler implements MessageListener {

    @Override
    public void onMessage(Message message) {

        System.out.println("Received message: " + message);
        System.out.println("Text: " + new String(message.getBody()));

        message = null;
        if (message == null) {
            throw new NullPointerException();
        }
    }
}


信息发送者

package com.spring.rabbit.first.deadletter;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;

@Service
@ManagedResource
public class MessageSender {

    @Autowired
    private AmqpTemplate template;

    @ManagedOperation
    public void send(String text) {
        send("amq.fanout", "NDPAR.SPRING.JAVA", text);
    }

    @ManagedOperation
    public void send(String exchange, String key, String text) {
        template.convertAndSend(exchange, key, text);
    }
}


输出:

23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer -
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
 receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)


但是仍然我在死信队列中没有看到任何消息。是否丢失了任何东西?
对此有帮助

最佳答案

我不确定您的XXX-channel和适配器应该做什么,但是您需要向重试建议工厂bean(在RejectAndDontRequeueRecoverer属性中)添加一个messageRecoverer

默认恢复仅记录重试已用尽并丢弃该消息的日志。

编辑

这是一个自定义MessageRecoverer,它将失败的消息从队列A发布到名为A.dlq的队列-队列和绑定根据需要自动声明。

/*
 * Copyright 2014-2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;

public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {

    public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";

    public static final String X_EXCEPTION_MESSAGE = "x-exception-message";

    public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";

    public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";

    private final Log logger = LogFactory.getLog(getClass());

    private final RabbitTemplate errorTemplate;

    private final RabbitAdmin admin;

    private final String deadLetterExchangeName = "DLX";

    private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);

    private boolean initialized;

    public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
        this.errorTemplate = errorTemplate;
        this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
    }

    @Override
    public void recover(Message message, Throwable cause) {
        if (!this.initialized) {
            initialize();
        }
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
        headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
        headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
        headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());

        String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
        if (this.admin.getQueueProperties(dlqName) == null) {
            bindDlq(dlqName);
        }
        this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Republishing failed message to " + dlqName);
        }
    }

    private void initialize() {
        this.admin.declareExchange(this.deadletterExchange);
        this.initialized = true;
    }

    private void bindDlq(String dlqName) {
        Queue dlq = new Queue(dlqName);
        this.admin.declareQueue(dlq);
        this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }

}

09-04 10:17