问题描述
我在从已弃用的 BigQueryOperator
转换为 BigQueryInsertJobOperator
时遇到了一些问题.我有以下任务:
I'm having some issue converting from the deprecated BigQueryOperator
to BigQueryInsertJobOperator
. I have the below task:
bq_extract = BigQueryInsertJobOperator(
dag="big_query_task,
task_id='bq_query',
gcp_conn_id='google_cloud_default',
params={'data': Utils().querycontext},
configuration={
"query": {"query": "{% include 'sql/bigquery.sql' %}", "useLegacySql": False,
"writeDisposition": "WRITE_TRUNCATE", "destinationTable": {"datasetId": bq_dataset}}
})
我的 bigquery_extract.sql 查询中的这一行抛出错误:
this line in my bigquery_extract.sql query is throwing the error:
{% for field in data.bq_fields %}
我想使用 params 中的 'data'
,它正在调用一个方法,这个方法从一个 .json 文件中读取:
I want to use 'data'
from params, which is calling a method, this method is reading from a .json file:
class Utils():
bucket = Variable.get('s3_bucket')
_qcontext = None
@property
def querycontext(self):
if self._qcontext is None:
self.load_querycontext()
return self._qcontext
def load_querycontext(self):
with open(path.join(conf.get("core", "dags"), 'traffic/bq_query.json')) as f:
self._qcontext = json.load(f)
bq_query.json
就是这种格式,我需要使用嵌套的 bq_fields
列表值:
the bq_query.json
is this format, and I need to use the nested bq_fields
list values:
{
"bq_fields": [
{ "name": "CONCAT(ID, '-', CAST(ID AS STRING), "alias": "new_id" },
{ "name": "TIMESTAMP(CAST(visitStartTime * 1000 AS INT64)", "alias": "new_timestamp" },
{ "name": "TO_JSON_STRING(hits.experiment)", "alias": "hit_experiment" }]
}
这个文件有一个我想在上面提到的查询行中使用的列表,但它抛出了这个错误:
this file has a list which I want to use in the above mentioned query line, but its throwing this error:
jinja2.exceptions.UndefinedError: 'data' 未定义
推荐答案
您的代码有两个问题.
第一个参数"不是 BigQueryInsertJobOperator 中支持的字段.请参阅这篇文章,其中我发布了如何在使用 BigQueryInsertJobOperator 时将参数传递给 sql 文件.如何在 Airflow 中使用 BigQueryInsertJobOperator 传递变量
First "params" is not a supported field in BigQueryInsertJobOperator. See this post where I post how to pass params to sql file when using BigQueryInsertJobOperator. How do you pass variables with BigQueryInsertJobOperator in Airflow
其次,如果您碰巧遇到找不到文件的错误,请确保设置文件的完整路径.从本地测试迁移到云时,我不得不这样做,即使文件在同一目录中.您可以使用以下示例在 dag 配置中设置路径(用您的路径替换路径):
Second, if you happen to get an error that your file cannot be found, make sure you set the full path of your file. I have had to do this when migrating from local testing to the cloud, even though file is in same directory. You can set the path in the dag config with example below(replace path with your path):
with DAG(
...
template_searchpath = '/opt/airflow/dags',
...
) as dag:
这篇关于Airflow BigQueryInsertJobOperator 配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!