赞
踩
上一篇博文介绍了如何独立部署一个高可用的Flink集群,本篇介绍如何用Native k8s去部署高可用的Flink 集群。本篇介绍的集群构建在AWS上,和构建在自己的服务器相比,主要区别在文件系统的选择和使用上。我选用的S3服务。
在AWS上启动3个EC2,操作系统选centos7,注意每个EC2要关联弹性ip地址,允许外网访问。
可参考https://wiki.centos.org/Cloud/AWS
sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
sudo yum install docker -y
sudo systemctl start docker.service
sudo systemctl enable docker.service
sudo systemctl status docker
$ sudo vi /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=https://packages.cloud.google.com/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=1
repo_gpgcheck=0
gpgkey=https://packages.cloud.google.com/yum/doc/yum-key.gpg https://packages.cloud.google.com/yum/doc/rpm-package-key.gpg
$ sudo yum install -y kubelet
$ sudo yum install -y kubeadm
$ sudo systemctl enable kubelet
// On your **Master** node, update your hostname using the following command: //Master:192.168.100.90 (3.101.77.138) $ sudo hostnamectl set-hostname master-node //worker-node1:192.168.100.29 (3.101.77.139) $ sudo hostnamectl set-hostname worker-node1 // worker-node2: 192.168.100.21 (3.101.77.140) $ sudo hostnamectl set-hostname worker-node2 // Make a host entry or DNS record to resolve the hostname for all nodes: $ sudo vi /etc/hosts // With the entry: 192.168.100.63 master-node 192.168.100.36 node1 worker-node1 192.168.100.125 node2 worker-node2
$ sudo setenforce 0
$ sudo sed -i --follow-symlinks 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/sysconfig/selinux
$ sudo reboot
$ sestatus
SELinux status: disabled
$ sudo vi /etc/sysctl.d/k8s.conf
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
$ sudo sysctl --system
$ sudo sed -i '/swap/d' /etc/fstab
$ sudo swapoff -a
// only master node $ sudo kubeadm init kubeadm join 192.168.100.63:6443 --token snmpyy.b4y506h6hr9u7fxh \ --discovery-token-ca-cert-hash sha256:87a099765ce369c519bc02af84a6d4732b1cb987d3e95277b334e3cfc3aa0960 // Create required directories and start managing Kubernetes cluster $ mkdir -p $HOME/.kube $ sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config $ sudo chown $(id -u):$(id -g) $HOME/.kube/config [ec2-user@master-node kubernetes]$ kubectl get nodes NAME STATUS ROLES AGE VERSION master-node NotReady control-plane 84m v1.24.2 //Set up Pod network for the Cluster $ export kubever=$(kubectl version | base64 | tr -d '\n') $ kubectl apply -f https://cloud.weave.works/k8s/net?k8s-version=$kubever // add nodes to your cluster //在两个工作节点操作 $ kubeadm join 192.168.100.63:6443 --token snmpyy.b4y506h6hr9u7fxh \ --discovery-token-ca-cert-hash sha256:87a099765ce369c519bc02af84a6d4732b1cb987d3e95277b334e3cfc3aa0960 //在master节点 $ kubectl label node worker-node1 node-role.kubernetes.io/worker=worker $ kubectl label node worker-node2 node-role.kubernetes.io/worker=worker
$ sudo yum search java|grep jdk
$ sudo yum install -y java-1.8.0-openjdk
$ sudo yum install java-1.8.0-openjdk-devel -y
$ java -version
$ cd /opt
$ wget https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz --no-check-certificate
$ tar -xzf flink-*.tgz
Native k8s是把k8s植入到了Flink安装包中,直接使用Flink的命令就可以在k8s集群中启动flink组件相关的pod。
//service account with RBAC permissions to create, delete pods $ kubectl create namespace flink-cluster $ kubectl create serviceaccount flink -n flink-cluster $ kubectl create clusterrolebinding flink-role-binding-flink \ --clusterrole=edit \ --serviceaccount=flink-cluster:flink //启动session集群1(此时读取的配置为master节点的配置文件,与taskmanager节点的配置文件无关) $ ./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.numberOfTaskSlots=6 -Dkubernetes.rest-service.exposed.type=NodePort 2022-07-01 08:46:49,621 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink session cluster my-session successfully, JobManager Web Interface: http://192.168.100.63:32172 // Dashboard: http://3.101.77.141:32172/,此时没有任何资源\ //只部署了jobmanager,部署在worker-node1,两个服务,一个对内,一个对外 [root@master-node ec2-user]# kubectl get pods,svc,ep -n flink-cluster -o wide NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR service/my-session ClusterIP None <none> 6123/TCP,6124/TCP 124m app=my-session,component=jobmanager,type=flink-native-kubernetes service/my-session-rest NodePort 10.109.225.42 <none> 8081:31595/TCP 124m app=my-session,component=jobmanager,type=flink-native-kubernetes NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES pod/my-session-556f44f44b-94gfk 1/1 Running 0 124m 10.44.0.2 worker-node1 <none> <none> NAME ENDPOINTS AGE endpoints/my-session 10.44.0.2:6124,10.44.0.2:6123 124m endpoints/my-session-rest 10.44.0.2:8081 124m [root@master-node ec2-user]# kubectl get deployment -o wide -n flink-cluster NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES SELECTOR my-session 1/1 1 1 125m flink-main-container apache/flink:1.14.5-scala_2.12 app=my-session,component=jobmanager,type=flink-native-kubernetes
可以通过命令行提交,也可以通过dashboard提交任务
// run之后才会显示资源 $ ./bin/flink run \ --target kubernetes-session \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.cluster-id=my-session \ ./examples/streaming/TopSpeedWindowing.jar $ ./bin/flink run --target kubernetes-session -Dkubernetes.namespace=flink-cluster -Dkubernetes.cluster-id=my-session -Dparallelism.default=2 ./examples/streaming/TopSpeedWindowing.jar //自动扩容功能测试 //一个taskmanager的任务槽使用完之前,cluster只有一个task manager NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES my-session-556f44f44b-zdvrf 1/1 Running 0 21m 10.36.0.1 worker-node2 <none> <none> my-session-taskmanager-1-1 1/1 Running 0 20m 10.44.0.1 worker-node1 <none> <none> //继续提交作业,cluster会自动扩容 NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES my-session-556f44f44b-zdvrf 1/1 Running 0 21m 10.36.0.1 worker-node2 <none> <none> my-session-taskmanager-1-1 1/1 Running 0 20m 10.44.0.1 worker-node1 <none> <none> my-session-taskmanager-1-2 1/1 Running 0 34s 10.36.0.2 worker-node2 <none> <none> //当两个taskmanager的资源都用完了 [root@master-node flink-1.14.5]kubectl get pods -n flink-cluster -o wide NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES my-session-58bb97cdc-85ssq 1/1 Running 0 17m 10.36.0.1 worker-node2 <none> <none> my-session-taskmanager-1-1 1/1 Running 0 8m26s 10.44.0.2 worker-node1 <none> <none> my-session-taskmanager-1-2 1/1 Running 0 3m11s 10.36.0.3 worker-node2 <none> <none> my-session-taskmanager-1-3 0/1 Pending 0 33s <none> <none> <none> <none> //清理资源 $ kubectl delete deployment/my-session -n flink-cluster $ kubectl delete clusterrolebinding flink-role-binding-flink $ kubectl delete serviceaccount flink -n flink-cluster $ kubectl delete namespace flink-cluster
和独立部署相比,采用Native k8s部署有以下几个值得注意的点:
//启动Session cluster2, 修改配置 $ kubectl label nodes worker-node2 node=master [root@master-node flink-1.14.5]# kubectl get nodes -l "node=master" NAME STATUS ROLES AGE VERSION worker-node2 Ready worker 10d v1.24.2 $ ./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.numberOfTaskSlots=8 -D kubernetes.rest-service.exposed.type=NodePort -Dkubernetes.jobmanager.node-selector=node:master [root@master-node flink-1.14.5]# kubectl get pods,svc,ep -n flink-cluster -o wide NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES pod/my-session-58bb97cdc-85ssq 1/1 Running 0 57s 10.36.0.1 worker-node2 <none> <none> NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR service/my-session ClusterIP None <none> 6123/TCP,6124/TCP 57s app=my-session,component=jobmanager,type=flink-native-kubernetes service/my-session-rest NodePort 10.108.58.5 <none> 8081:31181/TCP 57s app=my-session,component=jobmanager,type=flink-native-kubernetes NAME ENDPOINTS AGE endpoints/my-session 10.36.0.1:6124,10.36.0.1:6123 57s endpoints/my-session-rest 10.36.0.1:8081 57s
高可用服务的部署需要一个可共享的持久化存储目录,因为部署在AWS上,这里我选择S3。
因此首先要解决的问题是如何让集群可以使用S3
./bin/kubernetes-session.sh
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.4.jar \
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.4.jar
Native k8s部署下使用s3的坑较多,各种报错。主要有两种:
./bin/kubernetes-session.sh -Dkubernetes.namespace=flink-cluster -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=my-session -Dtaskmanager.numberOfTaskSlots=6 -Dkubernetes.rest-service.exposed.type=NodePort -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=s3a://yunzpeng-bucket/flink-ha
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xdKpmvz1-1673498615890)(2022-12-22-00-00-17.png)]
// 在root下操作 // 1. build a docker image with the flink job FROM flink:1.14.5 RUN mkdir -p $FLINK_HOME/usrlib COPY ./examples/streaming/TopSpeedWindowing.jar $FLINK_HOME/usrlib/my-flink-job.jar $ docker build -t pandafish1996/flink-demo . // 2. Push image to image warehouse $ docker login --username=pandafish1996 // 规范: docker push 注册用户名/镜像名 //$ docker tag yunzpeng/flink-word-count pandafish1996/flink-word-count $ docker push pandafish1996/flink-demo //3. start a flink application cluster $ kubectl create namespace flink-cluster $ kubectl create serviceaccount flink -n flink-cluster $ kubectl create clusterrolebinding flink-role-binding-flink \ --clusterrole=edit \ --serviceaccount=flink-cluster:flink // without HA $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=pandafish1996/flink-demo \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.service-account=flink \ -Dparallelism.default=2 \ -Dtaskmanager.numberOfTaskSlots=6 \ -Dkubernetes.rest-service.exposed.type=NodePort \ local:///opt/flink/usrlib/my-flink-job.jar $ kubectl delete deployment/my-first-application-cluster -n flink-cluster
// 1. 构造image并上传到远程仓库 FROM flink:1.14.5 RUN mkdir -p $FLINK_HOME/usrlib COPY ./examples/streaming/TopSpeedWindowing.jar $FLINK_HOME/usrlib/my-flink-job.jar RUN mkdir -p $FLINK_HOME/plugins/flink-s3-fs-hadoop COPY ./opt/flink-s3-fs-hadoop-1.14.5.jar $FLINK_HOME/plugins/flink-s3-fs-hadoop/ $ docker build -t pandafish1996/flink-hademo . $ docker login --username=pandafish1996 // 规范: docker push 注册用户名/镜像名 //$ docker tag yunzpeng/flink-word-count pandafish1996/flink-word-count $ docker push pandafish1996/flink-hademo // 2. 启动集群 ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=ha-cluster1 \ -Dkubernetes.container.image=pandafish1996/flink-hademo \ -Dkubernetes.namespace=flink-cluster \ -Dkubernetes.service-account=flink \ -Dparallelism.default=2 \ -Dtaskmanager.numberOfTaskSlots=6 \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ -Dhigh-availability.storageDir=s3://yunzpeng-bucket/flink-ha \ -Drestart-strategy=fixed-delay \ -Drestart-strategy.fixed-delay.attempts=10 \ -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar \ -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.14.5.jar \ local:///opt/flink/usrlib/my-flink-job.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。