我可以使用PubsubIO读取来自某个主题的PubSub消息,如下所示:

pipeline.apply("read", PubsubIO.readMessages().fromTopic(options.getPubsubReadTopic()))
.apply( /* rest of the pipeline that works on PubSubMessage records */ )


PubSub消息中的数据包装在我们的自定义包装器中,使用起来并不容易。我想创建类CustomPubsubIO并以类似的方式使用它:

pipeline.apply("read", CustomPubsubIO.readTyped<MyType>().fromTopic(options.getPubsubReadTopic()))
.apply( /* rest of the pipeline that works on MyType records */ )


我能够创建自定义CustomCoder<MyType>,但是用它创建PubsubIO.Read<MyType>时遇到了麻烦。 PubsubIO.ReadPubsubIO中是抽象的,并且与@AutoValue一起使用,似乎无法直接扩展它

用自定义编码器创建Read<>的正确方法是什么?

最佳答案

您是否有任何特定原因来创建自定义类型的PubsubIO.Read?否则,您可以只使用PubsubIO.readMessages()并组合DoFn将输出PubsubMessage转换为所需的任何内容。支持自定义编码器和自定义解析函数的API是两年前的removed,因为使用DoFn似乎是生成自定义类型的更清晰且语义等效的方式。

07-24 21:46