一、迟到的事实简介

数据仓库通常建立于一种理想的假设情况下,这就是数据仓库的度量(事实记录)与度量的环境(维度记录)同时出现在数据仓库中。当同时拥有事实记录和正确的当前维度行时,就能够从容地首先维护维度键,然后在对应的事实表行中使用这些最新的键。然而,各种各样的原因会导致需要ETL系统处理迟到的事实数据。例如,某些线下的业务,数据进入操作型系统的时间会滞后于事务发生的时间。再或者出现某些极端情况,如源数据库系统出现故障,直到恢复后才能补上故障期间产生的数据。
        在销售订单示例中,晚于订单日期进入源数据的销售订单可以看做是一个迟到事实的例子。销售订单数据被装载进其对应的事实表时,装载日期晚于销售订单产生的日期,因此是一个迟到的事实。本例中因为定期装载的是前一天的数据,所以这里的“晚于”指的是事务数据延迟两天及其以上才到达ETL系统。
        必须对标准的ETL过程进行特殊修改以处理迟到的事实。首先,当迟到度量事件出现时,不得不反向搜索维度表历史记录,以确定事务发生时间点的有效的维度代理键,因为当前的维度内容无法匹配输入行的情况。此外,还需要调整后续事实行中的所有半可加度量,例如,由于迟到的事实导致客户当前余额的改变。迟到事实可能还会引起周期快照事实表的数据更新,如果2017年5月的销售订单金额已经计算并存储在month_end_sales_order_fact快照表中,这时一个迟到的5月订单在6月某天被装载,那么2017年5月的快照金额必须因迟到事实而重新计算。
        下面就以销售订单数据仓库为例,说明如何处理迟到的事实。

二、修改数据仓库表结构

在“HAWQ取代传统数仓实践(十三)——事实表技术之周期快照”中建立的月销售周期快照表,其数据源自已经处理过的销售订单事务事实表。因此为了确定事实表中的一条销售订单记录是否是迟到的,需要把源数据中的登记日期列装载进销售订单事实表。为此在要销售订单事实表上添加登记日期代理键列。为了获取登记日期代理键的值,还要使用维度角色扮演技术添加登记日期维度表。
        执行下面的脚本在销售订单事实表里添加名为entry_date_sk的日期代理键列,并且从日期维度表创建一个叫做v_entry_date_dim的数据库视图。

set search_path=tds;

-- 给销售订单事实表增加登记日期代理键
alter table sales_order_fact add column entry_date_sk int default null;
comment on column sales_order_fact.entry_date_sk is '登记日期代理键';

-- 建立登记日期维度视图
create view v_entry_date_dim
(entry_date_sk, entry_date, month_name, month, quarter, year)
as
select date_sk, date, month_name, month, quarter, year
  from date_dim;

三、修改定期数据装载函数

在创建了登记日期维度视图,并给销售订单事实表添加了登记日期代理键列以后,需要修改数据仓库定期装载脚本来装载登记日期。修改后的装载函数如下。注意sales_order源数据表及其对应的过渡表中都已经含有登记日期,只是以前没有将其装载进数据仓库。

create or replace function fn_regular_load ()
returns void as
$$
declare
    -- 设置scd的生效时间
    v_cur_date date := current_date;
    v_pre_date date := current_date - 1;
    v_last_load date;
