当前位置:   article > 正文

Flink 实时数仓(十一)【ADS 层搭建】

Flink 实时数仓(十一)【ADS 层搭建】

前言

        ADS 层也就是这个实时数仓的最后一层了,意味着今天这个项目就要结束了;那么接下来就是结合着离线和实时数仓,把这两个数仓项目再次复习,反复理解。同时好好看看《阿里巴巴大数据之路》中关于数仓建模的内容;

        今天的秋招估计比去年更加严峻,本科学历也在贬值,不管怎么样,尽自己努力就好。

1、ADS 层搭建

        这里难度就不大了,无非就是写写 SQL (查询 DWS 层),把要展示的数据拼接成一张表,通过接口传给报表;

        DWS 层把轻度聚合的结果保存到 ClickHouse 中,主要的目的就是提供即时的数据查询、统计、分析服务。这些统计服务一般会以两种形式呈现,一种是面向专业数据分析人员准备的 BI 工具,一种是面向非专业人员的更加直观的数据大屏。

        这里用 Sugar 来作为 BI 可视化工具进行展示,做离线数仓的时候,我们是使用 Superset 主动读取 MySQL 数据库来实现数据可视化的,这里,我们要保证实时性不可能再去实时把 clickhouse 的数据写入到 MySQL,而是直接在 clickhouse 基础上使用 SpringBoot 进行 jdbc 查询,查询到的数据封装成 json 格式,让 Sugar 来周期性地读取这些 json 格式的数据;

        这个 json 格式的 k-v 当然是由 Sugar 来决定的,所以我们首先需要在 Sugar 上确定图表,然后再去根据 Sugar 要求的 json 格式去封装;

        此外,外网(Sugar)想要访问我们本地的数据需要做内网穿透(借助花生壳),这里不多废话,直接跳过;下面只讲核心的 ADS 层代码:

1.1、流量主题

1.1.1、各渠道流量统计

需求如下:

统计周期

统计粒度

指标

说明

当日

渠道

独立访客数

统计访问人数

当日

渠道

会话总数

统计会话总数

当日

渠道

会话平均浏览页面数

统计每个会话平均浏览页面数

当日

渠道

会话平均停留时长

统计每个会话平均停留时长

当日

渠道

跳出率

只有一个页面的会话的比例

我们在 DWS 层的时候,已经创建过 dws_traffic_vc_ch_ar_is_new_page_view_window 这张表了,所以这里我们直接查询 ck 即可;当时我们创建的这张表的建表语句如下:

  1. create table if not exists dws_traffic_vc_ch_ar_is_new_page_view_window
  2. (
  3. stt DateTime, -- 窗口开始时间(年月日)
  4. edt DateTime, -- 窗口结束时间(年月日)
  5. vc String, -- 版本号
  6. ch String, -- 渠道
  7. ar String, -- 地区
  8. is_new String, -- 新用户
  9. uv_ct UInt64, -- 独立访客数
  10. sv_ct UInt64, -- 会话数
  11. pv_ct UInt64, -- 页面浏览数
  12. dur_sum UInt64, -- 浏览时长
  13. uj_ct UInt64, -- 跳出会话数
  14. ts UInt64
  15. ) engine = ReplacingMergeTree(ts)
  16. partition by toYYYYMMDD(stt)
  17. order by (stt, edt, vc, ch, ar, is_new);

所以这里我们只需要根据需求写查询语句即可,我们将来会把不同渠道的不同指标做成柱状图,比如各渠道独立访客数

此外,还有各渠道会话总数、会话平均浏览页面数、会话平均停留时长、跳出率等,意味着我们需要写 5 条 SQL 去 clickhouse 中查询;

各渠道独立访客数

  1. SELECT
  2. ch,
  3. sum(uv_ct) uv_ct
  4. FROM
  5. dws_traffic_vc_ch_ar_is_new_page_view_window
  6. WHERE
  7. toYYYYMMDD(stt) = #{date}
  8. GROUP BY
  9. toYYYYMMDD(stt), ch
  10. ORDER BY
  11. uv_ct desc;

各渠道总会话数

  1. SELECT
  2. ch,
  3. sum(sv_ct) sv_ct
  4. FROM
  5. dws_traffic_vc_ch_ar_is_new_page_view_window
  6. WHERE
  7. toYYYYMMDD(stt) = #{date}
  8. GROUP BY
  9. toYYYYMMDD(stt), ch
  10. ORDER BY
  11. sv_ct desc;

各渠道会话平均页面浏览数

