当前位置:   article > 正文

Flink 窗口 概述_flink窗口分类

flink窗口分类

一:窗口简述

        Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 【把窗口理解成一个“桶”,Flink则可以把流切割成大小有限的“储存桶”,把数据分发到不同的桶里,每一个窗口都是一个桶。当窗口结束,就对每一个桶的数据进行收集处理】

二: 窗口的分类

        1)按照驱动类型分

              (1) 时间窗口

                            原理:建立一个窗口,在固定的额时间段内不断收集数据,到达结束时间的时候窗口结束收集数据,生成结果,窗口销毁。【就像地铁一样,间隔一段时间发车,无论车上有多少乘客,地铁都会往前开】

              (2)  计数窗口

                               原理:计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。基本思路是“人齐发车”

        2)按照窗口分配数据的规则分类(以下均为以时间驱动为例)

                        主要概念:窗口的大小,窗口的滑动步长【两个窗口重叠的部分】,会话间隔

                (1) 滚动窗口(Tumbling Windows)

                             滚动窗口有固定的大小,而且窗口之间不会重叠,每个数据都在一个窗口且只属于这个窗口。在一个固定时间内,接受数据的传入。到了截止时间,收集数据,输出结果。应用类型广泛,可以对每个时间做聚合统计。

                   c850e2e36a3f48bca15878796c5b9ec0.png

                (2) 滑动窗口  (Sliding Windows)

                                 当窗口大小大于窗口步长的时候就会出现滑动,滑动窗口会重叠,同时数据也会同时被分到多个窗口,滑动步长就代表了计算频率。适合计算结果更新较快的场景。

                              

                (3) 会话窗口   (Session Windows)

                                原理:基于“会话”来进行数据分组、如果相邻两个数据到来的时间间隔 小于指定大小,那么这两个数据在同一个窗口内。如果 大于则数据到了新的窗口,且前面的窗口关闭。  会话窗口长度,起始结束时间不确定。各个分区之间窗口没有任何关联。在规定的时间内没有数据到来触发一次计算。可以用于保持会话的场景下。

                (4) 全局窗口 (Global Windows)

                                把相同key的数据全部分配到一个窗口之中,窗口没有结束是不会触发计算的。如果希望对数据处理,需要定义一个触发器。

                     

三:窗口API

        1)按键分区(Keyed)和非按键分区(Non-Keyed

                        (1)按键分区窗口(Keyed Windows)

                                经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。

在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...) .window(...)

                        (2)非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。setParallelism(1)

在代码中,直接基于DataStream调用.windowAll()定义窗口。

stream.windowAll(...)

注意:对于非按键分区的窗口操作手动调大窗口算子的并行度也是无效的

四:窗口分配器

        1)Flink中的时间语义

                                到底是以哪种时间作为衡量的标准,就被成为“时间语义”

        2)时间窗口分配器

                                                (1) 处理时间的时间语义窗口

特征:

  • 基于处理节点的当前系统时间。
  • 实现简单,延迟低。
  • 适用于数据实时性要求高且乱序不多的场景。
  • 在处理乱序数据时,结果可能不准确。

应用场景:        

  • 监控系统指标(如CPU使用率、内存使用情况)。
  • 实时数据分析,延迟比准确性更重要的场景。

     

使用参数:                 

TumblingProcessingTimeWindows.of(Time.)    滚动窗口
SlidingProcessingTimeWindows.of(Time.)    滑动窗口
ProcessingTimeSessionWindows.withGap()  ProcessingTimeSessionWindows.withDynamicGap()    会话窗口

                                                

                                                (2) 处理事件的时间语义窗口 

特征:      

  • 基于事件的时间戳。
  • 处理乱序数据,通过Watermark机制来处理延迟事件。
  • 更加准确,适用于要求严格时间语义的场景。

实时场景:    

  • 实时日志分析(如用户行为分析、点击流分析)。
  • 需要严格时间顺序和准确性的场景,如金融交易分析。

使用参数:

TumblingEventTimeWindows.of(Time.)    滚动窗口
SlidingEventTimeWindows.of(Time.)    滑动窗口

