springBoot整合RocketMq

项目地址:https://github.com/apache/rocketmq-spring

在项目wiki界面你可获取相关示例,或者直接下载示例项目

部分概念在我的上一篇文章中有说明,这里仅仅只演示示例代码,不做概念性说明

引入依赖

写文章时rocketmq-spring-boot-starter最新版本是2.2.2 对应的rocketmq-client版本是4.9.3

这里我服务端安装的是5.0.0版本,不过5.0.0版本服务端按照官方的说法是兼容4.x版本的客户端的

如果一定要使用5.0.0版本sdk,可以排除相关依赖,再自行引入相应版本(似乎不太靠谱,我这边换成2.6.13版本启动报错了,不管了,先不换版本测试,使用其默认的2.5.9版本spring-boot-starter)

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
            <!-- 如果和当前项目使用的spring版本还有springBoot版本不一致,建议排除-->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-webmvc</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

配置文件

仅做参考,如果未做解释的配置请查看

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties

# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
  name-server: k8s-master:9876 # RocketMQ Namesrv,多个用;间隔
  # Producer 配置项
  producer:
    group: koala-dev-event-centre-group # 生产者分组
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      erbadagang-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费

但通常一个服务内会有多个consumer,建议在代码中实现。而producer如果只有一个,可以配置。

封装类RocketMQTemplate:

rocketmq-spring-boot-starter提供的用于收发消息的对象,后续演示的代码默认是已经引入了这个对象

@Resource
private RocketMQTemplate rocketMQTemplate;

封装另外的RocketMQTemplate 对象

事务监听器是和rocketMQTemplate一对一绑定的,如果我们需要再次开启一个单独的事务,就需要另外的rocketMQTemplate对象

@ExtRocketMQConsumerConfiguration注解获取的配置项和RocketMQTemplate的配置项相同,可以参照以下示例进行单独指定,似乎可以用来单独连接另外一个mq

@ExtRocketMQConsumerConfiguration(topic = "${demo.rocketmq.topic}", group = "string_consumer", tlsEnable = "${demo.ext.consumer.tlsEnable}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
rocketMQTemplate.getProducer().setCallbackExecutor();
rocketMQTemplate.getProducer().setAsyncSenderExecutor();

设置发送消息和消息回查的线程池

可以在配置类中执行,紧记录

        rocketMQTemplate.getProducer().setCallbackExecutor(-);
        rocketMQTemplate.getProducer().setAsyncSenderExecutor(-);

消息发送

普通消息发送

org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer

1、同步消息

String springTopic="springboot_topic";
// Send string
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "sync message");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

// 支持发送对象
sendResult = rocketMQTemplate.syncSend(springTopic, new User().setUserAge((byte) 18).setUserName("Kitty"));
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);

MessageBuilder 构建消息发送对象
// MessageBuilder 构建消息发送对象
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload(
new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());

sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());

2、异步消息

rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
    @Override
    public void onSuccess(SendResult var1) {
    System.out.printf("async onSucess SendResult=%s %n", var1);
    }

    @Override
    public void onException(Throwable var1) {
    System.out.printf("async onException Throwable=%s %n", var1);
    }

});

3、单向消息

rocketMQTemplate.sendOneWay(msgExtTopic, "I'm from tag0");

4、给消息添加tag

convertAndSend类似于单向发送消息,这里用于延时怎么给消息添加tag,sendOneWay,asyncSend,syncSend等方法都是通过如下方式给消息添加tag,他们的第一个入参都是destination,是topic和tag的结合

rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0");  // tag0 will not be consumer-selected
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag0");
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
System.out.printf("syncSend topic %s tag %s %n", msgExtTopic, "tag1");

批量消息

        List<Message> msgs = new ArrayList<Message>();
        for (int i = 0; i < 10; i++) {
            msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
                setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
        }

        SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);

        System.out.printf("--- Batch messages send result :" + sr);

顺序消息

应该是会以hashKey进行取模,让需要保证顺序的消息坐落在同一个messagequeue上,

#核心代码,只演示同步发送消息的实例,异步发送消息代码请自行寻找,
SendResult sr = rocketMQTemplate.syncSendOrderly(springTopic, msgs, "hashKey", 60000);

事务消息

事务消息发送关键代码

SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
springTransTopic + ":" + tags[i % tags.length], msg, null);

事务监听器和rocketMQTemplate绑定(一对一关系,所以这里如果你需要发送多个事务消息那就需要额外的rocketMQTemplate,rocketMQTemplateBeanName属性用来完成关系的绑定)

事务监听器作用不再赘述,虽然和roketmq自带的监听器类不是一个,但是模式是一样的

