当前位置:   article > 正文

数据仓库之拉链表的设计以及实现_拉链表的原理和简单实现

拉链表的原理和简单实现

一、简介

  • 增量表: 有日期分区,存放增量数据,即新增量和变化量。
  • 全量表: 无日期分区(每天覆盖更新),存放截止至当前,数据的最新的状态,所以无法记录数据的历史变化
  • 快照表: 有日期分区,每天的数据都是全量的(无论有无变化),缺点是每个分区存储了许多重复的数据,浪费存储空间
  • 拉链表: 拉链表是用来维护历史状态,以及最新状态数据的一种表,拉链表根据拉链粒度的不同,实际上相当于快照,只不过做了优化,去除了一部分不变的记录

二、应用场景

拉链表适用于那些大数据量,并且在字段变化的比例和频率不大的情况下需要查看历史快照信息的场景。

比如说有一张客户表,大约有几千万条记录,几百个字段。那么对于这种表,即使采用ORC压缩,单张表每天的数据存储空间也会超过50GB,在HDFS中使用三备份情况下,存储空间的占用会更大。

那么对于这种表我该如何设计呢?下面有几种方案可选:

  1. 方案一(全量表):每天抽取最新数据覆盖前一天的数据,优点是实现简单,节省空间,但是缺点同样明显,没有历史状态
  2. 方案二(快照表):做成每天全量的话,我们就可以查看历史数据了,但是缺点是存储空间占用太大了,特别是在客户信息不会频繁变化的情况下,字段的重复存储率太高了
  3. 方案三(拉链表):采用拉链表设计的话,不仅能查看历史状态,而且存储空间的占用也是极低的(毕竟对于没有变化的数据不重复存储)

三、Hive SQL实践

首先创建一张测试用的客户信息原表

  1. CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_src (
  2. `cust_id` STRING COMMENT '客户编号',
  3. `phone` STRING COMMENT '手机号码'
  4. )PARTITIONED BY (
  5. dt STRING COMMENT 'etldate'
  6. )STORED AS ORC
  7. TBLPROPERTIES ("orc.compress"="SNAPPY")
  8. ;

然后insert一些测试数据

cust_idphonedt
001111120210601
002222220210601
003333320210601
004444420210601
001111120210602
0022222-120210602
003333320210602
0044444-120210602
005555520210602
0011111-120210603
0022222-220210603
003333320210603
0044444-120210603
0055555-120210603
006666620210603
0022222-320210604
003333320210604
0044444-120210604
0055555-120210604
006666620210604
007777720210604

数据简单说明如下:

  • 20210601为起始日期,总共有4个客户
  • 20210602更新了002和004客户的信息,并新增了005客户
  • 20210603更新了001、002、005客户的信息,并新增了006客户
  • 20210604更新了002客户的信息,并新增了007客户,删除了001客户

现在回到正题,拉链表如何设计?

首先,拉链表有两个重要的审计字段:数据生效日期数据失效日期。顾名思义,数据生效日期记录了这条记录是何时生效的,而数据失效日期则是记录了该条记录的失效时间(9999-12-31表示截至当前一直有效)。那么对数据的操作总共可分为以下几类:

  1. 新增的记录:数据生效日期为当天, 失效日期为9999-12-31
  2. 没有变化的记录:数据生效日期需要使用之前的, 失效日期不变
  3. 有变化的记录:==》对于旧记录:保留,并更改失效日期为当天; ==》对于新记录:新增,生效日期为当天,失效日期为9999-12-31
  4. 删除的记录:需要闭环,失效日期变为当天

