我正在建立一个读取Avro通用记录的管道。要在阶段之间传递GenericRecord,我需要注册AvroCoder。该文档说,如果我使用通用记录,则模式参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of-java.lang.Class-org.apache.avro.Schema-

但是,当我将空模式传递给方法AvroCoder.of(Class, Schema)时,它将在运行时引发异常。有没有一种方法可以为GenericRecord创建不需要模式的AvroCoder?就我而言,每个GenericRecord都有一个嵌入式模式。

异常和堆栈跟踪:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)

最佳答案

在查看AvroCoder的代码之后,我认为该文档在此处不正确。您的AvroCoder实例将需要一种方法来找出您的Avro记录的架构-可能唯一的方法是提供一个。

因此,建议您调用AvroCoder.of(GenericRecord.class, schema),其中schema是PCollection中GenericRecord对象的正确架构。

10-08 16:38