创建springboot项目省略

项目依赖

注意:当前客户端版本是 5.1.3 ,安装的rocketmq服务的版本要与其对应

	<properties>
        <java.version>11</java.version>
        <rocketmq-client-java-version>5.1.3</rocketmq-client-java-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq-client-java-version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

创建 JmsConfig


public class JmsConfig {
	//roketmq 服务地址
    public static String nameServerAddr = "192.168.2.109:9876";
	//主题
    public static String TOPIC = "test_topic";
}

创建生产者 Producer

package com.example.springbootrocketmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;

@Component
public class PayProducer {
	//生产组
    private String producerGroup = "test_group";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);

        //多个NameServer地址 多个地址 ; 号隔开
        producer.setNamesrvAddr(JmsConfig.nameServerAddr);
        start();
    }

    /**
     * 开始
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 一般关闭上下文是关闭
     */
    @PreDestroy
    public void shutdown(){
        System.out.println("关闭....");
        this.producer.shutdown();
    }
}

创建消费者 Consumer


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class Consumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "test_consumer_group";

    public PayConsumer() throws Exception{
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.nameServerAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe(JmsConfig.TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                Message msg = msgs.get(0);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                String topic = msg.getTopic();
                String body = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                String keys = msg.getKeys();
                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        consumer.start();
    }
}

配置 TestController


import com.example.springbootrocketmq.jms.JmsConfig;
import com.example.springbootrocketmq.jms.Producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class TestController{

    @Autowired
    private Producer producer;

    @RequestMapping("/api/v1/test_cb")
    public Object callback(String text) throws Exception {

        Message message = new Message(JmsConfig.TOPIC,"taga",("hello rocketmq = "+ text).getBytes());
        SendResult sendResult = producer.getProducer().send(message);
        log.info(sendResult.toString());
        return null;
    }
}

测试结果

springboot 使用RocketMQ客户端生产消费消息DEMO-LMLPHP
springboot 使用RocketMQ客户端生产消费消息DEMO-LMLPHP

10-15 21:35