Closed. This question is off-topic。它当前不接受答案。
想改善这个问题吗? Update the question,所以它是on-topic,用于堆栈溢出。
2年前关闭。
我是Spring Cloud Stream和Kafka的新手,我正在寻找一个使用kafka主题的json消息的好例子。
谢谢
第二种情况是您的生产者不是SCS,您的使用者是SCS,默认情况下,SCS添加嵌入有效负载中的标头,因此您需要禁用将
第三种情况是SCS生产者而不是SCS使用者,在这种情况下,您需要使用
想改善这个问题吗? Update the question,所以它是on-topic,用于堆栈溢出。
2年前关闭。
我是Spring Cloud Stream和Kafka的新手,我正在寻找一个使用kafka主题的json消息的好例子。
谢谢
最佳答案
您可以参考examples from spring-cloud-stream team(接收器和源项目)
为了使应用程序运行,有三种方案。
如果使用者和生产者是Spring-Cloud-Stream(SCS)应用程序,则只需将content-type
设置为application/json
。
#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.contentType=application/json
#For all channels
spring.cloud.stream.default.contentType=application/json
第二种情况是您的生产者不是SCS,您的使用者是SCS,默认情况下,SCS添加嵌入有效负载中的标头,因此您需要禁用将
headerMode
作为raw
与contentType
一起放置的行为。#Specific channel
spring.cloud.stream.bindings.<channelName>.consumer.headerMode=raw
#For all channels
spring.cloud.stream.default.consumer.headerMode=raw
第三种情况是SCS生产者而不是SCS使用者,在这种情况下,您需要使用
application/octet-stream
作为contentType
,因为SCS不支持String(there is a issue for that)的原始头,因此您需要发送有效负载以字节为单位#Properties
spring.cloud.stream.default.contentType=application/octet-stream
spring.cloud.stream.default.producer.headerMode=raw
//Java
byte[] payload = jacksonObjectMapper.writeValueAsBytes(entity);
return channel.send(MessageBuilder.withPayload(payload).build());
10-08 15:57