赞
踩
今天的任务是完成流量域最后一个需求、用户域的两个需求以及交易域的部分需求;
任务:从 Kafka 页面日志主题读取数据,统计当日的首页和商品详情页独立访客数。
注意:一般我们谈到访客,指的是 mid;而用户才是 uid;
首先创建 ck 表结构,和前面的表一样,主要的字段就是:维度 + 度量值 (这里没有粒度,因为我们统计的是一个宏观的统计结果信息,到 ADS 都不用加工),这里的 stt 和 edt 依然是作为 ck 表的 order by 字段防止数据重复;ts 字段作为 ck 的版本字段;这里 order by 字段取窗口起止时间,因为窗口是基于事件时间的,所以不用担心任务挂了之后重复消费造成数据重复的问题,ck 会自动根据 order by 字段进行去重;
- create table if not exists dws_traffic_page_view_window
- (
- stt DateTime,
- edt DateTime,
- home_uv_ct UInt64,
- good_detail_uv_ct UInt64,
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt);
创建 ck 表对应的 JavaBean:
- @Data
- @AllArgsConstructor
- public class TrafficHomeDetailPageViewBean {
- // 窗口起始时间
- String stt;
-
- // 窗口结束时间
- String edt;
-
- // 首页独立访客数
- Long homeUvCt;
-
- // 商品详情页独立访客数
- Long goodDetailUvCt;
-
- // 时间戳
- Long ts;
- }

这里我们不仅要过滤还希望尽量顺便把数据转换为 JSONObject 格式,所以选用 flatMap 最为合适:
- // TODO 3. 读取 dwd_traffic_page_log 的数据
- String groupId = "dws_traffic_page_view_window";
- DataStreamSource<String> pageLog = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_traffic_page_log", groupId));
-
- // TODO 4. 转为 json 并过滤出首页和商品详情页
- SingleOutputStreamOperator<JSONObject> filterDS = pageLog.flatMap(new FlatMapFunction<String, JSONObject>() {
- @Override
- public void flatMap(String value, Collector<JSONObject> out) throws Exception {
- JSONObject jsonObject = JSON.parseObject(value);
- String page_id = jsonObject.getJSONObject("page").getString("page_id");
- if (page_id != null) {
- if (page_id.equals("home") || page_id.equals("good_detail")) {
- out.collect(jsonObject);
- }
- }
- }
- });

- // TODO 5. 提取事件时间生成水位线
- filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
- @Override
- public long extractTimestamp(JSONObject element, long recordTimestamp) {
- return element.getLong("ts");
- }
- })
- );
这里使用富函数的 flatMap,因为富函数中才有 open(在 open 方法中初始化状态)、close等方法,以及获取上下文对象(通过上下文对象给状态描述器设置ttl并初始化)等高级操作;
这里 flatMap 的输出类型我们设置为之前写好的 ck 表对应的 JavaBean ,方便直接插入到 ck中;
这里我们同样可以给状态设置一个 TTL 防止长时间访客未访问状态存储浪费;这里两个状态任意一个不为 null 即可输出:
- // TODO 6. 状态编程(按照mid分组)过滤出独立访客
- KeyedStream<JSONObject, String> keyedStream = filterDS.keyBy(json -> json.getJSONObject("common").getString("mid"));
- SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> trafficHomeDetailDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, TrafficHomeDetailPageViewBean>() {
-
- private ValueState<String> homeLastVisit;
- private ValueState<String> detailLastVisit;
-
- @Override
- public void open(Configuration parameters) throws Exception {
-
- StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build();
-
- ValueStateDescriptor<String> homeStateDescriptor = new ValueStateDescriptor<>("home-state", String.class);
- ValueStateDescriptor<String> detailStateDescriptor = new ValueStateDescriptor<>("detail-state", String.class);
-
- // 设置 TTL
- homeStateDescriptor.enableTimeToLive(ttlConfig);
- detailStateDescriptor.enableTimeToLive(ttlConfig);
-
- homeLastVisit = getRuntimeContext().getState(homeStateDescriptor);
- detailLastVisit = getRuntimeContext().getState(detailStateDescriptor);
- }
-
- @Override
- public void flatMap(JSONObject value, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
-
- // 获取状态数据以及当前数据中的日期
- String curDt = DateFormatUtil.toDate(value.getLong("ts"));
- String homeLastDt = homeLastVisit.value();
- String detailLastDt = detailLastVisit.value();
-
- long homeUvCt = 0;
- long goodDetailUvCt = 0;
-
- if (homeLastDt == null || !homeLastDt.equals(curDt)) {
- homeUvCt = 1;
- homeLastVisit.update(curDt);
- }
- if (detailLastDt == null || !detailLastDt.equals(curDt)) {
- goodDetailUvCt = 1;
- detailLastVisit.update(curDt);
- }
-
- if (homeUvCt == 1 || goodDetailUvCt == 1) {
- out.collect(new TrafficHomeDetailPageViewBean("", "",
- homeUvCt,
- goodDetailUvCt,
- value.getLong("ts")));
- }
- }
- });