begin
    -- 分析外部表
    analyze ext.customer;
    analyze ext.product;
    analyze ext.sales_order;

    -- 将外部表数据装载到原始数据表
    truncate table rds.customer;
    truncate table rds.product;

    insert into rds.customer select * from ext.customer;
    insert into rds.product select * from ext.product;
    insert into rds.sales_order
    select order_number,
           customer_number,
           product_code,
           status_date,
           entry_date,
           order_amount,
           quantity,
           request_delivery_date,
           verification_ind,
           credit_check_flag,
           new_customer_ind,
           web_order_flag,
           order_status
      from ext.sales_order;

    -- 分析rds模式的表
    analyze rds.customer;
    analyze rds.product;
    analyze rds.sales_order;

    -- 设置cdc的上限时间
    select last_load into v_last_load from rds.cdc_time;
    truncate table rds.cdc_time;
    insert into rds.cdc_time select v_last_load, v_cur_date;

    -- 装载客户维度
    insert into tds.customer_dim
    (customer_number,
     customer_name,
     customer_street_address,
     shipping_address,
     isdelete,
     version,
     effective_date)
    select case flag
                when 'D' then a_customer_number
                else b_customer_number
            end customer_number,
           case flag
                when 'D' then a_customer_name
                else b_customer_name
            end customer_name,
           case flag
                when 'D' then a_customer_street_address
                else b_customer_street_address
            end customer_street_address,
           case flag
                when 'D' then a_shipping_address
                else b_shipping_address
            end shipping_address,
           case flag
                when 'D' then true
                else false
            end isdelete,
           case flag
                when 'D' then a_version
                when 'I' then 1
                else a_version + 1
            end v,
           v_pre_date
      from (select a.customer_number a_customer_number,
                   a.customer_name a_customer_name,
                   a.customer_street_address a_customer_street_address,
                   a.shipping_address a_shipping_address,
                   a.version a_version,
                   b.customer_number b_customer_number,
                   b.customer_name b_customer_name,
                   b.customer_street_address b_customer_street_address,
                   b.shipping_address b_shipping_address,
                   case when a.customer_number is null then 'I'
                        when b.customer_number is null then 'D'
                        else 'U'
                    end flag
              from v_customer_dim_latest a
              full join rds.customer b on a.customer_number = b.customer_number
             where a.customer_number is null -- 新增
                or b.customer_number is null -- 删除
                or (a.customer_number = b.customer_number
                    and not
                           (coalesce(a.customer_name,'') = coalesce(b.customer_name,'')
                        and coalesce(a.customer_street_address,'') = coalesce(b.customer_street_address,'')
                        and coalesce(a.shipping_address,'') = coalesce(b.shipping_address,'')
                        ))) t
             order by coalesce(a_customer_number, 999999999999), b_customer_number limit 999999999999;

    -- 装载产品维度
    insert into tds.product_dim
    (product_code,
     product_name,
     product_category,
     isdelete,
     version,
     effective_date)
    select case flag
                when 'D' then a_product_code
                else b_product_code
            end product_code,
           case flag
                when 'D' then a_product_name
                else b_product_name
            end product_name,
           case flag
                when 'D' then a_product_category
                else b_product_category
            end product_category,
           case flag
                when 'D' then true
                else false
            end isdelete,
           case flag
                when 'D' then a_version
                when 'I' then 1
                else a_version + 1
            end v,
           v_pre_date
      from (select a.product_code a_product_code,
                   a.product_name a_product_name,
                   a.product_category a_product_category,
                   a.version a_version,
                   b.product_code b_product_code,
                   b.product_name b_product_name,
                   b.product_category b_product_category,
                   case when a.product_code is null then 'I'
                        when b.product_code is null then 'D'
                        else 'U'
                    end flag
              from v_product_dim_latest a
              full join rds.product b on a.product_code = b.product_code
             where a.product_code is null -- 新增
                or b.product_code is null -- 删除
                or (a.product_code = b.product_code
                    and not
                           (a.product_name = b.product_name
                        and a.product_category = b.product_category))) t
             order by coalesce(a_product_code, 999999999999), b_product_code limit 999999999999;

    -- 装载新增产品数量无事实事实表
    insert into tds.product_count_fact
    select a.product_sk, b.date_sk
      from tds.product_dim a, tds.date_dim b
     where a.version = 1
       and a.effective_date = v_pre_date
       and a.effective_date = b.date;

    -- 装载销售订单事实表
    insert into sales_order_fact
    select a.order_number,
           customer_sk,
           product_sk,
           e.date_sk,
           e.year * 100 + e.month,
           order_amount,
           quantity,
           f.date_sk,
           g.sales_order_attribute_sk,
           h.customer_zip_code_sk,
           i.shipping_zip_code_sk,
           a.order_status,
           l.entry_date_sk
      from rds.sales_order a,
           v_customer_dim_his c,
           v_product_dim_his d,
           date_dim e,
           date_dim f,
           sales_order_attribute_dim g,
           v_customer_zip_code_dim h,
           v_shipping_zip_code_dim i,
           rds.customer j,
           rds.cdc_time k,
           v_entry_date_dim l
     where a.customer_number = c.customer_number
       and a.status_date >= c.effective_date
       and a.status_date < c.expiry_date
       and a.product_code = d.product_code
       and a.status_date >= d.effective_date
       and a.status_date < d.expiry_date
       and date(a.status_date) = e.date
       and date(a.request_delivery_date) = f.date
       and date(a.entry_date) = l.entry_date
       and a.verification_ind = g.verification_ind
       and a.credit_check_flag = g.credit_check_flag
       and a.new_customer_ind = g.new_customer_ind
       and a.web_order_flag = g.web_order_flag
       and a.customer_number = j.customer_number
       and j.customer_zip_code = h.customer_zip_code
       and j.shipping_zip_code = i.shipping_zip_code
       and a.entry_date >= k.last_load and a.entry_date < k.current_load;

    -- 重载PA客户维度
    truncate table pa_customer_dim;
    insert into pa_customer_dim
    select distinct a.*
      from customer_dim a,
           sales_order_fact b,
           v_customer_zip_code_dim c
     where c.customer_state = 'pa'
       and b.customer_zip_code_sk = c.customer_zip_code_sk
       and a.customer_sk = b.customer_sk;

    -- 分析tds模式的表
    analyze customer_dim;
    analyze product_dim;
    analyze sales_order_fact;
    analyze pa_customer_dim;

    -- 更新时间戳表的last_load字段
    truncate table rds.cdc_time;
    insert into rds.cdc_time select v_cur_date, v_cur_date;

