赞
踩
streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Ø CountWindow:根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。
Ø TimeWindow:将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算
滚动窗口(Tumbing):依据固定的窗口长度对数据进行切片,一个数据可以被统计一次,所有的数据只能落在一个窗口里面。适用场景:适合做 BI 统计等(做每个时间段的聚合计算)
滑动窗口(Sliding):固定的窗口长度和滑动间隔,一个数据可以被统计多次,元素会被分配到多个窗口中。 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。
会话窗口(Session):不会有重叠和固定的开始时间和结束时间的情况,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭
滚动窗口:默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行
滑动窗口:和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。
下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。
定义对窗口收集数据的计算操作
增量聚合函数(incremental aggregation functions)
每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。
全窗口函数(full window functions)
先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。
- package chapter3
-
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- object Window_Tumbling_Demo {
- def main(args: Array[String]): Unit = {
- //创建程序入口
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //接收数据
- val data: DataStream[String] = env.socketTextStream("hadoop001",9999)
- //切分数据
- val spliFile: DataStream[String] = data.flatMap(_.split(" "))
- //每个单词记为1次
- val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
- //分流
- val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
- //指定时间窗口
- val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(5))
- //对window里的数据进行聚合
- val wordAndCount: DataStream[(String, Int)] = window.sum(1)
- //打印输出
- wordAndCount.print()
- //调用execute方法
- env.execute()
- }
-
- }
- package chapter3
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- object Window_Sliding_Demo {
- def main(args: Array[String]): Unit = {
- //创建程序入口
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //接收数据
- val data: DataStream[String] = env.socketTextStream("hadoop001",9999)
- //切分
- val spliFile: DataStream[String] = data.flatMap(_.split(" "))
- //记为1次
- val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
- //分流
- val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
- //指定窗口
- val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(15),Time.seconds(5))
- //聚合
- val wordAndCount: DataStream[(String, Int)] = window.sum(1)
- //打印输出
- wordAndCount.print()
- //调用execute方法
- env.execute()
- }
-
- }
- package chapter3
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
- import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
- object CountWindow_Tumbling_Demo {
- def main(args: Array[String]): Unit = {
- //创建程序入口
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //接收数据
- val data: DataStream[String] = env.socketTextStream("hadoop001",9999)
- //切分
- val spliFile: DataStream[String] = data.flatMap(_.split(" "))
- //每个单词记为1次
- val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
- //分流
- val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
- //指定窗口
- val window: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyed.countWindow(3)
- //聚合
- val wordAndCount: DataStream[(String, Int)] = window.sum(1)
- //打印输出
- wordAndCount.print()
- //调用execute方法
- env.execute()
- }
- package chapter3
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
- import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
- object CountWindow_Sliding_Demo {
- def main(args: Array[String]): Unit = {
- //创建程序入口
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //接收数据
- val data: DataStream[String] = env.socketTextStream("node01",9999)
- //切分
- val spliFile: DataStream[String] = data.flatMap(_.split(" "))
- //每个单词记为1
- val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
- //分流
- val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
- //指定窗口
- val window: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyed.countWindow(5,2)
- //聚合
- val wordAndCount: DataStream[(String, Int)] = window.sum(1)
- //打印输出
- wordAndCount.print()
- //调用execute方法
- env.execute()
- }
-
- }
来一条数据计算一条 ,value1,value2 代表接收两元组
- package chapter3
- import org.apache.flink.api.common.functions.ReduceFunction
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- object Reduce_Demo {
- def main(args: Array[String]): Unit = {
- //创建程序入口
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //接收数据
- val data: DataStream[String] = env.socketTextStream("hadoop001",8888)
- //切分
- val spliFile: DataStream[String] = data.flatMap(_.split(" "))
- //每个单词记为1次
- val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
- //分流
- val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
- //指定时间窗口
- val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(5))
- //使用增量聚合函数进行聚合
- val wordAndCount: DataStream[(String, Int)] = window.reduce(new ReduceFunction[(String, Int)] {
- override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
- (value1._1, value1._2 + value2._2)
- }
- })
- //打印输出
- wordAndCount.print()
- //调用execute方法
- env.execute()
- }
-
- }
- package chapter3
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
- import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.streaming.api.windowing.windows.TimeWindow
- import org.apache.flink.util.Collector
- object ProcessWindow_Demo {
- def main(args: Array[String]): Unit = {
- //创建程序入口
- val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- //接收数据
- val data: DataStream[String] = env.socketTextStream("hadoop001",8888)
- //切分
- val spliFile: DataStream[String] = data.flatMap(_.split(" "))
- //每个单词记为1次
- val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
- //分流,必须知道确定类型
- val keyed = wordAndOne.keyBy(_._1)
- //指定窗口
- val window = keyed.timeWindow(Time.seconds(5))
- //调用全窗口函数 in out key window 类型
- val wordAndCount: DataStream[(String, Int)] = window.process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
- override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
- var sum = 0
- for (i <- elements) {
- sum += i._2
- }
- //输出之前收集结果
- out.collect(key, sum)
- }
- })
- //打印输出
- wordAndCount.print()
- //调用execute方法
- env.execute()
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。