赞
踩
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 【把窗口理解成一个“桶”,Flink则可以把流切割成大小有限的“储存桶”,把数据分发到不同的桶里,每一个窗口都是一个桶。当窗口结束,就对每一个桶的数据进行收集处理】
原理:建立一个窗口,在固定的额时间段内不断收集数据,到达结束时间的时候窗口结束收集数据,生成结果,窗口销毁。【就像地铁一样,间隔一段时间发车,无论车上有多少乘客,地铁都会往前开】
原理:计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。基本思路是“人齐发车”
主要概念:窗口的大小,窗口的滑动步长【两个窗口重叠的部分】,会话间隔
滚动窗口有固定的大小,而且窗口之间不会重叠,每个数据都在一个窗口且只属于这个窗口。在一个固定时间内,接受数据的传入。到了截止时间,收集数据,输出结果。应用类型广泛,可以对每个时间做聚合统计。
当窗口大小大于窗口步长的时候就会出现滑动,滑动窗口会重叠,同时数据也会同时被分到多个窗口,滑动步长就代表了计算频率。适合计算结果更新较快的场景。
原理:基于“会话”来进行数据分组、如果相邻两个数据到来的时间间隔 小于指定大小,那么这两个数据在同一个窗口内。如果 大于则数据到了新的窗口,且前面的窗口关闭。 会话窗口长度,起始结束时间不确定。各个分区之间窗口没有任何关联。在规定的时间内没有数据到来触发一次计算。可以用于保持会话的场景下。
把相同key的数据全部分配到一个窗口之中,窗口没有结束是不会触发计算的。如果希望对数据处理,需要定义一个触发器。
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...) .window(...)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。setParallelism(1)
在代码中,直接基于DataStream调用.windowAll()定义窗口。
stream.windowAll(...)
注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的
到底是以哪种时间作为衡量的标准,就被成为“时间语义”
特征:
- 基于处理节点的当前系统时间。
- 实现简单,延迟低。
- 适用于数据实时性要求高且乱序不多的场景。
- 在处理乱序数据时,结果可能不准确。
应用场景:
- 监控系统指标(如CPU使用率、内存使用情况)。
- 实时数据分析,延迟比准确性更重要的场景。
使用参数:
TumblingProcessingTimeWindows.of(Time.) 滚动窗口 SlidingProcessingTimeWindows.of(Time.) 滑动窗口 ProcessingTimeSessionWindows.withGap() ProcessingTimeSessionWindows.withDynamicGap() 会话窗口
特征:
- 基于事件的时间戳。
- 处理乱序数据,通过Watermark机制来处理延迟事件。
- 更加准确,适用于要求严格时间语义的场景。
实时场景:
- 实时日志分析(如用户行为分析、点击流分析)。
- 需要严格时间顺序和准确性的场景,如金融交易分析。
使用参数:
TumblingEventTimeWindows.of(Time.) 滚动窗口 SlidingEventTimeWindows.of(Time.) 滑动窗口 EventTimeSessionWindows.withGap()
EventTimeSessionWindows.withDynamicGap()
会话窗口
countWindow(5) | 计数窗口分配器 【滚动】 满足5条输出一次计算结果 |
countWindow(5,2) | 计数窗口分配器 【滑动】 满足5条输出一次计算结果 , 每经过一个步长都有一个窗口-触发输出【第一次输出在第二条数据来的时候】 |
GlobalWindows.create() | 全局窗口 |
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。
窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
- import com.guigu.function.WaterSensorMapFunction;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.datastream.WindowedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-
- public class WindowReduceDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
- .map(new WaterSensorMapFunction());
-
- KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
-
- // 窗口分配器
- WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
-
- /*
- 1.相同key的第一条数据来的时候,不会调用reduce方法 【来的数据类型必须保持一致】
- 2.增量聚合:来一条数据,就会计算一次,但是不会输出
- 3.在窗口触发的时候,才会输出窗口的最终结果
- */
- // 窗口函数,增量聚合reduce
- // TODO:返回一个DataStream
- SingleOutputStreamOperator<WaterSensor> reduce = senorWS.reduce(new ReduceFunction<WaterSensor>() {
- // TODO:只有第二条数据进来了才会调用reduce方法
- @Override
- public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
- System.out.println("调用reduce方法: value1" + value1 + ",value2" + value2);
- return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
- }
- });
-
- reduce.print();
-
-
- env.execute();
- }
- }