平均浏览页面数(平均每个会话浏览的页面数) = 页面浏览总数 / 会话总数

  1. SELECT
  2. ch,
  3. sum(pv_ct) / sum(sv_ct) pv_per_session
  4. FROM
  5. dws_traffic_vc_ch_ar_is_new_page_view_window
  6. WHERE
  7. toYYYYMMDD(stt) = #{date}
  8. GROUP BY
  9. toYYYYMMDD(stt), ch
  10. ORDER BY
  11. pv_per_session desc;

各渠道会话平均页面停留时长

  1. SELECT
  2. ch,
  3. sum(dur_sum) / sum(sv_ct) dur_per_session
  4. FROM
  5. dws_traffic_vc_ch_ar_is_new_page_view_window
  6. WHERE
  7. toYYYYMMDD(stt) = #{date}
  8. GROUP BY
  9. toYYYYMMDD(stt), ch
  10. ORDER BY
  11. dur_per_session desc;

各渠道会话跳出率

  1. SELECT
  2. ch,
  3. sum(uj_ct) / sum(sv_ct) uj_rate
  4. FROM
  5. dws_traffic_vc_ch_ar_is_new_page_view_window
  6. WHERE
  7. toYYYYMMDD(stt) = #{date}
  8. GROUP BY
  9. toYYYYMMDD(stt), ch
  10. ORDER BY
  11. uj_rate desc;
代码实现
1、编写实体类

创建上面 5 个 SQL 各自返回的表的实体类,很简单,毕竟基本上都返回两个字段(维度 + 度量值):

  1. @Data
  2. @AllArgsConstructor
  3. public class TrafficUvCt {
  4. String ch; // 渠道
  5. Integer uvCt; // 独立访客数
  6. }
  1. @Data
  2. @AllArgsConstructor
  3. public class TrafficSvCt {
  4. // 渠道
  5. String ch;
  6. // 会话数
  7. Integer svCt;
  8. }
  1. @Data
  2. @AllArgsConstructor
  3. public class TrafficUjRate {
  4. // 渠道
  5. String ch;
  6. // 跳出率
  7. Double ujRate;
  8. }
  1. @Data
  2. @AllArgsConstructor
  3. public class TrafficDurPerSession {
  4. // 渠道
  5. String ch;
  6. // 各会话页面访问时长
  7. Double durPerSession;
  8. }
  1. @Data
  2. @AllArgsConstructor
  3. public class TrafficPvPerSession {
  4. // 渠道
  5. String ch;
  6. // 各会话页面浏览数
  7. Double pvPerSession;
  8. }
2、编写Mapper 接口

通过 Mybatis 自动实现接口来获取返回结果:

  1. @Mapper
  2. public interface TrafficChannelStatusMapper {
  3. // 1. 各渠道独立访客数(uv)
  4. @Select("select ch,sum(uv_ct) uv_ct from" +
  5. " dws_traffic_vc_ch_ar_is_new_page_view_window" +
  6. " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
  7. " order by uv_ct desc;")
  8. List<TrafficUvCt> selectUvCt(@Param("date")Integer date);
  9. // 2. 各渠道会话数(sv)
  10. @Select("select ch,sum(sv_ct) sv_ct from" +
  11. " dws_traffic_vc_ch_ar_is_new_page_view_window" +
  12. " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
  13. " order by sv_ct desc;")
  14. List<TrafficSvCt> selectSvCt(@Param("date")Integer date);
  15. // 3. 各渠道会话平均页面浏览数
  16. @Select("select ch,sum(pv_ct) / sum(sv_ct) pv_per_session from" +
  17. " dws_traffic_vc_ch_ar_is_new_page_view_window" +
  18. " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
  19. " order by pv_per_session desc;")
  20. List<TrafficPvPerSession> selectPvPerSession(@Param("date")Integer date);
  21. // 4. 各渠道会话平均页面访问时长
  22. @Select("select ch,sum(dur_sum) / sum(sv_ct) dur_per_session from" +
  23. " dws_traffic_vc_ch_ar_is_new_page_view_window" +
  24. " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
  25. " order by dur_per_session desc;")
  26. List<TrafficDurPerSession> selectDurPerSession(@Param("date")Integer date);
  27. // 5. 各渠道跳出率(跳出会话数/会话总数)
  28. @Select("select ch,sum(uj_ct)/sum(sv_ct) uj_rate from" +
  29. " dws_traffic_vc_ch_ar_is_new_page_view_window" +
  30. " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
  31. " order by uj_rate desc;")
  32. List<TrafficUjRate> selectUjRate(@Param("date")Integer date);
  33. }
