赞
踩
package com.flink.Java; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * Created by Shi shuai RollerQing on 2019/12/16 15:44 * 用窗口操作的方式进行WordCount * * 需要实现每隔1秒对最近2秒内的数据进行聚合操作 */ public class WordCount { public static void main(String[] args) throws Exception { // 获取服务数据的端口号 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); } catch (Exception e) { System.err.println("No port set . Please use default port 9000"); port = 6666; } String hostname = "hadoop01"; // 初始化对象 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取数据 DataStreamSource<String> data = env.socketTextStream(hostname, port); // 开始计算 // 生成一个个元组: (word, 1) SingleOutputStreamOperator<WordWithCount> pairWords = data.flatMap( new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String s, Collector<WordWithCount> out) throws Exception { String[] splits = s.split(" "); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } } ); // 将元组按照key进行分组 KeyedStream<WordWithCount, Tuple> grouped = pairWords.keyBy("word"); //调用窗口操作 //需要给两个重要的参数 窗口长度和滑动间隔 WindowedStream<WordWithCount, Tuple, TimeWindow> window = grouped.timeWindow(Time.seconds(2), Time.seconds(1)); SingleOutputStreamOperator<WordWithCount> counts = window.sum("count"); // window.reduce(new ReduceFunction<WordWithCount>() { // @Override // public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception { // return new WordWithCount(value1.word, value1.count + value2.count); // } // }); //打印 counts.print().setParallelism(1); env.execute("WordCount"); } public static class WordWithCount { public String word; public long count; public WordWithCount() { } public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
启动hadoop01,输入命令 nc -lk 6666
idea运行时并没有输入参数 所以根据catch处理port被设置为6666
package com.flink.demo01 import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time /** * Created by Shi shuai RollerQing on 2019/12/16 19:03 * 流式WordCount-Scala代码实现 * * 用窗口操作的方式进行WordCount */ object WordCount_Scala { def main(args: Array[String]): Unit = { // 获取NetCat的port val port: Int = try { ParameterTool.fromArgs(args).getInt("port") } catch { case e: Exception => { System.err.println("No port set, Use default port 6666") } 6666 } // 获取上下文对象(初始化对象) val env = StreamExecutionEnvironment.getExecutionEnvironment //获取数据 val data = env.socketTextStream("hadoop01", port) // 必须要引入这个包,这里面有很多在计算时用到的一些方法 import org.apache.flink.api.scala._ // 进行解析数据,并按照需求进行计算 val words = data.flatMap(_.split("\\s+")) // 获取数据并进行切分,生成一个个单词 val tups = words.map(w => WordWithCount(w, 1)) // 将一个个单词生成一个个对偶元组 val grouped = tups.keyBy("word") // 分组 // val grouped = tups.keyBy(0)// 分组 val window = grouped.timeWindow(Time.seconds(2), Time.seconds(2)) // 调用窗口操作 // val res = window.sum("count") // 聚合 val res = window.reduce((a, b) => WordWithCount(a.word, a.count + b.count)) // 将结果打印 res.print.setParallelism(1) // 开始执行 env.execute("scala wordCount") } case class WordWithCount(word: String, count: Int) }
package com.flink.demo01 import org.apache.flink.api.java.operators.DataSink import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} /** * Created by Shi shuai RollerQing on 2019/12/16 19:29 */ object WordCountB_Scala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment //get input data val text: DataSet[String] = env.readTextFile("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\test.txt") import org.apache.flink.api.scala._ val counts = text.flatMap(_.toLowerCase.split(" ").filter(_.nonEmpty)) .map((_, 1)) .groupBy(0) .sum(1) counts.collect().foreach(println) // counts.setParallelism(1).writeAsCsv("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\csvwc", "\n", "\t") // val write: DataSink[(String, Int)] = counts.setParallelism(1).writeAsCsv("C:\\Users\\HP\\IdeaProjects\\sparkCore\\data\\csvwc", "\n", "\t") } }
<properties> <scala.version>2.11.8</scala.version> <flink.version>1.7.2</flink.version> </properties> <!-- java依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <!-- scala依赖 --> <!--flink 有界数据处理依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <!--flink 无界数据处理依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。