本文介绍了如何使用ON_ERROR=CONTINUE在SnowPipe中高效解析JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在设置一个雪花管道,将数据从S3存储桶加载到雪花模式。S3包含NDJOSN格式的文件。一个文件可以包含多个记录,我想处理所有记录。即使有一项记录被打破。
为此,我需要在管道创建中添加on_error='continue'
选项,并使用雪花官方文档here中所述的CSV文件格式。
导致此COPY语句:
copy into MY_TABLE
from (select parse_json($1):id, parse_json($1):name, parse_json($1):status
from @MY_STAGE_CONNECTED_TO_S3)
on_error = 'continue'
此代码需要为每行分析3次json。
我有一个包含约40列的表,因此,此查询的速度比使用文件格式选项解析JSON的更简单的解决方案慢约5倍,但不幸的是它不支持on_error=continue
选项。
copy into HQO_DEVELOPMENT.PUBLIC.DIM_TENANT
from (select $1:id, $1:name, $1:status
from @HQO_DEVELOPMENT.PUBLIC.DIM_TENANT_STAGE_NN)
file_format = (type = 'json')
尝试的内容
- 不支持像这样使用嵌套SELECT::
copy into HQO_DEVELOPMENT.PUBLIC.DIM_TENANT from
(select $1:id, $1:name, $1:status from (
select parse_json($1) from
@HQO_DEVELOPMENT.PUBLIC.DIM_TENANT_STAGE))
on_error = 'continue'
- 在舞台上使用JSON类型,在管道上省略:无济于事
有没有办法利用on_error=continue
而不为每一列分析JSON?
推荐答案
Snowflake文档说明:csv和半结构化文件类型都支持;但是,即使在加载半结构化数据(例如json)时,也应该将csv设置为文件格式类型(默认值)。您可以使用相应的文件格式(例如JSON),但是转换中的任何错误都将停止复制操作,即使您将ON_ERROR选项设置为继续或跳过该文件也是如此。
https://docs.snowflake.net/manuals/sql-reference/sql/copy-into-table.html
另一方面,我看到ON_ERROR选项适用于NDJSON文件,至少当您在Stage级别设置文件类型时是这样。用于测试,我创建了以下NDJSON文件用于测试:
{ "id":1, "name":"Gokhan", "location":"Netherlands" }
{ "id":2, "name":"Hans", location:Germany -- broken json #1 }
{ "id":3, "name":"Joe", "location":"UK" }
{ broken json #2 }
{ "id":4, "name":"Mike", "location":"US" }
我创建了一个文件类型对象,并使用此文件类型创建了Stage(您可以更改现有Stage并设置文件类型):
CREATE FILE FORMAT myformat TYPE = json;
CREATE STAGE mystage FILE_FORMAT = (FORMAT_NAME = myformat);
我将示例NDJSON文件上传到此阶段,创建了一个雪斗来加载它:
CREATE PIPE mypipe AS
COPY INTO mytable
FROM (SELECT $1:id, $1:name, $1:location FROM @mystage)
ON_ERROR = CONTINUE;
当我刷新管道时,它成功地从文件加载了"有效"(3)记录。
这篇关于如何使用ON_ERROR=CONTINUE在SnowPipe中高效解析JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!