最佳實踐:如何優雅地提交一個 Amazon EMR Serverless 作業?

請添加圖片描述《大數據平臺架構與原型實現:數據中臺建設實戰》一書由博主歷時三年精心創作,現已通過知名IT圖書品牌電子工業出版社博文視點出版發行,點擊《重磅推薦:建大數據平臺太難了!給我發個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側二維碼進入京東手機購書頁面。

自Amazon EMR推出Serverlesss形態以來,得益于開箱即用和零運維的優質特性,越來越多的EMR用戶開始嘗試EMR Serverless。在使用過程中,一個常被提及的問題是:我們應該如何在EMR Serverless上提交Spark/Hive作業?本文我們將分享一些這方面的最佳實踐,幫助大家以一種更優雅的方式使用這項服務。

一份通俗易懂的講解最好配一個形象生動的例子,本文選擇《CDC一鍵入湖:當 Apache Hudi DeltaStreamer 遇見 Serverless Spark》一文介紹的DeltaStreamer作業作為講解示例,因為這個作業既有一定的通用性又足夠復雜,可以涵蓋大多數EMR Serverless作業遇到的場景,更重要的是,該作業的提交方式遵循了本文要介紹的各項最佳實踐(本文是其姊妹篇)。不了解Apache Hudi的讀者不必擔心,本文的關注點在于如何提交EMR Serverless作業本身,而非DeltaStreamer的技術細節,所以不會影響到您閱讀此文。

參考范本

首先,我們整理一下提交DeltaStreamer CDC作業的幾項關鍵操作,下文會以這些腳本為例,介紹蘊含其中的各項最佳實踐。

1. 導出環境相關變量

export APP_NAME='apache-hudi-delta-streamer'
export APP_S3_HOME='s3://apache-hudi-delta-streamer'
export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

2. 創建作業專屬工作目錄和S3存儲桶

mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME

3. 準備作業描述文件

cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{"name":"apache-hudi-delta-streamer","applicationId":"$EMR_SERVERLESS_APP_ID","executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN","jobDriver":{"sparkSubmit":{"entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar","entryPointArguments":["--continuous","--enable-sync","--table-type", "COPY_ON_WRITE","--op", "UPSERT","--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders","--target-table", "orders","--min-sync-interval-seconds", "60","--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource","--source-ordering-field", "_event_origin_ts_ms","--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload","--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS","--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL","--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest","--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer","--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders","--hoodie-conf", "auto.offset.reset=earliest","--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number","--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date","--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor","--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true","--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory","--hoodie-conf", "hoodie.datasource.hive_sync.table=orders","--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"],"sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"}},"configurationOverrides":{"monitoringConfiguration":{"s3MonitoringConfiguration":{"logUri":"$APP_S3_HOME/logs"}}}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

4. 提交作業

export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name apache-hudi-delta-streamer \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)

5. 監控作業

now=$(date +%s)sec
while true; dojobStatus=$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; thenfor i in {0..5}; doecho -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"sleep 1doneelseecho -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"breakfi
done

6. 檢查錯誤

JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME

最佳實踐 (1):提取環境相關信息集中配置,提升腳本可移植性

※ 此項最佳實踐參考《參考范本:1. 導出環境相關變量》

在EMR Serverless的作業腳本中經常會出現與AWS賬號和本地環境有關的信息,例如資源的ARN,各種路徑等,當我們要在不同環境(如開發、測試或生產)中提交作業時,就需要查找和替換這些環境相關的信息。為了讓腳本具備良好的可移植性,推薦的做法是將這些信息抽離出來,以全局變量的形式集中配置,這樣,當在一個新環境(新的AWS賬號或服務器)中提交作業時,只需修改這些變量即可,而不是具體的腳本。

最佳實踐 (2):為作業創建專用的工作目錄和S3存儲桶

※ 此項最佳實踐參考《參考范本:2. 創建作業專屬工作目錄和S3存儲桶》

為一個作業或應用程序創建專用的工作目錄和S3存儲桶是一個良好的規范和習慣。一方面,將本作業/應用的所有“資源”,包括:腳本、配置文件、依賴包、日志以及產生的數據統一存放在有利于集中管理和維護,如果要在Linux和S3上給作業賦予讀寫權限,操作起來了也會簡單一些。

最佳實踐 (3):使用作業描述文件規避字符轉義問題

※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》

我們通常見到的EMR Serverless作業提交示例是將作業描述以字符串參數形式傳遞給命令行的,就像下面這樣:

aws emr-serverless start-job-run \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--job-driver '{"sparkSubmit": {"entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py","entryPointArguments": ["s3://my-job-bucket/output"]}}'

這種方式只能應對簡單的作業提交,當作業中包含大量參數和變量時,很容易出現單引號、雙引號、美元符等特殊字符的轉義問題,由于這里牽涉shell字符串和json字符串的雙重嵌套和解析,所以會非常麻煩。此時在命令行中給出作業描述是很不明智的,更好的做法是:使用cat命令聯合heredoc來創建作業描述文件,然后在命令行中以--cli-input-json file://xxx.json形式將作業描述傳遞給命令行:

# 生成作業描述文件
cat << EOF > xxx.json... ...... ...... ...   
EOF
# 使用作業描述文件提交作業
aws emr-serverless start-job-run ... --cli-input-json file://xxx.json ...

這是一個非常重要的技巧,使用這種形式提交作業有如下兩個好處:

  • 在cat + heredoc中編輯的文本為原生字符串,不用考慮字符轉義問題

  • 在cat + heredoc中可嵌入shell變量、函數調用和if…else等結構體,實現“動態”構建作業描述文件

最佳實踐 (4):在作業描述文件中嵌入shell變量和腳本片段,實現“動態”構建

※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》

如上所述,采用cat + heredoc編輯作業描述文件后,可以在編輯文件的過程中嵌入shell變量、函數調用和if...else...等復合結構體,使得我們可以動態構建作業描述文件,這是非常重要的一個能力。在《參考范本:3. 準備作業描述文件》中有一個很好的例證,就是“動態拼接依賴Jar包的路徑”:

--conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')

這是在構建作業描述文件start-job-run.json的過程中通過$(....)嵌入的一段shell腳本,這段腳本遍歷了指定目錄下的jar文件并拼接成一個字符串輸出出來,而輸出的字符串會在嵌入腳本的地方變成文本的一部分,我們還可以在編輯文本時調用shell函數,嵌入if...else...whilecase等多重復合邏輯結構,讓作業描述文件可以根據不同的參數和條件動態生成期望的內容,這種靈活性足以讓開發者應對任何復雜的情況。

最佳實踐 (5):使用jq校驗并格式化作業描述文件

※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》

jq是一個處理json文件的命令行工具,對于AWS CLI來說,jq可以說是一個“最佳伴侶”。原因是使用AWS CLI創建資源時,除了傳入常規參數之外,還可以通過--cli-input-json參數傳入一個json文件來描述所要創建的資源。當創建的資源配置過于復雜時,json文件的優勢就會凸顯出來,就像我們參考范本中的這個EMR Serverless Job一樣。所以,使用AWS CLI時經常有編輯和操作json文件的需求,此時jq就成為了一個強有力的輔助工具。在參考范本中,我們僅僅使用jq打印了一下生成的作業描述文件:

jq . $APP_LOCAL_HOME/start-job-run.json

這一步操作有兩個作用:一是利用jq校驗了json文件,這能幫助排查文件中的json格式錯誤,二是jq輸出的json經過了格式化和語法著色,更加易讀。

其實jq在AWS CLI上還有更多高級應用,只是在我們的參考范本中并沒有體現出來。在某些情況下,我們可以通過jq直接檢索和編輯作業描述文件,將jq和使用cat + heredoc的json編輯方式結合起來,可以創建更加復雜和動態化的作業描述文件。

最佳實踐 (6):可復用的依賴Jar包路徑拼接腳本

※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》

