赞
踩
写这篇文目的是为了加深对窗口和 watermark 的理解。
先感谢这位博主的辛勤劳动。我做的分析就是基于这位大侠做的。
下面上正题。
窗口是用来切割无线流的,它把无线流切分成有限个碎片,通过计算碎片来计算流的某些性质。就像积分计算求球的体积。它将从球新到表面扇柱体是一个正方体,然后使用极限的思路,然后就计算出球体的体积。
从上图中,可以看到窗口的设计思路,也可以说成窗口的模型。
在这个模型里面,input stream 不停的发送数据给 WindowAssigner(窗口分配器) ,它负责将数据分配到个个窗口,根据分配的规则的不同,Flink 里面有下面几种窗口类型。
Trigger 是干啥的呢?它的作用决定是否触发窗口的计算,触发的规则有很多,有预定义的,也可以我们自定义,一般是 eventtime 的场景下需要用心考虑如何设置此值。
Evictor 是抛弃者,当某个窗口被触发后,Evictor 负责按照规则将窗口中的数据抛弃,留下需要的,通常的情况下,Evictor 的实现是不会抛弃任何数据的。
Evaluation Function 是定义计算公式的,它接收窗口中的数据,然后按照预定义的公式计算出结果,例如,求和、求平均、求最大值,求最小值。
弄清楚了 tumbling session count sliding 的功能,下面就从源码的角度弄清楚各窗口的 windown assigner、trigger 的不同。先来看 WindowAssigner 抽象类以及它下面的实现类。
先来理解一下 window assigner(WindowAssigner) 这个类。它负责将报文派发各自的窗口 。一条报文只能到一个窗口中去。
先来看看它有那些方法:
下面来具体的分析一下 WindowAssigner 的实现类。
第一个就是我们熟悉的 TumblingEventWindowAssigner ,然后关键的代码如下所示,
我们得到的结论:
假设,offset 先为 0,公式就变成了 start = timestamp - ( timestamp + winSize )%winSize
那么,公式就变成了:
start = n*winSize + m - ( n*winSize + m + winSize )%winSize = n*winSize + m - ( (n+1)/winSize + m )%winSize
按照 % 取余这种运算逻辑,( (n+1)/winSize + m )%winSize= m
,最后,上面的公式为:
start = n*winSize
,可见当没有 offset 的时候。start 是从整数开始的。举个例子,
1461756862000 这个是例子中的第一个 record 的时间戳。我们来看一下它是几点。我设置的 tumbling 时间是 3 s ,60%3 = 0 ,所以 3 是可以背 60 整除的,所以窗口的切割如下所示:
[ 0 , 3 ) , [ 3 , 6 ) , [ 6 , 9 ) ,[ 9 , 12 ), ... , [ n*3 , 3 + n*3 ]
, 其中,n >= 0 .
timestamp 的整数不是落到那个区间的开始位置,这个元素就在这个区间。
然后,如果 offset 不等于 0 呢?不等于 0 ,不等于 0 的话,根据公式的推导,如下所示
start = timestamp - ( timestamp + winSize )%winSize
= n*winSize + m - ( n*winSize + m - offset + winSize )%winSize
= n*winSize + m - ( (n+1)*winSize + m - offset )%winSize
= n*winSize + m - ( m - offset)
= n*winSize + offset
也就是如果我还是以 3 为窗口的大小,offset = 1 , 则得到的窗口划分如下所示:
[ 0 , 3 ) , [ 3 , 6 ) , [ 6 , 9 ) ,[ 9 , 12 ), ... , [ n*3 , 3 + n*3 ]
, 其中,n >= 0
.
当 offset < winSize 的时候,window 的不同区间为在各个区间的开始和结束哪里加 1 .
[ 1 , 4 ) , [ 4 , 7 ) , [ 7 , 10 ) ,[ 10 , 13 ), ... , [ 1 + n*3 , 4 + n*3 ]
, 其中,n >= 0
.
如果 offset > winSize ,则我们可以将 offset 分解为 offset = k*winSize + i ; i 为余数。则,根据上面的公式,假如 offset 为 4, 则 i = 1 ,k = 1 ,则 window 的不同分区为,
[ 4 , 7 ) , [ 7 , 10 ) , [ 10 , 13 ) ,[ 13 , 16 ), ... , [ 4 + n*3 , 7 + n*3 ]
, 其中,n >= 0 .
ok 先把的窗口的划分讲清楚了,接下来在讲讲 trigger 的时机。这就和 watermark 相关了。
TumblingProcessingTimeWindowAssiger 的规则和 event-time 是相同的,不同的是 timestamp 是服务器时间。也就根据 record 到达窗口的时间来划分,窗口,如果 winSize 还是 3。第一个 record 到达窗口的时间为 currrentTimeStamp , 那么下一个 3 秒钟,不管之后来多少个 record ,0 或者更多把,都会触发的。
指定窗口的代码片段为:
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
从代码中可以看到,也是使用了 TimeWindow.getWindowStartWithOffset 这个方法,只是这个返回的就不是一个窗口了,是多个窗口。这也是合理的,滑动窗口吗?每过一个时间长度,就统计前 n 个时间长度。下面我们来讨论一下,一个 record 要经理多少个窗口呢?
假设,窗口的大小是 N ,每次滑动的大小是 M ,那 滑动 N/M 次,窗口就不能覆盖到 record 了。所以在计算 record 的窗口的时候,先使用 TimeWindow.getWindowStartWithOffset 计算最后一次窗口的位置,使用滑动
上面的图最终的效果,下面的图才是,Flink 底层数据结构的划定 slide window 使用的数据结构。和算法。
EventTimeSessionWindows 这个类是继承自 MergingWindowAssigner 。实际上他就一个可以合并的窗口,所以这里我要重点的看看合并的逻辑。
EventTimeSessiongWindow 中的 mergeWindows 这个方法里面使用的是 TimeWindow.mergeWindow 这个方法所以重点看这个方法里面的逻辑。
先按照窗口的开始时间戳为 window 排个序。 Collections.sort(sortedWindows, new Comparator<TimeWindow>() { @Override public int compare(TimeWindow o1, TimeWindow o2) { return Long.compare(o1.getStart(), o2.getStart()); } }); // 这里开始合并窗口。 for (TimeWindow candidate: sortedWindows) { if (currentMerge == null) { currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); // currentMerge.f0.intersects(candidate) // intersects 这个方法判断 currentMerge 窗口是否包含 // candidate 这个窗口,具体的实现,请看下面的代码 } else if (currentMerge.f0.intersects(candidate)) { currentMerge.f0 = currentMerge.f0.cover(candidate); currentMerge.f1.add(candidate); } else { // 出现一个 gap 后,就讲 currentMerge 放如到 // merged list 里面。然后进入下一轮的合并。 merged.add(currentMerge); currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); } } // 最后再将最后一个合并好的窗口放如到 merged list 里面。 if (currentMerge != null) { merged.add(currentMerge); } // 最后,将 Tuple 里面 f1 和 f2 合并。 for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) { if (m.f1.size() > 1) { c.merge(m.f1, m.f0); } }
intersects 方法的具体实现如下所示:
public boolean intersects(TimeWindow other) {
return this.start <= other.end && this.end >= other.start;
}
cover 方法的具体逻辑:
public TimeWindow cover(TimeWindow other) {
return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
}
ProcessingTimeSessionAssigner 的逻辑和 event-time 相同。
这个是在 event-time assigner 的基础上,从 record 中取出 gap 值得,请看下面得代码:
public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
if (sessionTimeout <= 0) {
throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
}
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
sessionWindowTimeGapExtractor.extract 这个接口啊,和奇怪,我们没有找到它的实现类,接口对 extract 方法的描述如下所示:
Extracts the session time gap.
看这意思是是要,我们程序员来指定在哪里取出 gap 了。
DynamicProcessingTimeSessionWindows assigner ,和 event-time 的逻辑是一样的。
这个逻辑简单,就是所有的 record 都放 GlobalWindow 这个里面来放。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。