问题描述
因此,我尝试使用StreamBridge将消息动态发送到不同的主题。如果我的输出是 Message<字符串> ,但不是消息< GenericRecord>
So I am trying to use StreamBridge to dynamically send messages to different topics. I am successful in doing so if my output is a Message< String>, but not Message< GenericRecord>
代码示例:
@StreamListener(Sink.INPUT)
public void process(@Payload GenericRecord messageValue,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) GenericRecord messageKey,
@Header("Type") String type) {
log.info("Processing Event --> " + messageValue);
// Code...
// convert to Message<GenericRecord>
Message<GenericRecord> message = ...
streamBridge.send(type, message);
log.info("Processed Event --> " + messageValue);
}
我得到的错误是,原因是:org.springframework.messaging。 converter.MessageConversionException:无法编写JSON:不是地图:
我猜是因为streamBridge acceptedOutputTypes = application / json
The error I get is Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:
which I am guessing is because streamBridge acceptedOutputTypes = application/json
2020-06-28 04:42:55.670 INFO 54347 --- [container-0-C-1] o.s.c.f.c.c.SimpleFunctionRegistry : Looking up function 'streamBridge' with acceptedOutputTypes: [application/json]
我尝试通过在属性中设置以下内容来将接受的输出类型修改为avro,这是行不通的。
I tried modify accepted output type to be avro by setting the following in my properties, which did not work.
spring.cloud.stream.function.definition=streamBridge
spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings.streamBridge-out-0.content-type=application/*+avro
spring.cloud.stream.bindings.streamBridge-out-0.producer.use-native-encoding=true
关于如何将StreamBridge配置为avro的任何想法?
Any ideas on how to configure StreamBridge to be avro?
编辑:我也尝试了 streamBridge.send(type,message,MimeType.valueOf( application / * + avro))
但
推荐答案
我无法让StreamBridge动态工作,所以我改用Function:
I could not get StreamBridge to work dynamically so I switched to using Function:
@Bean
public Function<Message<GenericRecord>, Message<GenericRecord>> process() {
return message -> {
// Code...
String topic = message.getHeaders().get("type");
// convert to Message<GenericRecord>
Message<GenericRecord> message = MessageBuilder...
.setHeader("spring.cloud.stream.sendto.destination", topic)
.build();
return outgoingMessage;
};
}
属性文件为:
spring.cloud.function.definition=process
spring.cloud.stream.bindings.process-in-0.destination=${consumer_topic}
spring.cloud.stream.bindings.process-in-0.group=${spring.application.name}
spring.cloud.stream.bindings.process-out-0.content-type=application/*+avro
spring.cloud.stream.bindings.process-out-0.producer.use-native-encoding=true
编辑: Streambridge已修复以支持此操作:
Streambridge got fixed to support this: https://github.com/spring-cloud/spring-cloud-stream/issues/2007
这篇关于如何配置Spring Cloud StreamBridge来生产Avro?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!