赞
踩
独立模式 (Standalone) 是独立运行的,不依赖任何外部的资源管理平台,只需要运行所有 Flink 组件服务
Yarn 模式是指客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会在 Yarn 的 NodeManager 上创建容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所需要的 Slot 数量动态分配 TaskManager 资源
flink 下载地址:https://flink.apache.org/downloads/
下载 flink 安装包:flink-1.10.1-bin-scala_2.12.tgz
将安装包上传到虚拟机节点并解压缩
tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C /opt/module
cd /opt/module
mv flink-1.10.1 flink
进入 flink 安装目录,执行启动命令,并查看进程
cd /opt/module/flink
bin/start-cluster.sh
jps
访问 http://hadoop102:8081 进入 flink 集群和任务监控管理 Web 页面
关闭 flink:bin/stop-cluster.sh
节点服务器 | hadoop102 | hadoop103 | hadoop104 |
---|---|---|---|
角色 | JobManager | TaskManager | TaskManager |
下载 flink 安装包:flink-1.10.1-bin-scala_2.12.tgz
将安装包上传到 hadoop102 并解压缩
tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C /opt/module
cd /opt/module
mv flink-1.10.1 flink
进入 flink 安装目录下的 conf 目录,修改配置文件 flink-conf.yaml
cd /opt/module/flink/conf
vim flink-conf.yaml
#修改 jobmanager 内部通信主机名
jobmanager.rpc.address: hadoop102
jobmanager.rpc.port: 6123
修改 conf 目录下的 slaves 文件,配置 taskmanager 节点
vim slaves #1.13版本为 workers 文件
#添加内容
hadoop103
hadoop104
flink-conf.yaml
文件中常用配置项:
#对 JobManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整
jobmanager.memory.process.size: 1600
#对 TaskManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整
taskmanager.memory.process.size: 1600
#对每个 TaskManager 能够分配的 TaskSlot 数量进行配置,默认为 1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源
taskmanager.numberOfTaskSlots: 1
#Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量
parallelism.default: 1
分发 flink 安装目录到 hadoop103 和 hadoop104
在 hadoop102 上启动 flink 集群
cd /opt/module/flink
bin/start-cluster.sh
jps
访问 http://hadoop102:8081
查看 flink 监控页面
启动 Flink 集群服务
在 hadoop102 中启动 netcat 网络端口
nc -lk 7777
Web 页面提交
http://hadoop102:8081
进入 flink 监控页面,选择左侧的 Submit New Job
选项菜单+Add New
按钮,然后选择 jar 包进行上传submit
提交任务overview
和 jobs
等菜单选项下查看任务运行情况命令行提交
#提交任务:bin/flink run -m [jobmanager主机和端口] -c [主程序类全类名] -p [并行度] [jar包的绝对路径] [--param1 value1 --param2 value2 ...]
cd flink
bin/flink run -m hadoop102:8081 -c com.app.wc.StreamWordCount2 -p 3 /project/FlinkTutorial/target/FlinkTutorial-1.0-SNAPSHOT.jar --host localhost --port 7777
#查看job:-a 可以查看已经取消的job
bin/flink list [-a]
#取消job
bin/flink cancel [jobId]
不能使用 start-cluster.sh
命令启动集群
将编码好的 Flink maven 工程打成 jar 包,并将 jar 包上传到 flink 安装目录下的 lib 目录
启动 JobManager
cd /opt/module/flink
bin/standalone-job.sh start --job-classname com.app.wc.StreamWordCount2
启动 TaskManager
cd /opt/module/flink
bin/taskmanager.sh start
访问 http://hadoop102:8081
查看 flink 监控页面的作业执行
关闭
cd /opt/module/flink
bin/standalone-job.sh stop
bin/taskmanager.sh stop
节点服务器 | hadoop102 | hadoop103 | hadoop104 |
---|---|---|---|
角色 | JobManager | JobManager / TaskManager | TaskManager |
在 hadoop102 上进入 flink 目录下的 conf 目录,修改 flink-conf.yaml
文件
cd /opt/module/flink/conf
vim flink-conf.yaml
#添加内容
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:9820/flink/standalone/ha
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-standalone
high-availability.cluster-id: /cluster_atguigu
修改 masters 文件
vim masters
#添加
hadoop102:8081
hadoop103:8081
分发配置到其他节点
启动
#保证hadoop环境变量配置生效
#启动 hadoop 集群和 Zookeeper 集群
#启动 flink 集群
cd /opt/module/flink
bin/start-cluster.sh
访问:http://hadoop102:8081
和 http://hadoop103:8081
下载安装包:
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
,并将该组件上传至 Flink 的 lib 目录下flink-shaded-hadoop-*
的 jar 包,而是通过配置环境变量完成与 Yarn 集群的对接将安装包上传到虚拟机并解压缩
tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C /opt/module
cd /opt/module
mv flink-1.10.1 flink
进入 conf 目录,修改 flink-conf.yaml
文件
cd /opt/module/flink/conf
vim flink-conf.yaml
#修改
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
确保正确安装 Hadoop 集群和配置 Hadoop 环境变量
sudo vim /etc/profile.d/my_env.sh
export HADOOP_HOME=/opt/module/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
启动 Hadoop 集群
执行脚本命令向 Yarn 集群申请资源,开启一个 Yarn 会话,启动 Flink 集群
cd /opt/module/flink
bin/yarn-session.sh -nm test
#-n 参数:指定 TaskManager 数量
#-s 参数:指定 slot 数量
#-d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session 也可以后台运行。
#-jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
#-nm(--name):配置在 YARN UI 界面上显示的任务名。
#-qu(--queue):指定 YARN 队列名。
#-tm(--taskManager):配置每个 TaskManager 所使用内存
提交作业
Web UI 提交,同独立模式
命令行提交:
#1.将打包好的任务运行 JAR 包上传至集群
#2.执行命令
cd /opt/module/flink
bin/flink run -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
访问 yarn Web UI 界面或 flink Web UI 界面查看作业执行情况
启动 Hadoop 集群
直接向 Yarn 提交一个单独的作业,从而启动一个 Flink 集群
cd /opt/module/flink
#命令一:
bin/flink run -d -t yarn-per-job -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
#命令二:
bin/flink run -m yarn-cluster -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
访问 yarn Web UI 界面或 flink Web UI 界面查看作业执行情况
取消作业:
cd /opt/module/flink
bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
启动 Hadoop 集群
执行命令提交作业
#上传 jar 包到集群
cd /opt/module/flink
bin/flink run-application -t yarn-application -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
#上传 jar 包到 hdfs
bin/flink run-application -t yarn-application
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"
hdfs://myhdfs/jars/my-application.jar
查看或取消作业
/opt/module/flink
bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次启动一个, 本质是利用的 YARN 的重试次数来实现的高可用
在 yarn-site.xml
中配置
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
分发配置到其他节点
在 flink-conf.yaml
中配置
yarn.application-attempts: 3 #要小于 yarn 的重试次数
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
high-availability.zookeeper.path.root: /flink-yarn
启动 yarn-session,杀死 JobManager, 查看复活情况
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。