这里的窗口函数依旧是先用增量聚合函数,再用全量聚合函数(获得窗口信息);
注意:这里的 ts 字段是 clickhouse 表数据的版本字段,取系统时间即可;
- // TODO 7. 开窗(windowAll聚合)聚合
- SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> resultDS = trafficHomeDetailDS.windowAll(
- TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))
- ).reduce(new ReduceFunction<TrafficHomeDetailPageViewBean>() {
- @Override
- public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
- value1.setHomeUvCt(value1.getHomeUvCt() + value2.getHomeUvCt());
- value1.setGoodDetailUvCt(value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt());
- return value1;
- }
- }, new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
- @Override
- public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
- TrafficHomeDetailPageViewBean next = values.iterator().next();
- next.setTs(System.currentTimeMillis());
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- out.collect(next);
- }
- });
-
- // TODO 8. 写入到 clickhouse
- resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_traffic_page_view_window values(?,?,?,?,?)"));
-
- // TODO 9. 启动任务
- env.execute("DwsTrafficPageViewWindow");

任务:从 Kafka 页面日志主题读取数据,统计七日回流用户和当日独立用户数。
当日独立用户数很好求,和上面差不多,也是使用状态编程对 uid 保存状态去重即可。接下来我们主要分析七日回流用户怎么求:
回流用户定义:之前的活跃用户,一段时间未活跃(流失),今日又活跃了。这里要求统计回流用户总数,规定当日登陆,且自上次登陆之后至少 7 日未登录的用户为回流用户。
1、消费页面浏览主题(dwd_traffic_page_log)登录用户过滤:
2、设置水位线、uid 分组之后进行状态编程
这张表依然没有粒度,直接就是统计结果;我们去重的字段依然是窗口的起止时间:
- create table if not exists dws_user_user_login_window
- (
- stt DateTime,
- edt DateTime,
- back_ct UInt64,
- uu_ct UInt64,
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt);
- import lombok.AllArgsConstructor;
- import lombok.Data;
-
- @Data
- @AllArgsConstructor
- public class UserLoginBean {
- // 窗口起始时间
- String stt;
-
- // 窗口终止时间
- String edt;
-
- // 回流用户数
- Long backCt;
-
- // 独立用户数
- Long uuCt;
-
- // 时间戳
- Long ts;
- }

