当前位置:   article > 正文

Flink 之时间语义与Wartermark_flink1.15设置时间语义

flink1.15设置时间语义

一 时间语义

在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
在这里插入图片描述

  • Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
  • Ingestion Time:是数据进入Flink的时间。
  • Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

Flink是一个事件驱动的计算框架,就意味着每来一条数据,才应该触发Flink中的计算,但是如果此时计算涉及到时间问题,就比较麻烦,比如一小时内每5分钟的水位变 化。在分布式环境中,真实数据发生时间和Flink中数据处理的时间是有延迟的,那么显然,你拿处理时间作为统计的窗口时间范围是不够准确的。所以我们一般在业务处理 时,需要使用事件产生的时间,而不是处理时间。

默认情况下,Flink框架中处理的时间语义为ProcessingTime,如果要使用EventTime, 那么需要引入EventTime的时间属性,引入方式如下所示:

import org.apache.flink.streaming.api.TimeCharacteristic

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  • 1
  • 2
  • 3
  • 4

二 Watermark

流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

这里的Watermark什么意思呢?很简单,把数据流简单的理解为水流,那么当水流源源不断地流入咱们系统时,什么时候我们才知道要开始对数据计算了呢?总不能一直等吧。所以为了能够对数据计算的时间进行限定,我们的想法就是在水流上添加浮标或标记,当这个标记进入我们的数据窗口时,我们就认为可以开始计算了。这里在水流中增加的标记,我们就称之为Watermark(水位标记)

在实际操作中,Watermark作为特殊标记数据由Flink根据当前数据的EventTime创建出来后自动加入到数据队列中。当Watermark数据进入到窗口范围后,会判断时间窗口是否触发计算,所以Watermark数据中应该包含时间属性。但是这个时间属性值设置为多少合适呢?首先我们需要明确,如果数据是按照顺序采集过来的,那么来一条计算一条的话,是不会出现问题的,也无需加入任何的标记,时间一到,窗口自动触发计算就可以了,但实际情况恰恰是数据采集到Flink中时是乱序的,就意味着当触发窗口计算的时候,是有可能数据不全的,因为数据被打乱了,还没有采集到,基于这样的原因,所以需要在数据采集队列中增加标记,表示指定时间的窗口数据全部到达,可以计算,无需等待了。
根据不同的数据处理场景watermark会有不同的生成方式:

  1. 有序数据:DataStream.assignAscendingTimestamps
  2. 乱序数据:DataStream.assignTimestampsAndWatermarks
    乱序数据中的watermark处理又分两大类:
  • AssignerWithPeriodicWatermarks
  • AssignerWithPunctuatedWatermarks

三 EventTime在window中的使用

  • 滚动窗口(TumblingEventTimeWindows)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dataDS: DataStream[String] = env.readTextFile("input/data.txt")

// TODO 将数据进行转换
val mapDS = dataDS.map(data => {
    val datas = data.split(",")
    (datas(0), datas(1).toLong, datas(2).toInt)
})

val waterDS = mapDS.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(0)) {
        override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000
    }
)

val resultDS = waterDS.keyBy(0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(
        (t1, t2) => {
            (t1._1, t1._2, math.max(t1._3, t2._3))
        }
    )
mapDS.print("water")
resultDS.print("result")
env.execute("sensor")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 滑动窗口(SlidingEventTimeWindows)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val dataDS: DataStream[String] = env.readTextFile("input/data.txt")

val mapDS = dataDS.map(data => {
    val datas = data.split(",")
    (datas(0), datas(1).toLong, datas(2).toInt)
})

val waterDS = mapDS.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(0)) {
        override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000
    }
)

val resultDS = waterDS.keyBy(0)
    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    .reduce(
        (t1, t2) => {
            (t1._1, t1._2, math.max(t1._3, t2._3))
        }
    )
mapDS.print("water")
resultDS.print("result>>>>>")
env.execute("sensor")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 会话窗口(EventTimeSessionWindows)
    相邻两次数据的EventTime的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark,会在符合窗口触发的情况下进行延迟。到达延迟水位再进行窗口触发。
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

val dataDS: DataStream[String] = env.readTextFile("input/data1.txt")

val mapDS = dataDS.map(data => {
    val datas = data.split(",")
    (datas(0), datas(1).toLong, datas(2).toInt)
})

val waterDS = mapDS.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(0)) {
        override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000
    }
)

val resultDS = waterDS.keyBy(0)
    .window(EventTimeSessionWindows.withGap(Time.seconds(2)))
    .reduce(
        (t1, t2) => {
            (t1._1, t1._2, math.max(t1._3, t2._3))
        }
    )
mapDS.print("water")
resultDS.print("result>>>>>")
env.execute("sensor")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

四 窗口是如何划分的?

时间取整: timestamp - (timestamp – offset + windowSize)% windowSize

  • timestamp:从数据中抽取的EventTime
  • offset:默认偏移量为0
  • windowSize:窗口的范围大小(左闭右开)

五 窗口是何时触发计算的?

  1. 如果指定的时间标记进入到Flink时,该标记大于等于窗口的结束时间,那么窗口就会被触发计算;这里的时间标记并不是EventTime,而是waterMark。

  2. 如果数据是有序的,那么可以将eventTime看作是watermark。watermark = eventtime

  3. 如果读取文件,由于读取数据速度很快,所以会很快就读取到文件末尾。所以文件的最后一条数据是有可能无法计算的。
    为了解决这个问题,读取文件时,如果到达末尾,Flink框架会自动在数据流中增加一个Long类型的最大值作为watermark,为了提交所有未计算的窗口进行计算,从而保证计算结果准确

六 乱序数据需要动态生成watermark

stream.assignTimestampsAndWatermarks(new BoundOutOfOrdernessTimestampExtractor[T](Time.seconds(5){
	override def extrectTimestamp(element:T):Long={
	element.ts*1000L
}
})
  • 1
  • 2
  • 3
  • 4
  • 5

有两个参数列表

  • 第一个参数列表表示延迟时间,为了计算watermark
  • 第二个参数表示抽取事件时间 watermark => 延迟时间
    watermark = 事件时间 – 延迟时间

七 watermark总结

  1. 如果想要对数据的窗口进行计算,一般采用事件时间。
  2. 需要从数据中抽取事件时间eventtime,并且生成watermark
    - 有序:watermark = eventtime
    - 无序延迟:watermark = 事件时间 – 延迟时间
    - 无序延迟迟到:watermark = 事件时间 – 延迟时间 – 迟到时间
  3. 根据watermark触发数据窗口的计算
  4. 使用allowedLateness方法增加延迟时间
  5. 使用sideOutputLateData方法保存迟到数据
  6. 使用getSideOutput方法获取迟到数据
  7. watermark 只增不减
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/861860
推荐阅读
相关标签
  

闽ICP备14008679号