来一个数据就调用add方法,进行数据聚合。结果保存在状态中。窗口需要输出时调用getresult()方法 得到计算结果。和ReduceFunction作用相同,而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
- import org.apache.flink.api.common.functions.AggregateFunction;
- import org.apache.flink.api.common.functions.ReduceFunction;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.datastream.WindowedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
- public class WindowAggregateDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
- .map(new WaterSensorMapFunction());
-
- KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
-
- // 窗口分配器
- WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
-
-
- // 窗口函数,增量聚合 Aggregate
- /*
- 第一个类型:输入数据的类型
- 第二个类型: 累加器的类型,存储的中间结果的类型
- 第三个类型: 输出的类型
- */
-
-
- /*
- *1、属于本窗口的第一条数据来,创建窗口,创建累加器
- *2、增量聚合:来一条计算一条,调用-次add方法
- *3、窗口输出时调用一次getresult方法
- *4、输入、中间累加器、输出 类型可以不一样,非常灵活
- */
-
- senorWS.aggregate(
- new AggregateFunction<WaterSensor, Integer, String>() {
- // TODO:创建累加器初始化累加器
- @Override
- public Integer createAccumulator() {
- System.out.println("创建累加器");
- return 0;
- }
- // TODO: 聚合逻辑
- @Override
- public Integer add(WaterSensor value, Integer accumulator) {
- System.out.println("调用add方法,value="+value);
- return accumulator+ value.getVc();
- }
- // TODO:获得最后的结果,窗口触发时输出
- @Override
- public String getResult(Integer accumulator) {
- System.out.println("调用getresult方法");
- return accumulator.toString();
- }
-
- @Override
- public Integer merge(Integer a, Integer b) {
- // 只有会话窗口才会用到
- System.out.println("调用merge方法");
- return null;
- }
- }
- ).print();
-
-
- env.execute();
- }
- }

另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。
全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 WindowFunction【apply方法】能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用【老方法】。
- import org.apache.commons.lang3.time.DateFormatUtils;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.datastream.WindowedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- public class WindowProcessDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
- .map(new WaterSensorMapFunction());
-
- KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
-
- // 窗口分配器 【滚动】
- WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
-
-
- // TODO:全窗口 不断存储数据到最后 窗口触发时只会输出一次
-
-
- // TODO:老写法
- // senorWS
- // .apply(
- // new WindowFunction<WaterSensor, String, String, TimeWindow>() {
- // /**
- // *
- // * @param s 分组的key
- // * @param window 窗口对象
- // * @param input 存储的数据
- // * @param out 采集器
- // * @throws Exception
- // */
- // @Override
- // public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
- //
- // }
- // }
- // )
-
- // TODO:新写法
- senorWS
- .process(
- new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
- /**
- * @param s 分组的 key
- * @param context 上下文
- * @param elements 存的数据
- * @param out 采集器
- * @throws Exception
- */
- @Override
- public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
- // TODO:毫秒
- long start = context.window().getStart();
- long end = context.window().getEnd();
-
- String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
- String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
-
- // TODO:多少条数据
- long data = elements.spliterator().estimateSize();
-
- out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
-
- }
- }
- ).print();
-
-
- env.execute();
- }
- }

