我正在建立一个读取Avro通用记录的管道。要在阶段之间传递GenericRecord,我需要注册AvroCoder。该文档说,如果我使用通用记录,则模式参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of-java.lang.Class-org.apache.avro.Schema-
但是,当我将空模式传递给方法AvroCoder.of(Class, Schema)
时,它将在运行时引发异常。有没有一种方法可以为GenericRecord创建不需要模式的AvroCoder?就我而言,每个GenericRecord都有一个嵌入式模式。
异常和堆栈跟踪:
Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
最佳答案
在查看AvroCoder
的代码之后,我认为该文档在此处不正确。您的AvroCoder
实例将需要一种方法来找出您的Avro记录的架构-可能唯一的方法是提供一个。
因此,建议您调用AvroCoder.of(GenericRecord.class, schema)
,其中schema
是PCollection中GenericRecord
对象的正确架构。