3、编写 Service 接口
  1. public interface TrafficChannelStatsService {
  2. // 1. 获取各渠道独立访客数
  3. List<TrafficUvCt> getUvCt(Integer date);
  4. // 2. 获取各渠道会话数
  5. List<TrafficSvCt> getSvCt(Integer date);
  6. // 3. 获取各渠道会话平均页面浏览数
  7. List<TrafficPvPerSession> getPvPerSession(Integer date);
  8. // 4. 获取各渠道会话平均页面访问时长
  9. List<TrafficDurPerSession> getDurPerSession(Integer date);
  10. // 5. 获取各渠道跳出率
  11. List<TrafficUjRate> getUjRate(Integer date);
  12. }
4、编写 Service 实现类
  1. @Service
  2. public class TrafficChannelStatsServiceImpl implements TrafficChannelStatsService {
  3. // 自动装载 Mapper 接口实现类
  4. @Autowired
  5. TrafficChannelStatusMapper TrafficChannelStatusMapper;
  6. // 1. 获取各渠道独立访客数
  7. @Override
  8. public List<TrafficUvCt> getUvCt(Integer date) {
  9. return TrafficChannelStatusMapper.selectUvCt(date);
  10. }
  11. // 2. 获取各渠道会话数
  12. @Override
  13. public List<TrafficSvCt> getSvCt(Integer date) {
  14. return TrafficChannelStatusMapper.selectSvCt(date);
  15. }
  16. // 3. 获取各渠道会话平均页面浏览数
  17. @Override
  18. public List<TrafficPvPerSession> getPvPerSession(Integer date) {
  19. return TrafficChannelStatusMapper.selectPvPerSession(date);
  20. }
  21. // 4. 获取各渠道会话平均页面访问时长
  22. @Override
  23. public List<TrafficDurPerSession> getDurPerSession(Integer date) {
  24. return TrafficChannelStatusMapper.selectDurPerSession(date);
  25. }
  26. // 5. 获取各渠道跳出率
  27. @Override
  28. public List<TrafficUjRate> getUjRate(Integer date) {
  29. return TrafficChannelStatusMapper.selectUjRate(date);
  30. }
  31. }
