赞
踩
在idea中创建一个 名为 MyFlinkFirst 工程
- <properties>
- <flink.version>1.13.0</flink.version>
- <java.version>1.8</java.version>
- <scala.binary.version>2.12</scala.binary.version>
- <slf4j.version>1.7.30</slf4j.version>
- </properties>
- <dependencies>
- <!-- 引入 Flink 相关依赖-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- 引入日志管理相关依赖-->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-to-slf4j</artifactId>
- <version>2.14.0</version>
- </dependency>
- </dependencies>
在目录 src/main/resources 下添加文件:log4j.properties,内容配置如下:
- log4j.rootLogger=error, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
编写 StreamWordCount 类,单词汇总
- package com.qiyu;
-
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- import java.util.Arrays;
-
- /**
- * @author MR.Liu
- * @version 1.0
- * @data 2023-10-18 14:45
- */
- public class StreamWordCount {
- public static void main(String[] args) throws Exception {
- // 1. 创建流式执行环境
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- // 2. 读取文本流
- DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",
- 7777);
- // 3. 转换数据格式
- SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
- .flatMap((String line, Collector<String> words) -> {
- Arrays.stream(line.split(" ")).forEach(words::collect);
- })
- .returns(Types.STRING)
- .map(word -> Tuple2.of(word, 1L))
- .returns(Types.TUPLE(Types.STRING, Types.LONG));
- // 4. 分组
- KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
- .keyBy(t -> t.f0);
- // 5. 求和
- SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
- .sum(1);
- // 6. 打印
- result.print();
- // 7. 执行
- env.execute();
- }
- }
在hadoop102 服务器中 执行:
nc -lk 7777
再运行 StreamWordCount java类
在命令行随意疯狂输出
idea 控制台 打印结果:
测试代码正常
在pom.xml添加打包插件
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
直接使用 maven 中的 package命令,控制台显示 BUILD SUCCESS 就是打包成功!
选择 MyFlinkFirst-1.0-SNAPSHOT.jar 提交到 web ui 上
上传 jar 后,点击 jar 包名称 ,填写 主要配置程序入口主类的全类名,任务运行的并行度。完成后 点击 submit
查看 任务运行列表
点击任务
点击“Task Managers”,打开 Stdout,并且在 hadoop102 命令行 疯狂输出
Stdout 就会显示 结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。