赞
踩
今天立秋,任务是完成 DWS 剩余的表,不知道今天能不能做完,欲速则不达,学不完就明天继续,尽量搞懂每一个需求;
任务:从 Kafka 订单明细主题读取数据,对数据去重,统计当日下单独立用户数和新增下单用户数,封装为实体类,写入 ClickHouse。
首先,现在 DWS 这张表依赖于下单事务事实表,而下单事务事实表又依赖于订单预处理表,订单预处理表正是我们所讨论的会发生数据迟到的表(因为订单明细活动表和订单明细优惠券表要和主表进行 left join);
这个需求和昨天的交易域支付各窗口汇总表几乎一样,只不过这次我们不再对数据做去重了,因为上游订单明细活动表和订单明细优惠券表 left join 迟到不会影响我们表中的字段数据(表中字段的数据全部来自订单明细),所以下面我们在对 order_detail_id 分组后,我们只需要等到第一条数据来(不管乱序不乱序,因为我们要的字段在主表里,而撤回流写入 kafka 的null我们可以在分组前转 json 流的时候直接过滤掉),后面的数据直接丢弃即可(都是 left join 活动和优惠券的数据,不需要);
-
- create table if not exists dws_trade_order_window
- (
- stt DateTime,
- edt DateTime,
- order_unique_user_count UInt64,
- order_new_user_count UInt64,
- order_activity_reduce_amount Decimal(38, 20),
- order_coupon_reduce_amount Decimal(38, 20),
- order_origin_total_amount Decimal(38, 20),
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt);
-
-

表中的 order_activity_reduce_amount 和 order_coupon_reduce_amount 字段并不需要订单明细活动表和订单明细优惠券表参与,因为 order_detail 和 order_info 表中都有关于优惠券和活动减免金额的字段;所以我们不需要考虑去重,因为对于这个需求,并不需要担心因为 订单明细活动表和订单明细优惠券表 的 left join 数据迟到,它俩迟到对这张表字段并不影响;
- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
-
- @Data
- @AllArgsConstructor
- @Builder
- public class TradeOrderBean {
- // 窗口起始时间
- String stt;
-
- // 窗口关闭时间
- String edt;
-
- // 下单独立用户数
- Long orderUniqueUserCount;
-
- // 下单新用户数
- Long orderNewUserCount;
-
- // 下单活动减免金额
- Double orderActivityReduceAmount;
-
- // 下单优惠券减免金额
- Double orderCouponReduceAmount;
-
- // 下单原始金额
- Double orderOriginalTotalAmount;
-
- // 时间戳
- Long ts;
- }

- // TODO 2. 读取 kafka dwd_trade_order_detail
- String groupId = "dws_trade_order_window";
- DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));
-
- // TODO 3. 转为 JSON 对象
- SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new FlatMapFunction<String, JSONObject>() {
- @Override
- public void flatMap(String value, Collector<JSONObject> out) throws Exception {
- try {
- JSONObject jsonObject = JSONObject.parseObject(value);
- out.collect(jsonObject);
- } catch (Exception e) {
- // 可以选择输出到侧输出流
- e.printStackTrace();
- }
- }
- });

去重一共分为两步,第一步按照最细粒度 order_detail_id 进行,但是我们要求的是用户数,所以其实这里这一步只是为了多练习一下数据有撤回流时数据重复以及完整性的问题;
这里用的富函数版本的 filter 算子,因为涉及状态、ttl(上下午)等
- // TODO 4. 第一次去重(根据 order_detail_id 进行分组)
- KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));
-
- // TODO 5. 针对 order_detail_id 进行去重(保留第一条数据即可,因为要使用状态编程所以使用Rich)
- SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {
-
- // 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)
- private ValueState<String> state;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build();
-
- ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);
- stateDescriptor.enableTimeToLive(ttlConfig);
- state = getRuntimeContext().getState(stateDescriptor);
- }
-
- @Override
- public boolean filter(JSONObject value) throws Exception {
- String data = state.value();
- if (data == null) {
- state.update("1"); // 随便存就行
- return true;
- }
- return false;
- }
- });