EventTimeSessionWindows.withGap() 

EventTimeSessionWindows.withDynamicGap()

    会话窗口

       

        3)计数窗口分配器

countWindow(5)计数窗口分配器  【滚动】  满足5条输出一次计算结果
countWindow(5,2)计数窗口分配器  【滑动】  满足5条输出一次计算结果 , 每经过一个步长都有一个窗口-触发输出【第一次输出在第二条数据来的时候】

        4)全局窗口分配器

GlobalWindows.create()    全局窗口

五:窗口函数

窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

            1)增量聚合函数(ReduceFunction / AggregateFunction)

                                窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。

                       (1)归约函数(ReduceFunction)

                                

  1. import com.guigu.function.WaterSensorMapFunction;
  2. import org.apache.flink.api.common.functions.ReduceFunction;
  3. import org.apache.flink.streaming.api.datastream.KeyedStream;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.datastream.WindowedStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. public class WindowReduceDemo {
  11. public static void main(String[] args) throws Exception {
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
  14. .map(new WaterSensorMapFunction());
  15. KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
  16. // 窗口分配器
  17. WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
  18. /*
  19. 1.相同key的第一条数据来的时候,不会调用reduce方法 【来的数据类型必须保持一致】
  20. 2.增量聚合:来一条数据,就会计算一次,但是不会输出
  21. 3.在窗口触发的时候,才会输出窗口的最终结果
  22. */
  23. // 窗口函数,增量聚合reduce
  24. // TODO:返回一个DataStream
  25. SingleOutputStreamOperator<WaterSensor> reduce = senorWS.reduce(new ReduceFunction<WaterSensor>() {
  26. // TODO:只有第二条数据进来了才会调用reduce方法
  27. @Override
  28. public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
  29. System.out.println("调用reduce方法: value1" + value1 + ",value2" + value2);
  30. return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
  31. }
  32. });
  33. reduce.print();
  34. env.execute();
  35. }
  36. }

                       (2)聚合函数(AggregateFunction)

                                        来一个数据就调用add方法,进行数据聚合。结果保存在状态中。窗口需要输出时调用getresult()方法 得到计算结果。和ReduceFunction作用相同,而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

  1. import org.apache.flink.api.common.functions.AggregateFunction;
  2. import org.apache.flink.api.common.functions.ReduceFunction;
  3. import org.apache.flink.streaming.api.datastream.KeyedStream;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.datastream.WindowedStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. public class WindowAggregateDemo {
  11. public static void main(String[] args) throws Exception {
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
  14. .map(new WaterSensorMapFunction());
  15. KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
  16. // 窗口分配器
  17. WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
  18. // 窗口函数,增量聚合 Aggregate
  19. /*
  20. 第一个类型:输入数据的类型
  21. 第二个类型: 累加器的类型,存储的中间结果的类型
  22. 第三个类型: 输出的类型
  23. */
  24. /*
  25. *1、属于本窗口的第一条数据来,创建窗口,创建累加器
  26. *2、增量聚合:来一条计算一条,调用-次add方法
  27. *3、窗口输出时调用一次getresult方法
  28. *4、输入、中间累加器、输出 类型可以不一样,非常灵活
  29. */
  30. senorWS.aggregate(
  31. new AggregateFunction<WaterSensor, Integer, String>() {
  32. // TODO:创建累加器初始化累加器
  33. @Override
  34. public Integer createAccumulator() {
  35. System.out.println("创建累加器");
  36. return 0;
  37. }
  38. // TODO: 聚合逻辑
  39. @Override
  40. public Integer add(WaterSensor value, Integer accumulator) {
  41. System.out.println("调用add方法,value="+value);
  42. return accumulator+ value.getVc();
  43. }
  44. // TODO:获得最后的结果,窗口触发时输出
  45. @Override
  46. public String getResult(Integer accumulator) {
  47. System.out.println("调用getresult方法");
  48. return accumulator.toString();
  49. }
  50. @Override
  51. public Integer merge(Integer a, Integer b) {
  52. // 只有会话窗口才会用到
  53. System.out.println("调用merge方法");
  54. return null;
  55. }
  56. }
  57. ).print();
  58. env.execute();
  59. }
  60. }

