flink operator v1.10對接華為云對象存儲OBS

1 概述

flink operator及其flink集群,默認不直接支持華為云OBS,需要在這些java程序的插件目錄放一個jar包,以及修改flink配置后,才能支持集成華為云OBS。
相關鏈接參考:

https://support.huaweicloud.com/bestpractice-obs/obs_05_1516.html

2 環境準備

2.1 華為云kubernetes集群

準備一個kubernetes集群,如下圖所示:
在這里插入圖片描述

2.2 flink operator helm包下載地址

https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz

2.3 cert-manager yaml文件下載地址

https://github.com/jetstack/cert-manager/releases/download/v1.17.2/cert-manager.yaml

2.4 準備flink應用示例

https://github.com/apache/flink/tree/master/flink-examples

將flink官方示例的代碼編譯成jar包,再上傳到對象存儲OBS,如下圖所示:
在這里插入圖片描述
這些jar包存放在華為云OBS對象存儲上,flink operator和可以通過OBS協議拉取jar包,最終提交給flink集群,并且flink集群的jobmanager、flink taskmanager也能讀寫OBS對象存儲。

3 部署

3.1 安裝cert-manager

此組件是flink operator webhook的一個依賴,因此先安裝它。

cd /tmp
wget https://github.com/jetstack/cert-manager/releases/download/v1.17.1/cert-manager.yaml
kubectl apply -f cert-manager.yaml

在這里插入圖片描述

3.2 安裝helm二進制工具

cd /tmp
wget https://get.helm.sh/helm-v3.16.2-linux-amd64.tar.gz
tar xf helm-v3.16.2-linux-amd64.tar.gz
cd linux-amd64
/bin/cp -f helm /usr/bin/
helm env

3.3 部署flink operator

下載fink operator的helm包,解壓文件,最后通過helm命令將它部署在flink namespace中。

cd /tmp
wget https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/flink-kubernetes-operator-1.10.0-helm.tgz
tar xf flink-kubernetes-operator-1.10.0-helm.tgz

修改flink-kubernetes-operator/values.yaml文件,在文件的defaultConfiguration.flink-conf.yaml字段下新增如下內容:

defaultConfiguration:flink-conf.yaml: |+fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystemfs.obs.access.key: *********你的ak*********fs.obs.secret.key: *********你的sk*********fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com     # 這是對象存儲端點,依據實際情況填寫

部署k8s資源,命令如下:

helm upgrade --install flink-operator -n flink --create-namespace \
--set image.repository=swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator \
--set image.tag=1.10.0 \
./flink-kubernetes-operator/

我將flink-obs的jar包放入到鏡像swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45中,此鏡像是公共鏡像,大家可隨意拉取使用。

接著,更新operator deployment(需要使用initContainer和obs-plugin的volume的掛載),直接kubectl apply如下內容即可:

