问题描述
我有一个可以成功输出Avro文件的管道,如下所示:
I have a pipeline that successfully outputs an Avro file as follows:
@DefaultCoder(AvroCoder.class)
class MyOutput_T_S {
T foo;
S bar;
Boolean baz;
public MyOutput_T_S() {}
}
@DefaultCoder(AvroCoder.class)
class T {
String id;
public T() {}
}
@DefaultCoder(AvroCoder.class)
class S {
String id;
public S() {}
}
...
PCollection<MyOutput_T_S> output = input.apply(myTransform);
output.apply(AvroIO.Write.to("/out").withSchema(MyOutput_T_S.class));
除了参数输出MyOutput<T, S>
(其中T
和S
都可以使用反射进行Avro编码)外,如何重现此确切行为.
How can I reproduce this exact behavior except with a parameterized output MyOutput<T, S>
(where T
and S
are both Avro code-able using reflection).
主要问题是Avro反射不适用于参数化类型.因此,基于以下响应:
The main issue is that Avro reflection doesn't work for parameterized types. So based on these responses:
- Setting Custom Coders & Handling Parameterized types
- Using Avrocoder for Custom Types with Generics
1)我认为我需要编写自定义 CoderFactory
但是,我很难弄清楚它是如何工作的(我很难找到示例).奇怪的是,一个看起来很幼稚的编码器工厂似乎让我运行管道并使用DataflowAssert检查正确的输出:
1) I think I need to write a custom CoderFactory
but, I am having difficulty figuring out exactly how this works (I'm having trouble finding examples). Oddly enough, a completely naive coder factory appears to let me run the pipeline and inspect proper output using DataflowAssert:
cr.RegisterCoder(MyOutput.class, new CoderFactory() {
@Override
public Coder<?> create(List<? excents Coder<?>> componentCoders) {
Schema schema = new Schema.Parser().parse("{\"type\":\"record\,"
+ "\"name\":\"MyOutput\","
+ "\"namespace\":\"mypackage"\","
+ "\"fields\":[]}"
return AvroCoder.of(MyOutput.class, schema);
}
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
List components = new ArrayList();
return components;
}
虽然我现在可以成功地对输出进行断言,但我希望这不会将其切入到写入文件中.我还没有弄清楚如何使用提供的componentCoders
来生成正确的架构,如果我尝试将T
或S
的架构推入fields
中,我会得到:
While I can successfully assert against the output now, I expect this will not cut it for writing to a file. I haven't figured out how I'm supposed to use the provided componentCoders
to generate the correct schema and if I try to just shove the schema of T
or S
into fields
I get:
java.lang.IllegalArgumentException: Unable to get field id from class null
2)假设我弄清楚了如何编码MyOutput
.我要传递给AvroIO.Write.withSchema
的什么?如果我通过MyOutput.class
或架构,则会出现类型不匹配错误.
2) Assuming I figure out how to encode MyOutput
. What do I pass to AvroIO.Write.withSchema
? If I pass either MyOutput.class
or the schema I get type mismatch errors.
推荐答案
我认为有两个问题(如果我错了,请纠正我):
I think there are two questions (correct me if I am wrong):
- 如何启用编码器注册表为
MyOutput<T, S>
的各种参数设置提供编码器? - 如何使用
AvroIO.Write
将MyOutput<T, S>
的值设置为文件.
- How do I enable the coder registry to provide coders for various parameterizations of
MyOutput<T, S>
? - How do I values of
MyOutput<T, S>
to a file usingAvroIO.Write
.
第一个问题将通过在您找到的链接问题中注册一个CoderFactory
来解决.
The first question is to be solved by registering a CoderFactory
as in the linked question you found.
您的天真的编码器可能使您可以毫无问题地运行管道,因为已优化了序列化.当然,没有字段的Avro架构将导致这些字段在序列化+反序列化往返过程中被丢弃.
Your naive coder is probably allowing you to run the pipeline without issues because serialization is being optimized away. Certainly an Avro schema with no fields will result in those fields being dropped in a serialization+deserialization round trip.
但是,假设您使用字段填充模式,则CoderFactory#create
的方法看起来正确.我不知道消息java.lang.IllegalArgumentException: Unable to get field id from class null
的确切原因,但对于正确组合的schema
,对AvroCoder.of(MyOutput.class, schema)
的调用应该可以工作.如果与此有关,则更多详细信息(例如堆栈跟踪的其余部分)将很有帮助.
But assuming you fill in the schema with the fields, your approach to CoderFactory#create
looks right. I don't know the exact cause of the message java.lang.IllegalArgumentException: Unable to get field id from class null
, but the call to AvroCoder.of(MyOutput.class, schema)
should work, for an appropriately assembled schema
. If there is an issue with this, more details (such as the rest of the stack track) would be helpful.
但是,对CoderFactory#getInstanceComponents
的覆盖应返回值列表,每个MyOutput
类型参数一个.像这样:
However, your override of CoderFactory#getInstanceComponents
should return a list of values, one per type parameter of MyOutput
. Like so:
@Override
public List<Object> getInstanceComponents(Object value) {
MyOutput<Object, Object> myOutput = (MyOutput<Object, Object>) value;
return ImmutableList.of(myOutput.foo, myOutput.bar);
}
可以使用与第一个相同的支持代码来回答第二个问题,但在其他方面则是独立的. AvroIO.Write.withSchema
始终明确使用提供的架构.它确实使用了AvroCoder
,但这实际上是一个实现细节.提供兼容的架构就足够了-必须为要输出MyOutput<T, S>
的每个T
和S
值组成一个这样的架构.
The second question can be answered using some of the same support code as the first, but otherwise is independent. AvroIO.Write.withSchema
always explicitly uses the provided schema. It does use AvroCoder
under the hood, but this is actually an implementation detail. Providing a compatible schema is all that is necessary - such a schema will have to be composed for each value of T
and S
for which you want to output MyOutput<T, S>
.
这篇关于数据流将参数化类型输出到Avro文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!