5、编写 Controller 类
  1. @RestController
  2. @RequestMapping("/gmall/realtime/traffic")
  3. public class TrafficController {
  4. // 自动装载渠道流量统计服务实现类
  5. @Autowired
  6. private TrafficChannelStatsService trafficChannelStatsService;
  7. // 1. 独立访客请求拦截方法
  8. @RequestMapping("/uvCt")
  9. public String getUvCt(
  10. @RequestParam(value = "date", defaultValue = "1") Integer date) {
  11. if (date == 1) {
  12. date = DateUtil.now();
  13. }
  14. List<TrafficUvCt> trafficUvCtList = trafficChannelStatsService.getUvCt(date);
  15. if (trafficUvCtList == null) {
  16. return "";
  17. }
  18. StringBuilder categories = new StringBuilder("[");
  19. StringBuilder uvCtValues = new StringBuilder("[");
  20. for (int i = 0; i < trafficUvCtList.size(); i++) {
  21. TrafficUvCt trafficUvCt = trafficUvCtList.get(i);
  22. String ch = trafficUvCt.getCh();
  23. Integer uvCt = trafficUvCt.getUvCt();
  24. categories.append("\"").append(ch).append("\"");
  25. uvCtValues.append("\"").append(uvCt).append("\"");
  26. if (i < trafficUvCtList.size() - 1) {
  27. categories.append(",");
  28. uvCtValues.append(",");
  29. } else {
  30. categories.append("]");
  31. uvCtValues.append("]");
  32. }
  33. }
  34. return "{\n" +
  35. " \"status\": 0,\n" +
  36. " \"msg\": \"\",\n" +
  37. " \"data\": {\n" +
  38. " \"categories\":" + categories + ",\n" +
  39. " \"series\": [\n" +
  40. " {\n" +
  41. " \"name\": \"独立访客数\",\n" +
  42. " \"data\": " + uvCtValues + "\n" +
  43. " }\n" +
  44. " ]\n" +
  45. " }\n" +
  46. "}";
  47. }
  48. // 2. 会话数请求拦截方法
  49. @RequestMapping("/svCt")
  50. public String getPvCt(
  51. @RequestParam(value = "date", defaultValue = "1") Integer date) {
  52. if (date == 1) {
  53. date = DateUtil.now();
  54. }
  55. List<TrafficSvCt> trafficSvCtList = trafficChannelStatsService.getSvCt(date);
  56. if (trafficSvCtList == null) {
  57. return "";
  58. }
  59. StringBuilder categories = new StringBuilder("[");
  60. StringBuilder svCtValues = new StringBuilder("[");
  61. for (int i = 0; i < trafficSvCtList.size(); i++) {
  62. TrafficSvCt trafficSvCt = trafficSvCtList.get(i);
  63. String ch = trafficSvCt.getCh();
  64. Integer svCt = trafficSvCt.getSvCt();
  65. categories.append("\"").append(ch).append("\"");
  66. svCtValues.append("\"").append(svCt).append("\"");
  67. if (i < trafficSvCtList.size() - 1) {
  68. categories.append(",");
  69. svCtValues.append(",");
  70. } else {
  71. categories.append("]");
  72. svCtValues.append("]");
  73. }
  74. }
  75. return "{\n" +
  76. " \"status\": 0,\n" +
  77. " \"msg\": \"\",\n" +
  78. " \"data\": {\n" +
  79. " \"categories\":" + categories + ",\n" +
  80. " \"series\": [\n" +
  81. " {\n" +
  82. " \"name\": \"会话数\",\n" +
  83. " \"data\": " + svCtValues + "\n" +
  84. " }\n" +
  85. " ]\n" +
  86. " }\n" +
  87. "}";
  88. }
  89. // 3. 各会话浏览页面数请求拦截方法
  90. @RequestMapping("/pvPerSession")
  91. public String getPvPerSession(
  92. @RequestParam(value = "date", defaultValue = "1") Integer date) {
  93. if (date == 1) {
  94. date = DateUtil.now();
  95. }
  96. List<TrafficPvPerSession> trafficPvPerSessionList = trafficChannelStatsService.getPvPerSession(date);
  97. if (trafficPvPerSessionList == null) {
  98. return "";
  99. }
  100. StringBuilder categories = new StringBuilder("[");
  101. StringBuilder pvPerSessionValues = new StringBuilder("[");
  102. for (int i = 0; i < trafficPvPerSessionList.size(); i++) {
  103. TrafficPvPerSession trafficPvPerSession = trafficPvPerSessionList.get(i);
  104. String ch = trafficPvPerSession.getCh();
  105. Double pvPerSession = trafficPvPerSession.getPvPerSession();
  106. categories.append("\"").append(ch).append("\"");
  107. pvPerSessionValues.append("\"").append(pvPerSession).append("\"");
  108. if (i < trafficPvPerSessionList.size() - 1) {
  109. categories.append(",");
  110. pvPerSessionValues.append(",");
  111. } else {
  112. categories.append("]");
  113. pvPerSessionValues.append("]");
  114. }
  115. }
  116. return "{\n" +
  117. " \"status\": 0,\n" +
  118. " \"msg\": \"\",\n" +
  119. " \"data\": {\n" +
  120. " \"categories\":" + categories + ",\n" +
  121. " \"series\": [\n" +
  122. " {\n" +
  123. " \"name\": \"会话平均页面浏览数\",\n" +
  124. " \"data\": " + pvPerSessionValues + "\n" +
  125. " }\n" +
  126. " ]\n" +
  127. " }\n" +
  128. "}";
  129. }
  130. // 4. 各会话累计访问时长请求拦截方法
  131. @RequestMapping("/durPerSession")
  132. public String getDurPerSession(
  133. @RequestParam(value = "date", defaultValue = "1") Integer date) {
  134. if (date == 1) {
  135. date = DateUtil.now();
  136. }
  137. List<TrafficDurPerSession> trafficDurPerSessionList = trafficChannelStatsService.getDurPerSession(date);
  138. if (trafficDurPerSessionList == null) {
  139. return "";
  140. }
  141. StringBuilder categories = new StringBuilder("[");
  142. StringBuilder durPerSessionValues = new StringBuilder("[");
  143. for (int i = 0; i < trafficDurPerSessionList.size(); i++) {
  144. TrafficDurPerSession trafficDurPerSession = trafficDurPerSessionList.get(i);
  145. String ch = trafficDurPerSession.getCh();
  146. Double durPerSession = trafficDurPerSession.getDurPerSession();
  147. categories.append("\"").append(ch).append("\"");
  148. durPerSessionValues.append("\"").append(durPerSession).append("\"");
  149. if (i < trafficDurPerSessionList.size() - 1) {
  150. categories.append(",");
  151. durPerSessionValues.append(",");
  152. } else {
  153. categories.append("]");
  154. durPerSessionValues.append("]");
  155. }
  156. }
  157. return "{\n" +
  158. " \"status\": 0,\n" +
  159. " \"msg\": \"\",\n" +
  160. " \"data\": {\n" +
  161. " \"categories\":" + categories + ",\n" +
  162. " \"series\": [\n" +
  163. " {\n" +
  164. " \"name\": \"会话平均页面访问时长\",\n" +
  165. " \"data\": " + durPerSessionValues + "\n" +
  166. " }\n" +
  167. " ]\n" +
  168. " }\n" +
  169. "}";
  170. }
  171. // 5. 跳出率请求拦截方法
  172. @RequestMapping("/ujRate")
  173. public String getUjRate(
  174. @RequestParam(value = "date", defaultValue = "1") Integer date) {
  175. if (date == 1) {
  176. date = DateUtil.now();
  177. }
  178. List<TrafficUjRate> trafficUjRateList = trafficChannelStatsService.getUjRate(date);
  179. if (trafficUjRateList == null) {
  180. return "";
  181. }
  182. StringBuilder categories = new StringBuilder("[");
  183. StringBuilder ujRateValues = new StringBuilder("[");
  184. for (int i = 0; i < trafficUjRateList.size(); i++) {
  185. TrafficUjRate trafficUjRate = trafficUjRateList.get(i);
  186. String ch = trafficUjRate.getCh();
  187. Double ujRate = trafficUjRate.getUjRate();
  188. categories.append("\"").append(ch).append("\"");
  189. ujRateValues.append("\"").append(ujRate).append("\"");
  190. if (i < trafficUjRateList.size() - 1) {
  191. categories.append(",");
  192. ujRateValues.append(",");
  193. } else {
  194. categories.append("]");
  195. ujRateValues.append("]");
  196. }
  197. }
  198. return "{\n" +
  199. " \"status\": 0,\n" +
  200. " \"msg\": \"\",\n" +
  201. " \"data\": {\n" +
  202. " \"categories\":" + categories + ",\n" +
  203. " \"series\": [\n" +
  204. " {\n" +
  205. " \"name\": \"跳出率\",\n" +
  206. " \"data\": " + ujRateValues + "\n" +
  207. " }\n" +
  208. " ]\n" +
  209. " }\n" +
  210. "}";
  211. }
  212. }