end;
$$
language plpgsql;   

在装载脚本中使用销售订单过渡表的状态日期字段限定当时的维度代理键。例如,为了获取事务发生时的客户代理键,筛选条件为:

status_date >= v_customer_dim_his.effective_date and status_date < v_customer_dim_his.expiry_date

之所以可以这样做,原因在于本示例满足以下两个前提条件:在最初源数据库的销售订单表中,status_date存储的是状态发生时的时间;维度的生效时间与过期时间构成一条连续且不重叠的时间轴,任意status_date日期只能落到唯一的生效时间、过期时间区间内。

四、修改装载周期快照事实表的函数

HAWQ取代传统数仓实践(十三)——事实表技术之周期快照”中创建的fn_month_sum函数用于装载月销售周期快照事实表。迟到的事实记录会对周期快照中已经生成的月销售汇总数据产生影响,因此必须做适当的修改。
        月销售周期快照表存储的是某月某产品汇总的销售数量和销售金额,表中有年月、产品代理键、销售金额、销售数量四个字段。由于迟到事实的出现,需要将事务事实表中的数据划分为两类:上月的周期快照和更早的周期快照。

fn_month_sum函数先删除在生成上个月的汇总数据再重新生成,此时上月的迟到数据可以正确汇总。对于上上个月或更早的迟到数据,需要将迟到的数据累加到已有的周期快照上。下面修改fn_month_sum函数,使之能够自动处理任意时间的迟到事实数据。HAWQ不能行级更新或删除数据,因此为了实现所谓的幂等操作,需要标识出迟到事实记录对应的事实表逻辑主键,在重复执行周期快照装载函数时过滤掉已经装载过的迟到数据。

1. 给周期快照事实表增加事务事实表的逻辑主键

alter table month_end_sales_order_fact add order_number bigint default null;

正常数据(非迟到)对应的order_number字段值为空。

2. 修改周期快照事实表装载函数

create or replace function tds.fn_month_sum(p_year_month int)
returns void as
$$
declare
    sqlstring varchar(1000);
begin
    -- 幂等操作,先删除上月数据
    sqlstring := 'truncate table month_end_sales_order_fact_1_prt_p' || cast(p_year_month as varchar);
    execute sqlstring;

    -- 插入上月销售汇总数据
    insert into month_end_sales_order_fact
    select t1.year_month,
           t2.product_sk,
           coalesce(t2.month_order_amount,0),
           coalesce(t2.month_order_quantity,0),
           null
      from (select p_year_month year_month) t1
      left join (select year_month, product_sk, sum(order_amount) month_order_amount, sum(quantity) month_order_quantity
                   from sales_order_fact
                  where year_month = p_year_month and coalesce(order_status,'N') = 'N'
                  group by year_month,product_sk) t2
           on t1.year_month = t2.year_month;

    -- 装载迟到的数据
    insert into month_end_sales_order_fact
    select year_month, product_sk, order_amount, quantity, order_number
      from (select t1.year_month,
                   t1.product_sk,
                   t1.order_amount,
                   t1.quantity,
                   t1.order_number
              from sales_order_fact t1, v_entry_date_dim t2
             where coalesce(t1.entry_date_sk, t1.status_date_sk) = t2.entry_date_sk
               and t2.year*100 + t2.month = p_year_month
               and t1.year_month < p_year_month
               and coalesce(t1.order_status,'N') = 'N'
               and not exists (select 1 from month_end_sales_order_fact t3
                                where t1.order_number = t3.order_number)
             ) t1;

end;
$$
language plpgsql;

说明:

  • t2.year*100 + t2.month = p_year_month and t1.year_month < p_year_month 处理上个月之前的迟到数据;
  • not exists (select 1 from month_end_sales_order_fact t3 where t1.order_number = t3.order_number) 处理尚未装载的迟到数据,用于实现幂等操作。

3. 建立视图进行二次汇总

create view v_month_end_sales_order_fact as
select year_month, product_sk, sum(month_order_amount) month_order_amount, sum(month_order_quantity) month_order_quantity
  from month_end_sales_order_fact
 group by year_month, product_sk;

