当前位置:   article > 正文

Flink 中的Window_datastream[(string, int)].process

datastream[(string, int)].process

1 Flink 中的Window 概述

   streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。

 

2 Window 可以分成两类:

Ø CountWindow:根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。

Ø TimeWindow:将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算

3 TimeWindow 分为三类:

滚动窗口(Tumbing):依据固定的窗口长度对数据进行切片,一个数据可以被统计一次,所有的数据只能落在一个窗口里面。适用场景:适合做 BI 统计等(做每个时间段的聚合计算)

滑动窗口(Sliding):固定的窗口长度滑动间隔,一个数据可以被统计多次,元素会被分配到多个窗口中 适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

会话窗口(Session):不会有重叠固定的开始时间和结束时间的情况,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭

  CountWindow

滚动窗口:默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行

滑动窗口:和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。

下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。

4 window function

定义对窗口收集数据的计算操作

增量聚合函数(incremental aggregation functions)

每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。

全窗口函数(full window functions)

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。

 

5 代码

滚动窗口

  1. package chapter3
  2. import org.apache.flink.api.java.tuple.Tuple
  3. import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
  4. import org.apache.flink.api.scala._
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  7. object Window_Tumbling_Demo {
  8. def main(args: Array[String]): Unit = {
  9. //创建程序入口
  10. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  11. //接收数据
  12. val data: DataStream[String] = env.socketTextStream("hadoop001",9999)
  13. //切分数据
  14. val spliFile: DataStream[String] = data.flatMap(_.split(" "))
  15. //每个单词记为1次
  16. val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
  17. //分流
  18. val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
  19. //指定时间窗口
  20. val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(5))
  21. //对window里的数据进行聚合
  22. val wordAndCount: DataStream[(String, Int)] = window.sum(1)
  23. //打印输出
  24. wordAndCount.print()
  25. //调用execute方法
  26. env.execute()
  27. }
  28. }

 

滑动窗口(重复消费)

  1. package chapter3
  2. import org.apache.flink.api.java.tuple.Tuple
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  7. object Window_Sliding_Demo {
  8. def main(args: Array[String]): Unit = {
  9. //创建程序入口
  10. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  11. //接收数据
  12. val data: DataStream[String] = env.socketTextStream("hadoop001",9999)
  13. //切分
  14. val spliFile: DataStream[String] = data.flatMap(_.split(" "))
  15. //记为1次
  16. val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
  17. //分流
  18. val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
  19. //指定窗口
  20. val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(15),Time.seconds(5))
  21. //聚合
  22. val wordAndCount: DataStream[(String, Int)] = window.sum(1)
  23. //打印输出
  24. wordAndCount.print()
  25. //调用execute方法
  26. env.execute()
  27. }
  28. }

 

 count滚动窗口 

  1. package chapter3
  2. import org.apache.flink.api.java.tuple.Tuple
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
  5. import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
  6. object CountWindow_Tumbling_Demo {
  7. def main(args: Array[String]): Unit = {
  8. //创建程序入口
  9. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10. //接收数据
  11. val data: DataStream[String] = env.socketTextStream("hadoop001",9999)
  12. //切分
  13. val spliFile: DataStream[String] = data.flatMap(_.split(" "))
  14. //每个单词记为1次
  15. val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
  16. //分流
  17. val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
  18. //指定窗口
  19. val window: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyed.countWindow(3)
  20. //聚合
  21. val wordAndCount: DataStream[(String, Int)] = window.sum(1)
  22. //打印输出
  23. wordAndCount.print()
  24. //调用execute方法
  25. env.execute()
  26. }

count滑动窗口  size=5,代表最大输出5,slide=2 统计两个输出一次

  1. package chapter3
  2. import org.apache.flink.api.java.tuple.Tuple
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
  5. import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
  6. object CountWindow_Sliding_Demo {
  7. def main(args: Array[String]): Unit = {
  8. //创建程序入口
  9. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10. //接收数据
  11. val data: DataStream[String] = env.socketTextStream("node01",9999)
  12. //切分
  13. val spliFile: DataStream[String] = data.flatMap(_.split(" "))
  14. //每个单词记为1
  15. val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
  16. //分流
  17. val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
  18. //指定窗口
  19. val window: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyed.countWindow(5,2)
  20. //聚合
  21. val wordAndCount: DataStream[(String, Int)] = window.sum(1)
  22. //打印输出
  23. wordAndCount.print()
  24. //调用execute方法
  25. env.execute()
  26. }
  27. }

 增量聚合函数

来一条数据计算一条  ,value1,value2 代表接收两元组

  1. package chapter3
  2. import org.apache.flink.api.common.functions.ReduceFunction
  3. import org.apache.flink.api.java.tuple.Tuple
  4. import org.apache.flink.api.scala._
  5. import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  8. object Reduce_Demo {
  9. def main(args: Array[String]): Unit = {
  10. //创建程序入口
  11. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  12. //接收数据
  13. val data: DataStream[String] = env.socketTextStream("hadoop001",8888)
  14. //切分
  15. val spliFile: DataStream[String] = data.flatMap(_.split(" "))
  16. //每个单词记为1次
  17. val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
  18. //分流
  19. val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
  20. //指定时间窗口
  21. val window: WindowedStream[(String, Int), Tuple, TimeWindow] = keyed.timeWindow(Time.seconds(5))
  22. //使用增量聚合函数进行聚合
  23. val wordAndCount: DataStream[(String, Int)] = window.reduce(new ReduceFunction[(String, Int)] {
  24. override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
  25. (value1._1, value1._2 + value2._2)
  26. }
  27. })
  28. //打印输出
  29. wordAndCount.print()
  30. //调用execute方法
  31. env.execute()
  32. }
  33. }

 

全窗口函数

  1. package chapter3
  2. import org.apache.flink.api.java.tuple.Tuple
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
  6. import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
  7. import org.apache.flink.streaming.api.windowing.time.Time
  8. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  9. import org.apache.flink.util.Collector
  10. object ProcessWindow_Demo {
  11. def main(args: Array[String]): Unit = {
  12. //创建程序入口
  13. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14. //接收数据
  15. val data: DataStream[String] = env.socketTextStream("hadoop001",8888)
  16. //切分
  17. val spliFile: DataStream[String] = data.flatMap(_.split(" "))
  18. //每个单词记为1次
  19. val wordAndOne: DataStream[(String, Int)] = spliFile.map((_,1))
  20. //分流,必须知道确定类型
  21. val keyed = wordAndOne.keyBy(_._1)
  22. //指定窗口
  23. val window = keyed.timeWindow(Time.seconds(5))
  24. //调用全窗口函数 in out key window 类型
  25. val wordAndCount: DataStream[(String, Int)] = window.process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
  26. override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
  27. var sum = 0
  28. for (i <- elements) {
  29. sum += i._2
  30. }
  31. //输出之前收集结果
  32. out.collect(key, sum)
  33. }
  34. })
  35. //打印输出
  36. wordAndCount.print()
  37. //调用execute方法
  38. env.execute()
  39. }
  40. }

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号