6、用到的工具类

主要是返回当前日期的 Int 值

  1. public class DateUtil {
  2. public static Integer now(){
  3. return Integer.parseInt(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
  4. }
  5. }

下面的需求就不浪费时间了,只写 SQL,Java 代码又臭又长我这里就不展示了! 


1.1.2、流量分时统计

需求:

统计周期

指标

说明

1 小时

独立访客数

统计当日各小时独立访客数

1 小时

页面浏览数

统计当日各小时页面浏览数

1 小时

新访客数

统计当日各小时新访客数

  1. SELECT
  2. toHour(stt) hr, -- 小时(0-23)
  3. sum(uv_ct) uv_ct, -- 独立访客数
  4. sum(pv_ct) pv_ct, -- 页面浏览数
  5. -- 新访客数统计
  6. sum(if(is_new = '1', dws_traffic_vc_ch_ar_is_new_page_view_window.uv_ct, 0)) new_uv_ct
  7. FROM
  8. dws_traffic_vc_ch_ar_is_new_page_view_window
  9. WHERE
  10. toYYYYMMDD(stt) = #{date}\n" +
  11. GROUP BY
  12. hr

1.1.3、新老访客流量统计

需求:

统计周期

统计粒度

指标

说明

当日

访客类型

访客数

分别统计新老访客数

当日

访客类型

页面浏览数

分别统计新老访客页面浏览数

当日

访客类型

跳出率

分别统计新老访客跳出率

当日

访客类型

平均在线时长

分别统计新老访客平均在线时长

当日

访客类型

平均访问页面数

分别统计新老访客平均访问页面数

其实就是计算新老访客这两大群体的指标:

  1. SELECT
  2. is_new, -- 新老访客标记
  3. sum(uv_ct) uv_ct, -- 独立访客数
  4. sum(pv_ct) pv_ct, -- 页面浏览数
  5. sum(sv_ct) sv_ct, -- 会话总数
  6. sum(uj_ct) / sum(sv_ct) uj_rate, -- 跳出率
  7. sum(dur_sum) dur_sum -- 停留时长
  8. from dws_traffic_vc_ch_ar_is_new_page_view_window
  9. where toYYYYMMDD(stt) = #{date}
  10. group by is_new

1.1.4、关键词统计

统计周期

统计粒度

指标

说明

当日

关键词

关键词评分

根据不同来源和频次计算得分

数据源来自我们 DWS 层创建的第一张表:

  1. create table if not exists dws_traffic_source_keyword_page_view_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. source String,
  6. keyword String,
  7. keyword_count UInt64,
  8. ts UInt64
  9. ) engine = ReplacingMergeTree(ts)
  10. partition by toYYYYMMDD(stt)
  11. order by (stt, edt, source, keyword);

分析 SQL :