另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。

               2)全窗口函数(窗口函数,处理窗口函数)

                                        全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。    WindowFunction【apply方法】能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用【老方法】

  1. import org.apache.commons.lang3.time.DateFormatUtils;
  2. import org.apache.flink.streaming.api.datastream.KeyedStream;
  3. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  4. import org.apache.flink.streaming.api.datastream.WindowedStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  7. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. import org.apache.flink.util.Collector;
  11. public class WindowProcessDemo {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
  15. .map(new WaterSensorMapFunction());
  16. KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
  17. // 窗口分配器 【滚动】
  18. WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
  19. // TODO:全窗口 不断存储数据到最后 窗口触发时只会输出一次
  20. // TODO:老写法
  21. // senorWS
  22. // .apply(
  23. // new WindowFunction<WaterSensor, String, String, TimeWindow>() {
  24. // /**
  25. // *
  26. // * @param s 分组的key
  27. // * @param window 窗口对象
  28. // * @param input 存储的数据
  29. // * @param out 采集器
  30. // * @throws Exception
  31. // */
  32. // @Override
  33. // public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
  34. //
  35. // }
  36. // }
  37. // )
  38. // TODO:新写法
  39. senorWS
  40. .process(
  41. new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  42. /**
  43. * @param s 分组的 key
  44. * @param context 上下文
  45. * @param elements 存的数据
  46. * @param out 采集器
  47. * @throws Exception
  48. */
  49. @Override
  50. public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  51. // TODO:毫秒
  52. long start = context.window().getStart();
  53. long end = context.window().getEnd();
  54. String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  55. String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  56. // TODO:多少条数据
  57. long data = elements.spliterator().estimateSize();
  58. out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
  59. }
  60. }
  61. ).print();
  62. env.execute();
  63. }
  64. }

               3)全窗口函数 和 增量聚合函数 的结合使用

  1. import org.apache.commons.lang3.time.DateFormatUtils;
  2. import org.apache.flink.api.common.functions.AggregateFunction;
  3. import org.apache.flink.streaming.api.datastream.KeyedStream;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.datastream.WindowedStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  8. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
  9. import org.apache.flink.streaming.api.windowing.time.Time;
  10. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  11. import org.apache.flink.util.Collector;
  12. public class WindowAggregateAndProcessDemo {
  13. public static void main(String[] args) throws Exception {
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
  16. .map(new WaterSensorMapFunction());
  17. KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
  18. // 窗口分配器
  19. WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
  20. // TODO:窗口函数,增量聚合 Aggregate + 全窗口 Process
  21. /*
  22. *增量聚合 Aggregate+全窗日 process
  23. * 1、增量聚合函数处理数据:来一条计算一条
  24. *2、窗口触发时,增量聚合的结果(只有一条)传递给 全窗口函数
  25. *3、经过全窗口函数的处理包装后,输出米指轺调坝党
  26. *结合两者的优点:
  27. *1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
  28. *2、全窗口函数:可以通过 上下文 实现灵活的功能
  29. * */
  30. senorWS.aggregate(
  31. new MyAgg(),
  32. new MyProcess()
  33. ).print();
  34. //
  35. env.execute();
  36. }
  37. public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{
  38. // TODO:创建累加器初始化累加器
  39. @Override
  40. public Integer createAccumulator() {
  41. System.out.println("创建累加器");
  42. return 0;
  43. }
  44. // TODO: 聚合逻辑
  45. @Override
  46. public Integer add(WaterSensor value, Integer accumulator) {
  47. System.out.println("调用add方法,value="+value);
  48. return accumulator+ value.getVc();
  49. }
  50. // TODO:获得最后的结果,窗口触发时输出
  51. @Override
  52. public String getResult(Integer accumulator) {
  53. System.out.println("调用getresult方法");
  54. return accumulator.toString();
  55. }
  56. @Override
  57. public Integer merge(Integer a, Integer b) {
  58. // 只有会话窗口才会用到
  59. System.out.println("调用merge方法");
  60. return null;
  61. }
  62. }
  63. public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{
  64. /**
  65. * @param s 分组的 key
  66. * @param context 上下文
  67. * @param elements 存的数据
  68. * @param out 采集器
  69. * @throws Exception
  70. */
  71. @Override
  72. public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
  73. // TODO:毫秒
  74. long start = context.window().getStart();
  75. long end = context.window().getEnd();
  76. String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  77. String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  78. // TODO:多少条数据
  79. long data = elements.spliterator().estimateSize();
  80. out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
  81. }
  82. }
  83. }

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。

