赞
踩
在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:
Flink是一个事件驱动的计算框架,就意味着每来一条数据,才应该触发Flink中的计算,但是如果此时计算涉及到时间问题,就比较麻烦,比如一小时内每5分钟的水位变 化。在分布式环境中,真实数据发生时间和Flink中数据处理的时间是有延迟的,那么显然,你拿处理时间作为统计的窗口时间范围是不够准确的。所以我们一般在业务处理 时,需要使用事件产生的时间,而不是处理时间。
默认情况下,Flink框架中处理的时间语义为ProcessingTime,如果要使用EventTime, 那么需要引入EventTime的时间属性,引入方式如下所示:
import org.apache.flink.streaming.api.TimeCharacteristic
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
这里的Watermark什么意思呢?很简单,把数据流简单的理解为水流,那么当水流源源不断地流入咱们系统时,什么时候我们才知道要开始对数据计算了呢?总不能一直等吧。所以为了能够对数据计算的时间进行限定,我们的想法就是在水流上添加浮标或标记,当这个标记进入我们的数据窗口时,我们就认为可以开始计算了。这里在水流中增加的标记,我们就称之为Watermark(水位标记)
在实际操作中,Watermark作为特殊标记数据由Flink根据当前数据的EventTime创建出来后自动加入到数据队列中。当Watermark数据进入到窗口范围后,会判断时间窗口是否触发计算,所以Watermark数据中应该包含时间属性。但是这个时间属性值设置为多少合适呢?首先我们需要明确,如果数据是按照顺序采集过来的,那么来一条计算一条的话,是不会出现问题的,也无需加入任何的标记,时间一到,窗口自动触发计算就可以了,但实际情况恰恰是数据采集到Flink中时是乱序的,就意味着当触发窗口计算的时候,是有可能数据不全的,因为数据被打乱了,还没有采集到,基于这样的原因,所以需要在数据采集队列中增加标记,表示指定时间的窗口数据全部到达,可以计算,无需等待了。
根据不同的数据处理场景watermark会有不同的生成方式:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dataDS: DataStream[String] = env.readTextFile("input/data.txt") // TODO 将数据进行转换 val mapDS = dataDS.map(data => { val datas = data.split(",") (datas(0), datas(1).toLong, datas(2).toInt) }) val waterDS = mapDS.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(0)) { override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000 } ) val resultDS = waterDS.keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce( (t1, t2) => { (t1._1, t1._2, math.max(t1._3, t2._3)) } ) mapDS.print("water") resultDS.print("result") env.execute("sensor")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dataDS: DataStream[String] = env.readTextFile("input/data.txt") val mapDS = dataDS.map(data => { val datas = data.split(",") (datas(0), datas(1).toLong, datas(2).toInt) }) val waterDS = mapDS.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(0)) { override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000 } ) val resultDS = waterDS.keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))) .reduce( (t1, t2) => { (t1._1, t1._2, math.max(t1._3, t2._3)) } ) mapDS.print("water") resultDS.print("result>>>>>") env.execute("sensor")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val dataDS: DataStream[String] = env.readTextFile("input/data1.txt") val mapDS = dataDS.map(data => { val datas = data.split(",") (datas(0), datas(1).toLong, datas(2).toInt) }) val waterDS = mapDS.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(0)) { override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000 } ) val resultDS = waterDS.keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(2))) .reduce( (t1, t2) => { (t1._1, t1._2, math.max(t1._3, t2._3)) } ) mapDS.print("water") resultDS.print("result>>>>>") env.execute("sensor")
时间取整: timestamp - (timestamp – offset + windowSize)% windowSize
如果指定的时间标记进入到Flink时,该标记大于等于窗口的结束时间,那么窗口就会被触发计算;这里的时间标记并不是EventTime,而是waterMark。
如果数据是有序的,那么可以将eventTime看作是watermark。watermark = eventtime
如果读取文件,由于读取数据速度很快,所以会很快就读取到文件末尾。所以文件的最后一条数据是有可能无法计算的。
为了解决这个问题,读取文件时,如果到达末尾,Flink框架会自动在数据流中增加一个Long类型的最大值作为watermark,为了提交所有未计算的窗口进行计算,从而保证计算结果准确
stream.assignTimestampsAndWatermarks(new BoundOutOfOrdernessTimestampExtractor[T](Time.seconds(5){
override def extrectTimestamp(element:T):Long={
element.ts*1000L
}
})
有两个参数列表
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。