我正在编写一个Dataflow流管道。在一种转换中,DoFn我要访问外部服务-在这种情况下,它是数据存储。

这种初始化步骤是否有最佳实践?我不想为每个processElement方法调用创建数据存储连接对象。

最佳答案

在Dataflow SDK中,最简单的操作是在第一个元素中添加检查以初始化外部服务:

class DatastoreCallingDoFn extends DoFn {

    private ExtServiceHandle handle = null;

    private ExtServiceHandle initializeConnection() {
      // ...
    }

    public void processElement(ProcessContext c) {
      // ... process each element -- setup will have been called
      if (handle == null) {
        handle = initializeConnection();
      }
      // Process elements
    }
}




如果使用的是Beam,则可以使用@Setup装饰器来装饰DoFn中的函数以进行DoFn的设置,例如初始化数据存储连接。

class DatastoreCallingDoFn extends DoFn {
    @Setup
    public void initializeDatastoreConnection() {
      // ...
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // ... process each element -- setup will have been called
    }
}


这与answer in this question相似。

10-02 07:53