赞
踩
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.9.0</version> </dependency>
package com.leneovo.test; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * Desc: 使用flink对指定窗口内的数据进行实时统计,最终把结果打印出来(wordcount) * 先在192.168.19.10机器上执行nc -l 9000 */ public class SocketStreamingWordCount { public static void main(String[] args) throws Exception { //定义socket的端口号 int port; try{ ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("没有指定port参数,使用默认值9000"); port = 9000; } //获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //连接socket获取输入的数据 DataStreamSource<String> text = env.socketTextStream("192.168.19.10", port, "\n"); //计算数据 DataStream<Tuple2<String, Integer>> windowCount = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { //使用空格分隔数据 String[] splits = value.split("\\s"); for (String word:splits) { if (word.length()>0){ out.collect(new Tuple2<String, Integer>(word,1)); } } } }) //针对相同的word数据进行分组 .keyBy(0) //指定一个计数跳跃窗口(计数达到多少个之后进行跳跃) .countWindow(5) /* //指定窗口大小 .timeWindow(Time.seconds(2))*/ //求和 .sum(1); //把数据打印到控制台,使用一个并行度 windowCount.print().setParallelism(1); //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行 env.execute("streaming word count"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。