我目前正在使用Python处理数据流模板,我想访问作业ID并将其保存到特定的Firestore文档。
是否可以访问作业ID?
我在文档中找不到与此有关的任何内容。
最佳答案
您可以通过在管道中调用dataflow.projects().locations().jobs().list
来实现(请参见下面的完整代码)。一种可能性是始终使用相同的作业名称调用模板,这很有意义,否则可以将作业前缀作为运行时参数传递。使用正则表达式解析作业列表,以查看该作业是否包含名称前缀,如果包含名称前缀,则返回该作业ID。如果有多个,它将仅返回最新的一个(当前正在运行的一个)。
在定义PROJECT
和BUCKET
变量之后,通过以下步骤暂存该模板:
python script.py \
--runner DataflowRunner \
--project $PROJECT \
--staging_location gs://$BUCKET/staging \
--temp_location gs://$BUCKET/temp \
--template_location gs://$BUCKET/templates/retrieve_job_id
然后,在执行模板作业时指定所需的作业名称(在我的情况下为
myjobprefix
):gcloud dataflow jobs run myjobprefix \
--gcs-location gs://$BUCKET/templates/retrieve_job_id
retrieve_job_id
函数将从作业中返回作业ID,更改job_prefix
以匹配给定的名称。import argparse, logging, re
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def retrieve_job_id(element):
project = 'PROJECT_ID'
job_prefix = "myjobprefix"
location = 'us-central1'
logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))
try:
credentials = GoogleCredentials.get_application_default()
dataflow = build('dataflow', 'v1b3', credentials=credentials)
result = dataflow.projects().locations().jobs().list(
projectId=project,
location=location,
).execute()
job_id = "none"
for job in result['jobs']:
if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
job_id = job['id']
break
logging.info("Job ID: {}".format(job_id))
return job_id
except Exception as e:
logging.info("Error retrieving Job ID")
raise KeyError(e)
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| 'Start' >> beam.Create(["Init pipeline"])
| 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))
p.run()
if __name__ == '__main__':
run()