一.消息推送

public void pushMessage() {
        String message = "推送消息内容!";
        try {
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr("服务器地址+端口号");
            producer.setInstanceName("producer");
            // 只需要在发送前初始化一次
            producer.start();
            // 构建消息实体

            Message msg = new Message(topic,// topic
                    tag,// tag
                    message.getBytes()// body
            );
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
            producer.shutdown();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
}

二.消息消费

    @Autowired
    private MessageReceiveService messageReceiveService;
    //====好差评的服务器地址和端口=====
    @Value("${app.message.address}")
    private String address;
    //====好差评的topic=====
    @Value("${app.message.topic}")
    private String topic;
    //====好差评的组名=====
    @Value("${app.message.groupName}")
    private String consumerGroup;

    /**
     * 开始消费rocketMQ消息
     */
    @PostConstruct
    public void init() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(address);
            consumer.subscribe(topic, "*");
            consumer.registerMessageListener(messageReceiveService);
            consumer.start();
            logger.info("rocketMQ consumer start");
        } catch (Exception e) {
            logger.error("reocketMQ consumer start error!", e);
            e.printStackTrace();
        }
    }
 @Service
public class MessageReceiveService implements MessageListenerConcurrently {

    private static Logger logger = LoggerFactory.getLogger(MessageReceiveService.class);

    @Value("${accept_system_interface}")
    private String acceptSystemInterface;

    /**
     * 消费rocketMQ上的消息
     *
     * @param msgs    rocketMQ消息
     * @param context 消息消费上下文
     * @return 消息处理状态
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 判断消息类型
        return handleHcpMessage(msgs, context);
    }

    /**
     * <p>好差评消息消费</p>
     *
     * @param msgs    当前消息(组)
     * @param context 消息消费上下文
     */
    @Transactional(rollbackFor = {RuntimeException.class})
    private ConsumeConcurrentlyStatus handleHcpMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 消息校验与序列化
            String message = null;
            try {
            //获得消息的内容,转utf-8防止出现乱码
                message = new String(msg.getBody(),"utf-8");
            }catch (Exception e){
                e.printStackTrace();
                errorLogSave(message,"当前消息转化utf-8出现异常信息");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            //对消息进行对应的操作
            ...
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
01-31 22:53
查看更多