六:其他API

                1) 触发器(Trigger)

                                        触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...) .window(...) .trigger(new MyTrigger())

                2)移除器(Evictor)

                                移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...) .window(...)

七:窗口的原理 

以 时间类型的 滚动窗口 为例,分析原理:

1、窗口什么时候触发 输出?

时间进展>= 窗口的最大时间戳(end-1ms)


2、窗口是怎么划分的?     

start= 向下取整,取窗口长度的整数倍     

end =start+窗长度

窗口左闭右开        ==》        属于本窗口的  最大时间戳 =end - 1ms


3、窗口的生命周期?

创建:属于本窗口的第一条数据来的时候,现new的,放入一个singeton单例的集合中。

销毁(关窗):  时间进展>= 窗口的最大时间戳(end - 1ms) +允许迟到的时间【默认为0】

八:水位线 (Watermark)

        1)什么是水位线?【下图中均为虚线(w)表示】

                        在Flink中水位线被用来标记事件的进展时间。是在数据流里面的一个标记点,具体内容是时间戳。用来指示当前事件的处理时间。

        2) 有序数据的处理

                        为了提高速率,一般会每隔一段时间产生一个水位线

        3)乱序+数据迟到的水位线处理

                                通常在流式计算中,数据的传输收到网络IO等等因素的影响,数据无法准时到达计算的窗口。为了让窗口处理数据变得规整且正确。我们通常使用让水位线等待固定秒数【意思就是在时间戳的基础上增加一些延迟,以保证不丢失数据】

注意:乱序    和    迟到的区别:

乱序: 数据的顺序乱了   时间小的比时间大的晚来

