我正在寻找在PCollection中合并数据
输入的是CSV文件
customer id,customer name,transction amount,transaction type
cust123,ravi,100,D
cust123,ravi,200,D
cust234,Srini,200,C
cust444,shaker,500,D
cust123,ravi,100,C
cust123,ravi,300,C
O / p应该是
在将Textfile读取到对象的集合中之后,我想要合并为所示的输出。
cust123,ravi,300,D
cust123,ravi,400,C
cust234,Srini,200,C
cust444,shaker,500,D
Pipeline pipeline = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection< Customer> pCollection =
pipeline.apply("Read", TextIO.read().from("MyFile.csv"))
.apply("splitData and store",
ParDo.of(new TextTransform.SplitValues()))
最佳答案
如果我理解正确,则需要对按customerid + transaction类型分组的交易金额求和。在这种情况下,您需要从高级角度进行:
将键分配给记录:
您可以为此使用WithKeys
PTransform
,see the doc;
密钥取决于您,例如,您可以将客户ID与交易类型结合起来,例如:csvField[0] + "," + csvField[3]
使用GroupByKey
PTransform
,see this doc按新键对记录进行分组;
GBK的输出将是具有相同键的记录的集合,因此您将需要应用ParDo
接受该集合(所有记录属于同一客户和交易类型),对金额进行汇总,输出总和的记录;
最后两个步骤(GBK + ParDo)可以使用Combine.perKey()
PTransform
代替,它的作用相同,但可以在运行时进行优化。有关更多信息,请参见this和this。
您还可以研究Beam SQL,该语言将允许您在SQL中表达相同的逻辑。有关Beam SQL概述,请参见this doc。在这种情况下,您将需要添加一个ParDo
,以便在应用SqlTransform
之前将CSV记录转换为光束行。
关于java - 如何在PCollection中合并数据-Apache Beam,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56598661/