这里依然选择提取 create_time 字段,更贴近事件时间;
这里的定时器可以设置 5s 或者稍微久一点,因为我们上游订单预处理表在生成的过程中需要 join,我们当时给了 5s,也就是说我们能保证 5s 数据就能完整,所以这里的状态保存 5 s我们就能保证数据肯定是完整的,订单明细活动表和订单明细优惠券表的 left join 肯定已经完成了,不会有迟到数据了。而事实上,当第一条数据来的时候我们就会输出,因为我们所需要的字段数据是在 order_detail 中的,是订单预处理表的主表(订单明细表),而我们这里的下单事务事实表又是直接依赖于订单预处理表,所以订单明细活动表和订单明细优惠券表 的 left join 即使迟到,也不会受影响。
- // TODO 6. 提取事件时间生成 watermark
- SingleOutputStreamOperator<JSONObject> jsonWithWmDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
- @Override
- public long extractTimestamp(JSONObject element, long recordTimestamp) {
- return DateFormatUtil.toTs(element.getString("create_time"));
- }
- }));
- // TODO 7. 按照 user_id 分组
- KeyedStream<JSONObject, String> keyedByUidDS = jsonWithWmDS.keyBy(json -> json.getString("user_id"));
-
- // TODO 8. 提取独立下单用户
- SingleOutputStreamOperator<TradeOrderBean> tradeOrderDS = keyedByUidDS.map(new RichMapFunction<JSONObject, TradeOrderBean>() {
-
- private ValueState<String> lastOrderDtState;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- lastOrderDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-order-dt", String.class));
- }
-
- @Override
- public TradeOrderBean map(JSONObject value) throws Exception {
- String lastOrderDt = lastOrderDtState.value();
- String curDt = value.getString("create_time").split(" ")[0];
-
- // 下单独立用户数
- long orderUniqueUserCount = 0;
- // 下单新用户数
- long orderNewUserCount = 0;
- if (lastOrderDt == null) {
- orderUniqueUserCount = 1L;
- orderNewUserCount = 1L;
- lastOrderDtState.update(curDt);
- } else if (!lastOrderDt.equals(curDt)) {
- orderUniqueUserCount = 1L;
- lastOrderDtState.update(curDt);
- }
-
- // 取出下单件数和单价
- Integer sku_num = value.getInteger("sku_num");
- Double order_price = value.getDouble("order_price");
-
- return new TradeOrderBean("", "",
- orderUniqueUserCount,
- orderNewUserCount,
- value.getDouble("split_activity_amount"),
- value.getDouble("split_coupon_amount"),
- sku_num * order_price,
- null); // ts 后面开窗的时候都会给当前时间,到时候再补充
- }
- });

开窗为了时效性,聚合是因为我们的指标是一天的统计量,所以对一天的数据需要进行聚合;同样使用增量聚合函数 + 全量聚合函数来补全字段;
- // TODO 9. 开窗聚合
- SingleOutputStreamOperator<TradeOrderBean> resultDS = tradeOrderDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
- .reduce(new ReduceFunction<TradeOrderBean>() {
- @Override
- public TradeOrderBean reduce(TradeOrderBean value1, TradeOrderBean value2) throws Exception {
- value1.setOrderUniqueUserCount(value1.getOrderUniqueUserCount() + value2.getOrderUniqueUserCount());
- value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());
- value1.setOrderOriginalTotalAmount(value1.getOrderOriginalTotalAmount() + value2.getOrderOriginalTotalAmount());
- value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());
- value1.setOrderCouponReduceAmount(value1.getOrderCouponReduceAmount() + value2.getOrderCouponReduceAmount());
- value1.setOrderActivityReduceAmount(value1.getOrderActivityReduceAmount() + value2.getOrderActivityReduceAmount());
- return value1;
- }
- }, new AllWindowFunction<TradeOrderBean, TradeOrderBean, TimeWindow>() {
- @Override
- public void apply(TimeWindow window, Iterable<TradeOrderBean> values, Collector<TradeOrderBean> out) throws Exception {
- TradeOrderBean next = values.iterator().next();
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- next.setTs(System.currentTimeMillis());
- out.collect(next);
- }
- });
-
- // TODO 10. 写入 clickhouse
- resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_order_window values(?,?,?,?,?,?,?,?)"));
-
- // TODO 11. 启动任务
- env.execute("DwsTradeOrderWindow");