迟到: 数据的时间戳  <   当前的  watermark

        4) 水位线的特征

  1. 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据

  2. 水位线主要的内容是一个时间戳,用来表示当前事件时间【数据传入的时间】的进展

  3. 水位线是基于数据的时间戳生成的

  4. 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟,来保证正确处理乱序数据

  5.  一个水位线Watermank(t),表示在当前流中事件时间已经达到了时间戳T,这代表t之前的所有数据都到齐了,之后流中不会出现   时间戳T<t的数据。

  6.  水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理                                                                   

         

        5)水位线的使用【需要使用事件时间语义窗口才能触发】

                      (1)有序流的使用

                                直接调用WatermarkStrategy.forMonotonousTimestamps()方法,然后使用assignTimestampsAndWatermarks()接受 。就可以实现。

  1. import org.apache.commons.lang3.time.DateFormatUtils;
  2. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  3. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  7. import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
  8. import org.apache.flink.streaming.api.windowing.time.Time;
  9. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  10. import org.apache.flink.util.Collector;
  11. public class WaterMarkDemo {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.130", 9999)
  16. .map(new WaterSensorMapFunction());
  17. /*
  18. 1.定义watermark策略
  19. 2.使用assignTimestampsAndWatermarks 调用
  20. */
  21. // TODO:指定watermark策略
  22. WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
  23. .<WaterSensor>forMonotonousTimestamps() // 指定watermark的生成 单调递增 没有等待时间
  24. // 指定时间戳分配器,从数据中提取
  25. .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  26. @Override
  27. public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  28. // 返回的时间戳是毫秒
  29. System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
  30. return element.getTs() * 1000L;
  31. }
  32. });
  33. map.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy)
  34. .keyBy(sensor -> sensor.getId())
  35. // TODO:只能使用 事件时间语义窗口 才能使用水平线
  36. .window(TumblingEventTimeWindows.of(Time.seconds(2)))
  37. .process(
  38. new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
  39. /**
  40. * @param s 分组的 key
  41. * @param context 上下文
  42. * @param elements 存的数据
  43. * @param out 采集器
  44. * @throws Exception
  45. */
  46. @Override
  47. public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
  48. // TODO:毫秒
  49. long start = context.window().getStart();
  50. long end = context.window().getEnd();
  51. String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
  52. String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
  53. // TODO:多少条数据
  54. long data = elements.spliterator().estimateSize();
  55. out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
  56. }
  57. }
  58. ).print();
  59. env.execute();
  60. }
  61. }

                      (2) 无序流的使用

                                        将.forMonotonousTimestamps()方法 修改为forBoundedOutOfOrderness() 参数Duration.ofSeconds(等待的时间)                          

  1. // TODO:指定watermark策略
  2. WatermarkStrategy
  3. // TODO: 指定watermark的生成 乱序 有等待时间 3s
  4. .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  5. // TODO: 指定时间戳分配器,从数据中提取
  6. .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
  7. @Override
  8. public long extractTimestamp(WaterSensor element, long recordTimestamp) {
  9. // 返回的时间戳是毫秒
  10. System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
  11. return element.getTs() * 1000L;
  12. }
  13. });

                      (3) 自定义水位线的使用 

                                                将.forBoundedOutOfOrderness(方法 修改为 forGenerator(重写类atermarkGeneratorSupplier)    最后 return 自己的方法<>(延迟的时间)                         

                                        

  1. WatermarkStrategy
  2. // TODO: 自定义指定watermark的生成 乱序 有等待时间 3s
  3. .<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
  4. @Override
  5. public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
  6. // TODO:延迟时间3s
  7. return new MyGenerator<>(3000);
  8. }
  9. })

                      (4) 在数据源发送水位线   

                                                我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

  1. env.fromSource(
  2. kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
  3. )

                6)水位线的传递 

                                一个Task通常会设置多个并行度,而而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。每个任务都以“处理完之前所有数据”为标准来确定自己的时钟

九:处理乱序+迟到数据  常用三部曲               

                1)水位线空闲等待 

                                        在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,而没有数据的并行任务一直保持最小。就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待(withIdleness)。

  1. env.socketTextStream("192.168.88.130", 9999)
  2. .partitionCustom(new MyPartioner(), r -> r)
  3. .map(r -> Integer.parseInt(r))
  4. .assignTimestampsAndWatermarks(WatermarkStrategy
  5. .<Integer>forMonotonousTimestamps()
  6. .withTimestampAssigner((r, ts) -> r * 1000L)
  7. // TODO: 空闲等待 5s
  8. .withIdleness(Duration.ofSeconds(5))
  9. );

                2)  允许推迟时间关窗 

                                         Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭,迟到来的数据不会被计算。

  1. .keyBy(sensor -> sensor.getId())
  2. // TODO:只能使用 事件时间语义窗口 才能使用水平线
  3. .window(TumblingEventTimeWindows.of(Time.seconds(2)))
  4. // TODO:使用窗口延迟 5s
  5. .allowedLateness(Time.seconds(5))

                3) 侧输出流 

  1. .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  2. .allowedLateness(Time.seconds(3))
  3. // TODO: 【关窗后】 迟到的数据放入侧输出流
  4. .sideOutputLateData(
  5. // TODO: 参数1:侧输出流的名字 参数2:侧输出数据的类型
  6. new OutputTag<WaterSensor>("late_data", Types.POJO(WaterSensor.class))
  7. )

                4) 设置经验 

1.watermark等待时间,设置一个不算特别大的,一般是秒级,在乱序和 延迟 取舍

2.设置一定的窗口允许迟到,只考虑大部分的迟到数据,极端小部分迟到很久的数据,不管

