1 概述
flink operator及其flink集群,默認不直接支持華為云OBS,需要在這些java程序的插件目錄放一個jar包,以及修改flink配置后,才能支持集成華為云OBS。
相關鏈接參考:
https://support.huaweicloud.com/bestpractice-obs/obs_05_1516.html
2 環境準備
2.1 華為云kubernetes集群
準備一個kubernetes集群,如下圖所示:
2.2 flink operator helm包下載地址
https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
2.3 cert-manager yaml文件下載地址
https://github.com/jetstack/cert-manager/releases/download/v1.17.2/cert-manager.yaml
2.4 準備flink應用示例
https://github.com/apache/flink/tree/master/flink-examples
將flink官方示例的代碼編譯成jar包,再上傳到對象存儲OBS,如下圖所示:
這些jar包存放在華為云OBS對象存儲上,flink operator和可以通過OBS協議拉取jar包,最終提交給flink集群,并且flink集群的jobmanager、flink taskmanager也能讀寫OBS對象存儲。
3 部署
3.1 安裝cert-manager
此組件是flink operator webhook的一個依賴,因此先安裝它。
cd /tmp
wget https://github.com/jetstack/cert-manager/releases/download/v1.17.1/cert-manager.yaml
kubectl apply -f cert-manager.yaml
3.2 安裝helm二進制工具
cd /tmp
wget https://get.helm.sh/helm-v3.16.2-linux-amd64.tar.gz
tar xf helm-v3.16.2-linux-amd64.tar.gz
cd linux-amd64
/bin/cp -f helm /usr/bin/
helm env
3.3 部署flink operator
下載fink operator的helm包,解壓文件,最后通過helm命令將它部署在flink namespace中。
cd /tmp
wget https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
tar xf flink-kubernetes-operator-1.10.0-helm.tgz
修改flink-kubernetes-operator/values.yaml文件,在文件的defaultConfiguration.flink-conf.yaml字段下新增如下內容:
defaultConfiguration:flink-conf.yaml: |+fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystemfs.obs.access.key: *********你的ak*********fs.obs.secret.key: *********你的sk*********fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com # 這是對象存儲端點,依據實際情況填寫
部署k8s資源,命令如下:
helm upgrade --install flink-operator -n flink --create-namespace \
--set image.repository=swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator \
--set image.tag=1.10.0 \
./flink-kubernetes-operator/
我將flink-obs的jar包放入到鏡像swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45中,此鏡像是公共鏡像,大家可隨意拉取使用。
接著,更新operator deployment(需要使用initContainer和obs-plugin的volume的掛載),直接kubectl apply如下內容即可:
apiVersion: apps/v1
kind: Deployment
metadata:annotations:meta.helm.sh/release-name: flink-operatormeta.helm.sh/release-namespace: flinkgeneration: 4labels:app.kubernetes.io/managed-by: Helmapp.kubernetes.io/name: flink-kubernetes-operatorapp.kubernetes.io/version: 1.10.0helm.sh/chart: flink-kubernetes-operator-1.10.0name: flink-kubernetes-operatornamespace: flink
spec:replicas: 1selector:matchLabels:app.kubernetes.io/name: flink-kubernetes-operatorstrategy:type: Recreatetemplate:metadata:annotations:kubectl.kubernetes.io/default-container: flink-kubernetes-operatorcreationTimestamp: nulllabels:app.kubernetes.io/name: flink-kubernetes-operatorspec:initContainers:- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45name: sidecarcommand: ["sh"]args: ["-c","mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"]volumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoopcontainers:- command:- /docker-entrypoint.sh- operatorenv:- name: OPERATOR_NAMESPACEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.namespace- name: HOST_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.hostIP- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP- name: POD_NAMEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.name- name: OPERATOR_NAMEvalue: flink-kubernetes-operator- name: FLINK_CONF_DIRvalue: /opt/flink/conf- name: FLINK_PLUGINS_DIRvalue: /opt/flink/plugins- name: LOG_CONFIGvalue: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties- name: JVM_ARGSimage: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0imagePullPolicy: IfNotPresentlivenessProbe:failureThreshold: 3httpGet:path: /port: health-portscheme: HTTPinitialDelaySeconds: 30periodSeconds: 10successThreshold: 1timeoutSeconds: 1name: flink-kubernetes-operatorports:- containerPort: 8085name: health-portprotocol: TCPresources: {}securityContext: {}startupProbe:failureThreshold: 30httpGet:path: /port: health-portscheme: HTTPperiodSeconds: 10successThreshold: 1timeoutSeconds: 1terminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts:- mountPath: /opt/flink/confname: flink-operator-config-volume- mountPath: /opt/flink/artifactsname: flink-artifacts-volume- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoop- command:- /docker-entrypoint.sh- webhookenv:- name: WEBHOOK_KEYSTORE_PASSWORDvalueFrom:secretKeyRef:key: passwordname: flink-operator-webhook-secret- name: WEBHOOK_KEYSTORE_FILEvalue: /certs/keystore.p12- name: WEBHOOK_KEYSTORE_TYPEvalue: pkcs12- name: WEBHOOK_SERVER_PORTvalue: "9443"- name: LOG_CONFIGvalue: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties- name: JVM_ARGS- name: FLINK_CONF_DIRvalue: /opt/flink/conf- name: FLINK_PLUGINS_DIRvalue: /opt/flink/plugins- name: OPERATOR_NAMESPACEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.namespaceimage: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0imagePullPolicy: IfNotPresentname: flink-webhookresources: {}securityContext: {}terminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts:- mountPath: /certsname: keystorereadOnly: true- mountPath: /opt/flink/confname: flink-operator-config-volumednsPolicy: ClusterFirstrestartPolicy: AlwaysschedulerName: default-schedulersecurityContext:runAsGroup: 9999runAsUser: 9999serviceAccount: flink-operatorserviceAccountName: flink-operatorterminationGracePeriodSeconds: 30volumes:- configMap:defaultMode: 420items:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-operator.propertiespath: log4j-operator.properties- key: log4j-console.propertiespath: log4j-console.propertiesname: flink-operator-configname: flink-operator-config-volume- emptyDir: {}name: flink-artifacts-volume- name: keystoresecret:defaultMode: 420items:- key: keystore.p12path: keystore.p12secretName: webhook-server-cert- name: obs-pluginemptyDir: {}
3.4 部署flink session cluster
kubectl apply以下資源即可部署一個flink session集群,文件內容如下:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:name: flink-session-clusternamespace: flink
spec:image: swr.cn-south-1.myhuaweicloud.com/migrator/flink:1.19flinkVersion: v1_19flinkConfiguration:fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystemfs.obs.access.key: *********你的ak*********fs.obs.secret.key: *********你的sk*********fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com # 這是對象存儲端點,依據實際情況填寫jobManager:resource:memory: "2048m"cpu: 2taskManager:resource:memory: "2048m"cpu: 2serviceAccount: flinkpodTemplate:spec:volumes:- name: obs-pluginemptyDir: {}containers:# Do not change the main container name- name: flink-main-containervolumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoopinitContainers:- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45name: sidecarcommand: ["sh"]args: ["-c","mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"]volumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoop
4 提交flink作業
kubectl apply以下資源即可:
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:name: basic-session-job-examplenamespace: flink
spec:deploymentName: flink-session-clusterjob:jarURI: obs://你的桶/StateMachineExample.jar # jar包的位置,按實際情況填寫parallelism: 1
可見flink作業是running狀態,說明jar包被flink operator從華為云對象存儲OBS拉取下來并提交到flink集群中。
繼續查看flink operator日志,可以看見obs相關的信息:
小結
本文介紹flink operator及其管理的flink集群是如何對接到華為云對象存儲OBS,對接完成后,不僅可以將作業的jar包存儲在對象存儲,也可以將flink作業的狀態、輸入輸出等存儲在對象存儲。