


I have a pipeline that successfully outputs an Avro file as follows:

class MyOutput_T_S {
  T foo;
  S bar;
  Boolean baz;
  public MyOutput_T_S() {}

class T {
  String id;
  public T() {}

class S {
  String id;
  public S() {}
PCollection<MyOutput_T_S> output = input.apply(myTransform);

除了参数输出MyOutput<T, S>(其中TS都可以使用反射进行Avro编码)外,如何重现此确切行为.

How can I reproduce this exact behavior except with a parameterized output MyOutput<T, S> (where T and S are both Avro code-able using reflection).


The main issue is that Avro reflection doesn't work for parameterized types. So based on these responses:

  • Setting Custom Coders & Handling Parameterized types
  • Using Avrocoder for Custom Types with Generics

1)我认为我需要编写自定义 CoderFactory 但是,我很难弄清楚它是如何工作的(我很难找到示例).奇怪的是,一个看起来很幼稚的编码器工厂似乎让我运行管道并使用DataflowAssert检查正确的输出:

1) I think I need to write a custom CoderFactory but, I am having difficulty figuring out exactly how this works (I'm having trouble finding examples). Oddly enough, a completely naive coder factory appears to let me run the pipeline and inspect proper output using DataflowAssert:

cr.RegisterCoder(MyOutput.class, new CoderFactory() {
  public Coder<?> create(List<? excents Coder<?>> componentCoders) {
    Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
      + "\"name\":\"MyOutput\","
      + "\"namespace\":\"mypackage"\","
      + "\"fields\":[]}"
    return AvroCoder.of(MyOutput.class, schema);
  public List<Object> getInstanceComponents(Object value) {
    MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
    List components = new ArrayList();
    return components;


While I can successfully assert against the output now, I expect this will not cut it for writing to a file. I haven't figured out how I'm supposed to use the provided componentCoders to generate the correct schema and if I try to just shove the schema of T or S into fields I get:

java.lang.IllegalArgumentException: Unable to get field id from class null


2) Assuming I figure out how to encode MyOutput. What do I pass to AvroIO.Write.withSchema? If I pass either MyOutput.class or the schema I get type mismatch errors.



I think there are two questions (correct me if I am wrong):

  1. 如何启用编码器注册表为MyOutput<T, S>的各种参数设置提供编码器?
  2. 如何使用AvroIO.WriteMyOutput<T, S>的值设置为文件.
  1. How do I enable the coder registry to provide coders for various parameterizations of MyOutput<T, S>?
  2. How do I values of MyOutput<T, S> to a file using AvroIO.Write.


The first question is to be solved by registering a CoderFactory as in the linked question you found.


Your naive coder is probably allowing you to run the pipeline without issues because serialization is being optimized away. Certainly an Avro schema with no fields will result in those fields being dropped in a serialization+deserialization round trip.

但是,假设您使用字段填充模式,则CoderFactory#create的方法看起来正确.我不知道消息java.lang.IllegalArgumentException: Unable to get field id from class null的确切原因,但对于正确组合的schema,对AvroCoder.of(MyOutput.class, schema)的调用应该可以工作.如果与此有关,则更多详细信息(例如堆栈跟踪的其余部分)将很有帮助.

But assuming you fill in the schema with the fields, your approach to CoderFactory#create looks right. I don't know the exact cause of the message java.lang.IllegalArgumentException: Unable to get field id from class null, but the call to AvroCoder.of(MyOutput.class, schema) should work, for an appropriately assembled schema. If there is an issue with this, more details (such as the rest of the stack track) would be helpful.


However, your override of CoderFactory#getInstanceComponents should return a list of values, one per type parameter of MyOutput. Like so:

public List<Object> getInstanceComponents(Object value) {
  MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
  return ImmutableList.of(myOutput.foo, myOutput.bar);

可以使用与第一个相同的支持代码来回答第二个问题,但在其他方面则是独立的. AvroIO.Write.withSchema始终明确使用提供的架构.它确实使用了AvroCoder,但这实际上是一个实现细节.提供兼容的架构就足够了-必须为要输出MyOutput<T, S>的每个TS值组成一个这样的架构.

The second question can be answered using some of the same support code as the first, but otherwise is independent. AvroIO.Write.withSchema always explicitly uses the provided schema. It does use AvroCoder under the hood, but this is actually an implementation detail. Providing a compatible schema is all that is necessary - such a schema will have to be composed for each value of T and S for which you want to output MyOutput<T, S>.


09-11 17:24