本文介绍了Kafka流从JSON到Avro的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我尝试使用Kafka Stream将包含String/JSON消息的主题转换为Avro消息.
I try to use Kafka Stream to convert a topic with String/JSON messages to another topic as Avro messages.
流主要方法:
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
final KStreamBuilder builder = new KStreamBuilder();
final Serde<String> stringSerde = Serdes.String();
builder.stream(stringSerde, stringSerde, "testin")
.mapValues(value -> AvroUtil.transform(value))
.to("testout");
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
转化:
public static GenericRecord transform(Object value) {
// ... parse string/json and generate Avro object
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecord\"," +
"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1", "value1");
return avroRecord;
}
并获得这样的异常:
Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: com.fasterxml.jackson.annotation.JsonProperty.access()Lcom/fasterxml/jackson/annotation/JsonProperty$Access;
at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findPropertyAccess(JacksonAnnotationIntrospector.java:229)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:545)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder$9.withMember(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.fromMemberAnnotationsExcept(POJOPropertyBuilder.java:996)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.findAccess(POJOPropertyBuilder.java:542)
at com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder.removeNonVisible(POJOPropertyBuilder.java:623)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector._removeUnwantedAccessor(POJOPropertiesCollector.java:697)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.collectAll(POJOPropertiesCollector.java:298)
at com.fasterxml.jackson.databind.introspect.POJOPropertiesCollector.getJsonValueMethod(POJOPropertiesCollector.java:169)
at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findJsonValueMethod(BasicBeanDescription.java:222)
at com.fasterxml.jackson.databind.ser.BasicSerializerFactory.findSerializerByAnnotations(BasicSerializerFactory.java:355)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:210)
at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
at com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
at com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
at com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
at io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest.toJson(RegisterSchemaRequest.java:76)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:224)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:219)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:58)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:90)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
这是正确的方法吗?我是Kafka Streams和Avro的新手
Is this the right approach? I'm new to Kafka Streams and Avro
推荐答案
只是缺少了杰克逊的依赖关系:
Just the jackson dependencies were missing:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.8</version>
</dependency>
现在可以正常使用
这篇关于Kafka流从JSON到Avro的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!