这里我们不仅分析了搜索的关键词,还分析了购物车、点击和订单中的关键词,只不过权重不同:

  1. SELECT keyword,
  2. sum(keyword_count * multiIf(
  3. source = 'SEARCH', 10,
  4. source = 'ORDER', 5,
  5. source = 'CART', 2,
  6. source = 'CLICK', 1, 0
  7. )) keyword_score
  8. FROM
  9. dws_traffic_source_keyword_page_view_window
  10. WHERE
  11. toYYYYMMDD(stt) = #{date}
  12. GROUP BY
  13. toYYYYMMDD(stt), keyword
  14. ORDER BY
  15. keyword_score desc;

1.2、用户主题

1.2.1、用户变动统计

统计周期

指标

指标说明

当日回流用户数之前的活跃用户,一段时间未活跃(流失),今日又活跃了,就称为回流用户。此处要求统计回流用户总数。

当日

新增用户数

当日

活跃用户数

1、计算回流用户

用到 DWS 层的用户域用户登陆各窗口汇总表(dws_user_user_login_window)

  1. create table if not exists dws_user_user_login_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. back_ct UInt64,
  6. uu_ct UInt64,
  7. ts UInt64
  8. ) engine = ReplacingMergeTree(ts)
  9. partition by toYYYYMMDD(stt)
  10. order by (stt, edt);

指标分析: 

  1. SELECT
  2. 'backCt' type,
  3. sum(back_ct) back_ct
  4. FROM
  5. dws_user_user_login_window
  6. WHERE
  7. toYYYYMMDD(stt) = #{date}
2、计算新增用户

直接读取 DWS 层的用户域用户注册各窗口汇总表(dws_user_user_register_window)即可:

  1. create table if not exists dws_user_user_register_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. register_ct UInt64,
  6. ts UInt64
  7. ) engine = ReplacingMergeTree(ts)
  8. partition by toYYYYMMDD(stt)
  9. order by (stt, edt);

指标分析:

  1. SELECT
  2. 'newUserCt' type,
  3. sum(register_ct) register_ct
  4. FROM
  5. dws_user_user_register_window
  6. WHERE
  7. toYYYYMMDD(stt) = #{date}
3、计算活跃用户

还是用户登录表,直接统计今日的 uu_ct即可:

  1. SELECT
  2. 'activeUserCt' type,
  3. sum(uu_ct) uu_ct
  4. FROM
  5. dws_user_user_login_window
4、汇总结果 

直接 union all 即可;

1.2.2、用户行为漏斗分析

漏斗分析是一个数据分析模型,它能够科学反映一个业务过程从起点到终点各阶段用户转化情况。由于其能将各阶段环节都展示出来,故哪个阶段存在问题,就能一目了然。

统计周期

指标

说明

当日

首页浏览人数

当日

商品详情页浏览人数

当日

加购人数

当日

下单人数

当日

支付人数

支付成功人数

1、计算首页浏览人数、商品详情页浏览人数

需要查询 DWS 层 用户域页面浏览汇总表

  1. create table if not exists dws_traffic_page_view_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. home_uv_ct UInt64,
  6. good_detail_uv_ct UInt64,
  7. ts UInt64
  8. ) engine = ReplacingMergeTree(ts)
  9. partition by toYYYYMMDD(stt)
  10. order by (stt, edt);

 指标计算:

  1. select
  2. 'home' page_id,
  3. sum(home_uv_ct) uvCt
  4. from
  5. dws_traffic_page_view_window
  6. where
  7. toYYYYMMDD(stt) = #{date}
  8. union all
  9. select
  10. 'good_detail' page_id,
  11. sum(good_detail_uv_ct) uvCt
  12. from
  13. dws_traffic_page_view_window
  14. where
  15. toYYYYMMDD(stt) = #{date}
 2、计算加购人数

需要查询自 dws_trade_cart_add_uu_window:

  1. create table if not exists dws_trade_cart_add_uu_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. cart_add_uu_ct UInt64,
  6. ts UInt64
  7. ) engine = ReplacingMergeTree(ts)
  8. partition by toYYYYMMDD(stt)
  9. order by (stt, edt);

指标计算:

  1. select
  2. 'cart' page_id,
  3. sum(cart_add_uu_ct) uvCt
  4. from
  5. dws_trade_cart_add_uu_window
  6. where
  7. toYYYYMMDD(stt) = #{date}
3、计算下单人数
  1. create table if not exists dws_trade_order_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. order_unique_user_count UInt64,
  6. order_new_user_count UInt64,
  7. order_activity_reduce_amount Decimal(38, 20),
  8. order_coupon_reduce_amount Decimal(38, 20),
  9. order_origin_total_amount Decimal(38, 20),
  10. ts UInt64
  11. ) engine = ReplacingMergeTree(ts)
  12. partition by toYYYYMMDD(stt)
  13. order by (stt, edt);
  1. select
  2. 'trade' page_id,
  3. sum(order_unique_user_count) uvCt
  4. from
  5. dws_trade_order_window
  6. where
  7. toYYYYMMDD(stt) = #{date}
