我们正在考虑使用气流来代替当前基于自定义rq的工作流程,但我不确定设计它的最佳方法。或者使用气流是否合理。
用例是:
例如
数据上传后,我们将一个项目放在队列中:
上载:
user: 'a'
data:
- type: datatype1
start: 1
end: 3
- type: datatype2
start: 2
end: 3
这将触发:
然后也许job1将有一些清理作业在其后运行。
(如果能够将作业限制为仅在没有其他作业正在为同一用户运行时运行,那将是很好的。)
我考虑过的方法:
1。
当数据上传到达消息队列时触发DAG。
然后,此DAG确定要运行哪些从属作业,并将用户和时间范围作为参数(或xcom)传递。
2。
当数据上传到达消息队列时触发DAG。
然后,此DAG根据用户和时间范围内的数据类型和模板为作业动态创建DAGS。
因此,您可以获得每个用户,工作,时间范围组合的动态DAG。
我什至不知道如何从消息队列中触发DAG ...而且很难找到类似于此用例的示例。也许是因为气流不合适?
任何帮助/想法/建议将不胜感激。
谢谢。
最佳答案
气流是围绕基于时间的时间表建立的。它不是为触发基于数据着陆的运行而构建的。也有其他系统设计用来执行此操作。我听到了类似pachyderm.io或dvs.org的声音。甚至重新配置CI工具或自定义Jenkins设置也可能基于文件更改事件或消息队列触发。
但是,您可以通过让外部队列侦听器使用rest API calls to Airflow触发DAG来尝试使用Airflow。例如,如果队列是AWS SNS队列,则可以使用简单的Python使用AWS Lambda侦听器来执行此操作。
我建议每种作业类型(或者是用户,以较少者为准)一个DAG,触发逻辑根据队列确定它是正确的。如果存在常见的清理任务等,则DAG可能会使用TriggerDagRunOperator启动这些清理任务,或者您可能只是拥有每个DAG包含的清理任务的 public 库。我认为后者更干净。
DAG可以将其任务限制为某些池。您可以为每个用户创建一个池,以限制每个用户的作业运行。或者,如果每个用户都有DAG,则可以将该DAG的最大并发DAG运行设置为合理。
关于queue - Airflow DAG设计,以用户为中心的工作流程,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53744711/