光束使用模式注册表动态解码

光束使用模式注册表动态解码

本文介绍了光束使用模式注册表动态解码 avro 记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试编写一个从 kafka 主题读取的光束管道,其中该主题由 avro 记录组成.这些记录的架构可以快速更改,因此我想在提取相关公共字段之前使用 Confluent Schema Registry 获取架构并解码事件.要么我做错了什么,要么文档已经过时.我按照这里的例子:

感谢任何帮助,因为我不是 Java 专家.

解决方案

试试这个,它有效:

KafkaIO.read().withBootstrapServers(options.getBootstrap()).withTopic(options.getInputTopic()).withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", "true")).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider).of(options.getSchemaRegistryURL(), "My_TOPIC-valie")));

I've been trying to write a beam pipeline that reads from a kafka topic, where the topic consists of avro records. The schema for these records can change rapidly, so I want to use the Confluent Schema Registry to fetch the schema and decode the events, before extracting the relevant common fields.Either I'm doing something wrong or the documentation is outdated.I've followed the example here: https://github.com/apache/beam/blob/dfa1e475194ac6f65c42da7b8cb8d5055dd1952c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L176-L198

 * <p>If you want to deserialize the keys and/or values based on a schema available in Confluent
 * Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
 * for deserialization. A {@link Coder} will be inferred automatically based on the respective
 * {@link Deserializer}.
 *
 * <p>For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key
 * and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users
 * don't need to specify key or/and value deserializers and coders since they will be set to {@link
 * KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.
 *
 * <p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
 * keys are typed as {@link Long}:
 *
 * <pre>{@code
 * PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
 *   .apply(KafkaIO.<Long, GenericRecord>read()
 *      .withBootstrapServers("broker_1:9092,broker_2:9092")
 *      .withTopic("my_topic")
 *      .withKeyDeserializer(LongDeserializer.class)
 *      // Use Confluent Schema Registry, specify schema registry URL and value subject
 *      .withValueDeserializer(
 *          ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
 *    ...

My code looks like this:

    p.apply("ReadFromKafka",
        KafkaIO.<Long, GenericRecord>read()
        .withBootstrapServers(options.getBootstrapServers())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(
                "http://public-kafka-registry.mydomain.com:8081",
                "my_topic-value"))
        .withNumSplits(1)
        .withoutMetadata()
    );

However I'm getting the following error:

incompatible types: no instance(s) of type variable(s) T exist so that org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider<T> conforms to java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<org.apache.avro.generic.GenericRecord>>

any help appreciated, as I'm not a java wiz.

解决方案

Try this, it works:

KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(options.getBootstrap())
.withTopic(options.getInputTopic())
.withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", "true"))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider
.of(options.getSchemaRegistryURL(), "My_TOPIC-valie")));

这篇关于光束使用模式注册表动态解码 avro 记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-04 16:32