- import org.apache.commons.lang3.time.DateFormatUtils;
- import org.apache.flink.api.common.functions.AggregateFunction;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.datastream.WindowedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- public class WindowAggregateAndProcessDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
- .map(new WaterSensorMapFunction());
-
- KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
-
- // 窗口分配器
- WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
-
-
- // TODO:窗口函数,增量聚合 Aggregate + 全窗口 Process
- /*
- *增量聚合 Aggregate+全窗日 process
- * 1、增量聚合函数处理数据:来一条计算一条
- *2、窗口触发时,增量聚合的结果(只有一条)传递给 全窗口函数
- *3、经过全窗口函数的处理包装后,输出米指轺调坝党
- *结合两者的优点:
- *1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
- *2、全窗口函数:可以通过 上下文 实现灵活的功能
- * */
-
- senorWS.aggregate(
- new MyAgg(),
- new MyProcess()
- ).print();
- //
-
-
- env.execute();
- }
-
- public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{
- // TODO:创建累加器初始化累加器
- @Override
- public Integer createAccumulator() {
- System.out.println("创建累加器");
- return 0;
- }
- // TODO: 聚合逻辑
- @Override
- public Integer add(WaterSensor value, Integer accumulator) {
- System.out.println("调用add方法,value="+value);
- return accumulator+ value.getVc();
- }
- // TODO:获得最后的结果,窗口触发时输出
- @Override
- public String getResult(Integer accumulator) {
- System.out.println("调用getresult方法");
- return accumulator.toString();
- }
-
- @Override
- public Integer merge(Integer a, Integer b) {
- // 只有会话窗口才会用到
- System.out.println("调用merge方法");
- return null;
- }
-
-
- }
-
-
-
- public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{
-
- /**
- * @param s 分组的 key
- * @param context 上下文
- * @param elements 存的数据
- * @param out 采集器
- * @throws Exception
- */
-
-
- @Override
- public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
- // TODO:毫秒
- long start = context.window().getStart();
- long end = context.window().getEnd();
-
- String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
- String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
-
- // TODO:多少条数据
- long data = elements.spliterator().estimateSize();
-
- out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
-
- }
- }
- }

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...) .window(...) .trigger(new MyTrigger())
移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...) .window(...)
以 时间类型的 滚动窗口 为例,分析原理:
1、窗口什么时候触发 输出?
时间进展>= 窗口的最大时间戳(end-1ms)
2、窗口是怎么划分的?start= 向下取整,取窗口长度的整数倍
end =start+窗长度
窗口左闭右开 ==》 属于本窗口的 最大时间戳 =end - 1ms
3、窗口的生命周期?创建:属于本窗口的第一条数据来的时候,现new的,放入一个singeton单例的集合中。
销毁(关窗): 时间进展>= 窗口的最大时间戳(end - 1ms) +允许迟到的时间【默认为0】
在Flink中水位线被用来标记事件的进展时间。是在数据流里面的一个标记点,具体内容是时间戳。用来指示当前事件的处理时间。
为了提高速率,一般会每隔一段时间产生一个水位线
通常在流式计算中,数据的传输收到网络IO等等因素的影响,数据无法准时到达计算的窗口。为了让窗口处理数据变得规整且正确。我们通常使用让水位线等待固定秒数【意思就是在时间戳的基础上增加一些延迟,以保证不丢失数据】
注意:乱序 和 迟到的区别:
乱序: 数据的顺序乱了 时间小的比时间大的晚来
迟到: 数据的时间戳 < 当前的 watermark
水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
水位线主要的内容是一个时间戳,用来表示当前事件时间【数据传入的时间】的进展
水位线是基于数据的时间戳生成的
水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟,来保证正确处理乱序数据
一个水位线Watermank(t),表示在当前流中事件时间已经达到了时间戳T,这代表t之前的所有数据都到齐了,之后流中不会出现 时间戳T<t的数据。
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理
直接调用WatermarkStrategy.forMonotonousTimestamps()方法,然后使用assignTimestampsAndWatermarks()接受 。就可以实现。
- import org.apache.commons.lang3.time.DateFormatUtils;
- import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
- import org.apache.flink.api.common.eventtime.WatermarkStrategy;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
- import org.apache.flink.streaming.api.windowing.time.Time;
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
- import org.apache.flink.util.Collector;
-
- public class WaterMarkDemo {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.setParallelism(1);
-
- SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.130", 9999)
- .map(new WaterSensorMapFunction());
- /*
- 1.定义watermark策略
- 2.使用assignTimestampsAndWatermarks 调用
- */
-
- // TODO:指定watermark策略
- WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
- .<WaterSensor>forMonotonousTimestamps() // 指定watermark的生成 单调递增 没有等待时间
- // 指定时间戳分配器,从数据中提取
- .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
- @Override
- public long extractTimestamp(WaterSensor element, long recordTimestamp) {
- // 返回的时间戳是毫秒
- System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
- return element.getTs() * 1000L;
- }
- });
-
- map.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy)
-
- .keyBy(sensor -> sensor.getId())
- // TODO:只能使用 事件时间语义窗口 才能使用水平线
- .window(TumblingEventTimeWindows.of(Time.seconds(2)))
- .process(
- new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
- /**
- * @param s 分组的 key
- * @param context 上下文
- * @param elements 存的数据
- * @param out 采集器
- * @throws Exception
- */
- @Override
- public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
- // TODO:毫秒
- long start = context.window().getStart();
- long end = context.window().getEnd();
-
- String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
- String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
-
- // TODO:多少条数据
- long data = elements.spliterator().estimateSize();
-
- out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
-
- }
- }
- ).print();
-
-
- env.execute();
- }
- }