这应该是目前为止最难的一个需求了,从 Kafka 订单明细主题读取数据(dwd_trade_order_detail),过滤 null 数据(因为下单事务事实表依赖于的订单预处理表,而这张表的生成需要 left join 活动和优惠券表,所以会形成回撤流)并按照唯一键对数据去重,关联维度信息(因为 order_info 和 order_detail 中都没有 spu 信息),按照维度(user,spu,trademark,category)分组,统计各维度各窗口的订单数和订单金额,将数据写入 ClickHouse 交易域品牌-品类-用户-SPU粒度下单各窗口汇总表;
可以看到,我们的建表语句中不仅保留了我们需求中要求的用户和sku_id,我们还同时保留了品牌、品类等字段,为的是之后可以扩展更多的需求而不用再去创建;其实我们设置可以将这个需求的粒度再做小一点,做到用户-sku粒度,同时保留 spu信息,这样我们就可以实现更多细粒度的需求了;
我们需要关联 6 张维表:首先根据 sku_id 去读取 sku_info ,得到该 sku_id 的 spu_id、3个category_id以及trademark_id ,然后再去分别关联这 5 张表(通过 category3 去关联 category2,再用category2去获取category1);
- create table if not exists dws_trade_user_spu_order_window
- (
- stt DateTime,
- edt DateTime,
- trademark_id String,
- trademark_name String,
- category1_id String,
- category1_name String,
- category2_id String,
- category2_name String,
- category3_id String,
- category3_name String,
- user_id String,
- spu_id String,
- spu_name String,
- order_count UInt64,
- order_amount Decimal(38, 20),
- ts UInt64
- ) engine = ReplacingMergeTree(ts)
- partition by toYYYYMMDD(stt)
- order by (stt, edt, spu_id, spu_name, user_id);

我们在创建该表的 Java Bean 的时候需要额外补充一些字段——sku_id 和 一个 Set 类型的 orderset,因为没有 sku_id 字段就无法关联任一维表,而 order_set 是我们将来做去重的重要依据;
- import lombok.AllArgsConstructor;
- import lombok.Builder;
- import lombok.Data;
-
- import java.util.Set;
-
- @Data
- @AllArgsConstructor
- @Builder
- public class TradeUserSpuOrderBean {
- // 窗口起始时间
- String stt;
- // 窗口结束时间
- String edt;
- // 品牌 ID
- String trademarkId;
- // 品牌名称
- String trademarkName;
- // 一级品类 ID
- String category1Id;
- // 一级品类名称
- String category1Name;
- // 二级品类 ID
- String category2Id;
- // 二级品类名称
- String category2Name;
- // 三级品类 ID
- String category3Id;
- // 三级品类名称
- String category3Name;
-
- // 订单 ID
- @TransientSink
- Set<String> orderIdSet;
-
- // sku_id
- @TransientSink
- String skuId;
-
- // 用户 ID
- String userId;
- // spu_id
- String spuId;
- // spu 名称
- String spuName;
- // 下单次数
- Long orderCount;
- // 下单金额
- Double orderAmount;
- // 时间戳
- Long ts;
- }

