我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter

  • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
  • 重写下面两个方法: toMessage:java对象转换为Message fromMessage:Message对象转换为java对象
  • Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能
  • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
  • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体

在pom.xml文件中加入依赖:

    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
</dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>

1.完成将byte数组转换成String字符串

上节的适配器不变,在适配器中添加一个转换器TextMessageConverter

        //1.适配器方式:默认是有自己的方法的名字的:handleMessage
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
//自己指定一个默认的方法名
adapter.setDefaultListenerMethod("consumeMessage");
//也可以加一个转换器:从字节数组转换为String
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);

TextMessageConverter代码:

package com.dwz.spring.converter;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter; public class TextMessageConverter implements MessageConverter {
//将其它对象转换成Message
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
} //将Message对象转换成String
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
System.err.println("contentType:--String--" + contentType);
return new String(message.getBody());
}
return message.getBody();
}
}

此时我们适配器自定义的委托对象MessageDelegate的consumeMessage()接收的参数类型要与 fromMessage()返回的类型一致

MessageDelegate类如下:

public class MessageDelegate {
public void consumeMessage(String messageBody) {
System.err.println("consumeMessage默认方法,消息内容:String--" + messageBody);
}
}

测试代码:

    @Test
public void testMessage02() {
//创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("spring consumeMessage消息".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!");
}

2. 支持json格式的转换器

添加Order和Packaged这两个类

Order类:

package com.dwz.spring.entity;

import java.io.Serializable;
public class Order implements Serializable{
private static final long serialVersionUID = 1L; private String id; private String name; private String content; public Order() {
super();
// TODO Auto-generated constructor stub
} public Order(String id, String name, String content) {
super();
this.id = id;
this.name = name;
this.content = content;
} public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getContent() {
return content;
} public void setContent(String content) {
this.content = content;
} }

Packaged类:

package com.dwz.spring.entity;

import java.io.Serializable;
public class Packaged implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L; private String id; private String name; private String description; public Packaged() {
super();
// TODO Auto-generated constructor stub
} public Packaged(String id, String name, String description) {
super();
this.id = id;
this.name = name;
this.description = description;
} public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getDescription() {
return description;
} public void setDescription(String description) {
this.description = description;
} }

设置适配器中setMessageConverter(jackson2JsonMessageConverter)转换器参数

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);

MessageDelegate类的consumeMessage()的入参修改为Map<String, Object>如下:

    public void consumeMessage(Map<String, Object> messageBody) {
System.err.println("consumeMessage的map方法,消息内容:" + messageBody);
}

测试代码:

    /**
* 支持json格式的转换器
* @throws JsonProcessingException
*/
@Test
public void testSendJsonMessage() throws JsonProcessingException {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息"); ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json:" + json); MessageProperties messageProperties = new MessageProperties();
//这里一定要修改contentType为application/json
messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}

3.支持java对象转换

更改适配器部分代码,完成Jackson2JsonMessageConverter转换器的DefaultJackson2JavaTypeMapper设置

        //1.2 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java对象转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//如果使用RabbitMQ默认的转换方式,并不会涉及到本章遇到的信任package问题,如果想自定义消息转换并且使用DefaultClassMapper作为映射,
//肯定会出现信任package的问题,所以如果需要自定义转换的小伙伴,记住要设置trustedPackages。
javaTypeMapper.addTrustedPackages("com.dwz.spring.entity");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);

MessageDelegate类的consumeMessage()的入参修改为Order对象如下:

    public void consumeMessage(Order order) {
System.err.println("order对象,消息内容, id:" + order.getId()
+", name:" + order.getName()
+", content:" + order.getContent());
}

测试代码:

    /**
* json与java对象之间的转换
* @throws JsonProcessingException
*/
@Test
public void testSendJavaMessage() throws JsonProcessingException {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息"); ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json:" + json); MessageProperties messageProperties = new MessageProperties();
//这里一定要修改contentType为application/json
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__", "com.dwz.spring.entity.Order"); Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}

4.支持java对象多映射转换

更改适配器部分代码,加入多个typeid和对象的映射

        //1.3 DefaultJackson2JavaTypeMapper 和 Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", Order.class);
idClassMapping.put("packaged", Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);

MessageDelegate类的consumeMessage()添加入参为Order和Packaged对象的两个重载方法如下:

    public void consumeMessage(Order order) {
System.err.println("order对象,消息内容, id:" + order.getId()
+", name:" + order.getName()
+", content:" + order.getContent());
} public void consumeMessage(Packaged pack) {
System.err.println("Packaged对象,消息内容, id:" + pack.getId()
+", name:" + pack.getName()
+", description:" + pack.getDescription());
}

测试代码:

要在Headers中添加

messageProperties2.getHeaders().put("__TypeId__", "packaged");
    @Test
public void testSendMappingMessage() throws JsonProcessingException {
Order order = new Order();
order.setId("001");
order.setName("订单消息");
order.setContent("订单描述信息"); ObjectMapper mapper = new ObjectMapper();
String json1 = mapper.writeValueAsString(order);
System.err.println("order 4 json:" + json1); MessageProperties messageProperties = new MessageProperties();
//这里一定要修改contentType为application/json
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__", "order"); Message message = new Message(json1.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message); Packaged packaged = new Packaged();
packaged.setId("002");
packaged.setName("包裹消息");
packaged.setDescription("包裹描述信息"); String json2 = mapper.writeValueAsString(packaged);
System.err.println("packaged 4 json:" + json2); MessageProperties messageProperties2 = new MessageProperties();
//这里一定要修改contentType为application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged"); Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.packaged", message2);
}

5.全局转换器:convert

更改适配器部分代码,加入全局转换器

        //1.4 全局转换器:convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage"); //全局转换器
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConverter);
convert.addDelegate("application/json", jsonConverter); ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert);
container.setMessageListener(adapter);

MessageDelegate类的consumeMessage()添加方法

    public void consumeMessage(File file) {
System.err.println("文件对象,消息内容, " + file.getName());
}

添加自定义转换器ImageMessageConverter和PDFMessageConverter

package com.dwz.spring.converter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.UUID; import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.util.FileCopyUtils; public class ImageMessageConverter implements MessageConverter { @Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error! ");
} @Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("---------------Image MessageConverter-------------------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "D:\\dwz_temp\\" + fileName + "." + extName;
File f = new File(path);
System.out.println(path); try {
Files.copy(new ByteArrayInputStream(body), Paths.get(path), StandardCopyOption.REPLACE_EXISTING);
// FileCopyUtils.copy(body, f);
} catch (IOException e) {
e.printStackTrace();
}
return f;
} }
package com.dwz.spring.converter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.UUID; import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter; public class PDFMessageConverter implements MessageConverter { @Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error! ");
} @Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("---------------PDF MessageConverter-------------------"); byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "D:/dwz_temp/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}

测试代码:

图片转换测试

    @Test
public void testSendImgMessage1() throws IOException {
byte[] body = Files.readAllBytes(Paths.get("C:/Users/Administrator/Pictures/Saved Pictures/img02/dwz.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png"); Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
}

pdf文件转换测试

    @Test
public void testSendPDFMessage2() throws IOException {
byte[] body = Files.readAllBytes(Paths.get("F:\\dwz\\my\\2019全新Java学习路线图.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}
05-28 03:52