本文介绍了雪花中如何将增量更新数据转换为结构化表格的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下来自S3 Avro格式的增量交易数据。

{
  "after": {
    "COM_PCT": null,
    "DEPT_ID": 30,
    "EMAIL": "AKHOO",
    "EMPLOYEE_ID": 115,
    "FIRST_NAME": "ALEX",
    "LAST_NAME": "TIM",
    "HIRE": "1995-05-18 00:00:00",
    "MANAGER_ID": 114
  },
  "before": {},
  "current_ts": "2018-05-18 00:00:00:00",
  "op_ts": "2018-05-18 00:00:00:00",
  "op_type": "I",
  "pos": "00000000001123",
  "primary_keys": ["EMPLOYEE_ID"],
  "table": "HR.EMPLOYEE"
},
{
    "after": {
      "COM_PCT": null,
      "DEPT_ID": 11,
      "EMAIL": "AKHOO",
      "EMPLOYEE_ID": 115,
      "FIRST_NAME": "ALEX",
      "LAST_NAME": "TIM",
      "HIRE": "1995-05-18 00:00:00",
      "MANAGER_ID": 114
    },
    "before": {},
    "current_ts": "2018-05-19 00:00:00:00",
    "op_ts": "2018-05-19 00:00:00:00",
    "op_type": "U",
    "pos": "00000000001124",
    "primary_keys": ["EMPLOYEE_ID"],
    "table": "HR.EMPLOYEE"
  },
  {
      "after": {
        "COM_PCT": null,
        "DEPT_ID": 30,
        "EMAIL": "AKHOO",
        "EMPLOYEE_ID": 115,
        "FIRST_NAME": "ALEX",
        "LAST_NAME": "TIM",
        "HIRE": "1995-05-18 00:00:00",
        "MANAGER_ID": 114
      },
      "before": {},
      "current_ts": "2018-05-20 00:00:00:00",
      "op_ts": "2018-05-20 00:00:00:00",
      "op_type": "U",
      "pos": "00000000001125",
      "primary_keys": ["EMPLOYEE_ID"],
      "table": "HR.EMPLOYEE"
    }

第一个事务是同一主键的插入事务,第二个是更新事务,我无法使用流管道处理增量更新,是否有办法将其转换为结构化表格,并仅显示该主键的最新插入/更新事务?

推荐答案

我假设该文件只包含一个表的更改,如果不包含,则您只需要对特定表的更改进行过滤操作。我还假设数据是表中的VARIANT类型,但这对您如何提取数据的解决方案没有影响。

我建议此解决方案:

  • 一开始,您应该使用QUALIFY函数并将数据过滤到记录的最新版本。
  • 然后您可以执行合并操作以插入或更新记录。
  • 如果您的数据还允许删除操作,则它们应该包含在代码中。

示例数据:

CREATE OR REPLACE TABLE SAMPLE_RAW (samples variant);

INSERT INTO SAMPLE_RAW
SELECT parse_json('{
    "after": {
      "COM_PCT": null,
      "DEPT_ID": 30,
      "EMAIL": "AKHOO",
      "EMPLOYEE_ID": 115,
      "FIRST_NAME": "ALEX",
      "LAST_NAME": "TIM",
      "HIRE": "1995-05-18 00:00:00",
      "MANAGER_ID": 114
    },
    "before": {},
    "current_ts": "2018-05-18 00:00:00:00",
    "op_ts": "2018-05-18 00:00:00:00",
    "op_type": "I",
    "pos": "00000000001123",
    "primary_keys": ["EMPLOYEE_ID"],
    "table": "HR.EMPLOYEE"
  }')
UNION ALL
SELECT parse_json('{
      "after": {
        "COM_PCT": null,
        "DEPT_ID": 11,
        "EMAIL": "AKHOO",
        "EMPLOYEE_ID": 115,
        "FIRST_NAME": "ALEX",
        "LAST_NAME": "TIM",
        "HIRE": "1995-05-18 00:00:00",
        "MANAGER_ID": 114
      },
      "before": {},
      "current_ts": "2018-05-19 00:00:00:00",
      "op_ts": "2018-05-19 00:00:00:00",
      "op_type": "U",
      "pos": "00000000001124",
      "primary_keys": ["EMPLOYEE_ID"],
      "table": "HR.EMPLOYEE"
    }')
UNION ALL
SELECT parse_json('{
        "after": {
          "COM_PCT": null,
          "DEPT_ID": 30,
          "EMAIL": "AKHOO",
          "EMPLOYEE_ID": 115,
          "FIRST_NAME": "ALEX",
          "LAST_NAME": "TIM",
          "HIRE": "1995-05-18 00:00:00",
          "MANAGER_ID": 114
        },
        "before": {},
        "current_ts": "2018-05-20 00:00:00:00",
        "op_ts": "2018-05-20 00:00:00:00",
        "op_type": "U",
        "pos": "00000000001125",
        "primary_keys": ["EMPLOYEE_ID"],
        "table": "HR.EMPLOYEE"
      }');

解决方案:

WITH src AS (
  SELECT TO_TIMESTAMP(s.samples:op_ts::string, 'YYYY-MM-DD HH24:MI:SS:FF') AS op_ts
       , s.samples:op_type::string AS op_type
       , s.samples:after:COM_PCT::string AS COM_PCT
       , s.samples:after:DEPT_ID As DEPT_ID
       , s.samples:after:EMAIL::string As EMAIL
       , s.samples:after:EMPLOYEE_ID As EMPLOYEE_ID
       , s.samples:after:FIRST_NAME::string As FIRST_NAME
       , s.samples:after:LAST_NAME::string AS LAST_NAME
       , TO_TIMESTAMP(s.samples:after:HIRE::string, 'YYYY-MM-DD HH24:MI:SS') As HIRE
       , s.samples:after:MANAGER_ID AS MANAGER_ID
    FROM SAMPLE_RAW AS s
 QUALIFY ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY op_ts DESC) = 1
)
MERGE INTO HR.EMPLOYEE AS trg USING src ON trg.EMPLOYEE_ID = src.EMPLOYEE_ID
WHEN MATCHED AND src.op_type = 'U' THEN UPDATE SET trg.EMAIL = src.EMAIL ...
WHEN MATCHED AND src.op_type = 'D' THEN DELETE
WHEN NOT MATCHED THEN INSERT (EMAIL, FIRST_NAME, LAST_NAME, ...) VALUES (src.EMAIL, src.FIRST_NAME, src.LAST_NAME, ...)

引用:QUALIFYMERGE

这篇关于雪花中如何将增量更新数据转换为结构化表格的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-26 09:07