问题描述
我目前无法在KSTREAM APP中反序列化原始PRIMITIVE密钥
以avro模式编码的密钥(已在模式注册表中注册),
当我使用kafka-avro-console-consumer时,我可以看到密钥已正确反序列化
但是不可能使其在KSTREAM应用程序中工作
密钥的avro模式是原始的:
{"type":"string"}
我已经关注了合流的文档
final Serde<V> valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueSpecificAvroSerde.configure(serdeConfig, false);
final Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
keySpecificAvroSerde.configure(serdeConfig, true);
Consumed<String, totoAvro> inputConf = Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde);
final KStream<String, totoAvro> mystream = builder.stream("name topic", inputConf);
mystream.peek((key, value) -> logger.info("topic KEY :" + key))
对于该值来说效果很好,但是键将是一个字符串,该字符串包含模式注册表中的字节,而不仅仅是"reel"键
https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
因此字符串键为/§/./11016015201 ,但我希望使用卷轴值: 1016015201
如果我在字符串中打印字节,则为[0x00 0x00 0x00 0x02 0x31 0x14 0x31 0x30 0x31 0x36 0x30 0x31 0x35 0x32 0x30 0x31]
更新
它现在正在工作: https://stackoverflow.com/a/51957801/6227500
原始答案
该功能当前在架构注册表项目中不可用.
但是通过实现自定义SERDE,您可以管理案件,
Thiyaga Rajan提出了可行的实施方案
I'm currently incapable of deserialize an avro PRIMITIVE key in a KSTREAM APP
the key in encoded with an avro schema ( registered in the schema registry ) ,
when i use the kafka-avro-console-consumer, I can see that the key is correctly deserialize
But impossible to make it work in a KSTREAM app
the avro schema of the key is a PRIMITIVE:
{"type":"string"}
I already followed the documentation of confluent
final Serde<V> valueSpecificAvroSerde = new SpecificAvroSerde<>();
final Map<String, String> serdeConfig = Collections.singletonMap(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
valueSpecificAvroSerde.configure(serdeConfig, false);
final Serdes.StringSerde keySpecificAvroSerde = new Serdes.StringSerde();
keySpecificAvroSerde.configure(serdeConfig, true);
Consumed<String, totoAvro> inputConf = Consumed.with(keySpecificAvroSerde, valueSpecificAvroSerde);
final KStream<String, totoAvro> mystream = builder.stream("name topic", inputConf);
mystream.peek((key, value) -> logger.info("topic KEY :" + key))
it's working well for the value, but the key is going to be a string containing the bytes from the schema registry and not only the "reel" key
https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-format
So the string key is /§/./11016015201 , but I would like the reel value : 1016015201
if I print the bytes inside the String it's [ 0x00 0x00 0x00 0x02 0x31 0x14 0x31 0x30 0x31 0x36 0x30 0x31 0x35 0x32 0x30 0x31 ]
Update
it's now working : https://stackoverflow.com/a/51957801/6227500
Original answer
The feature is not available currently in the schema registry project.
But by implementing a custom SERDE you can manage the case ,
Thiyaga Rajan proposed a working implementation
Serde class for AVRO primitive type
这篇关于带有kafka lib的反序列化原始AVRO KEY的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!