我正在编写一个 Airflow DAG 来从 API 中提取数据并将其存储在我拥有的数据库中。遵循 We're All Using Airflow Wrong 中概述的最佳实践,我将 DAG 编写为一系列 KubernetesPodOperator ,这些 execution_date 运行非常简单的 Python 函数作为 Docker 镜像的入口点。

我试图解决的问题是这个 DAG 应该只为 PythonOperator 提取数据。

如果我使用的是 provide_context ( doc ),我可以使用 provide_context 参数使执行日期对函数可用。但是从 the KubernetesPodOperator's documentation 来看,Kubernetes operator 似乎没有参数可以做 arguments 所做的事情。

我最好的猜测是你可以使用 sys.argv 命令传入一个日期范围,因为它是模板化的,你可以像这样引用它:

my_pod_operator = KubernetesPodOperator(
    # ... other args here
    arguments=['python', 'my_script.py', '{{ ds }}'],
    # arguments continue
)

然后,您将获得开始日期,就像使用 ojit_code 将任何其他参数提供给作为脚本运行的 Python 文件一样。

这是正确的做法吗?

谢谢您的帮助。

最佳答案

是的,这是正确的做法。

每个 Operator 都会有 template_fieldstemplate_fields 中列出的所有参数都可以渲染 Jinja2 模板和 Airflow 宏。

对于 KubernetesPodOperator,如果您检查 docs ,您会发现:

template_fields = ['cmds', 'arguments', 'env_vars', 'config_file']

这意味着您可以将 '{{ ds }}' 传递给上面列出的四个参数中的任何一个。

关于kubernetes - 如何在 `KubernetesPodOperator` 中引用 DAG 的执行日期?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/55796959/

10-16 23:44