赞
踩
拉链表适用于那些大数据量,并且在字段变化的比例和频率不大的情况下需要查看历史快照信息的场景。
比如说有一张客户表,大约有几千万条记录,几百个字段。那么对于这种表,即使采用ORC压缩,单张表每天的数据存储空间也会超过50GB,在HDFS中使用三备份情况下,存储空间的占用会更大。
那么对于这种表我该如何设计呢?下面有几种方案可选:
首先创建一张测试用的客户信息原表
- CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_src (
- `cust_id` STRING COMMENT '客户编号',
- `phone` STRING COMMENT '手机号码'
- )PARTITIONED BY (
- dt STRING COMMENT 'etldate'
- )STORED AS ORC
- TBLPROPERTIES ("orc.compress"="SNAPPY")
- ;
然后insert一些测试数据
cust_id | phone | dt |
001 | 1111 | 20210601 |
002 | 2222 | 20210601 |
003 | 3333 | 20210601 |
004 | 4444 | 20210601 |
001 | 1111 | 20210602 |
002 | 2222-1 | 20210602 |
003 | 3333 | 20210602 |
004 | 4444-1 | 20210602 |
005 | 5555 | 20210602 |
001 | 1111-1 | 20210603 |
002 | 2222-2 | 20210603 |
003 | 3333 | 20210603 |
004 | 4444-1 | 20210603 |
005 | 5555-1 | 20210603 |
006 | 6666 | 20210603 |
002 | 2222-3 | 20210604 |
003 | 3333 | 20210604 |
004 | 4444-1 | 20210604 |
005 | 5555-1 | 20210604 |
006 | 6666 | 20210604 |
007 | 7777 | 20210604 |
数据简单说明如下:
现在回到正题,拉链表如何设计?
首先,拉链表有两个重要的审计字段:数据生效日期和数据失效日期。顾名思义,数据生效日期记录了这条记录是何时生效的,而数据失效日期则是记录了该条记录的失效时间(9999-12-31表示截至当前一直有效)。那么对数据的操作总共可分为以下几类:
因此拉链表的HQL实现代码如下:
- -- 拉链表建表语句
- CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_dst (
- `cust_id` STRING COMMENT '客户编号',
- `phone` STRING COMMENT '手机号码',
- `s_date` DATE COMMENT '生效时间',
- `e_date` DATE COMMENT '失效时间'
- )STORED AS ORC
- TBLPROPERTIES ("orc.compress"="SNAPPY")
- ;
- -- 拉链表实现代码(含数据回滚刷新)
- INSERT OVERWRITE TABLE datadev.zipper_table_test_cust_dst
- -- part1: 处理新增的、没有变化的记录,以及有变化的记录中的新记录
- select NVL(curr.cust_id, prev.cust_id) as cust_id,
- NVL(curr.phone, prev.phone) as phone,
- -- 没有变化的记录: s_date需要使用之前的
- case when NVL(curr.phone, '') = NVL(prev.phone, '') then prev.s_date
- else NVL(curr.s_date, prev.s_date)
- end as s_date,
- NVL(curr.e_date, prev.e_date) as e_date
- from (
- select cust_id, phone, DATE(from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd')) as s_date, DATE('9999-12-31') as e_date
- from datadev.zipper_table_test_cust_src
- where dt = '${etldate}'
- ) as curr
-
- left join (
- select cust_id, phone, s_date, if(e_date > from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'), DATE('9999-12-31'), e_date) as e_date,
- row_number() over(partition by cust_id order by e_date desc) as r_num -- 取最新状态
- from datadev.zipper_table_test_cust_dst
- where regexp_replace(s_date, '-', '') <= '${etldate}' -- 拉链表历史数据回滚
- ) as prev
- on curr.cust_id = prev.cust_id
- and prev.r_num = 1
-
- union all
-
- -- part2: 处理删除的记录,以及有变化的记录中的旧记录
- select prev_cust.cust_id, prev_cust.phone, prev_cust.s_date,
- case when e_date <> '9999-12-31' then e_date
- else DATE(from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'))
- END as e_date
- from (
- select cust_id, phone, s_date, if(e_date > from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'), DATE('9999-12-31'), e_date) as e_date
- from datadev.zipper_table_test_cust_dst
- where regexp_replace(s_date, '-', '') <= '${etldate}' -- 拉链表历史数据回滚
- ) as prev_cust
-
- left join (
- select cust_id, phone
- from datadev.zipper_table_test_cust_src
- where dt = '${etldate}'
- ) as curr_cust
- on curr_cust.cust_id = prev_cust.cust_id
- -- 只要变化量
- where NVL(prev_cust.phone, '') <> NVL(curr_cust.phone, '')
- ;

4.1 第一天(20210601):将${etldate}替换成20210601,并执行SQL。此时为初始状态,客户信息没有变化量,因此生效日期为2021-06-01,生效日期为9999-12-31(代表当前有效)
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 9999-12-31 |
002 | 2222 | 2021-06-01 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 9999-12-31 |
4.2 第二天(20210602):将${etldate}替换成20210602,并执行SQL。此时原表修改了002和004的手机号码,因此有将会有两条记录,一条记录记录了数据的历史状态,另外一条记录了数据的当前状态。然后原表还新增了005客户,因此此时的数据生效日期为2021-06-02,失效日期为9999-12-31
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 9999-12-31 |
002 | 2222 | 2021-06-01 | 2021-06-02 |
002 | 2222-1 | 2021-06-02 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 2021-06-02 |
004 | 4444-1 | 2021-06-02 | 9999-12-31 |
005 | 5555 | 2021-06-02 | 9999-12-31 |
4.3 第三天(20210603):将${etldate}替换成20210602,并执行SQL。此时原表修改了001、002、005,并新增006.
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 2021-06-03 |
001 | 1111-1 | 2021-06-03 | 9999-12-31 |
002 | 2222 | 2021-06-01 | 2021-06-02 |
002 | 2222-1 | 2021-06-02 | 2021-06-03 |
002 | 2222-2 | 2021-06-03 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 2021-06-02 |
004 | 4444-1 | 2021-06-02 | 9999-12-31 |
005 | 5555 | 2021-06-02 | 2021-06-03 |
005 | 5555-1 | 2021-06-03 | 9999-12-31 |
006 | 6666 | 2021-06-03 | 9999-12-31 |
4.4 第四天(20210604):将${etldate}替换成20210602,并执行SQL。此时原表更新了002,新增007,并删除了001. 需要注意的是,删除操作时,该条数据失效日期应该改为当天。
zipper_table_test_cust_dst.cust_id | zipper_table_test_cust_dst.phone | zipper_table_test_cust_dst.s_date | zipper_table_test_cust_dst.e_date |
001 | 1111 | 2021-06-01 | 2021-06-03 |
001 | 1111-1 | 2021-06-03 | 2021-06-04 |
002 | 2222 | 2021-06-01 | 2021-06-02 |
002 | 2222-1 | 2021-06-02 | 2021-06-03 |
002 | 2222-2 | 2021-06-03 | 2021-06-04 |
002 | 2222-3 | 2021-06-04 | 9999-12-31 |
003 | 3333 | 2021-06-01 | 9999-12-31 |
004 | 4444 | 2021-06-01 | 2021-06-02 |
004 | 4444-1 | 2021-06-02 | 9999-12-31 |
005 | 5555 | 2021-06-02 | 2021-06-03 |
005 | 5555-1 | 2021-06-03 | 9999-12-31 |
006 | 6666 | 2021-06-03 | 9999-12-31 |
007 | 7777 | 2021-06-04 | 9999-12-31 |
可通过以下代码查看拉链表的最新状态
select * from datadev.zipper_table_test_cust_dst where e_date = '9999-12-31';
通过以下代码查看拉链表的历史状态/快照
- -- 查看拉链表的20210602的快照
- select cust_id, phone, s_date, if(e_date > '2021-06-02', DATE('9999-12-31'), e_date) as e_date
- from datadev.zipper_table_test_cust_dst
- where s_date <= '2021-06-02';
因此,对于拉链表的数据回滚刷新,我们只要根据上诉代码找到那一天的历史快照,然后进行重刷即可。(注:我上面贴的拉链表insert语句,已经包含了数据回滚刷新功能,读者可自行进行测试——将${etldate}替换成要回滚的日期,然后INSERT OVERWRITE TABLE那行可以注释掉,单跑select查看结果即可)
上一种实现方式有一个缺点,随着拉链表数据量的增多,每次执行的时间也会随之增多。因此,需要改进:可采用hive结合ES的方式。
- -- 拉链表(hive只存储新增/更新量,全量存储于ES)实现代码
-
- -- 临时表,只存放T-1天的新增以及变化的记录
- CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_dst_2 (
- `id` STRING COMMENT 'es id',
- `cust_id` STRING COMMENT '客户编号',
- `phone` STRING COMMENT '手机号码',
- `s_date` DATE COMMENT '生效时间',
- `e_date` DATE COMMENT '失效时间'
- )STORED AS ORC
- TBLPROPERTIES ("orc.compress"="SNAPPY")
- ;
-
- drop table datadev.zipper_table_test_cust_dst_2;
-
- select * from datadev.zipper_table_test_cust_dst_2 a;
-
-
-
-
- INSERT OVERWRITE TABLE datadev.zipper_table_test_cust_dst_2
- select concat_ws('-', curr.s_date, curr.cust_id) as id,
- curr.cust_id as cust_id,
- curr.phone as phone,
- DATE(curr.s_date) as s_date,
- DATE('9999-12-31') as e_date
- from (
- select cust_id, phone, from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd') as s_date
- from datadev.zipper_table_test_cust_src
- where dt = '20210603' -- etldate
- ) as curr
-
- left join (
- select *
- from datadev.zipper_table_test_cust_src
- where dt = '20210602' -- prev_date
- ) as prev
- on prev.cust_id = curr.cust_id
- where NVL(curr.phone, '') <> NVL(prev.phone, '')
-
- union all
-
- select concat_ws('-', STRING(prev.s_date), prev.cust_id) as id,
- prev.cust_id as cust_id,
- prev.phone as phone,
- prev.s_date as s_date,
- case when NVL(prev.phone, '') = NVL(curr.phone, '') then prev.e_date
- else DATE(from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd'))
- end as e_date
- from (
- select cust_id, phone, s_date, e_date,
- -- 只更新最新的一条
- row_number() over(partition by cust_id order by s_date desc) as r_num
- from datadev.zipper_table_test_cust_dst_2
- ) as prev
-
- inner join (
- select *
- from datadev.zipper_table_test_cust_src
- where dt = '20210603' -- etldate
- ) as curr
- on prev.cust_id = curr.cust_id
- where prev.r_num = 1
- ;
-
-
-
-
- -- mock: load delta data to es
- CREATE TABLE IF NOT EXISTS datadev.es_zipper (
- `id` STRING COMMENT 'es id',
- `cust_id` STRING COMMENT '客户编号',
- `phone` STRING COMMENT '手机号码',
- `s_date` DATE COMMENT '生效时间',
- `e_date` DATE COMMENT '失效时间'
- )STORED AS ORC
- TBLPROPERTIES ("orc.compress"="SNAPPY")
- ;
-
- drop table datadev.es_zipper;
-
- select * from datadev.es_zipper;
-
-
- INSERT OVERWRITE TABLE datadev.es_zipper
- SELECT nvl(curr.id, prev.id) as id,
- nvl(curr.cust_id, prev.cust_id) as cust_id,
- nvl(curr.phone, prev.phone) as phone,
- nvl(curr.s_date, prev.s_date) as s_date,
- nvl(curr.e_date, prev.e_date) as e_date
- FROM datadev.es_zipper prev
-
- full join datadev.zipper_table_test_cust_dst_2 curr
- on curr.id = prev.id;

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。