1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置文件
rabbitmq:
addresses: 10.0.0.203
port: 5672
username: root
password: 123456
virtual-host: /
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
template:
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 1
2、注解配置
package dhht.seal.hn.gsgate.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: sh
* @Description: RabbitMqConfig
* @Date: 10:33 2019/11/4
*/
@Configuration
public class RabbitMqConfig {
public static final String QUEUE = "hnyz_gs_queue";
@Value("${spring.rabbitmq.addresses}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
public static final String GS_EXCHANGE ="gs_exchange";
/**
* 处理连接端口
* @return
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(true);
return admin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template =new RabbitTemplate(connectionFactory);
template.setUseDirectReplyToContainer(false);
template.setReplyTimeout(-1);
return template;
}
@Bean
public Queue cretaeQueue(){
return new Queue(QUEUE,true);
}
@Bean(name="gs_exchange")
public FanoutExchange getGsExchange() {
return new FanoutExchange("gs_exchange", true, false, null);
}
@Bean
public Binding getFauoutBinding() {
return BindingBuilder.bind(cretaeQueue()).to(getGsExchange());
}
}
package dhht.seal.hn.gsgate.rabbitmq;
import com.alibaba.fastjson.JSON;
import dhht.seal.hn.gsgate.model.pojo.CropQueryVO;
import dhht.seal.hn.gsgate.service.CropQueryService;
import dhht.seal.hn.gsgate.service.impl.CropServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.ParameterizedType;
/**
* @Author: sh
* @Description: MqSender
* @Date: 10:34 2019/11/4
*/
@Slf4j
@Service
public class MqSenderService {
@Autowired
AmqpTemplate amqpTemplate;
@Autowired
RabbitTemplate rabbitTemplate;
@Resource
private CropServiceImpl cropImplService;
public String sendMsgToQueue(Object message) {
try {
String msg = beanToString(message);
log.info("【发送的消息-社会信用代码】:" + msg);
Message mm = rabbitTemplate.sendAndReceive(RabbitMqConfig.QUEUE, new Message(msg.getBytes("UTF-8"),new MessageProperties()));
String msgResult = new String(mm.getBody());
log.info("【同步消息返回结果-msgResult】:"+msgResult);
CropQueryVO cropQueryVO = cropImplService.cropQueryVO(msg);
log.info("【消息发送后-sql查询结果-CropQueryVO】:"+JSON.toJSONString(cropQueryVO));
return JSON.toJSONString(cropQueryVO);
//amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
//ParameterizedTypeReference param = new ParameterizedTypeReference<Object>(){};
//return amqpTemplate.convertSendAndReceiveAsType(RabbitMqConfig.QUEUE, beanToString(message),param).toString();
//return amqpTemplate.convertSendAndReceive(RabbitMqConfig.QUEUE, message).toString();
} catch (UnsupportedEncodingException e) {
log.info(e.getMessage());
return null;
}
}
public void sendMsg(Object message) {
String msg = beanToString(message);
log.info("send message:" + msg);
amqpTemplate.convertAndSend(RabbitMqConfig.QUEUE, msg);
log.info("sendMsg()---消息发送成功!");
}
public static <T> String beanToString(T value) {
if (value == null) {
return null;
}
Class<?> clazz = value.getClass();
if (clazz == int.class || clazz == Integer.class) {
return "" + value;
} else if (clazz == String.class) {
return (String) value;
} else if (clazz == long.class || clazz == Long.class) {
return "" + value;
} else {
return JSON.toJSONString(value);
}
}
}
package dhht.seal.hn.gsgate.rabbitmq;
import dhht.seal.hn.gsgate.service.CropQueryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
* @Author: sh
* @Description: MqReceiver
* @Date: 10:40 2019/11/4
*/
@Service
public class MqReceiverService {
private static Logger log = LoggerFactory.getLogger(MqReceiverService.class);
@Resource
private CropQueryService cropQueryService;
@RabbitListener(queues = RabbitMqConfig.QUEUE)
@SendTo(RabbitMqConfig.QUEUE)
public String receiveQueueMsg(/*String message*/Message message) {
try {
log.info("接收到队列消息:" + new String(message.getBody()));
// 业务处理代码,工商拉取入库
String resJson = cropQueryService.crropQuery(new String(message.getBody(),"UTF-8"));
return resJson;
} catch (UnsupportedEncodingException e) {
log.error(e.getMessage());
return null;
}
}
}