我有一个简单的DAG
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table20180524'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={})
start >> bq_query >> end
执行
bq_query
任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我仅将destination_dataset_table
更改为my_dataset.my_table$20180524
。执行bq_task
时出现以下错误:Partitioning specification must be provided in order to create partitioned table
如何指定BigQuery将查询结果保存到每日分区表中?我的第一个猜测是在
query_params
中使用BigQueryOperator
但我没有找到有关如何使用该参数的任何示例。编辑:
我正在使用
google-cloud==0.27.0
python客户端...这是Prod中使用的一个:( 最佳答案
您首先需要创建一个空的分区目标表。请按照以下说明进行操作:link创建一个空的分区表
然后再次在 Airflow 管道下方运行。
您可以尝试以下代码:
import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table={{ params.t_name }}),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={'t_name': table_name},
dag=dag
)
start >> bq_query >> end
因此,我要做的是创建了一个动态表名变量并将其传递给BQ运算符。
关于google-bigquery - Airflow BigQueryOperator : how to save query result in a partitioned Table?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50505067/