4、计算支付人数
  1. create table if not exists dws_trade_payment_suc_window
  2. (
  3. stt DateTime,
  4. edt DateTime,
  5. payment_suc_unique_user_count UInt64,
  6. payment_new_user_count UInt64,
  7. ts UInt64
  8. ) engine = ReplacingMergeTree(ts)
  9. partition by toYYYYMMDD(stt)
  10. order by (stt, edt);
  1. select
  2. 'payment' page_id,
  3. sum(payment_suc_unique_user_count) uvCt
  4. from
  5. dws_trade_payment_suc_window
  6. where
  7. toYYYYMMDD(stt) = #{date}

1.2.3、新增交易用户统计

统计周期

指标

说明

当日

下单新用户人数

当日

支付成功新用户人数

区别于上一个需求中的统计指标,上面是求的独立用户数,这里求的是新用户数:

  1. select
  2. 'order' trade_type,
  3. sum(order_new_user_count) order_new_user_count
  4. from
  5. dws_trade_order_window
  6. where
  7. toYYYYMMDD(stt) = #{date}
  8. union all
  9. select
  10. 'payment' trade_type,
  11. sum(payment_new_user_count) pay_suc_new_user_count
  12. from
  13. dws_trade_payment_suc_window
  14. where
  15. toYYYYMMDD(stt) = #{date};

1.3、商品主题

1.3.1、各品牌商品交易统计

指标:

统计周期

统计粒度

指标

说明

当日

品牌

订单数

当日

品牌

订单人数

当日

品牌

订单金额

当日

品牌

退单数

当日

品牌

退单人数

计算指标:

这里需要对两张表(用户-品牌-品类-SPU-下单表和用户-品牌-品类-SPU-退单表)进行 full outer join:

  1. select trademark_name,
  2. order_count,
  3. uu_count,
  4. sum(order_amount) order_amount,
  5. refund_count,
  6. refund_uu_count
  7. from (
  8. select trademark_id,
  9. trademark_name,
  10. sum(order_count) order_count,
  11. count(distinct user_id) uu_count,
  12. sum(order_amount) order_amount
  13. from
  14. dws_trade_trademark_category_user_spu_order_window
  15. where
  16. toYYYYMMDD(stt) = #{date}
  17. group by
  18. trademark_id, trademark_name) oct
  19. full outer join(
  20. select
  21. trademark_id,
  22. trademark_name,
  23. sum(refund_count) refund_count,
  24. count(distinct user_id) refund_uu_count
  25. from
  26. dws_trade_trademark_category_user_refund_window
  27. where
  28. toYYYYMMDD(stt) = #{date}
  29. group by
  30. trademark_id, trademark_name) rct
  31. on oct.trademark_id = rct.trademark_id;

1.3.2、各品类商品交易统计

统计周期

统计粒度

指标

说明

当日

品类

订单数

当日

品类

订单人数

当日

品类

订单金额

当日

品类

退单数

当日

品类

退单人数

和上面的指标用到的表一样,无非就是 group by 的字段变成了品类: 

  1. select category1_name,
  2. category2_name,
  3. category3_name,
  4. order_count,
  5. uu_count,
  6. order_amount,
  7. refund_count,
  8. refund_uu_count
  9. from (
  10. select category1_id,
  11. category1_name,
  12. category2_id,
  13. category2_name,
  14. category3_id,
  15. category3_name,
  16. sum(order_count) order_count,
  17. count(distinct user_id) uu_count,
  18. sum(order_amount) order_amount
  19. from
  20. dws_trade_trademark_category_user_spu_order_window
  21. where
  22. toYYYYMMDD(stt) = #{date}
  23. group by
  24. category1_id,
  25. category1_name,
  26. category2_id,
  27. category2_name,
  28. category3_id,
  29. category3_name) oct
  30. full outer join(
  31. select category1_id,
  32. category1_name,
  33. category2_id,
  34. category2_name,
  35. category3_id,
  36. category3_name,
  37. sum(refund_count) refund_count,
  38. count(distinct user_id) refund_uu_count
  39. from
  40. dws_trade_trademark_category_user_refund_window
  41. where
  42. toYYYYMMDD(stt) = #{date}
  43. group by
  44. category1_id,
  45. category1_name,
  46. category2_id,
  47. category2_name,
  48. category3_id,
  49. category3_name) rct
  50. on oct.category1_id = rct.category1_id
  51. and oct.category2_id = rct.category2_id
  52. and oct.category3_id = rct.category3_id;