在上面的 Java Bean 中我们用到了构造者模式,因为我们在把数据转为 JavaBean 类型的数据流的时候,很多字段需要关联维表才能得到,而初始化构造太烦人了,所以我们通过构造者模式来不断丰富对象的属性值;
表中的 order_count 字段是指对某一品类商品下单的次数,也就是指用户在不同 order_id 中下单相同的 spu 数;而用户在一个 order_id 中下单的多个相同 spu 是只能计数为 1 的;
所以考虑到一个 order_id 中的 spu_id 可能重复:
而且 sku_id 也可能发生重复(可能一个订单中的一个商品买了多件,虽然 order_id 和 sku_id 肯定相同,但是每件用了不同的券,有的平台会视为一条(sku_id相同,sku_num=2),但是有的平台可能会视为两条sku_id相同,sku_num=1的数据)。
所以我们必须做去重,那应该怎么去重?
为什么我们要在开窗聚合前后去关联两次维表呢?
这部分和上面的需求一样,直接拿来:
- // TODO 2. 读取 kafka dwd_trade_order_detail
- String groupId = "dws_trade_trademark_category_user_order_window";
- DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));
-
- // TODO 3. 转为 JSON 格式
- SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
- @Override
- public void flatMap(String value, Collector<JSONObject> out) throws Exception {
- try {
- JSONObject jsonObject = JSONObject.parseObject(value);
- out.collect(jsonObject);
- } catch (Exception e) {
- // 可以选择输出到侧输出流
- e.printStackTrace();
- }
- }
- });
-
- // TODO 4. 第一次去重(根据 order_detail_id 进行分组)
- KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));
-
- // TODO 5. 针对 order_detail_id 去重上游 left join 导致的重复数据(保留第一条数据即可,因为要使用状态编程所以使用Rich)
- SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {
-
- // 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)
- private ValueState<String> state;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
- .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
- .build();
-
- ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);
- stateDescriptor.enableTimeToLive(ttlConfig);
- state = getRuntimeContext().getState(stateDescriptor);
- }
-
- @Override
- public boolean filter(JSONObject value) throws Exception {
- String data = state.value();
- if (data == null) {
- state.update("1"); // 随便存就行
- return true;
- }
- return false;
- }
- });

- // TODO 6. 将流转为 JavaBean 类型
- SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuDS = filterDS.map(line -> {
- HashSet<String> orderIds = new HashSet<>();
- orderIds.add(line.getString("order_id"));
- return TradeUserSpuOrderBean.builder()
- .skuId(line.getString("sku_id"))
- .userId(line.getString("user_id"))
- .orderAmount(line.getDouble("split_total_amount"))
- .orderIdSet(orderIds)
- .ts(DateFormatUtil.toTs(line.getString("create_time"), true))
- .build();
- }
- );
之后我们要将数据流和维表进行关联,那我们就必须去 Phoenix 查询数据,所以我们这里封装一个工具类:
- /**
- * 适用于任何 JDBC 方式访问的数据库中的任何查询语句
- */
- public class JdbcUtil {
-
- /**
- * @param connection 连接对象
- * @param sql sql语句
- * @param clz 返回类型对象
- * @param underScoreToCamel 是否下划线转为驼峰
- * @param <T> 返回类型
- * @return 结果集列表
- */
- public static <T> List<T> queryList(Connection connection, String sql, Class<T> clz, boolean underScoreToCamel) throws SQLException, InstantiationException, IllegalAccessException, InvocationTargetException {
- // 创建集合用于存放结果
- ArrayList<T> result = new ArrayList<>();
-
- // 预编译 SQL
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
-
- // 执行查询
- ResultSet resultSet = preparedStatement.executeQuery();
-
- // 获取查询的元数据信息
- ResultSetMetaData metaData = resultSet.getMetaData();
- int columnCount = metaData.getColumnCount();
-
- // 遍历结果集
- while (resultSet.next()) {
- T t = clz.newInstance();
-
- for (int i = 0; i < columnCount; i++) {
- // 获取列名和列值
- String columnName = metaData.getColumnName(i);
- Object value = resultSet.getObject(columnName);
-
- // 判断是否需要进行下划线与驼峰命名的转换
- if (underScoreToCamel) {
- columnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName.toLowerCase());
- }
-
- // 赋值
- BeanUtils.setProperty(t, columnName, value);
- }
-
- resultSet.close();
- preparedStatement.close();
-
- result.add(t);
- }
- return result;
- }
- }

