the confluent-kafka-python repo中的AvroProducer示例来看,键/值模式似乎是从文件中加载的。也就是说,从此代码:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}

avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)

看来文件ValueSchema.avscKeySchema.avsc是独立于Avro Schema Registry加载的。

这是正确的吗?引用Avro架构注册表的URL,然后从磁盘加载架构以获取键/值的意义是什么?

请说清楚。

最佳答案

我遇到了同样的问题,最初不清楚本地文件的意义。如其他答案所述,对于首次写入Avro主题或对该主题的架构进行更新,您需要架构字符串-您可以从Kafka REST文档here中看到它。

一旦在注册表中有了该架构,就可以使用REST读取它(在这种情况下,我使用了请求Python模块)并使用avro.loads()方法来获取它。我发现这很有用,因为Produce()函数要求您为AvroProducer提供一个值架构,并且该代码将在不存在该本地文件的情况下起作用:

get_schema_req_data = requests.get("http://1.2.3.4:8081/subjects/sample_value_schema/versions/latest")
get_schema_req_data.raise_for_status()
schema_string = get_schema_req_data.json()['schema']
value_schema = avro.loads(schema_string)
avroProducer = AvroProducer({'bootstrap.servers': '1.2.3.4:9092', 'schema.registry.url': 'http://1.2.3.4:8081'}, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value={"data" : "that matches your schema" })

希望这可以帮助。

09-13 00:31