将.forMonotonousTimestamps()方法 修改为forBoundedOutOfOrderness() 参数Duration.ofSeconds(等待的时间)
- // TODO:指定watermark策略
- WatermarkStrategy
- // TODO: 指定watermark的生成 乱序 有等待时间 3s
- .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
- // TODO: 指定时间戳分配器,从数据中提取
- .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
- @Override
- public long extractTimestamp(WaterSensor element, long recordTimestamp) {
- // 返回的时间戳是毫秒
- System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
- return element.getTs() * 1000L;
- }
- });
将.forBoundedOutOfOrderness()方法 修改为 forGenerator(重写类atermarkGeneratorSupplier) 最后 return 自己的方法<>(延迟的时间)
- WatermarkStrategy
- // TODO: 自定义指定watermark的生成 乱序 有等待时间 3s
- .<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
- @Override
- public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
- // TODO:延迟时间3s
- return new MyGenerator<>(3000);
- }
- })
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
- env.fromSource(
- kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
- )
一个Task通常会设置多个并行度,而而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,而没有数据的并行任务一直保持最小。就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待(withIdleness)。
- env.socketTextStream("192.168.88.130", 9999)
- .partitionCustom(new MyPartioner(), r -> r)
- .map(r -> Integer.parseInt(r))
- .assignTimestampsAndWatermarks(WatermarkStrategy
- .<Integer>forMonotonousTimestamps()
- .withTimestampAssigner((r, ts) -> r * 1000L)
- // TODO: 空闲等待 5s
- .withIdleness(Duration.ofSeconds(5))
- );
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭,迟到来的数据不会被计算。
-
- .keyBy(sensor -> sensor.getId())
- // TODO:只能使用 事件时间语义窗口 才能使用水平线
- .window(TumblingEventTimeWindows.of(Time.seconds(2)))
- // TODO:使用窗口延迟 5s
- .allowedLateness(Time.seconds(5))
- .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
- .allowedLateness(Time.seconds(3))
- // TODO: 【关窗后】 迟到的数据放入侧输出流
- .sideOutputLateData(
- // TODO: 参数1:侧输出流的名字 参数2:侧输出数据的类型
- new OutputTag<WaterSensor>("late_data", Types.POJO(WaterSensor.class))
- )
1.watermark等待时间,设置一个不算特别大的,一般是秒级,在乱序和 延迟 取舍
2.设置一定的窗口允许迟到,只考虑大部分的迟到数据,极端小部分迟到很久的数据,不管
3.极端小部分迟到很久的数据,放到侧输出流。获取到之后可以做各种处理
可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。
不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
- // TODO: window join 窗口联结
-
- /**
- * stream1.join(stream2)
- * .where(<KeySelector>)
- * .equalTo(<KeySelector>)
- * .window(<WindowAssigner>)
- * .apply(<JoinFunction>)
- *
- * 1. 落在同一个时间窗口范围内才能匹配
- * 2. 根据keyby的key,来进行匹配关联
- * 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
- */
-
-
- data1.join(data2)
- // TODO:data1的keyby
- .where(new KeySelector<Tuple2<String, Integer>, String>() {
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- })
- // TODO:data2的keyby
- .equalTo(new KeySelector<Tuple3<String, Integer, Integer>, String>() {
- @Override
- public String getKey(Tuple3<String, Integer, Integer> value) throws Exception {
- return value.f0;
- }
- })
-
- .window(TumblingEventTimeWindows.of(Time.seconds(5)))
-
- .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
- /**
- *关联上的数据调用join方法
- * @param first The element from first input. [data1]
- * @param second The element from second input. [data2]
- * @return
- * @throws Exception
- */
- @Override
- public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
- return first + "<------------>" + second;
- }
- });

这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key,所以要先进行KeyBy分组;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
- data_afterKeyBy1.intervalJoin(data_afterKeyBy2)
- // TODO:设置下上界的偏移量 [先下后上的设置]
- .between(Time.seconds(-3), Time.seconds(3))
- .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
- /**
- *两条流的数据匹配上才会调用这个方法
- * @param left ks1 的数据
- * @param right ks2 的数据
- * @param ctx 上下文
- * @param out 采集器
- * @throws Exception
- */
- @Override
- public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
- // 关联上的数据才能进入这个方法是
- out.collect(left + "<------->" + right);
- }
- })

- /**
- * TODO Interval join 处理迟到数据
- * 1、只支持事件时间
- * 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
- * 3、process中,只能处理 join上的数据
- * 4、两条流关联后的watermark,以两条流中最小的为准
- * 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理
- * => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流
- */
-
-
- OutputTag<Tuple2<String, Integer>> ks1Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT));
- OutputTag<Tuple3<String, Integer,Integer>> ks2Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT,Types.INT));
-
- SingleOutputStreamOperator<String> process = key1.intervalJoin(key2)
- // TODO:设置下上界的偏移量 [先下后上的设置]
- .between(Time.seconds(-3), Time.seconds(3))
-
-
- // TODO:处理左侧迟到数据到侧输出流
- .sideOutputLeftLateData(ks1Late)
- // TODO:处理右侧迟到数据到侧输出流
- .sideOutputRightLateData(ks2Late)
-
-
- .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
- /**
- *两条流的数据匹配上才会调用这个方法
- * @param left ks1 的数据
- * @param right ks2 的数据
- * @param ctx 上下文
- * @param out 采集器
- * @throws Exception
- */
- @Override
- public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
- // 进入这个方法是关联上的数据
- out.collect(left + "<------->" + right);
- }
- });
-
-
- process.print("主流打印:");
- process.getSideOutput(ks1Late).printToErr("ks1迟到数据以错误日志方式打印:");
- process.getSideOutput(ks2Late).printToErr("ks2迟到数据以错误日志方式打印:");

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。