- public class DimUtil {
-
- public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {
- // 拼接 SQL
- String querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";
- // 查询数据
- List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);
- // 返回结果
- return queryList.get(0);
- }
-
- }
现在有个问题是:Phoenix 查询数据的速度每条平均 7 ms 左右,这显然不能满足大数据的场景,关于 HBase 的查询数据流程我们很熟悉:
所以我们说 HBase 的连接是一个重量级的连接,过程很长很繁琐,那我们就得考虑怎么优化这个查询速度了;
为了提高查询效率,我们需要对数据做一个缓存,但是并不是把所有维表缓存起来,那样就没有必要把海量数据存到 HBase 了,我们只缓存热点数据;
旁路缓存注意事项:
缓存要设过期时间,不然冷数据会常驻缓存,浪费资源
要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存(在写入 Phoenix 前判断 Maxwell 传过来的数据中 type 字段是否为 update,如果是删除缓存中对应的数据)
关于 Redis 的表设计:
- public class JedisUtil {
- private static JedisPool jedisPool;
-
- private static void initJedisPool() {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- poolConfig.setMaxTotal(100);
- poolConfig.setMaxIdle(5);
- poolConfig.setMinIdle(5);
- poolConfig.setBlockWhenExhausted(true);
- poolConfig.setMaxWaitMillis(2000);
- poolConfig.setTestOnBorrow(true);
- jedisPool = new JedisPool(poolConfig, "hadoop102", 6379, 10000);
- }
-
- public static Jedis getJedis() {
- if (jedisPool == null) {
- initJedisPool();
- }
- // 获取Jedis客户端
- Jedis jedis = jedisPool.getResource();
- return jedis;
- }
-
- public static void main(String[] args) {
- Jedis jedis = getJedis();
- String pong = jedis.ping();
- System.out.println(pong);
- }
-
- }

重写 DimUtil:
- public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {
- // 先查询 Redis
- Jedis jedis = JedisUtil.getJedis();
- String redisKey = "DIM:"+tableName+":"+key;
- String dimJsonStr = jedis.get(redisKey);
- if (dimJsonStr != null){
- // 重置过期时间
- jedis.expire(redisKey,24 * 60 * 60);
- // 归还连接
- jedis.close();
- // 返回维表数据
- return JSONObject.parseObject(dimJsonStr);
- }
-
- // 拼接 SQL
- String querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";
- // 查询数据
- List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);
-
- // 写入到 Redis
- JSONObject dimInfo = queryList.get(0);
- jedis.set(redisKey,dimInfo.toJSONString());
- // 设置过期时间
- jedis.expire(redisKey,24 * 60 * 60);
- // 归还连接
- jedis.close();
-
- // 返回结果
- return dimInfo;
- }
-
- public static void deleteDimInfo(String tableName,String key){
- // 获取连接
- Jedis jedis = JedisUtil.getJedis();
- // 删除数据
- jedis.del("DIM"+tableName+":"+key);
- // 归还连接
- jedis.close();
- }

修改 DimSinkFunction 的 invoke 方法(如果是 update 数据就去删除旧的缓存) :
- @Override
- public void invoke(JSONObject value, Context context) throws Exception {
- // 获取连接
- DruidPooledConnection connection = druidDataSource.getConnection();
- // 写出数据(需要知道写出的表名、字段)
- String sinkTable = value.getString("sinkTable");
- JSONObject data = value.getJSONObject("data");
-
- // 获取数据类型
- String type = value.getString("type");
- if (type!=null && type.equals("update")) {
- // Phoenix 都是大写
- DimUtil.deleteDimInfo(sinkTable.toUpperCase(),data.getString("id"));
- }
-
- // 如果插入数据失败 invoke 方法抛出的 Exception 会导致程序停止
- PhoenixUtil.upsertValues(connection,sinkTable,data);
- // 归还连接
- connection.close();
- }

