当前位置:   article > 正文

flink的自定义window聚合统计(增量,全量)_flink sql 全量统计

flink sql 全量统计

window聚合统计(增量,全量)

 

timewindow,countwindow就不说了,不知道的百度

自定义widow:

在这里插入图片描述

window的数值聚合统计
对于某一个window内的数值统计,我们可以增量的聚合统计或者全量的聚合统计
增量聚合统计
窗口当中每加入一条数据,就进行一次统计
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
 

  1. import org.apache.flink.api.common.functions.ReduceFunction
  2. import org.apache.flink.streaming.api.datastream.DataStreamSink
  3. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  4. import org.apache.flink.streaming.api.windowing.time.Time
  5. object FlinkTimeCount {
  6. def main(args: Array[String]): Unit = {
  7. val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  8. import org.apache.flink.api.scala._
  9. val socketStream: DataStream[String] = environment.socketTextStream("node01",8000)
  10. val print: DataStreamSink[(Int, Int)] = socketStream
  11. .map(x => (1, x.toInt))
  12. .keyBy(0)
  13. .timeWindow(Time.seconds(5))
  14. .reduce(new ReduceFunction[(Int, Int)] {
  15. override def reduce(t: (Int, Int), t1: (Int, Int)): (Int, Int) = {
  16. (t._1, t._2 + t1._2)
  17. }
  18. }).print()
  19. environment.execute("startRunning")
  20. }
  21. }

全量聚合统计
等到窗口截止,或者窗口内的数据全部到齐,然后再进行统计,可以用于求窗口内的数据的最大值,或者最小值,平均值等
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息
通过全量聚合统计,求取每3条数据的平均值
 

  1. import org.apache.flink.api.java.tuple.Tuple
  2. import org.apache.flink.streaming.api.datastream.DataStreamSink
  3. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
  4. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream}
  5. import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}
  6. import org.apache.flink.util.Collector
  7. object FlinkCountWindowAvg {
  8. def main(args: Array[String]): Unit = {
  9. val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  10. import org.apache.flink.api.scala._
  11. val socketStream: DataStream[String] = environment.socketTextStream("node01",9000)
  12. //统计一个窗口内的数据的平均值
  13. val socketDatas: DataStreamSink[Double] = socketStream.map(x => (1, x.toInt))
  14. .keyBy(0)
  15. //.timeWindow(Time.seconds(10))
  16. .countWindow(3)
  17. //通过process方法来统计窗口的平均值
  18. .process(new MyProcessWindowFunctionclass).print()
  19. //必须调用execute方法,否则程序不会执行
  20. environment.execute("count avg")
  21. }
  22. }
  23. /**ProcessWindowFunction 需要跟四个参数
  24. * 输入参数类型,输出参数类型,聚合的key的类型,window的下界
  25. *
  26. */
  27. class MyProcessWindowFunctionclass extends ProcessWindowFunction[(Int , Int) , Double , Tuple , GlobalWindow]{
  28. override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[Double]): Unit = {
  29. var totalNum = 0;
  30. var countNum = 0;
  31. for(data <- elements){
  32. totalNum +=1
  33. countNum += data._2
  34. }
  35. out.collect(countNum/totalNum)
  36. }
  37. }

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/916429
推荐阅读
相关标签
  

闽ICP备14008679号