问题描述
背景:我们有一个 Dataflow 作业,它将 PubSub 消息转换为 Avro GenericRecords 并将它们作为.avro"写入 GCS.PubSub 消息和 GenericRecords 之间的转换需要一个模式.此架构每周更改一次,仅添加字段.我们希望能够在不更新 Dataflow 作业的情况下更新字段.
Context:We have a Dataflow job that transforms PubSub messages into Avro GenericRecords and writes them into GCS as ".avro". The transformation between PubSub messages and GenericRecords requires a schema. This schema changes weekly with field additions only. We want to be able to update the fields without updating the Dataflow job.
我们做了什么:我们听取了这篇博文 并创建了一个每分钟刷新一次内容的 Guava 缓存.刷新功能将从 GCS 拉取模式.然后我们让 FileIO.write 查询 Guava Cache 以获取最新的 schema,并将具有 schema 的元素转换为 GenericRecord.我们还有 FileIO.write 输出到 Avro 接收器,该接收器也是使用架构创建的.
What we did:We took the advice from this post and created a Guava Cache that refreshes the content every minute. The refresh function will pull schema from GCS. We then have FileIO.write query the Guava Cache to get the latest schema and transforms the elements with the schema as GenericRecord. We also have FileIO.write outputs to an Avro sink which is also created using the schema.
代码如下:
genericRecordsAsByteArrays.apply(FileIO.<byte[]>write()
.via(fn((input, c) -> {
Map<String, Object> schemaInfo = cache.get("");
Descriptors.Descriptor paymentRecordFd =
(Descriptors.Descriptor) schemaInfo.get(DESCRIPTOR_KEY);
DynamicMessage paymentRecordMsg = DynamicMessage.parseFrom(paymentRecordFd, input);
Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
//From concrete PaymentRecord bytes to DynamicMessage
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
ProtobufDatumWriter<DynamicMessage> pbWriter = new ProtobufDatumWriter<>(schema);
pbWriter.write(paymentRecordMsg, encoder);
encoder.flush();
// From dynamic message to GenericRecord
byte[] avroContents = output.toByteArray();
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroContents, null);
return reader.read(null, decoder);
}
}, requiresSideInputs()),
fn((output, c) -> {
Map<String, Object> schemaInfo = cache.get("");
Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
return AvroIO.sink(schema).withCodec(CodecFactory.snappyCodec());
}, requiresSideInputs()))
.withNumShards(5)
.withNaming(new PerWindowFilenames(baseDir, ".avro"))
.to(baseDir.toString()));
我的问题:
- 当我们写入一个 Avro 文件时会发生什么,但突然发生了架构更新,现在我们正在将新架构写入使用旧架构创建的 Avro 文件?
- Dataflow 在看到新架构时是否会启动一个新文件?
- 在创建新文件之前,Dataflow 是否会忽略新架构和其他字段?
每个 Avro 文件在文件的开头都有自己的架构,所以我不确定预期的行为是什么.
Each Avro file has its own schema at the very beginning of the file, so I am not sure what's the expected behavior.
推荐答案
这不可能.每个 Avro 文件只有一个架构.如果它发生变化,根据定义,您将写入一个新文件.
It's not possible. Each Avro file only has one schema. If it changes, by definition, you'd be writing to a new file.
我怀疑 Dataflow 会忽略字段.
I doubt Dataflow ignores fields.
这篇关于写入 Avro 文件时的架构更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!