我正在尝试使用Apache Beam创建一个Dataflow管道,但是我无法按照文档进行操作,也找不到任何示例。

管道很简单。

  • 创建管道
  • 从发布/订阅主题中阅读
  • 写入 Spanner 。

  • 目前,我停留在第2步。我找不到任何有关如何从pub/sub中读取和使用它的示例。

    这是我到目前为止想要的代码

    class ExtractFlowInfoFn extends DoFn<PubsubMessage, KV<String, String>> {
        public void processElement(ProcessContext c) {
            KV.of("key", "value");
        }
    }
    
    public static void main(String[] args) {
    
        Pipeline p = Pipeline.create(
        PipelineOptionsFactory.fromArgs(args).withValidation().create());
    
        p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
         .apply("ConvertToKeyValuePair", ParDo.of(new ExtractFlowInfoFn()))
         .apply("WriteToLog", ));
    };
    

    通过遵循多个示例,我能够提出代码。老实说,我不知道我在这里做什么。

    请帮助我理解这一点或将我链接到正确的文档。

    最佳答案

    从发布/订阅中提取消息并写入Cloud Spanner的示例:

    import com.google.cloud.spanner.Mutation;
    import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
    import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
    
    class MessageToMutationDoFn extends DoFn<PubsubMessage, Mutation> {
    
        @ProcessElement
        public void processElement(ProcessContext c) {
    
            // TODO: create Mutation object from PubsubMessage
    
            Mutation mutation = Mutation.newInsertBuilder("users_backup2")
                .set("column_1").to("value_1")
                .set("column_2").to("value_2")
                .set("column_3").to("value_3")
                .build();
    
            c.output(mutation);
        }
    }
    
    public static void main(String[] args) {
    
        Pipeline p = Pipeline.create();
    
        p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
         .apply("MessageToMutation", ParDo.of(new MessageToMutationDoFn()))
         .apply("WriteToSpanner", SpannerIO.write()
             .withProjectId("projectId")
             .withInstanceId("spannerInstanceId")
             .withDatabaseId("spannerDatabaseId"));
    
        p.run();
    }
    

    引用:Apache Beam SpannerIOApache Beam PubsubIO

    关于google-cloud-dataflow - Google DataFlow Apache Beam,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48716264/

    10-11 10:55