在Flink 流处理过程中,经常需要和外部系统进行交互,如通过维度表补全事实表中的维度字段。
默认情况下,在Flink 算子中,单个并行子任务只能以同步方式与外部系统交互:将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种方式将大量时间耗费在了等待结果上。
为了提高处理效率,可以有两种思路。
(1)增加算子的并行度,但需要耗费更多的资源。
(2)异步 IO。
Flink 在1.2中引入了Async I/O,将IO操作异步化。在异步模式下,单个并行子任务可以连续发送多个请求,按照返回的先后顺序对请求进行处理,发送请求后不需要阻塞式等待,省去了大量的等待时间,大幅提高了流处理效率,解决了与外部系统交互时网络延迟成为系统瓶颈的问题。
异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,因此单个并行子任务可以连续发送多个请求,从而提高并发效率。对于涉及网络IO的操作,可以显著减少因为请求等待带来的性能损耗。
构建一个线程池工具类:
- public class ThreadPoolUtil {
-
- private static ThreadPoolExecutor threadPoolExecutor;
-
- private ThreadPoolUtil(){
-
- }
-
- // 单例模式
- public ThreadPoolExecutor getThreadPoolExecutor() {
- if (threadPoolExecutor == null){
- synchronized (ThreadPoolUtil.class){
- if (threadPoolExecutor == null){
- threadPoolExecutor = new ThreadPoolExecutor(4,
- 20,
- 100,
- TimeUnit.SECONDS,
- new LinkedBlockingDeque<>());
- }
- }
- }
- return threadPoolExecutor;
- }
- }

这个异步函数类我们使用了泛型,为了方便扩展之后的其它需求(不同流的数据类型不一样),并且添加了 2 个抽象方法:
在异步函数中我们来创建 Phoenix 客户端线程去读取数据;
- public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> {
-
- private DruidDataSource dataSource;
- private ThreadPoolExecutor threadPoolExecutor;
-
- private String tableName;
-
- public DimAsyncFunction(){
-
- }
-
- public DimAsyncFunction(String tableName) {
- this.tableName = tableName;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // 创建 phoenix 连接池
- dataSource = DruidDSUtil.createDataSource();
- threadPoolExecutor = ThreadPoolUtil.getThreadPoolExecutor();
- }
-
- @Override
- public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {
-
- threadPoolExecutor.execute(new Runnable() {
- @Override
- public void run() {
- // 获取连接
- DruidPooledConnection connection = null;
- try {
- connection = dataSource.getConnection();
- // 查询维表 获取维度信息
- String key = getKey(input);
- JSONObject dimInfo = DimUtil.getDimInfo(connection, tableName, key);
- // 将维度信息补充至当前数据
- if (dimInfo != null){
- addAttribute(input,dimInfo);
- }
- // 归还连接
- connection.close();
- // 将结果写出
- resultFuture.complete(Collections.singletonList(input));
- } catch (Exception e) {
- System.out.println("关联维表失败: "+input+","+tableName);
- e.printStackTrace();
- }
- }
- });
-
- }
-
- @Override
- public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
- System.out.println("timeout"+input);
- }
-
- public abstract String getKey(T input);
- public abstract void addAttribute(T pojo,JSONObject dimInfo);
- }

我们的关联分为两步:
- // TODO 7. 关联 sku_info 维表补充 spu_id、tm_id、category3_id
- SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithSkuDS = AsyncDataStream.unorderedWait(tradeUserSpuDS,
- new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SKU_INFO") {
- @Override
- public String getKey(TradeUserSpuOrderBean input) {
- return input.getSkuId();
- }
-
- @Override
- public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
- pojo.setSpuId(dimInfo.getString("SPU_ID"));
- pojo.setTrademarkId(dimInfo.getString("TM_ID"));
- pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));
- }
- },
- 100,
- TimeUnit.SECONDS);