因此拉链表的HQL实现代码如下:

  1. -- 拉链表建表语句
  2. CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_dst (
  3. `cust_id` STRING COMMENT '客户编号',
  4. `phone` STRING COMMENT '手机号码',
  5. `s_date` DATE COMMENT '生效时间',
  6. `e_date` DATE COMMENT '失效时间'
  7. )STORED AS ORC
  8. TBLPROPERTIES ("orc.compress"="SNAPPY")
  9. ;
  1. -- 拉链表实现代码(含数据回滚刷新)
  2. INSERT OVERWRITE TABLE datadev.zipper_table_test_cust_dst
  3. -- part1: 处理新增的、没有变化的记录,以及有变化的记录中的新记录
  4. select NVL(curr.cust_id, prev.cust_id) as cust_id,
  5. NVL(curr.phone, prev.phone) as phone,
  6. -- 没有变化的记录: s_date需要使用之前的
  7. case when NVL(curr.phone, '') = NVL(prev.phone, '') then prev.s_date
  8. else NVL(curr.s_date, prev.s_date)
  9. end as s_date,
  10. NVL(curr.e_date, prev.e_date) as e_date
  11. from (
  12. select cust_id, phone, DATE(from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd')) as s_date, DATE('9999-12-31') as e_date
  13. from datadev.zipper_table_test_cust_src
  14. where dt = '${etldate}'
  15. ) as curr
  16. left join (
  17. select cust_id, phone, s_date, if(e_date > from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'), DATE('9999-12-31'), e_date) as e_date,
  18. row_number() over(partition by cust_id order by e_date desc) as r_num -- 取最新状态
  19. from datadev.zipper_table_test_cust_dst
  20. where regexp_replace(s_date, '-', '') <= '${etldate}' -- 拉链表历史数据回滚
  21. ) as prev
  22. on curr.cust_id = prev.cust_id
  23. and prev.r_num = 1
  24. union all
  25. -- part2: 处理删除的记录,以及有变化的记录中的旧记录
  26. select prev_cust.cust_id, prev_cust.phone, prev_cust.s_date,
  27. case when e_date <> '9999-12-31' then e_date
  28. else DATE(from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'))
  29. END as e_date
  30. from (
  31. select cust_id, phone, s_date, if(e_date > from_unixtime(unix_timestamp('${etldate}', 'yyyyMMdd'), 'yyyy-MM-dd'), DATE('9999-12-31'), e_date) as e_date
  32. from datadev.zipper_table_test_cust_dst
  33. where regexp_replace(s_date, '-', '') <= '${etldate}' -- 拉链表历史数据回滚
  34. ) as prev_cust
  35. left join (
  36. select cust_id, phone
  37. from datadev.zipper_table_test_cust_src
  38. where dt = '${etldate}'
  39. ) as curr_cust
  40. on curr_cust.cust_id = prev_cust.cust_id
  41. -- 只要变化量
  42. where NVL(prev_cust.phone, '') <> NVL(curr_cust.phone, '')
  43. ;

四、测试

4.1 第一天(20210601):将${etldate}替换成20210601,并执行SQL。此时为初始状态,客户信息没有变化量,因此生效日期为2021-06-01,生效日期为9999-12-31(代表当前有效)

zipper_table_test_cust_dst.cust_idzipper_table_test_cust_dst.phonezipper_table_test_cust_dst.s_datezipper_table_test_cust_dst.e_date
00111112021-06-019999-12-31
00222222021-06-019999-12-31
00333332021-06-019999-12-31
00444442021-06-019999-12-31

4.2 第二天(20210602):将${etldate}替换成20210602,并执行SQL。此时原表修改了002和004的手机号码,因此有将会有两条记录,一条记录记录了数据的历史状态,另外一条记录了数据的当前状态。然后原表还新增了005客户,因此此时的数据生效日期为2021-06-02,失效日期为9999-12-31

zipper_table_test_cust_dst.cust_idzipper_table_test_cust_dst.phonezipper_table_test_cust_dst.s_datezipper_table_test_cust_dst.e_date
00111112021-06-019999-12-31
00222222021-06-012021-06-02
0022222-12021-06-029999-12-31
00333332021-06-019999-12-31
00444442021-06-012021-06-02
0044444-12021-06-029999-12-31
00555552021-06-029999-12-31

4.3 第三天(20210603):将${etldate}替换成20210602,并执行SQL。此时原表修改了001、002、005,并新增006.

zipper_table_test_cust_dst.cust_idzipper_table_test_cust_dst.phonezipper_table_test_cust_dst.s_datezipper_table_test_cust_dst.e_date
00111112021-06-012021-06-03
0011111-12021-06-039999-12-31
00222222021-06-012021-06-02
0022222-12021-06-022021-06-03
0022222-22021-06-039999-12-31
00333332021-06-019999-12-31
00444442021-06-012021-06-02
0044444-12021-06-029999-12-31
00555552021-06-022021-06-03
0055555-12021-06-039999-12-31
00666662021-06-039999-12-31

4.4 第四天(20210604):将${etldate}替换成20210602,并执行SQL。此时原表更新了002,新增007,并删除了001. 需要注意的是,删除操作时,该条数据失效日期应该改为当天。

zipper_table_test_cust_dst.cust_idzipper_table_test_cust_dst.phonezipper_table_test_cust_dst.s_datezipper_table_test_cust_dst.e_date
00111112021-06-012021-06-03
0011111-12021-06-032021-06-04
00222222021-06-012021-06-02
0022222-12021-06-022021-06-03
0022222-22021-06-032021-06-04
0022222-32021-06-049999-12-31
00333332021-06-019999-12-31
00444442021-06-012021-06-02
0044444-12021-06-029999-12-31
00555552021-06-022021-06-03
0055555-12021-06-039999-12-31
00666662021-06-039999-12-31
00777772021-06-049999-12-31

五、拉链表的数据回滚刷新

可通过以下代码查看拉链表的最新状态

select * from datadev.zipper_table_test_cust_dst where e_date = '9999-12-31';

通过以下代码查看拉链表的历史状态/快照

  1. -- 查看拉链表的20210602的快照
  2. select cust_id, phone, s_date, if(e_date > '2021-06-02', DATE('9999-12-31'), e_date) as e_date
  3. from datadev.zipper_table_test_cust_dst
  4. where s_date <= '2021-06-02';

因此,对于拉链表的数据回滚刷新,我们只要根据上诉代码找到那一天的历史快照,然后进行重刷即可。(注:我上面贴的拉链表insert语句,已经包含了数据回滚刷新功能,读者可自行进行测试——将${etldate}替换成要回滚的日期,然后INSERT OVERWRITE TABLE那行可以注释掉,单跑select查看结果即可)

六、另一种实现

上一种实现方式有一个缺点,随着拉链表数据量的增多,每次执行的时间也会随之增多。因此,需要改进:可采用hive结合ES的方式。

  1. -- 拉链表(hive只存储新增/更新量,全量存储于ES)实现代码
  2. -- 临时表,只存放T-1天的新增以及变化的记录
  3. CREATE TABLE IF NOT EXISTS datadev.zipper_table_test_cust_dst_2 (
  4. `id` STRING COMMENT 'es id',
  5. `cust_id` STRING COMMENT '客户编号',
  6. `phone` STRING COMMENT '手机号码',
  7. `s_date` DATE COMMENT '生效时间',
  8. `e_date` DATE COMMENT '失效时间'
  9. )STORED AS ORC
  10. TBLPROPERTIES ("orc.compress"="SNAPPY")
  11. ;
  12. drop table datadev.zipper_table_test_cust_dst_2;
  13. select * from datadev.zipper_table_test_cust_dst_2 a;
  14. INSERT OVERWRITE TABLE datadev.zipper_table_test_cust_dst_2
  15. select concat_ws('-', curr.s_date, curr.cust_id) as id,
  16. curr.cust_id as cust_id,
  17. curr.phone as phone,
  18. DATE(curr.s_date) as s_date,
  19. DATE('9999-12-31') as e_date
  20. from (
  21. select cust_id, phone, from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd') as s_date
  22. from datadev.zipper_table_test_cust_src
  23. where dt = '20210603' -- etldate
  24. ) as curr
  25. left join (
  26. select *
  27. from datadev.zipper_table_test_cust_src
  28. where dt = '20210602' -- prev_date
  29. ) as prev
  30. on prev.cust_id = curr.cust_id
  31. where NVL(curr.phone, '') <> NVL(prev.phone, '')
  32. union all
  33. select concat_ws('-', STRING(prev.s_date), prev.cust_id) as id,
  34. prev.cust_id as cust_id,
  35. prev.phone as phone,
  36. prev.s_date as s_date,
  37. case when NVL(prev.phone, '') = NVL(curr.phone, '') then prev.e_date
  38. else DATE(from_unixtime(unix_timestamp(dt, 'yyyyMMdd'), 'yyyy-MM-dd'))
  39. end as e_date
  40. from (
  41. select cust_id, phone, s_date, e_date,
  42. -- 只更新最新的一条
  43. row_number() over(partition by cust_id order by s_date desc) as r_num
  44. from datadev.zipper_table_test_cust_dst_2
  45. ) as prev
  46. inner join (
  47. select *
  48. from datadev.zipper_table_test_cust_src
  49. where dt = '20210603' -- etldate
  50. ) as curr
  51. on prev.cust_id = curr.cust_id
  52. where prev.r_num = 1
  53. ;
  54. -- mock: load delta data to es
  55. CREATE TABLE IF NOT EXISTS datadev.es_zipper (
  56. `id` STRING COMMENT 'es id',
  57. `cust_id` STRING COMMENT '客户编号',
  58. `phone` STRING COMMENT '手机号码',
  59. `s_date` DATE COMMENT '生效时间',
  60. `e_date` DATE COMMENT '失效时间'
  61. )STORED AS ORC
  62. TBLPROPERTIES ("orc.compress"="SNAPPY")
  63. ;
  64. drop table datadev.es_zipper;
  65. select * from datadev.es_zipper;
  66. INSERT OVERWRITE TABLE datadev.es_zipper
  67. SELECT nvl(curr.id, prev.id) as id,
  68. nvl(curr.cust_id, prev.cust_id) as cust_id,
  69. nvl(curr.phone, prev.phone) as phone,
  70. nvl(curr.s_date, prev.s_date) as s_date,
  71. nvl(curr.e_date, prev.e_date) as e_date
  72. FROM datadev.es_zipper prev
  73. full join datadev.zipper_table_test_cust_dst_2 curr
  74. on curr.id = prev.id;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/255318?site
推荐阅读
相关标签
  

闽ICP备14008679号