apiVersion: apps/v1
kind: Deployment
metadata:annotations:meta.helm.sh/release-name: flink-operatormeta.helm.sh/release-namespace: flinkgeneration: 4labels:app.kubernetes.io/managed-by: Helmapp.kubernetes.io/name: flink-kubernetes-operatorapp.kubernetes.io/version: 1.10.0helm.sh/chart: flink-kubernetes-operator-1.10.0name: flink-kubernetes-operatornamespace: flink
spec:replicas: 1selector:matchLabels:app.kubernetes.io/name: flink-kubernetes-operatorstrategy:type: Recreatetemplate:metadata:annotations:kubectl.kubernetes.io/default-container: flink-kubernetes-operatorcreationTimestamp: nulllabels:app.kubernetes.io/name: flink-kubernetes-operatorspec:initContainers:- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45name: sidecarcommand: ["sh"]args: ["-c","mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"]volumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoopcontainers:- command:- /docker-entrypoint.sh- operatorenv:- name: OPERATOR_NAMESPACEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.namespace- name: HOST_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.hostIP- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP- name: POD_NAMEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.name- name: OPERATOR_NAMEvalue: flink-kubernetes-operator- name: FLINK_CONF_DIRvalue: /opt/flink/conf- name: FLINK_PLUGINS_DIRvalue: /opt/flink/plugins- name: LOG_CONFIGvalue: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties- name: JVM_ARGSimage: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0imagePullPolicy: IfNotPresentlivenessProbe:failureThreshold: 3httpGet:path: /port: health-portscheme: HTTPinitialDelaySeconds: 30periodSeconds: 10successThreshold: 1timeoutSeconds: 1name: flink-kubernetes-operatorports:- containerPort: 8085name: health-portprotocol: TCPresources: {}securityContext: {}startupProbe:failureThreshold: 30httpGet:path: /port: health-portscheme: HTTPperiodSeconds: 10successThreshold: 1timeoutSeconds: 1terminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts:- mountPath: /opt/flink/confname: flink-operator-config-volume- mountPath: /opt/flink/artifactsname: flink-artifacts-volume- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoop- command:- /docker-entrypoint.sh- webhookenv:- name: WEBHOOK_KEYSTORE_PASSWORDvalueFrom:secretKeyRef:key: passwordname: flink-operator-webhook-secret- name: WEBHOOK_KEYSTORE_FILEvalue: /certs/keystore.p12- name: WEBHOOK_KEYSTORE_TYPEvalue: pkcs12- name: WEBHOOK_SERVER_PORTvalue: "9443"- name: LOG_CONFIGvalue: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties- name: JVM_ARGS- name: FLINK_CONF_DIRvalue: /opt/flink/conf- name: FLINK_PLUGINS_DIRvalue: /opt/flink/plugins- name: OPERATOR_NAMESPACEvalueFrom:fieldRef:apiVersion: v1fieldPath: metadata.namespaceimage: swr.cn-south-1.myhuaweicloud.com/migrator/flink-kubernetes-operator:1.10.0imagePullPolicy: IfNotPresentname: flink-webhookresources: {}securityContext: {}terminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts:- mountPath: /certsname: keystorereadOnly: true- mountPath: /opt/flink/confname: flink-operator-config-volumednsPolicy: ClusterFirstrestartPolicy: AlwaysschedulerName: default-schedulersecurityContext:runAsGroup: 9999runAsUser: 9999serviceAccount: flink-operatorserviceAccountName: flink-operatorterminationGracePeriodSeconds: 30volumes:- configMap:defaultMode: 420items:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-operator.propertiespath: log4j-operator.properties- key: log4j-console.propertiespath: log4j-console.propertiesname: flink-operator-configname: flink-operator-config-volume- emptyDir: {}name: flink-artifacts-volume- name: keystoresecret:defaultMode: 420items:- key: keystore.p12path: keystore.p12secretName: webhook-server-cert- name: obs-pluginemptyDir: {}

3.4 部署flink session cluster

kubectl apply以下資源即可部署一個flink session集群,文件內容如下:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:name: flink-session-clusternamespace: flink
spec:image: swr.cn-south-1.myhuaweicloud.com/migrator/flink:1.19flinkVersion: v1_19flinkConfiguration:fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystemfs.obs.access.key: *********你的ak*********fs.obs.secret.key: *********你的sk*********fs.obs.endpoint: obs.cn-south-1.myhuaweicloud.com   # 這是對象存儲端點,依據實際情況填寫jobManager:resource:memory: "2048m"cpu: 2taskManager:resource:memory: "2048m"cpu: 2serviceAccount: flinkpodTemplate:spec:volumes:- name: obs-pluginemptyDir: {}containers:# Do not change the main container name- name: flink-main-containervolumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoopinitContainers:- image: swr.cn-south-1.myhuaweicloud.com/migrator/flink-obs-fs-hadoop:1.12.1-hw-45name: sidecarcommand: ["sh"]args: ["-c","mkdir -p /opt/flink/plugins/obs-fs-hadoop && cp -f /opt/*.jar /opt/flink/plugins/obs-fs-hadoop/"]volumeMounts:- name: obs-pluginmountPath: /opt/flink/plugins/obs-fs-hadoop

