赞
踩
timewindow,countwindow就不说了,不知道的百度
自定义widow:
window的数值聚合统计
对于某一个window内的数值统计,我们可以增量的聚合统计或者全量的聚合统计
增量聚合统计
窗口当中每加入一条数据,就进行一次统计
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
- import org.apache.flink.api.common.functions.ReduceFunction
- import org.apache.flink.streaming.api.datastream.DataStreamSink
- import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
- import org.apache.flink.streaming.api.windowing.time.Time
-
- object FlinkTimeCount {
- def main(args: Array[String]): Unit = {
- val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
- import org.apache.flink.api.scala._
-
- val socketStream: DataStream[String] = environment.socketTextStream("node01",8000)
- val print: DataStreamSink[(Int, Int)] = socketStream
- .map(x => (1, x.toInt))
- .keyBy(0)
- .timeWindow(Time.seconds(5))
- .reduce(new ReduceFunction[(Int, Int)] {
- override def reduce(t: (Int, Int), t1: (Int, Int)): (Int, Int) = {
- (t._1, t._2 + t1._2)
- }
- }).print()
-
- environment.execute("startRunning")
- }
- }

全量聚合统计
等到窗口截止,或者窗口内的数据全部到齐,然后再进行统计,可以用于求窗口内的数据的最大值,或者最小值,平均值等
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息
通过全量聚合统计,求取每3条数据的平均值
- import org.apache.flink.api.java.tuple.Tuple
- import org.apache.flink.streaming.api.datastream.DataStreamSink
- import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
- import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, WindowedStream}
- import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow}
- import org.apache.flink.util.Collector
-
-
- object FlinkCountWindowAvg {
-
- def main(args: Array[String]): Unit = {
- val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
- import org.apache.flink.api.scala._
- val socketStream: DataStream[String] = environment.socketTextStream("node01",9000)
- //统计一个窗口内的数据的平均值
- val socketDatas: DataStreamSink[Double] = socketStream.map(x => (1, x.toInt))
- .keyBy(0)
- //.timeWindow(Time.seconds(10))
- .countWindow(3)
- //通过process方法来统计窗口的平均值
- .process(new MyProcessWindowFunctionclass).print()
- //必须调用execute方法,否则程序不会执行
- environment.execute("count avg")
- }
- }
-
- /**ProcessWindowFunction 需要跟四个参数
- * 输入参数类型,输出参数类型,聚合的key的类型,window的下界
- *
- */
- class MyProcessWindowFunctionclass extends ProcessWindowFunction[(Int , Int) , Double , Tuple , GlobalWindow]{
- override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[Double]): Unit = {
- var totalNum = 0;
- var countNum = 0;
- for(data <- elements){
- totalNum +=1
- countNum += data._2
- }
- out.collect(countNum/totalNum)
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。