五、测试

在执行定期装载前使用下面的语句查询month_end_sales_order_fact表。之后可以对比‘前’(不包含迟到事实)‘后’(包含了迟到事实)的数据,以确认装载的正确性。

select year_month,
       product_name,
       month_order_amount amt,
       month_order_quantity qty
  from month_end_sales_order_fact a,
       product_dim b
 where a.product_sk = b.product_sk
   and year_month = cast(extract(year from current_date - interval '1 month') * 100
                  + extract(month from current_date - interval '1 month') as int)
order by year_month, product_name;

查询结果如图1所示。

HAWQ取代传统数仓实践(十六)——事实表技术之迟到的事实-LMLPHP
图1

下一步执行下面的脚本准备销售订单测试数据。此脚本将三个销售订单装载进销售订单源数据,一个是迟到的在month_end_sales_order_fact中已存在的产品,一个是迟到的在month_end_sales_order_fact中不存在的产品,另一个是非迟到的正常产品。这里需要注意,产品维度是SCD2处理的,所以在添加销售订单时,新增订单时间一定要在产品维度的生效与过期时间区间内。

use source;

-- 迟到已存在
set @order_date := from_unixtime(unix_timestamp('2017-05-10') + rand() * (unix_timestamp('2017-05-11') - unix_timestamp('2017-05-10')));
set @request_delivery_date := from_unixtime(unix_timestamp(date_add(@order_date, interval 5 day)) + rand() * 86400);
set @entry_date := from_unixtime(unix_timestamp('2017-06-07') + rand() * (unix_timestamp('2017-06-08') - unix_timestamp('2017-06-07')));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);

insert into source.sales_order values
  (null, 143, 6, 2, 'y', 'y', 'y', 'y',  @order_date, 'N', @request_delivery_date,
        @entry_date, @amount, @quantity);

-- 迟到不存在
set @order_date := from_unixtime(unix_timestamp('2017-05-10') + rand() * (unix_timestamp('2017-05-11') - unix_timestamp('2017-05-10')));
set @request_delivery_date := from_unixtime(unix_timestamp(date_add(@order_date, interval 5 day)) + rand() * 86400);
set @entry_date := from_unixtime(unix_timestamp('2017-06-07') + rand() * (unix_timestamp('2017-06-08') - unix_timestamp('2017-06-07')));
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);

insert into source.sales_order values
  (null, 144, 6, 3, 'y', 'y', 'y', 'y',  @order_date, 'N', @request_delivery_date,
        @entry_date, @amount, @quantity);

-- 非迟到
set @entry_date := from_unixtime(unix_timestamp('2017-06-07') + rand() * (unix_timestamp('2017-06-08') - unix_timestamp('2017-06-07')));
set @request_delivery_date := from_unixtime(unix_timestamp(date_add(@entry_date, interval 5 day)) + rand() * 86400);
set @amount := floor(1000 + rand() * 9000);
set @quantity := floor(10 + rand() * 90);

insert into source.sales_order values
  (null, 145, 12, 4, 'y', 'y', 'y', 'y',  @entry_date, 'N', @request_delivery_date,
        @entry_date, @amount, @quantity);

commit;

新增订单数据如图2所示。

HAWQ取代传统数仓实践(十六)——事实表技术之迟到的事实-LMLPHP
图2

执行定期装载脚本。

~/regular_etl.sh

现在已经准备好运行修改后的月底快照装载。手工执行下面的命令执行月底销售订单事实表装载函数导入2017年5月的快照。

su - gpadmin -c 'export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp3 -c "set search_path=tds;select fn_month_sum(cast(extract(year from current_date - interval '\''1 month'\'') * 100 + extract(month from current_date - interval '\''1 month'\'') as int))"'

执行相同的查询获取包含了迟到事实月底销售订单数据。查询结果如图3所示。

select year_month,
       product_name,
       month_order_amount amt,
       month_order_quantity qty
  from v_month_end_sales_order_fact a,
       product_dim b
 where a.product_sk = b.product_sk
   and year_month = cast(extract(year from current_date - interval '1 month') * 100
                  + extract(month from current_date - interval '1 month') as int)
order by year_month, product_name;
HAWQ取代传统数仓实践(十六)——事实表技术之迟到的事实-LMLPHP
图3

对比‘前’‘后’查询的结果可以看到:

  • 2017年5月Floppy Drive的销售金额已经从52083变为57707,这是由于迟到的产品销售订单增加了5624的销售金额。销售数量也相应的增加了。
  • 2017年5月的LCD Panel(也是迟到的产品)被添加。
05-11 15:47