我正在研究通过Google Dataflow/Apache Beam处理来自Web用户 session 的日志,并且需要将用户日志的输入(流式传输)与上个月用户 session 的历史记录结合起来。
我研究了以下方法:
element
中给定processElement(ProcessContext processContext)
的用户 session 历史记录我的理解是,通过
.withSideInputs(pCollectionView)
加载的数据需要放入内存中。我知道我可以将单个用户的所有 session 历史记录都存储到内存中,但不能将所有 session 历史记录都存储到内存中。我的问题是,是否有一种方法可以从仅与当前用户 session 相关的侧面输入中加载/流式传输数据?
我正在想象一个parDo函数,该函数将通过指定用户的ID从侧面输入中加载用户的历史 session 。但是只有当前用户的历史记录 session 可以容纳在内存中。通过侧面输入加载所有历史记录 session 将太大。
一些伪代码来说明:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
最佳答案
目前尚没有一种方法可以访问流中的每个键的侧面输入,但是正如您所描述的,它绝对有用,这是我们正在考虑实现的方法。
一种可能的解决方法是使用侧面输入来分配指向实际 session 历史记录的指针。生成24小时 session 历史记录的代码可以将它们上传到GCS/BigQuery/etc,然后将位置作为侧面输入发送到加入代码。