我能够设置TestPipelinePubsubIOPubsub模拟器一起使用:

  options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class);
  options.setPubsubRootUrl(pubsubUrl);


但是问题是在单元测试时如何绑定PubsubIO.readMessages()来进行有限测试:

  final PCollection<PubsubMessage> pCollection =
      pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic("some-topic"));

  PAssert.that(pCollection).satisfies(pubsubMessages -> {
    pubsubMessages.forEach(System.out::println);
    return null;
  });

  pipeline.run();


我正在使用google-cloud-dataflow-java-sdk-all版本2.1.0

最佳答案

据我所知,对于DirectRunner,这目前是不可能的(我很不幸)。

使用DataflowRunner,最好的选择可能是在一段时间后以编程方式在管道上启动Drain。

10-07 23:03