- // TODO 3. 读取 dwd_traffic_page_log 的数据
- String groupId = "dws_user_user_login_window";
- DataStreamSource<String> pageLog = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_traffic_page_log", groupId));
- // TODO 4. 转换为 json 格式 & 过滤出独立用户(uid!=null & last_page_id=null 或者 uid!=null & last_page_id=login)
- SingleOutputStreamOperator<JSONObject> filterDS = pageLog.flatMap(new RichFlatMapFunction<String, JSONObject>() {
- @Override
- public void flatMap(String value, Collector<JSONObject> out) throws Exception {
- JSONObject jsonObject = JSONObject.parseObject(value);
- String uid = jsonObject.getJSONObject("common").getString("uid");
- String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");
-
- if (uid != null) {
- if (lastPageId == null || lastPageId.equals("login")) {
- out.collect(jsonObject);
- }
- }
- }
- });
- // TODO 5. 提取事件时间生成水位线
- filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
- @Override
- public long extractTimestamp(JSONObject element, long recordTimestamp) {
- return element.getLong("ts");
- }
- })
- );
- // TODO 6. 状态编程过滤出独立用户
- KeyedStream<JSONObject, String> keyedStream = filterDS.keyBy(json -> json.getJSONObject("common").getString("uid"));
- SingleOutputStreamOperator<UserLoginBean> userLoginDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, UserLoginBean>() {
-
- private ValueState<String> lastLoginDtState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(7))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build();
-
- ValueStateDescriptor<String> lastLoginStateDescriptor = new ValueStateDescriptor<String>("last-login", String.class);
- lastLoginStateDescriptor.enableTimeToLive(ttlConfig);
- lastLoginDtState = getIterationRuntimeContext().getState(lastLoginStateDescriptor);
- }
-
- @Override
- public void flatMap(JSONObject value, Collector<UserLoginBean> out) throws Exception {
- // 本次登录日期
- Long curTs = value.getLong("ts");
- String curDt = DateFormatUtil.toDate(curTs);
- // 上次登录日期
- String lastLoginDt = lastLoginDtState.value();
-
- long uuCt = 0L;
- long backCt = 0L;
-
- if (lastLoginDt == null) {
- uuCt = 1;
- lastLoginDtState.update(curDt);
- } else if (!lastLoginDt.equals(curDt)) {
- uuCt = 1;
- lastLoginDtState.update(curDt);
- // 判断相差是否 >= 8 天
- Long lastTs = DateFormatUtil.toTs(lastLoginDt);
- long days = (curTs - lastTs) / 1000 / 3600 / 24;
- backCt = days >= 8 ? 1 : 0;
- }
-
- if (uuCt != 0) {
- out.collect(new UserLoginBean("", "", backCt, uuCt, curTs));
- }
- }
- });

和上一个需求一样,增量聚合函数和全量聚合函数配合着使用;
- // TODO 6. 窗口聚合
- SingleOutputStreamOperator<UserLoginBean> resultDS = userLoginDS.windowAll(
- TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))
- ).reduce((record1, record2) -> {
- record1.setUuCt(record1.getUuCt() + record2.getUuCt());
- record2.setBackCt(record1.getBackCt() + record2.getBackCt());
- return record1;
- }, new AllWindowFunction<UserLoginBean, UserLoginBean, TimeWindow>() {
- @Override
- public void apply(TimeWindow window, Iterable<UserLoginBean> values, Collector<UserLoginBean> out) throws Exception {
- UserLoginBean next = values.iterator().next();
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- next.setTs(System.currentTimeMillis());
- out.collect(next);
- }
- });