拼接依賴Jar包路徑幾乎是每個作業都要解決的問題,手動拼接雖然可行,但費力且容易出錯。過去在本地環境中,我們可以使用:--jars $(echo /path/*.jar | tr ' ' ',')這種簡潔而優雅的方式拼接Jar包路徑。但是EMR Serverless作業的依賴Jar包是存放在S3上的,這此,我們針對性地編寫了一段可復用的腳本來拼接位于S3指定目錄下的Jar包路徑,供大家參考(請注意替換腳本中出現的兩處文件夾路徑):

aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//'

最佳實踐 (7):可復用的作業監控腳本

※ 此項最佳實踐參考《參考范本:5. 監控作業》

使用命令行提交EMR Serverless作業后,用戶可以轉到AWS控制臺上查看作業的狀態,但是對開發者來說,這種切換會分散注意力,最完美的方式莫過于提交作業后繼續在命令行窗口監控作業狀態,直到其失敗或成功運行。為此,《參考范本:5. 監控作業》給出了一種實現,可復用于所有EMR Serverless作業,供大家參考。

最佳實踐 (8):可復用的日志錯誤信息檢索腳本

※ 此項最佳實踐參考《參考范本:6. 檢查錯誤》

在日常開發中,“提交作業報錯 -> 查看日志中的報錯信息 -> 修改代碼重新提交”是一個反復迭代的過程,在EMR Serverless中,用戶需要切換到AWS控制臺查看錯誤日志,并且有時日志量會非常大,在控制臺上查看效率很低。一種更高效的做法是:將存放于S3上的日志文件統一下載到本地并解壓,然后使用grep命令快速檢索日志中含有error, failed, exception等關鍵字的行,然后再打開具體文件仔細查看。將這些動作腳本化后,我們就能得到一段可復用的日志錯誤信息檢索腳本,對于調試和排查錯誤有很大的幫助。為此,《參考范本:6. 檢查錯誤》給出了一種實現,可復用于所有EMR Serverless作業,供大家參考。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/35772.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/35772.shtml
英文地址,請注明出處:http://en.pswp.cn/news/35772.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

章節7:XSS檢測和利用

章節7&#xff1a;XSS檢測和利用 測試payload <script>alert(XSS)</script> <script>alert(document.cookie)</script> ><script>alert(document.cookie)</script> ><script>alert(document.cookie)</script> &qu…

元宇宙之經濟(02)理解NFT

1 NFT是什么&#xff1f; 想象一下&#xff0c;你小時候曾經在操場上集齊過各種不同的貼紙&#xff0c;然后和朋友們交換&#xff0c;這些貼紙有著獨特的圖案和價值。NFT的概念與此類似&#xff0c;但在數字世界中運作。NFT是一種基于區塊鏈技術的數字資產&#xff0c;每個NFT…

golang—面試題大全

目錄標題 sliceslice和array的區別slice擴容機制slice是否線程安全slice分配到棧上還是堆上擴容過程中是否重新寫入go深拷貝發生在什么情況下&#xff1f;切片的深拷貝是怎么做的copy和左值進行初始化區別slice和map的區別 mapmap介紹map的key的類型map對象如何比較map的底層原…

《Java極簡設計模式》第03章:工廠方法模式(FactoryMethod)

作者&#xff1a;冰河 星球&#xff1a;http://m6z.cn/6aeFbs 博客&#xff1a;https://binghe.gitcode.host 文章匯總&#xff1a;https://binghe.gitcode.host/md/all/all.html 源碼地址&#xff1a;https://github.com/binghe001/java-simple-design-patterns/tree/master/j…

無法正確識別車牌(Python、OpenCv、Tesseract)

我正在嘗試識別車牌&#xff0c;但出現了錯誤&#xff0c;例如錯誤/未讀取字符 以下是每個步驟的可視化&#xff1a; 從顏色閾值變形關閉獲得遮罩 以綠色突出顯示的車牌輪廓過濾器 將板輪廓粘貼到空白遮罩上 Tesseract OCR的預期結果 BP 1309 GD 但我得到的結果是 BP 1309…

騰訊云標準型CVM云服務器詳細介紹

騰訊云CVM服務器標準型實例的各項性能參數平衡&#xff0c;標準型云服務器適用于大多數常規業務&#xff0c;例如&#xff1a;web網站及中間件等&#xff0c;常見的標準型云服務器有CVM標準型S5、S6、SA3、SR1、S5se等規格&#xff0c;騰訊云服務器網來詳細說下云服務器CVM標準…

NAS搭建指南一——服務器的選擇與搭建

一、服務器的選擇 有自己的本地的公網 IP 的請跳過此篇文章按需求選擇一個云服務器&#xff0c;目的就是為了進行 frp 的搭建&#xff0c;完成內網穿透我選擇的是騰訊云服務器&#xff0c;我的配置如下&#xff0c;僅供參考&#xff1a; 4. 騰訊云服務器官網地址 二、服務器…

docker 鏡像的導出與導入 save 與 load

一、鏡像導出 docker save 導出 將系統中的鏡像保存為壓縮包&#xff0c;進行文件傳輸。使用 docker save --help 查看命令各參數&#xff0c;或者去docker官網查看.以 hello-world鏡像為例。 A&#xff1a;將鏡像保存為tar包 docker save image > package.tar docker sa…

day9 10-牛客67道劍指offer-JZ66、19、20、75、23、76、8、28、77、78

文章目錄 1. JZ66 構建乘積數組暴力解法雙向遍歷 2. JZ19 正則表達式匹配3. JZ20 表示數值的字符串有限狀態機遍歷 4. JZ75 字符流中第一個不重復的字符5. JZ23 鏈表中環的入口結點快慢指針哈希表 6. JZ76 刪除鏈表中重復的結點快慢指針三指針如果只保留一個重復結點 7. JZ8 二…

gitblit-使用

1.登入GitBlit服務器 默認用戶和密碼: admin/admin 2.創建一個新的版本庫 點擊圖中的“版本庫”&#xff0c;然后點擊圖中“創建版本庫” 填寫名稱和描述&#xff0c;注意名稱最后一定要加 .git選擇限制查看、克隆和推送勾選“加入README”和“加入.gitignore文件”在圖中的1處…

使用IIS服務器部署Flask python Web項目

參考文章 ""D:\Program Files (x86)\Python310\python310.exe"|"D:\Program Files (x86)\Python310\lib\site-packages\wfastcgi.py"" can now be used as a FastCGI script processor參考文章 請求路徑填寫*&#xff0c;模塊選擇FastCgiModule&…

一鍵部署 Umami 統計個人網站訪問數據

談到網站統計&#xff0c;大家第一時間想到的肯定是 Google Analytics。然而&#xff0c;我們都知道 Google Analytics 會收集所有用戶的信息&#xff0c;對數據沒有任何控制和隱私保護。 Google Analytics 收集的指標實在是太多了&#xff0c;有很多都是不必要的&#xff0c;…

Javascript 深入了解map

map() 是 JavaScript 數組提供的一個高階函數&#xff0c;它用于對數組中的每個元素執行指定的函數&#xff0c;并返回一個新的數組&#xff0c;新數組中的元素是原數組中的每個元素經過函數處理后的結果。 map() 函數的語法如下&#xff1a; javascript array.map(callback(…

Multi-object navigation in real environments using hybrid policies 論文閱讀

論文信息 題目&#xff1a;Multi-object navigation in real environments using hybrid policies 作者&#xff1a;Assem Sadek, Guillaume Bono 來源&#xff1a;CVPR 時間&#xff1a;2023 Abstract 機器人技術中的導航問題通常是通過 SLAM 和規劃的結合來解決的。 最近…

優化堆排序(Java 實例代碼)

目錄 優化堆排序 Java 實例代碼 src/runoob/heap/HeapSort.java 文件代碼&#xff1a; 優化堆排序 上一節的堆排序&#xff0c;我們開辟了額外的空間進行構造堆和對堆進行排序。這一小節&#xff0c;我們進行優化&#xff0c;使用原地堆排序。 對于一個最大堆&#xff0c;首…

【設計模式】-策略模式:優雅處理條件邏輯

Java 策略模式之優雅處理條件邏輯 前言 在軟件開發中&#xff0c;我們經常會遇到根據不同的條件執行不同邏輯的情況。這時&#xff0c;策略模式是一種常用的設計模式&#xff0c;能夠使代碼結構清晰、易于擴展和維護。 本文將詳細介紹策略模式的概念及其在Java中的應用&#x…

flume系列之:監控Systemctl托管的flume agent組

flume系列之:監控Systemctl托管的flume agent組 一、需求背景二、相關技術博客三、遠程登陸flume機器四、發送飛書告警五、監控flume agent組狀態一、需求背景 flume接kafka集群,一個kafka集群對應一個flume agent組,會把一組flume agent用systemctl托管每接一個kafka集群會…

pytest 編寫規范

一、pytest 編寫規范 1、介紹 pytest是一個非常成熟的全功能的Python測試框架&#xff0c;主要特點有以下幾點&#xff1a; 1、簡單靈活&#xff0c;容易上手&#xff0c;文檔豐富&#xff1b;2、支持參數化&#xff0c;可以細粒度地控制要測試的測試用例&#xff1b;3、能夠…

差分升級在物聯網水表上的實現與應用(學習)

摘要 當越來越多的物聯網水表加入抄表系統后&#xff0c;實現了水表數據的信息化&#xff0c;并且當水表終端需要技術更新時&#xff0c;通過網絡方式來升級產品可以高效修復設備面臨的問題&#xff0c;減少用戶損失&#xff0c;降低維護成本&#xff0c;但同時也對有限的網絡…

遍歷集合List的五種方法以及如何在遍歷集合過程中安全移除元素

一、遍歷集合List的五種方法 測試數據 List<String> list new ArrayList<>(); list.add("A");list.add("B");list.add("C");1. 普通for循環 普通for循環&#xff0c;通過索引遍歷 for (int i 0; i < list.size(); i) {Syst…