![]() | 《大數據平臺架構與原型實現:數據中臺建設實戰》一書由博主歷時三年精心創作,現已通過知名IT圖書品牌電子工業出版社博文視點出版發行,點擊《重磅推薦:建大數據平臺太難了!給我發個工程原型吧!》了解圖書詳情,京東購書鏈接:https://item.jd.com/12677623.html,掃描左側二維碼進入京東手機購書頁面。 |
自Amazon EMR推出Serverlesss形態以來,得益于開箱即用和零運維的優質特性,越來越多的EMR用戶開始嘗試EMR Serverless。在使用過程中,一個常被提及的問題是:我們應該如何在EMR Serverless上提交Spark/Hive作業?本文我們將分享一些這方面的最佳實踐,幫助大家以一種更優雅的方式使用這項服務。
一份通俗易懂的講解最好配一個形象生動的例子,本文選擇《CDC一鍵入湖:當 Apache Hudi DeltaStreamer 遇見 Serverless Spark》一文介紹的DeltaStreamer作業作為講解示例,因為這個作業既有一定的通用性又足夠復雜,可以涵蓋大多數EMR Serverless作業遇到的場景,更重要的是,該作業的提交方式遵循了本文要介紹的各項最佳實踐(本文是其姊妹篇)。不了解Apache Hudi的讀者不必擔心,本文的關注點在于如何提交EMR Serverless作業本身,而非DeltaStreamer的技術細節,所以不會影響到您閱讀此文。
參考范本
首先,我們整理一下提交DeltaStreamer CDC作業的幾項關鍵操作,下文會以這些腳本為例,介紹蘊含其中的各項最佳實踐。
1. 導出環境相關變量
export APP_NAME='apache-hudi-delta-streamer'
export APP_S3_HOME='s3://apache-hudi-delta-streamer'
export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'
2. 創建作業專屬工作目錄和S3存儲桶
mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME
3. 準備作業描述文件
cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{"name":"apache-hudi-delta-streamer","applicationId":"$EMR_SERVERLESS_APP_ID","executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN","jobDriver":{"sparkSubmit":{"entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar","entryPointArguments":["--continuous","--enable-sync","--table-type", "COPY_ON_WRITE","--op", "UPSERT","--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders","--target-table", "orders","--min-sync-interval-seconds", "60","--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource","--source-ordering-field", "_event_origin_ts_ms","--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload","--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS","--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL","--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest","--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer","--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders","--hoodie-conf", "auto.offset.reset=earliest","--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number","--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date","--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor","--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true","--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory","--hoodie-conf", "hoodie.datasource.hive_sync.table=orders","--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"],"sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"}},"configurationOverrides":{"monitoringConfiguration":{"s3MonitoringConfiguration":{"logUri":"$APP_S3_HOME/logs"}}}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json
4. 提交作業
export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name apache-hudi-delta-streamer \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)
5. 監控作業
now=$(date +%s)sec
while true; dojobStatus=$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; thenfor i in {0..5}; doecho -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"sleep 1doneelseecho -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"breakfi
done
6. 檢查錯誤
JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME
最佳實踐 (1):提取環境相關信息集中配置,提升腳本可移植性
※ 此項最佳實踐參考《參考范本:1. 導出環境相關變量》
在EMR Serverless的作業腳本中經常會出現與AWS賬號和本地環境有關的信息,例如資源的ARN,各種路徑等,當我們要在不同環境(如開發、測試或生產)中提交作業時,就需要查找和替換這些環境相關的信息。為了讓腳本具備良好的可移植性,推薦的做法是將這些信息抽離出來,以全局變量的形式集中配置,這樣,當在一個新環境(新的AWS賬號或服務器)中提交作業時,只需修改這些變量即可,而不是具體的腳本。
最佳實踐 (2):為作業創建專用的工作目錄和S3存儲桶
※ 此項最佳實踐參考《參考范本:2. 創建作業專屬工作目錄和S3存儲桶》
為一個作業或應用程序創建專用的工作目錄和S3存儲桶是一個良好的規范和習慣。一方面,將本作業/應用的所有“資源”,包括:腳本、配置文件、依賴包、日志以及產生的數據統一存放在有利于集中管理和維護,如果要在Linux和S3上給作業賦予讀寫權限,操作起來了也會簡單一些。
最佳實踐 (3):使用作業描述文件規避字符轉義問題
※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》
我們通常見到的EMR Serverless作業提交示例是將作業描述以字符串參數形式傳遞給命令行的,就像下面這樣:
aws emr-serverless start-job-run \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--job-driver '{"sparkSubmit": {"entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py","entryPointArguments": ["s3://my-job-bucket/output"]}}'
這種方式只能應對簡單的作業提交,當作業中包含大量參數和變量時,很容易出現單引號、雙引號、美元符等特殊字符的轉義問題,由于這里牽涉shell字符串和json字符串的雙重嵌套和解析,所以會非常麻煩。此時在命令行中給出作業描述是很不明智的,更好的做法是:使用cat命令聯合heredoc來創建作業描述文件,然后在命令行中以--cli-input-json file://xxx.json
形式將作業描述傳遞給命令行:
# 生成作業描述文件
cat << EOF > xxx.json... ...... ...... ...
EOF
# 使用作業描述文件提交作業
aws emr-serverless start-job-run ... --cli-input-json file://xxx.json ...
這是一個非常重要的技巧,使用這種形式提交作業有如下兩個好處:
-
在cat + heredoc中編輯的文本為原生字符串,不用考慮字符轉義問題
-
在cat + heredoc中可嵌入shell變量、函數調用和if…else等結構體,實現“動態”構建作業描述文件
最佳實踐 (4):在作業描述文件中嵌入shell變量和腳本片段,實現“動態”構建
※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》
如上所述,采用cat + heredoc編輯作業描述文件后,可以在編輯文件的過程中嵌入shell變量、函數調用和if...else...
等復合結構體,使得我們可以動態構建作業描述文件,這是非常重要的一個能力。在《參考范本:3. 準備作業描述文件》中有一個很好的例證,就是“動態拼接依賴Jar包的路徑”:
--conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')
這是在構建作業描述文件start-job-run.json
的過程中通過$(....)
嵌入的一段shell腳本,這段腳本遍歷了指定目錄下的jar文件并拼接成一個字符串輸出出來,而輸出的字符串會在嵌入腳本的地方變成文本的一部分,我們還可以在編輯文本時調用shell函數,嵌入if...else...
,while
,case
等多重復合邏輯結構,讓作業描述文件可以根據不同的參數和條件動態生成期望的內容,這種靈活性足以讓開發者應對任何復雜的情況。
最佳實踐 (5):使用jq校驗并格式化作業描述文件
※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》
jq是一個處理json文件的命令行工具,對于AWS CLI來說,jq可以說是一個“最佳伴侶”。原因是使用AWS CLI創建資源時,除了傳入常規參數之外,還可以通過--cli-input-json
參數傳入一個json文件來描述所要創建的資源。當創建的資源配置過于復雜時,json文件的優勢就會凸顯出來,就像我們參考范本中的這個EMR Serverless Job一樣。所以,使用AWS CLI時經常有編輯和操作json文件的需求,此時jq就成為了一個強有力的輔助工具。在參考范本中,我們僅僅使用jq打印了一下生成的作業描述文件:
jq . $APP_LOCAL_HOME/start-job-run.json
這一步操作有兩個作用:一是利用jq校驗了json文件,這能幫助排查文件中的json格式錯誤,二是jq輸出的json經過了格式化和語法著色,更加易讀。
其實jq在AWS CLI上還有更多高級應用,只是在我們的參考范本中并沒有體現出來。在某些情況下,我們可以通過jq直接檢索和編輯作業描述文件,將jq和使用cat + heredoc的json編輯方式結合起來,可以創建更加復雜和動態化的作業描述文件。
最佳實踐 (6):可復用的依賴Jar包路徑拼接腳本
※ 此項最佳實踐參考《參考范本:3. 準備作業描述文件》
拼接依賴Jar包路徑幾乎是每個作業都要解決的問題,手動拼接雖然可行,但費力且容易出錯。過去在本地環境中,我們可以使用:--jars $(echo /path/*.jar | tr ' ' ',')
這種簡潔而優雅的方式拼接Jar包路徑。但是EMR Serverless作業的依賴Jar包是存放在S3上的,這此,我們針對性地編寫了一段可復用的腳本來拼接位于S3指定目錄下的Jar包路徑,供大家參考(請注意替換腳本中出現的兩處文件夾路徑):
aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//'
最佳實踐 (7):可復用的作業監控腳本
※ 此項最佳實踐參考《參考范本:5. 監控作業》
使用命令行提交EMR Serverless作業后,用戶可以轉到AWS控制臺上查看作業的狀態,但是對開發者來說,這種切換會分散注意力,最完美的方式莫過于提交作業后繼續在命令行窗口監控作業狀態,直到其失敗或成功運行。為此,《參考范本:5. 監控作業》給出了一種實現,可復用于所有EMR Serverless作業,供大家參考。
最佳實踐 (8):可復用的日志錯誤信息檢索腳本
※ 此項最佳實踐參考《參考范本:6. 檢查錯誤》
在日常開發中,“提交作業報錯 -> 查看日志中的報錯信息 -> 修改代碼重新提交”是一個反復迭代的過程,在EMR Serverless中,用戶需要切換到AWS控制臺查看錯誤日志,并且有時日志量會非常大,在控制臺上查看效率很低。一種更高效的做法是:將存放于S3上的日志文件統一下載到本地并解壓,然后使用grep命令快速檢索日志中含有error, failed, exception等關鍵字的行,然后再打開具體文件仔細查看。將這些動作腳本化后,我們就能得到一段可復用的日志錯誤信息檢索腳本,對于調試和排查錯誤有很大的幫助。為此,《參考范本:6. 檢查錯誤》給出了一種實現,可復用于所有EMR Serverless作業,供大家參考。