我正在编写一个 Airflow DAG 来从 API 中提取数据并将其存储在我拥有的数据库中。遵循 We're All Using Airflow Wrong 中概述的最佳实践,我将 DAG 编写为一系列 KubernetesPodOperator
,这些 execution_date
运行非常简单的 Python 函数作为 Docker 镜像的入口点。
我试图解决的问题是这个 DAG 应该只为 PythonOperator
提取数据。
如果我使用的是 provide_context
( doc ),我可以使用 provide_context
参数使执行日期对函数可用。但是从 the KubernetesPodOperator's documentation 来看,Kubernetes operator 似乎没有参数可以做 arguments
所做的事情。
我最好的猜测是你可以使用 sys.argv
命令传入一个日期范围,因为它是模板化的,你可以像这样引用它:
my_pod_operator = KubernetesPodOperator(
# ... other args here
arguments=['python', 'my_script.py', '{{ ds }}'],
# arguments continue
)
然后,您将获得开始日期,就像使用 ojit_code 将任何其他参数提供给作为脚本运行的 Python 文件一样。
这是正确的做法吗?
谢谢您的帮助。
最佳答案
是的,这是正确的做法。
每个 Operator 都会有 template_fields
。 template_fields
中列出的所有参数都可以渲染 Jinja2 模板和 Airflow 宏。
对于 KubernetesPodOperator,如果您检查 docs ,您会发现:
template_fields = ['cmds', 'arguments', 'env_vars', 'config_file']
这意味着您可以将
'{{ ds }}'
传递给上面列出的四个参数中的任何一个。关于kubernetes - 如何在 `KubernetesPodOperator` 中引用 DAG 的执行日期?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55796959/