在這里插入圖片描述

4 提交flink作業

kubectl apply以下資源即可:

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:name: basic-session-job-examplenamespace: flink
spec:deploymentName: flink-session-clusterjob:jarURI: obs://你的桶/StateMachineExample.jar    # jar包的位置,按實際情況填寫parallelism: 1

在這里插入圖片描述
可見flink作業是running狀態,說明jar包被flink operator從華為云對象存儲OBS拉取下來并提交到flink集群中。
繼續查看flink operator日志,可以看見obs相關的信息:
在這里插入圖片描述

小結

本文介紹flink operator及其管理的flink集群是如何對接到華為云對象存儲OBS,對接完成后,不僅可以將作業的jar包存儲在對象存儲,也可以將flink作業的狀態、輸入輸出等存儲在對象存儲。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/71834.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/71834.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/71834.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

免費PDF工具

Smallpdf.com - A Free Solution to all your PDF Problems Smallpdf - the platform that makes it super easy to convert and edit all your PDF files. Solving all your PDF problems in one place - and yes, free. https://smallpdf.com/#rappSmallpdf.com-解決您所有PD…

去中心化技術P2P框架

中心化網絡與去中心化網絡 1. 中心化網絡 在傳統的中心化網絡中,所有客戶端都通過一個中心服務器進行通信。這種網絡拓撲結構通常是一個星型結構,其中服務器作為中心節點,每個客戶端只能與服務器通信。如果客戶端之間需要通信,必須…

muduo源碼閱讀:linux timefd定時器

?timerfd timerfd 是Linux一個定時器接口,它基于文件描述符工作,并通過該文件描述符的可讀事件進行超時通知。可以方便地與select、poll和epoll等I/O多路復用機制集成,從而在沒有處理事件時阻塞程序執行,實現高效的零輪詢編程模…

Pinia 3.0 正式發布:全面擁抱 Vue 3 生態,升級指南與實戰教程

一、重大版本更新解析 2024年2月11日,Vue 官方推薦的狀態管理庫 Pinia 迎來 3.0 正式版發布,本次更新標志著其全面轉向 Vue 3 技術生態。以下是開發者需要重點關注的升級要點: 1.1 核心變更說明 特性3.0 版本要求兼容性說明Vue 支持Vue 3.…

【圖像處理 --- Sobel 邊緣檢測的詳解】

Sobel 邊緣檢測的詳解 目錄 Sobel 邊緣檢測的詳解1. 梯度計算2. 梯度大小3. 梯度方向4. 非極大值抑制5. 雙閾值處理6. 在 MATLAB 中實現 Sobel 邊緣檢測7.運行結果展示8.關鍵參數解釋9.實驗與驗證 Sobel 邊緣檢測是一種經典的圖像處理算法,用于檢測圖像中的邊緣。它…

LeetCode 熱題100 15. 三數之和

LeetCode 熱題100 | 15. 三數之和 大家好,今天我們來解決一道經典的算法題——三數之和。這道題在 LeetCode 上被標記為中等難度,要求我們從一個整數數組中找到所有不重復的三元組,使得三元組的和為 0。下面我將詳細講解解題思路&#xff0c…

基因組組裝中的術語1——from HGP

Initial sequencing and analysis of the human genome | Nature 1,分層鳥槍法測序hierarchical shotgun sequencing

安全開發-環境選擇

文章目錄 個人心得虛擬機選擇ubuntu 22.04python環境選擇conda下載使用: 個人心得 在做開發時配置一個專門的環境可以使我們在開發中的效率顯著提升,可以避免掉很多環境沖突的報錯。尤其是python各種版本沖突,還有做滲透工具不要選擇windows…

數字體驗驅動用戶參與增效路徑

內容概要 在數字化轉型深化的當下,數字內容體驗已成為企業與用戶建立深度連接的核心切入點。通過個性化推薦引擎與智能數據分析系統的協同運作,企業能夠實時捕捉用戶行為軌跡,構建精準的用戶行為深度洞察模型。這一模型不僅支撐內容分發的動…