-
- // TODO 7. 写入到 clickhouse
- resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_user_user_login_window values(?,?,?,?,?)"));
-
- // TODO 8. 启动任务
- env.execute("DwsUserUserLoginWindow");
任务:从 DWD 层用户注册表中读取数据,统计各窗口注册用户数,写入 ClickHouse。
这个需求比较简单,因为我们之前在 DWD 层已经创建了用户注册事务事实表(包含字段:user_id,date_id,create_time,ts)
这里教程中用的是 DataStream API ,但是我这里想用 Flink SQL 实现:
注意:当原表中有更贴近事件时间的字段时,我们就尽量少用 Maxwell 的 ts 字段!
- // TODO 3. 消费 Kafka dwd_user_register 主题(生成水位线)
- String groupId = "dws_user_user_register_window";
- tableEnv.executeSql("CREATE TABLE dwd_user_register " +
- "`user_id` string," +
- "`date_id` string," +
- "`create_time` string," +
- "`ts` string" +
- "time_ltz AS TO_TIMESTAMP(FROM_UNIXTIME(create_time/1000)), " +
- "WATERMARK FOR time_ltz AS time_ltz - INTERVAL '2' SECOND " +
- ")" + MyKafkaUtil.getKafkaDDL("dwd_user_register",groupId)
- );
用 Flink SQL 实现就简单多了,这里的聚合逻辑更简单,直接 count(*):
- // TODO 4. 分组,开窗,聚合
- Table resultTable = tableEnv.sqlQuery("SELECT " +
- " date_format(tumble_start(time_ltz,interval '10' second),'yyyy-MM-dd HH:mm:ss') stt," +
- " date_format(tumble_end(time_ltz,interval '10' second),'yyyy-MM-dd HH:mm:ss') edt," +
- " count(*) register_ct," +
- " unix_timestamp() ts" +
- "FROM dwd_user_register " +
- "GROUP BY tumble(time_ltz,interval '10' second)");
- tableEnv.createTemporaryView("result_table",resultTable);
- create table if not exists dws_user_user_register_window
- (
- stt DateTime,
- edt DateTime,
- register_ct UInt64,
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt);
这里需要把动态表转为流,所以我们需要创建一个 Java Bean,对应上 ck 表的每个字段:
- @Data
- @AllArgsConstructor
- public class UserRegisterBean {
- // 窗口起始时间
- String stt;
- // 窗口终止时间
- String edt;
- // 注册用户数
- Long registerCt;
- // 时间戳
- Long ts;
- }
- // TODO 5. 将动态表转为流并写入到 clickhouse
- DataStream<UserRegisterBean> dataStream = tableEnv.toAppendStream(resultTable, UserRegisterBean.class);
- dataStream.addSink(ClickHouseUtil.getSinkFunction("insert into dws_user_user_register_window values (?,?,?,?)"));
-
- // TODO 6. 启动任务
- env.execute("DwsUserUserRegisterWindow");
任务:从 Kafka 读取用户加购明细数据,统计每日各窗口加购独立用户数,写入 ClickHouse。
思路很简单,还是根据 uid 进行 keyby,然后使用状态编程维护一个 lastCartAddDate,对数据进行判断:
这里不多介绍,和前面的逻辑都是一样的,只说明部分点:
- // TODO 3. 读取 dwd_traffic_card_add 的数据
- String groupId = "dws_trade_cart_add_uu_window";
- DataStreamSource<String> cartAddLog = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_cart_add", groupId));
-
- //TODO 4. 转为 json 格式并
- SingleOutputStreamOperator<JSONObject> jsonDS = cartAddLog.map(JSONObject::parseObject);
-
- // TODO 5. 提取事件时间生成水位线
- jsonDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
- @Override
- public long extractTimestamp(JSONObject element, long recordTimestamp) {
- String operate_time = element.getString("operate_time");
- if (operate_time != null){
- return DateFormatUtil.toTs(operate_time,true);
- }
- return DateFormatUtil.toTs(element.getString("create_time"));
- }
- })
- );
-
- // TODO 6. 按照用户id进行分组 & 过滤出独立用户
- KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getJSONObject("common").getString("uid"));
- SingleOutputStreamOperator<CartAddUuBean> filterDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, CartAddUuBean>() {
-
- private ValueState<String> lastCartAddDateState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build();
-
- ValueStateDescriptor<String> lastCartAddStateDescriptor = new ValueStateDescriptor<String>("last-cart-add", String.class);
- lastCartAddStateDescriptor.enableTimeToLive(ttlConfig);
- lastCartAddDateState = getRuntimeContext().getState(lastCartAddStateDescriptor);
- }
-
- @Override
- public void flatMap(JSONObject value, Collector<CartAddUuBean> out) throws Exception {
- // 当前的时间戳
- Long curTs = value.getLong("ts");
- String curDate = DateFormatUtil.toDate(curTs);
- String lastCartAddDate = lastCartAddDateState.value();
-
- if (lastCartAddDate == null || !lastCartAddDate.equals(curDate)) {
- lastCartAddDateState.update(curDate);
- out.collect(new CartAddUuBean("","",1L,curTs));
- }
- }
- });
-
- // TODO 7. 开窗聚合(补充字段)
- SingleOutputStreamOperator<CartAddUuBean> resultDS = filterDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
- .reduce(new ReduceFunction<CartAddUuBean>() {
- @Override
- public CartAddUuBean reduce(CartAddUuBean value1, CartAddUuBean value2) throws Exception {
- value1.setCartAddUuCt(value1.getCartAddUuCt() + value2.getCartAddUuCt());
- return value1;
- }
- }, new AllWindowFunction<CartAddUuBean, CartAddUuBean, TimeWindow>() {
- @Override
- public void apply(TimeWindow window, Iterable<CartAddUuBean> values, Collector<CartAddUuBean> out) throws Exception {
- CartAddUuBean next = values.iterator().next();
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- next.setTs(System.currentTimeMillis());
- out.collect(next);
- }
- });
-
- // TODO 8. 写出到 clickhouse
- resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_cart_add_uu_window values (?,?,?,?)"));
-
- // TODO 9. 启动任务
- env.execute("DwsTradeCartAddUuWindow");

