我有一个简单的DAG

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(dag_id='my_dags.my_dag') as dag:

    start = DummyOperator(task_id='start')

    end = DummyOperator(task_id='end')
    sql = """
             SELECT *
             FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table20180524'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})
    start >> bq_query >> end

执行bq_query任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我仅将destination_dataset_table更改为my_dataset.my_table$20180524。执行bq_task时出现以下错误:
Partitioning specification must be provided in order to create partitioned table

如何指定BigQuery将查询结果保存到每日分区表中?我的第一个猜测是在query_params中使用BigQueryOperator但我没有找到有关如何使用该参数的任何示例。

编辑:

我正在使用google-cloud==0.27.0 python客户端...这是Prod中使用的一个:(

最佳答案

您首先需要创建一个空的分区目标表。请按照以下说明进行操作:link创建一个空的分区表

然后再次在 Airflow 管道下方运行。
您可以尝试以下代码:

import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    sql = """
         SELECT *
         FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                        destination_dataset_table={{ params.t_name }}),
                        task_id='bq_query',
                        bigquery_conn_id='my_bq_connection',
                        use_legacy_sql=False,
                        write_disposition='WRITE_TRUNCATE',
                        create_disposition='CREATE_IF_NEEDED',
                        query_params={'t_name': table_name},
                        dag=dag
                        )
start >> bq_query >> end

因此,我要做的是创建了一个动态表名变量并将其传递给BQ运算符。

关于google-bigquery - Airflow BigQueryOperator : how to save query result in a partitioned Table?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/50505067/

10-12 16:55