Python 字符串(str)全方位剖析:從基礎入門、方法詳解到跨語言對比與知識拓展

Python 字符串(str)全方位剖析:從基礎入門、方法詳解到跨語言對比與知識拓展 本文將深入探討 Python 中字符串(str)的相關知識,涵蓋字符串的定義、創建、基本操作、格式化等內容。同時,會將 Py…

使用C++實現簡單的TCP服務器和客戶端

使用C實現簡單的TCP服務器和客戶端 介紹準備工作1. TCP服務器實現代碼結構解釋 2. TCP客戶端實現代碼結構解釋 3. 測試1.編譯:2.運行 結語 介紹 本文將通過一個簡單的例子,介紹如何使用C實現一個基本的TCP服務器和客戶端。這個例子展示了如何創建服務器…

Java Web開發實戰與項目——Spring Boot與Spring Cloud微服務項目實戰

企業級應用中,微服務架構已經成為一種常見的開發模式。Spring Boot與Spring Cloud提供了豐富的工具和組件,幫助開發者快速構建、管理和擴展微服務應用。本文將通過一個實際的微服務項目,展示如何使用Spring Boot與Spring Cloud構建微服務架構…

VMware建立linux虛擬機

本文適用于初學者,幫助初學者學習如何創建虛擬機,了解在創建過程中各個選項的含義。 環境如下: CentOS版本: CentOS 7.9(2009) 軟件: VMware Workstation 17 Pro 17.5.0 build-22583795 1.配…

Linux8-互斥鎖、信號量

一、前情回顧 void perror(const char *s);功能:參數: 二、資源競爭 1.多線程訪問臨界資源時存在資源競爭(存在資源競爭、造成數據錯亂) 臨界資源:多個線程可以同時操作的資源空間(全局變量、共享內存&a…

LD_PRELOAD 繞過 disable_function 學習

借助這位師傅的文章來學習通過LD_PRELOAD來繞過disable_function的原理 【PHP繞過】LD_PRELOAD bypass disable_functions_phpid繞過-CSDN博客 感謝這位師傅的貢獻 介紹 靜態鏈接: (1)舉個情景來幫助理解: 假設你要搬家&#x…

【無人集群系列---無人機集群編隊算法】

【無人集群系列---無人機集群編隊算法】 一、核心目標二、主流編隊控制方法1. 領航-跟隨法(Leader-Follower)2. 虛擬結構法(Virtual Structure)3. 行為法(Behavior-Based)4. 人工勢場法(Artific…

Oracle Fusion Middleware更改weblogic密碼

前言 當用戶忘記weblogic密碼時,且無法登錄到web界面中,需要使用服務器命令更改密碼 更改方式 1、備份 首先進入 weblogic 安裝目錄,備份三個文件:boot.properties,DefaultAuthenticatorInit.ldift,Def…

MongoDB 復制(副本集)

MongoDB 復制(副本集) 引言 MongoDB是一個高性能、可擴展、易于使用的文檔存儲系統。它以JSON-like的文檔存儲結構,支持靈活的數據模型。在分布式系統中,為了提高數據可用性和系統穩定性,常常需要實現數據的備份和冗余。MongoDB提供了副本集…

【Erdas實驗教程】009:非監督分類及分類后評價

文章目錄 一、分類過程二、分類評價ERDAS 的 ISODATA 算法是基于最小光譜距離來進行的非監督分類,聚類過程始于任意聚類平均值或一個已有分類模板的平均值;聚類每重復一次,聚類的平均值就更新一次,新聚類的均值再用于下次聚類循環。這個過程不斷重復,直到最大的循環次數已…

一周學會Flask3 Python Web開發-Jinja2模板訪問對象

鋒哥原創的Flask3 Python Web開發 Flask3視頻教程: 2025版 Flask3 Python web開發 視頻教程(無廢話版) 玩命更新中~_嗶哩嗶哩_bilibili 如果渲染模板傳的是對象,如果如何來訪問呢? 我們看下下面示例: 定義一個Student類 cla…