问题描述
我想动态获取gcs存储桶中的csv文件列表,然后将每个文件转储到相应的BQ表中。
I would like to dynamically get the list of csv files on gcs bucket and then dump each one to a corresponding BQ table.
我正在使用 GoogleCloudStorageListOperator 和 GoogleCloudStorageToBigQueryOperator 运算符
GCS_Files = GoogleCloudStorageListOperator(
task_id='GCS_Files',
bucket=cf.storage.import_bucket_name,
prefix='20190701/',
delimiter='.csv',
dag=dag
)
for idx, elem in enumerate(["{{ task_instance.xcom_pull(task_ids='GCS_Files') }}"]):
storage_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id='storage_to_bigquery',
bucket=cf.storage.import_bucket_name,
create_disposition='CREATE_IF_NEEDED',
autodetect=True,
destination_project_dataset_table=f"{cf.project}.{cf.bigquery.core_dataset_name}.{idx}",
skip_leading_rows=1,
source_format='CSV',
source_objects=[f'{elem}'],
write_disposition='WRITE_TRUNCATE',
dag=dag
)
storage_to_bigquery.set_upstream(GCS_Files)
但是该列表一次不能迭代一次,并抛出以下错误。
However the list fails to iterate one at a time throwing the below error.
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs?alt=json returned "Source URI must not contain the ',' character: gs://mybucket/['20190701/file0.csv', '20190701/file1.csv', '20190701/file2.csv']">
有什么建议吗?
推荐答案
您不能在代码的任何地方调用宏。
这在您的代码中被视为一个字符串: {{task_instance.xcom_pull(task_ids ='GCS_Files')}}
后来由Jinja2在gcp运算符中传递时进行了评估,因为您使用的是模板字段:
You cannot call macro from everywhere in your code.This is seen as a string in your code: "{{ task_instance.xcom_pull(task_ids='GCS_Files') }}"And later evaluated by Jinja2 when passed in the gcp operator because you are using a templated field: https://github.com/apache/airflow/blob/21a7e7ec67ac7a391d837aa7c13c0825683f1349/airflow/contrib/operators/gcs_to_bq.py#L140
要能够调用task_instance.xcom_pull,您需要具有上下文,该上下文只能存在于DAG运行中。当Airflow懒惰地评估DAG时,此功能不可用。
To be able to call task_instance.xcom_pull, you need to have a context, which can only exist in a DAG run. When Airflow lazily evaluates the DAG, this is not available.
在您的情况下,最好的方法是使用SubDAG来循环使用您的宏来生成操作符要循环的文件列表:
In your case, the best would be to use a SubDAG to loop over your Operator using the your macro to generate the list of file to loop over: https://airflow.apache.org/concepts.html#subdags
这篇关于气流无法使用GoogleCloud Operatos遍历xcom_pull列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!