任务:从 Kafka 读取交易域支付成功主题数据,统计支付成功独立用户数和首次支付成功用户数(第一次在平台消费)。
如果一个用户是首次支付成功用户(既然是历史第一次下单操作,必然也是今天的第一次下单),那么他必然是今天的支付成功独立用户;所以我们只需要通过状态过滤出 lastPayDate = null 或者 lastPayDate != curDt 的用户(注意:这里的 lastPayDate 不能设置 TTL ,因为我们需要知道这个用户历史上有没有支付过,所以就不允许状态失效)
left join 实现过程:
假设 A 表作为主表与 B 表做等值左外联。当 A 表数据进入算子,而 B 表数据未至时会先生成一条 B 表字段均为 null 的关联数据ab1,其标记为 +I。其后,B 表数据到来,会先将之前的数据撤回,即生成一条与 ab1 内容相同,但标记为 -D 的数据,再生成一条关联后的数据,标记为 +I。这样生成的动态表对应的流称之为回撤流。
在 DWD 层的订单预处理表(dwd_trade_order_pre_process)生成过程中会形成回撤流,因为它需要对订单明细活动表和订单明细优惠券表进行 left join。而我们这里的支付成功依赖于 DWD 层支付成功事务事实表(dwd_trade_pay_detail_suc),该表又依赖于 DWD 层的下单事务事实表(dwd_trade_order_detail),所以这里我们需要考虑回撤流的问题:
回撤数据在 Kafka 中以 null 值的形式存在,只需要简单判断即可过滤。我们需要考虑的是如何对其余数据去重:
- order_id = 1001
- order_detail_id = 1001-a
- order_detail_activity_id: a1
-
-
- SELECT ...
- FROM
- order_detail od
- join
- order_info oi
- on
- od.order_id = oi.id
- left join
- order_detail_activity oa
- on
- od.id = oa.order_detail_id

上面我们有一个订单(id=1001),这个订单内只有一个商品并且参与了活动,那么由于 order_detail_activity 来得肯定要晚一些,所以可能会出现下面这种情况:
- +/- order_id order_detail_id order_detail_activity_id
-
- + 1001 1001-a null
- - null null null
- + 1001 1001-a a1
我们过滤 null 值指的是过滤上面操作是 '-' 的数据,因为回撤数据在 Kafka 中以 null 值的形式存在。而除了 null 值之外,我们还应该过滤掉旧的错误数据,由于 order_detail_activity 数据来得晚一些,导致flink 直接给字段 order_detail_activity_id 一个 null,所以我们应该把这个字段值删除;
但是,对于这个需求(求支付成功的用户数),其实我们也可以不做去重,放到最后再做去重,为什么呢?设想如果一个用户下了多个订单,而我们的支付成功表的粒度是商品,所以数据即使在 left join 之后对相同 order_detail_id 的数据做了去重,但是多个订单的话最终还有重复。
考虑到之后还可能遇到需要去重的需求(尤其是设计到金额的),这里我们还是练习一下如何实现去重:
- create table if not exists dws_trade_payment_suc_window
- (
- stt DateTime,
- edt DateTime,
- payment_suc_unique_user_count UInt64,
- payment_new_user_count UInt64,
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt);
- import lombok.AllArgsConstructor;
- import lombok.Data;
-
- @Data
- @AllArgsConstructor
- public class TradePaymentWindowBean {
- // 窗口起始时间
- String stt;
-
- // 窗口终止时间
- String edt;
-
- // 支付成功独立用户数
- Long paymentSucUniqueUserCount;
-
- // 支付成功新用户数
- Long paymentSucNewUserCount;
-
- // 时间戳
- Long ts;
- }