1.3.3、各 SPU 商品交易统计

统计周期

统计粒度

指标

说明

当日

SPU

订单数

当日

SPU

订单人数

当日

SPU

订单金额

SPU 没有退单指标: 

  1. select spu_name,
  2. sum(order_count) order_count,
  3. count(distinct user_id) uu_count,
  4. sum(order_amount) order_amount
  5. from
  6. dws_trade_trademark_category_user_spu_order_window
  7. where
  8. toYYYYMMDD(stt) = #{date}
  9. group by
  10. spu_id, spu_name

1.4、交易主题

1.4.1、交易综合统计

统计周期

指标

说明

当日

订单总额

订单最终金额

当日

订单数

当日

订单人数

当日

退单数

当日

退单人数

1、计算订单总额 

直接查询所有省份的订单总额即为总金额:

  1. select
  2. sum(order_amount) order_total_amount
  3. from
  4. dws_trade_province_order_window
  5. where
  6. toYYYYMMDD(stt) = #{date}
  7. group by
  8. toYYYYMMDD(stt)
2、计算订单数、订单人数、退单数、退单人数
  1. select '下单数' type,
  2. sum(order_count) value
  3. from
  4. dws_trade_trademark_category_user_spu_order_window
  5. where
  6. toYYYYMMDD(stt) = #{date}
  7. union all
  8. select '下单人数' type,
  9. count(distinct user_id) value
  10. from
  11. dws_trade_trademark_category_user_spu_order_window
  12. where
  13. toYYYYMMDD(stt) = #{date}
  14. union all
  15. select '退单数' type,
  16. sum(refund_count) value
  17. from
  18. dws_trade_trademark_category_user_refund_window
  19. where
  20. toYYYYMMDD(stt) = #{date}
  21. union all
  22. select '退单人数' type,
  23. count(distinct user_id) value
  24. from
  25. dws_trade_trademark_category_user_refund_window
  26. where
  27. toYYYYMMDD(stt) = #{date}

 1.4.2、各省份交易统计

统计周期

统计粒度

指标

当日

省份

订单数

当日

省份

订单金额

1、计算各省份订单数 
  1. select province_name,
  2. sum(order_count) order_count
  3. from
  4. dws_trade_province_order_window
  5. where
  6. toYYYYMMDD(stt) = #{date}
  7. group by
  8. province_id, province_name
2、计算各省份订单总金额
  1. select province_name,
  2. sum(order_amount) order_amount
  3. from
  4. dws_trade_province_order_window
  5. where
  6. toYYYYMMDD(stt) = #{date}
  7. group by
  8. province_id, province_name

这俩 SQL 完全可以写成一条 SQL ,分开写是因为它俩对应的是 Sugar 上不同的图表; 

1.5、优惠券主题

1.5.1、当日优惠券补贴率

统计周期

统计粒度

指标

说明

当日

优惠券

补贴率

用券的订单明细优惠券减免金额总和/原始金额总和

  1. select
  2. sum(order_coupon_reduce_amount) coupon_reduce_amount,
  3. sum(order_origin_total_amount) origin_total_amount,
  4. round(round(toFloat64(coupon_reduce_amount), 5) /
  5. round(toFloat64(origin_total_amount), 5), 20) coupon_subsidy_rate
  6. from
  7. dws_trade_order_window
  8. where
  9. toYYYYMMDD(stt) = #{date}
  10. group by
  11. toYYYYMMDD(stt)

1.6、活动主题

1.6.1、当日活动补贴率

统计周期

统计粒度

指标

说明

当日

活动

补贴率

参与促销活动的订单明细活动减免金额总和/原始金额总和

  1. select
  2. sum(order_activity_reduce_amount) activity_reduce_amount,
  3. sum(order_origin_total_amount) origin_total_amount,
  4. round(round(toFloat64(activity_reduce_amount), 5) /
  5. round(toFloat64(origin_total_amount), 5), 20) subsidyRate
  6. from
  7. dws_trade_order_window
  8. where
  9. toYYYYMMDD(stt) = #{date}
  10. group by
  11. toYYYYMMDD(stt)

总结

        至此,ADS 层搭建完毕,相比离线数仓,这里所有指标都是直接从 DWS 层读取过来的,而 clickhouse 最大的优点也正是单表查询超级快!这 ADS 比离线数仓要简单多了,离线数仓可以分析更加深层的指标,毕竟它有大量的历史数据做支撑;而实时数仓则更多的是分析当下近几秒钟、几小时、远至一天的一些指标;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/972224
推荐阅读
相关标签
  

闽ICP备14008679号