我正在研究通过Google Dataflow/Apache Beam处理来自Web用户 session 的日志,并且需要将用户日志的输入(流式传输)与上个月用户 session 的历史记录结合起来。

我研究了以下方法:

  • 使用30天固定的窗口:最有可能将一个大窗口容纳进内存,并且我不需要更新用户的历史记录,只需引用
  • 使用CoGroupByKey来连接两个数据集,但是两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),在我的情况下这不是正确的(24h vs 30 days)
  • 使用Side Input检索element中给定processElement(ProcessContext processContext)的用户 session 历史记录

  • 我的理解是,通过.withSideInputs(pCollectionView)加载的数据需要放入内存中。我知道我可以将单个用户的所有 session 历史记录都存储到内存中,但不能将所有 session 历史记录都存储到内存中。

    我的问题是,是否有一种方法可以从仅与当前用户 session 相关的侧面输入中加载/流式传输数据?

    我正在想象一个parDo函数,该函数将通过指定用户的ID从侧面输入中加载用户的历史 session 。但是只有当前用户的历史记录 session 可以容纳在内存中。通过侧面输入加载所有历史记录 session 将太大。

    一些伪代码来说明:
    public static class MetricFn extends DoFn<LogLine, String> {
    
        final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
    
        public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
            this.pHistoryView = historyView;
        }
    
        @Override
        public void processElement(ProcessContext processContext) throws Exception {
            Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
    
            final LogLine currentLogLine = processContext.element();
            final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
            final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
            processContext.output(outputMetric);
        }
    }
    

    最佳答案

    目前尚没有一种方法可以访问流中的每个键的侧面输入,但是正如您所描述的,它绝对有用,这是我们正在考虑实现的方法。

    一种可能的解决方法是使用侧面输入来分配指向实际 session 历史记录的指针。生成24小时 session 历史记录的代码可以将它们上传到GCS/BigQuery/etc,然后将位置作为侧面输入发送到加入代码。

    10-02 07:07
    查看更多