我能够设置TestPipeline
和PubsubIO
与Pubsub
模拟器一起使用:
options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class);
options.setPubsubRootUrl(pubsubUrl);
但是问题是在单元测试时如何绑定
PubsubIO.readMessages()
来进行有限测试: final PCollection<PubsubMessage> pCollection =
pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic("some-topic"));
PAssert.that(pCollection).satisfies(pubsubMessages -> {
pubsubMessages.forEach(System.out::println);
return null;
});
pipeline.run();
我正在使用
google-cloud-dataflow-java-sdk-all
版本2.1.0
。 最佳答案
据我所知,对于DirectRunner,这目前是不可能的(我很不幸)。
使用DataflowRunner,最好的选择可能是在一段时间后以编程方式在管道上启动Drain。