- // TODO 8. 提取事件时间并生成 watermark
- SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithWmDS = tradeUserSpuWithSkuDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeUserSpuOrderBean>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- .withTimestampAssigner(new SerializableTimestampAssigner<TradeUserSpuOrderBean>() {
- @Override
- public long extractTimestamp(TradeUserSpuOrderBean element, long recordTimestamp) {
- return element.getTs();
- }
- }));
按照粒度分组,之后增量聚合把相同粒度的数据的订单总数和订单金额进行累加,在全量聚合中再补充窗口起止时间、ck版本时间戳、订单总数等字段;
- // TODO 9. 分组开窗聚合
- KeyedStream<TradeUserSpuOrderBean, Tuple4<String, String, String, String>> keyStream = tradeUserSpuWithWmDS.keyBy(line -> Tuple4.of(line.getSpuId(), line.getCategory3Id(), line.getTrademarkId(), line.getUserId()));
- SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceDS = keyStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
- .reduce(new ReduceFunction<TradeUserSpuOrderBean>() {
- @Override
- public TradeUserSpuOrderBean reduce(TradeUserSpuOrderBean value1, TradeUserSpuOrderBean value2) throws Exception {
- value1.getOrderIdSet().addAll(value2.getOrderIdSet());
- value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());
- return value1;
- }
- }, new WindowFunction<TradeUserSpuOrderBean, TradeUserSpuOrderBean, Tuple4<String, String, String, String>, TimeWindow>() {
- @Override
- public void apply(Tuple4<String, String, String, String> stringStringStringStringTuple4, TimeWindow window, Iterable<TradeUserSpuOrderBean> input, Collector<TradeUserSpuOrderBean> out) throws Exception {
- TradeUserSpuOrderBean next = input.iterator().next();
-
- next.setTs(System.currentTimeMillis());
- next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
- next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
- next.setOrderCount((long) next.getOrderIdSet().size());
-
- out.collect(next);
- }
- });

- // TODO 10. 关联 spu、tm、category 维表的 name 字段以及 category2 和 category1 的 id 和 name 字段
- // TODO 10.1 关联 Spu
- SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithSpuDS = AsyncDataStream.unorderedWait(reduceDS,
- new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SPU_INFO") {
- @Override
- public String getKey(TradeUserSpuOrderBean input) {
- return input.getSpuId();
- }
-
- @Override
- public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
- pojo.setSpuName(dimInfo.getString("SPU_NAME"));
- }
- },
- 100, TimeUnit.SECONDS);
-
- // TODO 10.2 关联 tm
- SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceWithSpuDS,
- new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_TRADEMARK") {
- @Override
- public String getKey(TradeUserSpuOrderBean input) {
- return input.getTrademarkId();
- }
-
- @Override
- public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
- pojo.setTrademarkName(dimInfo.getString("TM_NAME"));
- }
- },
- 100, TimeUnit.SECONDS);
- // TODO 10.3 关联 category3
- SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,
- new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY3") {
- @Override
- public String getKey(TradeUserSpuOrderBean input) {
- return input.getCategory3Id();
- }
-
- @Override
- public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
- pojo.setCategory3Name(dimInfo.getString("NAME"));
- pojo.setCategory2Id("CATEGORY2_ID");
- }
- },
- 100, TimeUnit.SECONDS);
- // TODO 10.4 关联 category2
- SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,
- new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY2") {
- @Override
- public String getKey(TradeUserSpuOrderBean input) {
- return input.getCategory2Id();
- }
-
- @Override
- public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
- pojo.setCategory2Name(dimInfo.getString("NAME"));
- pojo.setCategory1Id("CATEGORY1_ID");
- }
- },
- 100, TimeUnit.SECONDS);
- // TODO 10.5 关联 category1
- SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,
- new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY1") {
- @Override
- public String getKey(TradeUserSpuOrderBean input) {
- return input.getCategory1Id();
- }
-
- @Override
- public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
- pojo.setCategory1Name(dimInfo.getString("NAME"));
- }
- },
- 100, TimeUnit.SECONDS);
-
- // TODO 11. 写出到 clickhouse
- reduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_order_window values " +
- "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
-
- // TODO 12. 启动
- env.execute("DwsTradeUserSpuOrderWindow");

至此,DWS 层这两张表终于完成,耗时一天,第二张表是目前为止最复杂的一张,也是 DWS 层第一次关联维表;
DWS 层的需求只剩两个,明天应该一上午就能完成,毕竟和这个需求差不多;ADS 层就简单了,明天下午应该就能结束;
关于异步IO,我把链接放这,之后复习再来查看:异步 I/O | Apache Flink
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。