3.极端小部分迟到很久的数据,放到侧输出流。获取到之后可以做各种处理 

十:基于时间的合流-----双流联结(Join)

可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。

不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。

                1)窗口联结  (Window  Join)

                                Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

  1. // TODO: window join 窗口联结
  2. /**
  3. * stream1.join(stream2)
  4. * .where(<KeySelector>)
  5. * .equalTo(<KeySelector>)
  6. * .window(<WindowAssigner>)
  7. * .apply(<JoinFunction>)
  8. *
  9. * 1. 落在同一个时间窗口范围内才能匹配
  10. * 2. 根据keyby的key,来进行匹配关联
  11. * 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
  12. */
  13. data1.join(data2)
  14. // TODO:data1的keyby
  15. .where(new KeySelector<Tuple2<String, Integer>, String>() {
  16. @Override
  17. public String getKey(Tuple2<String, Integer> value) throws Exception {
  18. return value.f0;
  19. }
  20. })
  21. // TODO:data2的keyby
  22. .equalTo(new KeySelector<Tuple3<String, Integer, Integer>, String>() {
  23. @Override
  24. public String getKey(Tuple3<String, Integer, Integer> value) throws Exception {
  25. return value.f0;
  26. }
  27. })
  28. .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  29. .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
  30. /**
  31. *关联上的数据调用join方法
  32. * @param first The element from first input. [data1]
  33. * @param second The element from second input. [data2]
  34. * @return
  35. * @throws Exception
  36. */
  37. @Override
  38. public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
  39. return first + "<------------>" + second;
  40. }
  41. });

                2)间隔联结 (Interval Join)

                                这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key,所以要先进行KeyBy分组;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义

                     (1)正常使用                   

  1. data_afterKeyBy1.intervalJoin(data_afterKeyBy2)
  2. // TODO:设置下上界的偏移量 [先下后上的设置]
  3. .between(Time.seconds(-3), Time.seconds(3))
  4. .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
  5. /**
  6. *两条流的数据匹配上才会调用这个方法
  7. * @param left ks1 的数据
  8. * @param right ks2 的数据
  9. * @param ctx 上下文
  10. * @param out 采集器
  11. * @throws Exception
  12. */
  13. @Override
  14. public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
  15. // 关联上的数据才能进入这个方法是
  16. out.collect(left + "<------->" + right);
  17. }
  18. })

                     (2) 处理迟到数据

  1. /**
  2. * TODO Interval join 处理迟到数据
  3. * 1、只支持事件时间
  4. * 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
  5. * 3、process中,只能处理 join上的数据
  6. * 4、两条流关联后的watermark,以两条流中最小的为准
  7. * 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理
  8. * => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流
  9. */
  10. OutputTag<Tuple2<String, Integer>> ks1Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT));
  11. OutputTag<Tuple3<String, Integer,Integer>> ks2Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT,Types.INT));
  12. SingleOutputStreamOperator<String> process = key1.intervalJoin(key2)
  13. // TODO:设置下上界的偏移量 [先下后上的设置]
  14. .between(Time.seconds(-3), Time.seconds(3))
  15. // TODO:处理左侧迟到数据到侧输出流
  16. .sideOutputLeftLateData(ks1Late)
  17. // TODO:处理右侧迟到数据到侧输出流
  18. .sideOutputRightLateData(ks2Late)
  19. .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
  20. /**
  21. *两条流的数据匹配上才会调用这个方法
  22. * @param left ks1 的数据
  23. * @param right ks2 的数据
  24. * @param ctx 上下文
  25. * @param out 采集器
  26. * @throws Exception
  27. */
  28. @Override
  29. public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
  30. // 进入这个方法是关联上的数据
  31. out.collect(left + "<------->" + right);
  32. }
  33. });
  34. process.print("主流打印:");
  35. process.getSideOutput(ks1Late).printToErr("ks1迟到数据以错误日志方式打印:");
  36. process.getSideOutput(ks2Late).printToErr("ks2迟到数据以错误日志方式打印:");

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/992274
推荐阅读
相关标签
  

闽ICP备14008679号