赞
踩
在Flink中,处理时间序列数据时,通常需要考虑事件时间和水印(watermarks)的处理。以下是修改前后的代码对比分析:
- val systemDS = unitDS.map(dp => {
- dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))
- dp
- }).keyBy(_.getDeviceCode)
- .window(TumblingEventTimeWindows.of(Time.seconds(60)))
- .process(new MySystemWinF)
unitDS
经过一个 map
操作,将每个元素的 deviceCode
转换为系统设备码。keyBy(_.getDeviceCode)
对转换后的设备码进行分组。process
操作应用自定义的窗口函数 HPageSystemWinF
来处理每个窗口中的数据。注意:修改前的代码没有显示地处理水印(watermarks),这可能导致在处理乱序数据或延迟数据时出现问题。
- val systemDS = unitDS.map(dp => {
- dp.setDeviceCode(DeviceCodeEnum.fromPidToSystem(dp.getDeviceCode))
- dp
- }).keyBy(_.getDeviceCode)
- .assignTimestampsAndWatermarks(
- WatermarkStrategy
- .<boundedOutOfOrdernessDaysPower>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 假设这里应该是.forBoundedOutOfOrderness而不是.forBoundedOutOfOrdernessDaysPower
- .withIdleness(Duration.ofSeconds(5))
- .withTimestampAssigner(new SerializableTimestampAssigner[DaysPower] {
- override def extractTimestamp(element: DaysPower, recordTimestamp: Long): Long = {
- Math.max(element.getEventTime, recordTimestamp)
- }
- })
- ).keyBy(_.getDeviceCode)
- .window(TumblingEventTimeWindows.of(Time.seconds(60)))
- .process(new MySystemWinF)
map
, keyBy
, 和 window
操作。assignTimestampsAndWatermarks
方法来处理事件时间和水印:
WatermarkStrategy.forBoundedOutOfOrderness
允许一定程度的乱序数据(这里是5秒)。.withIdleness(Duration.ofSeconds(5))
设置了空闲超时时间为5秒,用于处理不活跃的键。withTimestampAssigner
自定义了时间戳分配器,确保使用的事件时间是元素中的 eventTime
和记录的 recordTimestamp
中的较大值。赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。