/**
 * 关于@RocketMQTransactionListener 这个注解,有点奇怪。2.0.4版本中,是需要指定txProducerGroup指向一个消息发送者组。不同的组可以有不同的事务消息逻辑。
 * 但是到了2.1.1版本,只能指定rocketMQTemplateBeanMame,也就是说如果你有多个发送者组需要有不同的事务消息逻辑,那就需要定义多个RocketMQTemplate。
 * 而且这个版本中,虽然重现了我们在原生API中的事务消息逻辑,但是测试过程中还是发现一些奇怪的特性,用的时候要注意点。
 **/
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionImpl implements RocketMQLocalTransactionListener {

    private ConcurrentHashMap<Object, Message> localTrans = new ConcurrentHashMap<>();
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Object transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID);
        String destination = arg.toString();
        localTrans.put(transId,msg);
        //这个msg的实现类是GenericMessage,里面实现了toString方法
        //在Header中自定义的RocketMQHeaders.TAGS属性,到这里就没了。但是RocketMQHeaders.TRANSACTION_ID这个属性就还在。
        //而message的Header里面会默认保存RocketMQHeaders里的属性,但是都会加上一个RocketMQHeaders.PREFIX前缀
        System.out.println("executeLocalTransaction msg = "+msg);
        //转成RocketMQ的Message对象
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(new StringMessageConverter(),"UTF-8",destination, msg);
        String tags = message.getTags();
        if(StringUtils.contains(tags,"TagA")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagB")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    //延迟检查的时间间隔要有点奇怪。
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transId = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TRANSACTION_ID).toString();
        Message originalMessage = localTrans.get(transId);
        //这里能够获取到自定义的transaction_id属性
        System.out.println("checkLocalTransaction msg = "+originalMessage);
        //获取标签时,自定义的RocketMQHeaders.TAGS拿不到,但是框架会封装成一个带RocketMQHeaders.PREFIX的属性
//        String tags = msg.getHeaders().get(RocketMQHeaders.TAGS).toString();
        String tags = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TAGS).toString();
        if(StringUtils.contains(tags,"TagC")){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(StringUtils.contains(tags,"TagD")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

需要注意的是,我们拿到的不再是rocketmq的messageExt对象,不过我们一样可以获取到消息id和事务id

事务id是RMQ_SYS_TRANS_HALF_TOPIC这个主题下的消息id,在ui界面,你可以直接根据主题(发消息的主题)和消息id搜索到对应的事务消息

springBoot整合RocketMq-LMLPHP

有回执的消息

实在没看出有啥用,我只想说既然这样,直接接口调用不就行了吗

生产者发送消息后,等待消费者进行消费,并给回执,回执可以是多种类型,并且支持嵌套的复杂类型,具体可以到示例项目中找

生产者

String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
System.out.printf("send %s and receive %s %n", "request string", replyString);

消费者是一个给回执的监听器

@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}",
    selectorExpression = "${demo.rocketmq.tag}", replyTimeout = 10000)
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {

    @Override
    public String onMessage(String message) {
        System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
        return "reply string";
    }
}

异步接收回执信息

    rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() {
        @Override public void onSuccess(String message) {
            System.out.printf("send %s and receive %s %n", "request string", message);
        }

        @Override public void onException(Throwable e) {
            e.printStackTrace();
        }
    });

延时消息

同步异步单向等消息发送的方法中支持传入delayLevel参数,

5.0.0版本已经支持延时到具体是时刻,而不是指定延时级别,这里可能需要等rocketmq-spring-boot-starter的下一个版本了

ACL权限控制

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    accessKey = "AK",
    secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

消费者消费消息

普通消费者

@Service
@RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer_newns")
public class StringConsumerNewNS implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.printf("------- StringConsumerNewNS received: %s \n", message);
    }
}

拿到原始messageExt消息对象消费者

@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(MessageExt message) {
        System.out.printf("------- MessageExtConsumer received message, msgId: %s, body:%s \n", message.getMsgId(), new String(message.getBody()));
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // set consumer consume message from now
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    }
}

消费者过滤消息

通过注解的属性设置多虑消息的条件

@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorExpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
    
# 主要是这两个属性    
    SelectorType selectorType() default SelectorType.TAG; // 还可以指定SQL92

    /**
     * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
     */
    String selectorExpression() default "*";

顺序消息消费

@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY

同样是指定这个注解的属性

    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;  # 需要指定为  ORDERLY

消息返回消费失败

原生的写法是可以在对应的监听器中返回消息是否消费成功,在springBoot对应的监听器中,抛出异常就算是消费失败,会触发消息重试机制

不使用rocketmq-spring-boot-starter的方式

rocketmq-spring-boot-starter使用的话,依赖冲途是个问题,而且还不支持5.0.0版本

这里直接使用配置类的方法使用原函数呢个

消费者端

//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
@Configuration
@Slf4j
public class ConsumerClient {

    @Value("${rocketmq.name-server}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;

    @Bean(destroyMethod = "shutdown",value = "rocketMqConsumer")
    public DefaultMQPushConsumer buildConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.subscribe("TopicTest", "*");

        /*定义消费Client从那个位置消费消息,分别为:
        CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                log.debug("{} Receive New Messages: {}", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        return consumer;
    }

}

生产者


//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
@Configuration
@Slf4j
public class ProducerClient {

    @Value("${rocketmq.name-server}")
    private String namesrvAddr;



    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    public static final String TOPIC = "TopicTest";
    public static final String TAG = "TagA";

    @Bean(destroyMethod = "shutdown",value = "rocketMqProducer")
    public DefaultMQProducer buildProducer() throws MQClientException {

        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setSendMsgTimeout(6000);
        producer.start();
        return producer;
    }

}

其他

springcloudalibaba的github页面可以找到相关使用示例

关联信息

  • 关联的主题:
  • 上一篇:
  • 下一篇:
  • image: 20221021/1
  • 转载自:
12-09 13:03