当前位置:   article > 正文

Flink学习之旅:(二)构建Flink demo工程并提交到集群执行

flink demo

1.创建Maven工程

        在idea中创建一个 名为 MyFlinkFirst 工程

2.配置pom.xml

  1. <properties>
  2. <flink.version>1.13.0</flink.version>
  3. <java.version>1.8</java.version>
  4. <scala.binary.version>2.12</scala.binary.version>
  5. <slf4j.version>1.7.30</slf4j.version>
  6. </properties>
  7. <dependencies>
  8. <!-- 引入 Flink 相关依赖-->
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-java</artifactId>
  12. <version>${flink.version}</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  17. <version>${flink.version}</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.flink</groupId>
  21. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  22. <version>${flink.version}</version>
  23. </dependency>
  24. <!-- 引入日志管理相关依赖-->
  25. <dependency>
  26. <groupId>org.slf4j</groupId>
  27. <artifactId>slf4j-api</artifactId>
  28. <version>${slf4j.version}</version>
  29. </dependency>
  30. <dependency>
  31. <groupId>org.slf4j</groupId>
  32. <artifactId>slf4j-log4j12</artifactId>
  33. <version>${slf4j.version}</version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.logging.log4j</groupId>
  37. <artifactId>log4j-to-slf4j</artifactId>
  38. <version>2.14.0</version>
  39. </dependency>
  40. </dependencies>

3.配置日志管理

        在目录 src/main/resources 下添加文件:log4j.properties,内容配置如下:

  1. log4j.rootLogger=error, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

4.编写代码

        编写 StreamWordCount 类,单词汇总

  1. package com.qiyu;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.KeyedStream;
  6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  8. import org.apache.flink.util.Collector;
  9. import java.util.Arrays;
  10. /**
  11. * @author MR.Liu
  12. * @version 1.0
  13. * @data 2023-10-18 14:45
  14. */
  15. public class StreamWordCount {
  16. public static void main(String[] args) throws Exception {
  17. // 1. 创建流式执行环境
  18. StreamExecutionEnvironment env =
  19. StreamExecutionEnvironment.getExecutionEnvironment();
  20. // 2. 读取文本流
  21. DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",
  22. 7777);
  23. // 3. 转换数据格式
  24. SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
  25. .flatMap((String line, Collector<String> words) -> {
  26. Arrays.stream(line.split(" ")).forEach(words::collect);
  27. })
  28. .returns(Types.STRING)
  29. .map(word -> Tuple2.of(word, 1L))
  30. .returns(Types.TUPLE(Types.STRING, Types.LONG));
  31. // 4. 分组
  32. KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
  33. .keyBy(t -> t.f0);
  34. // 5. 求和
  35. SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
  36. .sum(1);
  37. // 6. 打印
  38. result.print();
  39. // 7. 执行
  40. env.execute();
  41. }
  42. }

5.测试

在hadoop102 服务器中 执行:

nc -lk 7777

再运行 StreamWordCount java类

在命令行随意疯狂输出

idea 控制台 打印结果:

测试代码正常

6. 打包程序提交到集群中运行

        在pom.xml添加打包插件

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-assembly-plugin</artifactId>
  6. <version>3.0.0</version>
  7. <configuration>
  8. <descriptorRefs>
  9. <descriptorRef>jar-with-dependencies</descriptorRef>
  10. </descriptorRefs>
  11. </configuration>
  12. <executions>
  13. <execution>
  14. <id>make-assembly</id>
  15. <phase>package</phase>
  16. <goals>
  17. <goal>single</goal>
  18. </goals>
  19. </execution>
  20. </executions>
  21. </plugin>
  22. </plugins>
  23. </build>

直接使用 maven 中的 package命令,控制台显示 BUILD SUCCESS 就是打包成功!

选择 MyFlinkFirst-1.0-SNAPSHOT.jar 提交到 web ui 上

上传 jar 后,点击 jar 包名称 ,填写 主要配置程序入口主类的全类名,任务运行的并行度。完成后 点击 submit 

查看 任务运行列表 

点击任务

点击“Task Managers”,打开 Stdout,并且在 hadoop102 命令行 疯狂输出 

Stdout 就会显示 结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/751952
推荐阅读
相关标签
  

闽ICP备14008679号