是否可以创建一个从Pub/Sub读取数据并写入Datastore的管道?在我的代码中,我指定PubsubIO作为输入,并应用窗口获取有界的PCollection,但似乎无法将DatastoreIO.writeTo与options.setStreaming一起使用为true,但这是使用该属性所必需的将PubsubIO作为输入。有没有解决的办法?还是根本无法从pubsub读取并写入数据存储区?

这是我的代码:

DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);

    options.setRunner(DataflowPipelineRunner.class);
    options.setProject(projectName);
    options.setStagingLocation("gs://my-staging-bucket/staging");
    options.setStreaming(true);

    Pipeline p = Pipeline.create(options);

    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
        private static final long serialVersionUID = 1L;
        public void processElement(ProcessContext c) {
            String msg = c.element();
            byte[] decoded = Base64.decodeBase64(msg.getBytes());
            String outmsg = new String(decoded);
            c.output(outmsg);
        }
    }));
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));

    inputEntity.apply(DatastoreIO.writeTo(datasetid));


    p.run();

这是我得到的异常(exception):
Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)

最佳答案

流媒体运行器当前不支持DatastoreIO接收器。要从流传输管道写入Datastore,您可以从DoFn直接调用Datastore API。

09-05 01:32