我正在编写一个Dataflow流管道。在一种转换中,DoFn我要访问外部服务-在这种情况下,它是数据存储。
这种初始化步骤是否有最佳实践?我不想为每个processElement方法调用创建数据存储连接对象。
最佳答案
在Dataflow SDK中,最简单的操作是在第一个元素中添加检查以初始化外部服务:
class DatastoreCallingDoFn extends DoFn {
private ExtServiceHandle handle = null;
private ExtServiceHandle initializeConnection() {
// ...
}
public void processElement(ProcessContext c) {
// ... process each element -- setup will have been called
if (handle == null) {
handle = initializeConnection();
}
// Process elements
}
}
如果使用的是Beam,则可以使用
@Setup
装饰器来装饰DoFn
中的函数以进行DoFn的设置,例如初始化数据存储连接。class DatastoreCallingDoFn extends DoFn {
@Setup
public void initializeDatastoreConnection() {
// ...
}
@ProcessElement
public void processElement(ProcessContext c) {
// ... process each element -- setup will have been called
}
}
这与answer in this question相似。