引言
在當今大數據與云計算蓬勃發展的時代,容器編排與流處理技術成為企業數據處理架構的關鍵支柱。Kubernetes作為容器編排系統的行業標準,能夠高效自動化地部署、擴展和管理計算機應用程序;Apache Flink則是流處理和批處理領域的佼佼者,以強大的實時處理能力和精準的狀態管理著稱。當Flink與Kubernetes實現深度集成,二者優勢互補,為企業帶來了更加靈活、高效、智能的數據處理解決方案。本文將基于詳實的操作筆記,深入解析Flink與Kubernetes集成的全流程,以及不同部署模式的技術要點與實踐細節。
一、集成環境搭建
1.1 環境要求
Flink與Kubernetes的集成對運行環境有著嚴格且明確的要求,這些條件是確保集成順利進行和集群穩定運行的基礎:
- Kubernetes版本:Kubernetes集群版本必須在1.9及以上,高版本的Kubernetes不僅提供了更豐富的功能特性,還能更好地與Flink進行兼容性適配,保證Flink作業在集群上的穩定運行和高效調度。
- kubectl配置:
kubecconfig
文件是連接本地客戶端與Kubernetes集群的橋梁,通常存儲在~/.kube/config
路徑下 。通過執行kubectl auth can-i <list|create|edit|delete> pods
命令,可以驗證當前用戶是否具備對Pods和服務進行列出、創建、編輯、刪除等操作的權限。若權限不足,在后續部署Flink集群資源時,將會遇到權限拒絕的錯誤,導致部署失敗。 - Kubernetes DNS:Kubernetes DNS服務的開啟至關重要,它承擔著集群內服務發現的核心功能。在Flink與Kubernetes集成環境中,各組件之間需要通過服務名稱進行通信,Kubernetes DNS能夠將服務名稱解析為對應的IP地址,確保Flink JobManager、TaskManager等組件之間的網絡通信暢通無阻。
- RBAC權限:默認的
default
服務賬號需要具備RBAC(基于角色的訪問控制)中創建、刪除Pods的權限。然而,為了實現更精細化的資源管理和權限隔離,建議專門為Flink創建獨立的命名空間和服務賬號。這樣做不僅可以避免因權限混亂導致的部署失敗風險,還能顯著降低后期的運維成本,使得Flink集群的管理更加清晰、安全和高效。
1.2 創建專屬資源
為了給Flink集群打造一個獨立、安全且便于管理的運行環境,需要按照以下步驟創建專屬資源:
- 創建命名空間:使用命令
kubectl create ns flink
,即可創建一個名為flink
的命名空間。這個命名空間就像是一個獨立的“數據處理小世界”,后續所有與Flink相關的資源,如Pods、Services、ConfigMaps等,都將部署在這個空間內,實現與其他應用資源的邏輯隔離。 - 創建ServiceAccount:執行
kubectl create serviceaccount flink-service-account -n flink
命令,創建flink-service-account
服務賬號。該服務賬號將作為Flink集群與Kubernetes集群進行交互的“身份憑證”,用于驗證操作權限和進行安全通信。 - 用戶授權:通過
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
命令,為flink-service-account
服務賬號賦予edit
權限。這意味著該賬號可以對集群內的資源進行創建、修改和刪除等操作,從而滿足Flink集群部署和運行過程中對資源管理的需求 。
二、Flink Standalone Kubernetes部署模式
2.1 模式概述
Flink Standalone Kubernetes集成模式支持session
和application
兩種部署模式,而per-job
模式目前僅在YARN環境中支持,并且在Flink 1.15版本中已在YARN環境下被棄用(具體可參考FLINK-26000相關內容) 。本次重點研究非HA(高可用)部署模式,雖然在實際生產環境中,HA模式能夠提供更高的可靠性和容錯能力,但非HA模式與HA模式在核心原理和大部分配置上是相通的,HA模式的詳細配置可參考官方文檔進行深入學習。session
和application
模式的主要差異體現在JobManager和TaskManager服務的聲明方式上,不過兩種模式也存在一些通用的集群資源配置。
2.2 通用集群資源配置
在Flink Standalone Kubernetes部署中,以下幾種資源是通用的,它們為Flink集群的正常運行提供了基礎配置和服務支持:
- flink-configuration-configmap.yaml:該文件主要用于配置Flink的核心參數以及日志相關設置,是Flink集群運行的重要配置文件。示例內容如下:
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2 log4j-console.properties: |+# 如下配置會同時影響用戶代碼和 Flink 的日志行為rootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# 如果你只想改變 Flink 的日志行為則可以取消如下的注釋部分#logger.flink.name = org.apache.flink#logger.flink.level = INFO# 下面幾行將公共 libraries 或 connectors 的日志級別保持在 INFO 級別。# root logger 的配置不會覆蓋此處配置。# 你必須手動修改這里的日志級別。logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# 將所有 info 級別的日志輸出到 consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# 將所有 info 級別的日志輸出到指定的 rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# 關閉 Netty channel handler 中不相關的(錯誤)警告logger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
- flink-reactive-mode-configuration-configmap.yaml:當需要啟用Flink的響應式調度模式時,需要配置此文件。除了包含與
flink-configuration-configmap.yaml
類似的核心參數配置外,還會設置與響應式模式相關的特定參數,如調度模式和檢查點間隔等。示例如下:
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels: app: flink
data:flink-conf.yaml: jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m parallelism.default: 2scheduler-mode: reactiveexecution.checkpointing.interval: 10s log4j-console.properties: |+# 如下配置會同時影響用戶代碼和 Flink 的日志行為rootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender# 如果你只想改變 Flink 的日志行為則可以取消如下的注釋部分#logger.flink.name = org.apache.flink #logger.flink.level = INFO # 下面幾行將公共 libraries 或 connectors 的日志級別保持在 INFO 級別。# root logger 的配置不會覆蓋此處配置。# 你必須手動修改這里的日志級別。logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafka logger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # 將所有 info 級別的日志輸出到 consoleappender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# 將所有 info 級別的日志輸出到指定的 rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # 關閉 Netty channel handler 中不相關的(錯誤)警告 logger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
- jobmanager-service.yaml:這是一個可選的Service資源,僅在非HA模式下需要使用,其主要作用是定義JobManager服務,用于集群內部組件之間的通信。通過該Service,JobManager的RPC端口(用于任務調度通信)、Blob Server端口(用于管理二進制大對象)和Web UI端口(用于用戶監控和管理界面訪問)得以暴露。具體配置如下:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
- jobmanager-rest-service.yaml:同樣是可選的Service,該服務的作用是將JobManager的REST端口暴露為Kubernetes節點端口,使得外部客戶端可以通過節點的IP地址和指定端口訪問Flink的Web UI界面,方便用戶對Flink作業進行監控、管理和操作。配置示例如下:
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
- taskmanager-query-state-service.yaml:此Service也是可選的,它的功能是將TaskManager的查詢狀態服務端口暴露為公共Kubernetes node的節點端口,通過該端口可以訪問Flink的Queryable State服務,用于查詢和管理TaskManager中的狀態數據。配置內容如下:
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
當完成上述通用集群資源的配置文件編寫后,可以使用以下命令來創建這些服務資源:
# 在執行以下指令時,優先檢查是否已經定義了通用集群資源聲明
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink
2.3 Application模式部署
Flink Application集群是一種專門為運行單個Application而設計的專用集群,在這種模式下,部署集群時必須確保對應的Application能夠正常運行。因此,在提交Flink作業任務時,首先需要啟動Flink Application集群,保證其處于可用狀態,然后才能進行作業提交操作。
在Kubernetes上部署一個基本的Flink Application集群時,通常需要包含以下三個核心組件:
- 一個運行JobManager的Application:JobManager在Flink集群中扮演著“大腦”的角色,負責作業的調度、任務分配以及與TaskManager之間的協調通信,確保整個作業的順利執行。
- 運行若干個TaskManager的Deployment:TaskManager是實際執行任務的工作節點,它們接收來自JobManager的任務指令,進行數據處理和計算工作。通過Deployment來管理TaskManager,可以方便地實現任務節點的擴縮容,以適應不同作業負載的需求。
- 暴露JobManager上REST和UI端口的Service:該Service的作用是將JobManager的REST API端口和Web UI端口暴露出來,方便用戶通過瀏覽器訪問Flink的Web界面,對作業進行監控、提交、暫停、取消等操作,同時也便于外部系統通過REST API與Flink集群進行交互。
在進行Application集群資源定義時,需要特別注意以下幾點:
- jobmanager-job.yaml:在該文件中,
args
屬性必須指定用戶作業的主類,這是告訴Flink集群從哪個類開始執行用戶提交的作業。此外,還可以參考如何設置JobManager參數的相關文檔,了解如何將額外的參數傳遞給jobmanager-job.yaml
配置中指定的Flink鏡像,以滿足不同作業的個性化需求。 - job artifacts參數:該參數所對應的作業資源(如Jar包等)必須能夠從資源定義示例中的
job-artifacts-volume
處獲取。如果是在minikube集群中創建這些組件,那么可以將定義示例中的job-artifacts-volume
掛載為主機的本地目錄,方便快捷地提供作業資源。若不使用minikube集群,也可以使用Kubernetes集群中任何其它可用類型的volume,如PersistentVolumeClaim等,來提供作業所需的資源。另外,還可以選擇構建一個已經包含job artifacts參數的自定義鏡像,將作業資源直接打包在鏡像中,簡化資源管理流程。
以下是Apllication Mode集群資源定義模板,在實際生產環境中使用時,需要根據具體需求進行相應的調整:
- jobmanager-application-non-ha.yaml:非HA模式下JobManager的配置模板示例:
實在抱歉,給你帶來不好的體驗!我將分多次輸出完整內容,每次保證段落完整連貫。以下從Flink Standalone Kubernetes部署模式的Session模式繼續。
2.4 Session模式部署
Flink Session集群以長期運行的Deployment形式存在,可在其上運行多個作業。部署包含運行JobManager的Deployment、運行TaskManager的Deployment和暴露JobManager端口的Service。
- 資源定義模板: jobmanager-session-deployment-non-ha.yaml:非HA模式下JobManager的Deployment配置。
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:1.14.4-scala_2.11args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999volumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
jobmanager-session-deployment-ha.yaml:HA模式下JobManager的Deployment配置。
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1 # 通過設置大于 1 的整型值來開啟 Standby JobManagerselector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: apache/flink:1.14.4-scala_2.11env:- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP# 下面的 args 參數會使用 POD_IP 對應的值覆蓋 config map 中 jobmanager.rpc.address 的屬性值。args: ["jobmanager", "$(POD_IP)"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999 # 參考官方 flink 鏡像中的 _flink_ 用戶,如有必要可以修改serviceAccountName: flink-service-account # 擁有創建、編輯、刪除 ConfigMap 權限的 Service 賬號volumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
taskmanager-session-deployment.yaml:TaskManager的Deployment配置。
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: apache/flink:1.14.4-scala_2.11args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/flink/conf/securityContext:runAsUser: 9999 # 參考官方 flink 鏡像中的 _flink_ 用戶,如有必要可以修改volumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
- 集群操作命令
創建Session集群:
kubectl create -f jobmanager-session-deployment.yaml
kubectl create -f taskmanager-session-deployment.yaml
停止Flink session集群:
kubectl delete -f taskmanager-session-deployment.yaml
kubectl delete -f jobmanager-session-deployment.yaml
三、Flink Native Kubernetes部署模式
Flink Native kubernetes模式默認只啟動jobmanager,之后根據job任務提交情況,動態的申請、啟動taskmanager計算資源,目前該模式支持session、application部署方式,不支持per - job方式。
3.1 Flink Native Kubernetes(Session)
Flink Session Native on kubernetes和Flink 流程大致相同,都需要構建基礎dockerfile,首先需要將獲取的基礎鏡像push到本地倉庫中,其次才是構建鏡像倉庫。
- 編寫dockerfile
cat >> kubernete-native-session <<EOF
# 將以下內容填充到該docker文件
FROM apache/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
EOF
- 構建docker鏡像
docker build -t ruijie/bigdata/flink-session:1.14.6-scala_2.12. --no-cache
#推送到私有倉庫中
docker push ruijie/bigdata/flink-session:1.14.6-scala_2.12
- 創建flink native session kubernetes集群
./bin/kubernetes-session.sh \-Dkubernetes.cluster-id=flink-native-cluster \-Dkubernetes.container.image=ruijie/bigdata/flink-session:1.14.6-scala_2.12 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dkubernetes.rest-service.exposed.type=NodePort
執行完畢之后,可以得到以下結果,我們指定的是nodePort而非clusterId,后續會對這一部分進行詳細解釋,執行完畢之后我們的kubernetes 的flink native session創建完畢了,可以通過日志打印看出web - ui暴露的地址進行訪問。
通過訪問控制臺打印的日志可以找到web - ui訪問地址;或者通過kubectl get pods -n flink
查看,然后通過kubectl logs -f
查看日志。
4. 提交任務
./bin/flink run \--target kubernetes-session \-Dkubernetes.cluster-id=flink-native-session \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \./examples/streaming/TopSpeedWindowing.jar \./examples/streaming/WordCount.jar \-Dkubernetes.taskmanager.cpu=2000m \-Dexternal-resource.limits.kubernetes.cpu=4000m \-Dexternal-resource.limits.kubernetes.memory=10Gi \-Dexternal-resource.requests.kubernetes.cpu=2000m \-Dexternal-resource.requests.kubernetes.memory=8Gi \-Dkubernetes.taskmanager.cpu=2000m
- 刪除集群資源
kubectl delete deployment/flink-native-session
- 取消正在運行的任務
echo'stop' | ./bin/kubernetes-session.sh \-Dkubernetes.cluster-id=my-first-flink-cluster \-Dexecution.attached=true
3.2 Flink Application Native Kubernetes
Flink application on native 和以上的相同都需要經過dockerfile 文件的編寫和構建鏡像。
- 編寫dockerfile文件
cat >> flink-application<<EOF
FROM flink:1.14.6-scala_2.12
ENV FLINK_HOME=/opt/flink
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/
EOF
- 構建鏡像與操作
# 構建鏡像
docker build -f flink-application -t docker.ruijie.com/bigdata/application-flink:1.14.6. --no-cache
# 推動到本地倉庫中
docker push docker.ruijie.com/bigdata/application-flink:1.14.6
# 刪除本地鏡像
docker rmi docker.ruijie.com/bigdata/application-flink:1.14.6
# 刪除kubernetes鏡像
crictl rmi docker.ruijie.com/bigdata/application-flink:1.14.6
同時也需要構建和下發到其他node節點將鏡像加載進去,避免因找不到鏡像而報錯。命名空間和權限也相同,參考以上步驟。
3. 啟動任務
./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=flink-application-cluster \-Dkubernetes.container.image=docker.ruijie.com/bigdata/application/flink:1.14.6 \-Dkubernetes.jobmanager.replicas=1 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dexternal-resource.limits.kubernetes.cpu=2000m \-Dkubernetes.websocket.timeout=60000 \-Dexternal-resource.limits.kubernetes.memory=1Gi \-Dexternal-resource.requests.kubernetes.cpu=1000m \-Dexternal-resource.requests.kubernetes.memory=1Gi \-Dkubernetes.rest-service.exposed.type=NodePort \local:///usrlib/TopSpeedWindowing.jar
- 查看任務運行情況
# 查看pods和svc
kubectl get pods,svc -n flink
# 查看日志
kubectl logs -f rj-flink-cluster-f4d9b796-lqg7q -n flink
- native application集群管理
# 列出集群上正在運行的作業
./bin/flink list --target kubernetes-application -Dkubernetes.cluster-id=flink-native-session
# 取消正在運行的作業
./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=flink-native-session <jobId>
以上就是Flink與Kubernetes集成的全流程及各模式詳細操作。如果在實際操作中有任何疑問,或想了解某部分的更多細節,歡迎隨時告訴我。