为了去重,我们需要对每一条数据都设置一个时间,因为对于重复数据,它们在原始表中的时间字段值都是一样的。
FlinkSQL 提供了几个可以获取当前时间戳的函数
这里,我们使用current_row_timestamp 来作为时间,我们需要给订单预处理表中添加:
- current_row_timestamp() as row_op_ts
-
- -- 在建表语句中添加
- row_op_ts TIMESTAMP_LTZ(3)
那么,下单事务事实表来源于订单预处理表,支付成功事务事实表依赖于下单事务事实表,搜易当然也应该添加该字段。
- import java.util.Comparator;
-
- public class TimestampLtz3CompareUtil {
-
- public static int compare(String timestamp1, String timestamp2) {
- // 数据格式 2022-04-01 10:20:47.302Z
- // 1. 去除末尾的时区标志,'Z' 表示 0 时区
- String cleanedTime1 = timestamp1.substring(0, timestamp1.length() - 1);
- String cleanedTime2 = timestamp2.substring(0, timestamp2.length() - 1);
- // 2. 提取小于 1秒的部分
- String[] timeArr1 = cleanedTime1.split("\\.");
- String[] timeArr2 = cleanedTime2.split("\\.");
- String microseconds1 = new StringBuilder(timeArr1[timeArr1.length - 1])
- .append("000").toString().substring(0, 3);
- String microseconds2 = new StringBuilder(timeArr2[timeArr2.length - 1])
- .append("000").toString().substring(0, 3);
- int micro1 = Integer.parseInt(microseconds1);
- int micro2 = Integer.parseInt(microseconds2);
- // 3. 提取 yyyy-MM-dd HH:mm:ss 的部分
- String date1 = timeArr1[0];
- String date2 = timeArr2[0];
- Long ts1 = DateFormatUtil.toTs(date1, true);
- Long ts2 = DateFormatUtil.toTs(date2, true);
- // 4. 获得精确到毫秒的时间戳
- long microTs1 = ts1 * 1000 + micro1;
- long microTs2 = ts2 * 1000 + micro2;
-
- long divTs = microTs1 - microTs2;
-
- return divTs < 0 ? -1 : divTs == 0 ? 0 : 1;
- }
-
- public static void main(String[] args) {
- System.out.println(compare("2022-04-01 11:10:55.040Z",
- "2022-04-01 11:10:55.04Z"));
- }
- }

读取DWD支付成功事务事实表并转为 JSON 格式,然后按照订单明细id进行分组(为了对回撤流的数据进行去重,根据相同明细id的时间进行判断)
- // TODO 3. 读取 dwd_trade_pay_detail_suc 的数据
- String groupId = "dws_trade_payment_suc_window";
- DataStreamSource<String> paymentSucDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_pay_detail_suc", groupId));
-
- // TODO 4. 将数据转为JSON格式
- SingleOutputStreamOperator<JSONObject> jsonDS = paymentSucDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
- @Override
- public void flatMap(String value, Collector<JSONObject> out) throws Exception {
- try {
- JSONObject jsonObject = JSONObject.parseObject(value);
- out.collect(jsonObject);
- } catch (Exception e) {
- // 可以选择输出到侧输出流
- e.printStackTrace();
- }
- }
- });
-
- // TODO 5. 按照订单明细id分组
- KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("order_detail_id"));

