我需要对具有相同模式的多个数据库(10K)执行SQL查询,并使用airflow将结果插入到单独的主机中。
你知道我应该如何为这类项目以最有效的方式设计DAG吗?
任何帮助都将不胜感激!
最佳答案
气流中每个数据库一个连接。
然后定义这些连接id字符串的列表。
然后在每个连接字符串上为同一任务重复一个任务定义。
例如,使用MysqlOperator(另请参见MssqlOperator或PostgresOperator)
conns = ('db1','db2','db3')
tasks = [MysqlOperator("""
show tables;
""",
task_id="update_" + conn,
mysql_conn_id=conn,
) for conn in conns]