我正在寻找在PCollection中合并数据

输入的是CSV文件

customer id,customer name,transction amount,transaction type
cust123,ravi,100,D
cust123,ravi,200,D
cust234,Srini,200,C
cust444,shaker,500,D
cust123,ravi,100,C
cust123,ravi,300,C


O / p应该是

在将Textfile读取到对象的集合中之后,我想要合并为所示的输出。

cust123,ravi,300,D
cust123,ravi,400,C
cust234,Srini,200,C
cust444,shaker,500,D


Pipeline pipeline = Pipeline.create(
   PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection< Customer> pCollection =
   pipeline.apply("Read", TextIO.read().from("MyFile.csv"))
           .apply("splitData and store",
               ParDo.of(new TextTransform.SplitValues()))

最佳答案

如果我理解正确,则需要对按customerid + transaction类型分组的交易金额求和。在这种情况下,您需要从高级角度进行:


将键分配给记录:


您可以为此使用WithKeys PTransformsee the doc
密钥取决于您,例如,您可以将客户ID与交易类型结合起来,例如:csvField[0] + "," + csvField[3]

使用GroupByKey PTransformsee this doc按新键对记录进行分组;
GBK的输出将是具有相同键的记录的集合,因此您将需要应用ParDo接受该集合(所有记录属于同一客户和交易类型),对金额进行汇总,输出总和的记录;


最后两个步骤(GBK + ParDo)可以使用Combine.perKey() PTransform代替,它的作用相同,但可以在运行时进行优化。有关更多信息,请参见thisthis

您还可以研究Beam SQL,该语言将允许您在SQL中表达相同的逻辑。有关Beam SQL概述,请参见this doc。在这种情况下,您将需要添加一个ParDo,以便在应用SqlTransform之前将CSV记录转换为光束行。

关于java - 如何在PCollection中合并数据-Apache Beam,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56598661/

10-09 03:33