我正在尝试执行一个数据流管道作业,该作业将在数据存储区中的时间对 N个条目执行一个功能。在我的情况下,此功能将一批100个条目作为有效载荷发送到某个REST服务。这意味着我要遍历一个数据存储实体中的所有条目,并一次将100个批处理条目发送到某个外部REST服务。

我当前的解决方案

  • 从数据存储区
  • 读取输入
  • 创建与管道选项中指定的工作程序一样多的键(1个工作程序= 1个键)。
  • 按键分组,以便我们将迭代器作为输出(第4步中的迭代器输入)
  • 以编程方式在临时列表中批处理用户,并将其作为批处理发送到REST端点。

  • 上面用伪代码描述的场景(忽略细节):

    final int BATCH_SIZE = 100;
    
    // 1. Read input from datastore
    pipeline.apply(DatastoreIO.readFrom(datasetId, query))
    
        // 2. create keys to be used in group by so we get iterator in next task
        .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
                String key = generateKey(c);
                EntryPOJO entry = processEntity(c);
                c.output(KV.of(key, entry));
            }
        }))
    
        // 3. Group by key
        .apply(GroupByKey.create())
    
        // 4. Programatically batch users
        .apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
            @Override
            public void processElement(ProcessContext c) throws Exception {
                List<EntryPOJO> batchedEntries = new ArrayList<>();
                for (EntryPOJO entry : c.element().getValue()) {
                    if (batchedEntries.size() >= BATCH_SIZE) {
                        sendToRESTEndpoint(batchedEntries);
                        batchedEntries = new ArrayList<>();
                    }
                    batchedEntries.add(entry);
                }
                sendToRESTEndpoint(batchedEntries);
            }
        }));
    

    我当前解决方案的主要问题

    GroupByKey阻止最后一个ParDo的执行(阻止第4步),直到所有条目都分配给一个键为止。

    解决方案通常可以使用,但是我想并行进行所有操作(从数据存储区加载后,立即向REST端点发送100个条目的批处理),这对于我当前的解决方案是不可能的,因为GroupByKey不输出任何直到从数据库中提取每个条目并将其插入键值对为止。因此,执行实际上分两个步骤:1.从数据存储区中获取所有数据并为其分配一个 key ,2.以批处理方式处理条目

    问题

    所以我想知道的是,是否有一些现有功能可以做到这一点。或者至少在没有GroupByKey步骤的情况下获得Iterable,以便批处理功能任务无需等待数据被转储。

    最佳答案

    您可以在DoFn中批量添加这些元素。例如:

    final int BATCH_SIZE = 100;
    
    pipeline
      // 1. Read input from datastore
      .apply(DatastoreIO.readFrom(datasetId, query))
    
      // 2. Programatically batch users
      .apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {
    
        private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);
    
        @Override
        public void processElement(ProcessContext c) throws Exception {
          EntryPOJO entry = processEntity(c);
          accumulator.add(c);
          if (accumulator.size() >= BATCH_SIZE) {
            c.output(accumulator);
            accumulator = new ArrayList<>(BATCH_SIZE);
          }
        }
    
        @Override
        public void finishBundle(Context c) throws Exception {
          if (accumulator.size() > 0) {
            c.output(accumulator);
          }
        }
      });
    
      // 3. Consume those bundles
      .apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            sendToRESTEndpoint(batchedEntries);
        }
      }));
    

    如果您不想使用单独的“批处理”步骤,也可以将第2步和第3步合并到一个DoFn中。

    10-02 07:55
    查看更多