这里的回撤流是因为支付成功事务事实表需要用 订单明细 innner join 订单表 left join 订单明细活动 left join 订单明细活动造成的;
这里的定时器我们设置为 5 s,因为上游订单预处理表在生成的时候需要 join,我们当时设置的就是 5s,所以理论上 5s 之后,订单明细活动表和订单明细优惠券表的 left join 也应该完成了,这个状态就没必要保留了。
- // TODO 6. 使用状态编程过滤最新数据输出(需要使用状态和定时器所以使用 process)
- SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
-
- private ValueState<JSONObject> lastPaySucDateState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- lastPaySucDateState = getRuntimeContext().getState(new ValueStateDescriptor<>("last-pay-suc", JSONObject.class));
- }
-
- @Override
- public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
- JSONObject state = lastPaySucDateState.value();
- if (state == null) {
- lastPaySucDateState.update(value);
- // 注册定时器
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentProcessingTime() + 5000L);
- } else {
- String stateRt = state.getString("row_op_ts");
- String curRt = value.getString("row_op_ts");
- int compare = TimestampLtz3CompareUtil.compare(stateRt, curRt);
-
- if (compare != 1) { // 状态里的时间小
- lastPaySucDateState.update(value);
- }
- }
- }
-
- @Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<JSONObject> out) throws Exception {
- super.onTimer(timestamp, ctx, out);
- // 输出并清空状态数据
- JSONObject value = lastPaySucDateState.value();
- out.collect(value);
-
- lastPaySucDateState.clear();
- }
- });

这里选择 callback_time ,它是支付成功后的回调时间;
- // TODO 7. 提取事件时间生成水位线
- SingleOutputStreamOperator<JSONObject> jsonWithWmDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
- @Override
- public long extractTimestamp(JSONObject element, long recordTimestamp) {
- return DateFormatUtil.toTs(element.getString("callback_time"), true);
- }
- }));
- // TODO 8. 按照 user_id 分组
- KeyedStream<JSONObject, String> keyedByUidDS = jsonWithWmDS.keyBy(json -> json.getString("user_id"));
-
- // TODO 9. 提取独立支付成功用户数和首次支付成功用户数
- SingleOutputStreamOperator<TradePaymentWindowBean> tradePaymentDS = keyedByUidDS.flatMap(new RichFlatMapFunction<JSONObject, TradePaymentWindowBean>() {
-
- private ValueState<String> lastDtState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- lastDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastDt", String.class));
- }
-
- @Override
- public void flatMap(JSONObject value, Collector<TradePaymentWindowBean> out) throws Exception {
- String lastDt = lastDtState.value();
- String curDt = value.getString("callback_time").split(" ")[0];
-
- // 当日支付人数
- long pay = 0L;
- // 首次支付人数
- long newPay = 0L;
-
- // 判断状态是否为null
- if (lastDt == null) {
- pay = 1;
- newPay = 1;
- lastDtState.update(curDt);
- } else if (!lastDt.equals(curDt)) {
- pay = 1;
- lastDtState.update(curDt);
- }
-
- // 写出
- if (pay == 1) {
- out.collect(new TradePaymentWindowBean("", "", newPay, pay, DateFormatUtil.toTs(curDt)));
- }
- }
- });

开窗是为了实时刷新到报表,聚合依然是那两个函数:增量聚合(聚合结果),全量聚合(补充窗口起止字段);
- // TODO 10. 开窗,聚合
- SingleOutputStreamOperator<TradePaymentWindowBean> resultDS = tradePaymentDS.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
- .reduce(new ReduceFunction<TradePaymentWindowBean>() {
- @Override
- public TradePaymentWindowBean reduce(TradePaymentWindowBean value1, TradePaymentWindowBean value2) throws Exception {
- value1.setPaymentSucNewUserCount(value1.getPaymentSucNewUserCount() + value2.getPaymentSucNewUserCount());
- value1.setPaymentSucUniqueUserCount(value1.getPaymentSucUniqueUserCount() + value2.getPaymentSucUniqueUserCount());
- return value1;
- }
- }, new AllWindowFunction<TradePaymentWindowBean, TradePaymentWindowBean, TimeWindow>() {
- @Override
- public void apply(TimeWindow window, Iterable<TradePaymentWindowBean> values, Collector<TradePaymentWindowBean> out) throws Exception {
- TradePaymentWindowBean next = values.iterator().next();
- next.setTs(System.currentTimeMillis());
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- out.collect(next);
- }
- });
-
- // TODO 11. 写出到 clickhouse
- resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_payment_suc_window values(?,?,?,?,?)"));
-
- // TODO 12. 启动任务
- env.execute("DwsTradePaymentSucWindow");

今天的 DWS 层到此为止,剩下了还有几个需求估计还得 1~2 天完成,这一块要比之前都难一些,争取这周日前把实时数仓完结;然后下周开始把离线和实时再好好复习一遍;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。