我正在将Apache Beam 2.14与Java结合使用。

给定一个看起来像这样的数据集:

| countryID | sessionID | pageID    | count    |
| --------- | --------- | --------- | -------- |
| a         | a         | a         | 1        |
| a         | b         | c         | 2        |
| b         | c         | a         | 4        |
| c         | d         | a         | 6        |

我想返回一个数据集,该数据集的行数之和在前N个国家/地区ID中,对于每个countryID,在前N个会话中,对于每个sessionID,在前N个pageID中。

数据集的大小为数十亿行-它不适合内存。
顺便说一句-数据集驻留在BigQuery中,并尝试使用DENSE_RANK()或ROW_NUMBER()函数错误直接在BigQuery中执行此操作,并且由于该大小而出现“超出内存限制”错误,因此尝试使用Dataflow。

我目前的策略是:
  • 组通过countryID,sessionID,pageID的组合键找到每个组的总和。
  • 按国家ID,会话ID对结果进行分组,并找到每个组的总和。
  • 按国家ID将结果分组,然后找到每个组的总和。
  • 使用Top.of可获得排名靠前的国家/地区ID
  • 将结果展平到第二级分组,并使用Top.perKey获得每个国家/地区的热门会话。
  • 将结果展平到第一级分组,并获得每个会话的首页ID。
  • 拼合结果以发出行。

  • 棘手的部分是,行必须保留在每个“group by”级别,以便可以在末尾将其发出。
    我试图创建一个树结构,其中每个节点都包含“分组依据”步骤的结果-这样我就可以只计算一次其子项的总和,以便在后续步骤中进行比较。
    也就是说,在每个“分组依据”步骤中,结果均为KV<String, Iterable<Node>>,并且节点具有以下字段:

        @DefaultCoder(SerializableCoder.class)
        public static class TreeNode implements Node, Serializable {
            private Long total = 0L;
            private KV<String, Iterable<LeafNode>> kv;
        ...
    

    尽管这似乎可以与直接运行器和少量示例数据集一起使用,但是在数据流上运行时,由于Node是输入PCollection的窗口,因此我遇到了与Iterable类相关的序列化错误:

    引起原因:java.io.NotSerializableException:
    org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn $ WindowReiterable

    (根据https://beam.apache.org/releases/javadoc/2.15.0/index.html?org/apache/beam/sdk/transforms/GroupByKey.html)

    给定我需要使用的数据集大小,将数据复制到内存中的另一个集合中以进行可序列化将不是一个可行的选择。

    这是到目前为止的管道示例-仅使用2级分组作为示例:

            Pipeline pipeline = Pipeline.create(options);
    
            pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
                    .apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
                    .apply("Set first level key", WithKeys.of(new GroupKey(key1)))
                    .apply("Group by", GroupByKey.create())
                    .apply("to leaf nodes", ParDo.of(new ToLeafNode()))
                    .apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
                    .apply("Group by 2nd level", GroupByKey.create())
                    .apply("To tree nodes", ParDo.of(new ToTreeNode()))
                    .apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
                    .apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
                    .apply("Expand", ParDo.of(new ExpandTreeNode()))
                    .apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
                    .apply("Values", Values.create())
                    .apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
                    .apply("Expand", ParDo.of(new ExpandLeafNode()))
                    .apply("Values", Values.create())
                    .apply("Write to bq",
                            BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
                                    .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
                                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                                    .withFormatFunction(BigQueryUtils.toTableRow()));
            pipeline.run();
    

    看来这应该是一个共同的目标,所以想知道是否有更简单的方法,或者是否有任何示例使用Beam在Java中实现相同的目的。

    最佳答案

    您可以尝试使用setCoder设置代码,如下所示。

        Pipeline pipeline = Pipeline.create(options);
    
        pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
                .apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
                .apply("Set first level key", WithKeys.of(new GroupKey(key1)))
                .apply("Group by", GroupByKey.create())
                .apply("to leaf nodes", ParDo.of(new ToLeafNode()))
                .apply("Set 2nd level key", WithKeys.of(new GroupKey2()))
                .apply("Group by 2nd level", GroupByKey.create())
                .apply("To tree nodes", ParDo.of(new ToTreeNode())).setCoder(SerializableCoder.of(TreeNode.class))
                .apply("Top N", Top.of(10, new CompareTreeNode<TreeNode>()))
                .apply("Flatten", FlatMapElements.via(new FlattenNodes<TreeNode>()))
                .apply("Expand", ParDo.of(new ExpandTreeNode()))
                .apply("Top N of first key", Top.perKey(10, new CompareTreeNode<LeafNode>()))
                .apply("Values", Values.create())
                .apply("Flatten", FlatMapElements.via(new FlattenNodes<LeafNode>()))
                .apply("Expand", ParDo.of(new ExpandLeafNode()))
                .apply("Values", Values.create())
                .apply("Write to bq",
                        BigQueryIO.<Row>write().to(outputTable).withSchema(BigQueryUtils.toTableSchema(SCHEMA))
                                .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
                                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                                .withFormatFunction(BigQueryUtils.toTableRow()));
        pipeline.run();
    

    但是,对于需要确定前N个国家/地区,前N个会话和前N个页面的用例,我建议简化管道,仅将GroupBy设置在正确的字段中,然后按以下方式应用SumTop
        Pipeline pipeline = Pipeline.create(options);
    
        rows = pipeline.apply("Read from BQ", BigQueryIO.readTableRows().from(inputTable))
               .apply("Transform to row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA);
        sumByCountry =rows.apply("Set Country key", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
             .via((Row row) -> KV.of(row.getCountry(), row.getCount()))))
             .apply("Country Scores", Sum.<String>integersPerKey());
             .apply("Top Countries", Top.of(N, new CompareValues()))
         // Do the same for Session and page
         sumBySession = rows....
         sumByPage = rows....
    

    我不确定是否要获取前N个国家/地区的所有行,但是如果要获取行,则可以在PCollection行上使用前N个国家/地区的side input并将结果过滤掉。您可以对会话和页面执行相同的操作。

    数据流应根据该用例的需要进行扩展,因此您无需为此用例手动进行中间分组。

    10-07 16:40