長久已來,SQL以其簡單易用、開發效率高等優勢一直是ETL的首選編程語言,在構建數據倉庫和數據湖的過程中發揮著不可替代的作用。Hive和Spark SQL也正是立足于這一點,才在今天的大數據生態中牢牢占據著主力位置。在常規的Spark環境中,開發者可以使用spark-sql
命令直接執行SQL文件,這是一項看似平平無奇實則非常重要的功能:一方面,這一方式極大地降低了Spark的使用門檻,用戶只要會寫SQL就可以使用Spark;另一方面,通過命令行驅動SQL文件的執行可以極大簡化SQL作業的提交工作,使得作業提交本身被“代碼化”,為大規模工程開發和自動化部署提供了便利。
但遺憾的是,Amazon EMR Serverless 未能針對執行SQL文件提供原生支持,用戶只能在Scala/Python代碼中嵌入SQL語句,這對于倚重純SQL開發數倉或數據湖的用戶來說并不友好。為此,我們專門開發了一組用于讀取、解析和執行SQL文件的工具類,借助這組工具類,用戶可以在 Amazon EMR Serverless 上直接執行SQL文件,本文將詳細介紹一下這一方案。
1. 方案設計
鑒于在Spark編程環境中執行SQL語句的方法是:spark.sql("...")
,我們可以設計一個通用的作業類,該類在啟動時會根據傳入的參數讀取指定位置上的SQL文件,然后拆分成單條SQL并調用spark.sql("...")
執行。為了讓作業類更加靈活和通用,還可以引入通配符一次加載并執行多個SQL文件。此外,ETL作業經常需要根據作業調度工具生成的時間參數去執行相應的批次,這些參數同樣會作用到SQL中,所以,作業類還應允許用戶在SQL文件中嵌入自定義變量,并在提交作業時以參數形式為自定義變量賦值。基于這種設計思路,我們開發了一個項目,實現了上述功能,項目地址為:
項目名稱 | 項目地址 |
---|---|
Amazon EMR Serverless Utilities | https://github.com/bluishglc/emr-serverless-utils |
項目中的com.github.emr.serverless.SparkSqlJob
類即為通用的SQL作業類,該類接受兩個可選參數,分別是:
參數 | 說明 | 取值示例 |
---|---|---|
–sql-files | 指定要執行的SQL文件路徑,支持Java文件系統通配符,可指定多個文件一起執行 | s3://my-spark-sql-job/sqls/insert-into-*.sql |
–sql-params | 以K1=V1,K2=V2,... 形式為SQL文件中定義的${K1} ,${K2} ,…形式的變量設值 | CUST_CITY=NEW YORK,ORD_DATE=2008-07-15 |
該方案具備如下特性:
① 允許單一SQL文件包含多條SQL語句
② 允許在SQL文件中使用${K1}
,${K2}
,…的形式定義變量,并在執行作業時使用K1=V1,K2=V2,...
形式的參數進行變量賦值
③ 支持Java文件系統通配符,可一次執行多個SQL文件
下面,我們將分別在AWS控制臺和命令行兩種環境下介紹并演示如何使用該項目的工具類提交純SQL作業。
2. 實操演示
2.1. 環境準備
在EMR Serverless上提交作業時需要準備一個“EMR Serverless Application”和一個“EMR Serverless Job Execution Role”,其中后者應具有S3和Glue Data Catalog的讀寫權限。Application可以在EMR Serverless控制臺(EMR Studio)上通過向導輕松創建(全默認配置即可),Execution Role可以使用 《CDC一鍵入湖:當 Apache Hudi DeltaStreamer 遇見 Serverless Spark》 一文第5節提供的腳本快速創建。
接下來要準備提交作業所需的Jar包和SQL文件。首先在S3上創建一個存儲桶,本文使用的桶取名:my-spark-sql-job
(當您在自己的環境中操作時請注意替換桶名),然后從 [ 此處 ] 下載編譯好的 emr-serverless-utils.jar
包并上傳至s3://my-spark-sql-job/jars/
目錄下:
在演示過程中還將使用到5個SQL示例文件,從 [ 此處 ] 下載解壓后上傳至s3://my-spark-sql-job/sqls/
目錄下:
2.2. 在控制臺上提交純SQL文件作業
2.2.1. 執行單一SQL文件
打開EMR Serverless的控制臺(EMR Studio),在選定的EMR Serverless Application下提交一個如下的Job:
① Script location:設定為此前上傳的Jar包路徑 s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar
② Main class:設定為 com.github.emr.serverless.SparkSqlJob
③ Script arguments:設定為 ["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"]
至于其他選項,無需特別設定,保持默認配置即可,對于在生產環境中部署的作業,您可以結合自身作業的需要靈活配置,例如Spark Driver/Executor的資源分配等。需要提醒的是:通過控制臺創建的作業默認會啟用Glue Data Catalog(即:Additional settings -> Metastore configuration -> Use AWS Glue Data Catalog 默認是勾選的),為了方便在Glue和Athena中檢查SQL腳本的執行結果,建議您不要修改此項默認配置。
上述配置描述了這樣一項工作:以s3://my-spark-sql-job/jars/emr-serverless-utils-1.0.jar
中的com.github.emr.serverless.SparkSqlJob
作為主類,提起一個Spark作業。其中["--sql-files","s3://my-spark-sql-job/sqls/drop-tables.sql"]
是傳遞給SparkSqlJob
的參數,用于告知作業所要執行的SQL文件位置。本次作業執行的SQL文件只有三條簡單的DROP TABLE語句,是一個基礎示例,用以展示工具類執行單一文件內多條SQL語句的能力。
2.2.2. 執行帶自定義參數的SQL文件
接下來要演示的是工具類的第二項功能:執行帶自定義參數的SQL文件。新建或直接復制上一個作業(在控制臺上選定上一個作業,依次點擊 Actions -> Clone job),然后將“Script arguments”的值設定為:
["--sql-files","s3://my-spark-sql-job/sqls/create-tables.sql","--sql-params","APP_S3_HOME=s3://my-spark-sql-job"]
如下圖所示:
這次的作業設定除了使用--sql-files
參數指定了SQL文件外,還通過--sql-params
參數為SQL中出現的用戶自定義變量進行了賦值。根據此前的介紹,APP_S3_HOME=s3://my-spark-sql-job
是一個“Key=Value”字符串,其含義是將值s3://my-spark-sql-job
賦予了變量APP_S3_HOME
,SQL中所有出現${APP_S3_HOME}
的地方都將被s3://my-spark-sql-job
所替代。查看create-tables.sql
文件,在建表語句的LOCATION部分可以發現自定義變量${APP_S3_HOME}
:
CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (... ...
)
... ...
LOCATION '${APP_S3_HOME}/data/orders/';
當SparkSqlJob
讀取該SQL文件時,會根據鍵值對字符串APP_S3_HOME=s3://my-spark-sql-job
將SQL文件中所有的${APP_S3_HOME}
替換為s3://my-spark-sql-job
,實際執行的SQL將變為:
CREATE EXTERNAL TABLE IF NOT EXISTS ORDERS (... ...
)
... ...
LOCATION 's3://my-spark-sql-job/data/orders/';
提交作業并執行完畢后,可登錄Athena控制臺,查看數據表是否創建成功。
2.2.3. 使用通配符執行多個文件
有時候,我們需要批量執行一個文件夾下的所有SQL文件,或者使用通配符選擇性的執行部分SQL文件,SparkSqlJob
使用了Java文件系統通配符來支持這類需求。下面的作業就演示了通配符的使用方法,同樣是新建或直接復制上一個作業,然后將“Script arguments”的值設定為:
["--sql-files","s3://my-spark-sql-job/sqls/insert-into-*.sql"]
如下圖所示:
這次作業的--sql-files
參數使用了路徑通配符,insert-into-*.sql
將同時匹配insert-into-orders.sql
和insert-into-customers.sql
兩個SQL文件,它們將分別向ORDERS
和CUSTOMERS
兩張表插入多條記錄。執行完畢后,可以可登錄Athena控制臺,查看數據表中是否有數據產生。
2.2.4. 一個復合示例
最后,我們來提交一個更有代表性的復合示例:文件通配符 + 用戶自定義參數。再次新建或直接復制上一個作業,然后將“Script arguments”的值設定為:
["--sql-files","s3://my-spark-sql-job/sqls/select-*.sql","--sql-params","APP_S3_HOME=s3://my-spark-sql-job,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"]
如下圖所示:
![emr-serverless-snapshot-4.jpg-150.8kB][6]
本次作業的--sql-files
參數使用路徑通配符select-*.sql
匹配select-tables.sql
文件,該文件中存在三個用戶自定義變量,分別是${APP_S3_HOME}
、${CUST_CITY}
、${ORD_DATE}
:
CREATE EXTERNAL TABLE ORDERS_CUSTOMERS... ...LOCATION '${APP_S3_HOME}/data/orders_customers/'
AS SELECT... ...
WHEREC.CUST_CITY = '${CUST_CITY}' ANDO.ORD_DATE = CAST('${ORD_DATE}' AS DATE);
--sql-params
參數為這三個自定義變量設置了取值,分別是:APP_S3_HOME=s3://my-spark-sql-job
,CUST_CITY=NEW YORK
,ORD_DATE=2008-07-15
,于是上述SQL將被轉化為如下內容去執行:
CREATE EXTERNAL TABLE ORDERS_CUSTOMERS... ...LOCATION 's3://my-spark-sql-job/data/orders_customers/'
AS SELECT... ...
WHEREC.CUST_CITY = 'NEW YORK' ANDO.ORD_DATE = CAST('2008-07-15' AS DATE);
至此,通過控制臺提交純SQL文件作業的所有功能演示完畢。
2.3. 通過命令行提交純SQL文件作業
實際上,很多EMR Serverless用戶并不在控制臺上提交自己的作業,而是通過AWS CLI提交,這種方式方式多見于工程代碼或作業調度中。所以,我們再來介紹一下如何通過命令行提交純SQL文件作業。
本文使用命令行提交EMR Serverless作業的方式遵循了《最佳實踐:如何優雅地提交一個 Amazon EMR Serverless 作業?》一文給出的最佳實踐。首先,登錄一個安裝了AWS CLI并配置有用戶憑證的Linux環境(建議使用Amazon Linux2),先使用命令sudo yum -y install jq
安裝操作json文件的命令行工具:jq(后續腳本會使用到它),然后完成如下前期準備工作:
① 創建或選擇一個作業專屬工作目錄和S3存儲桶
② 創建或選擇一個EMR Serverless Execution Role
③ 創建或選擇一個EMR Serverless Application
接下來將所有環境相關變量悉數導出(請根據您的AWS賬號和本地環境替換命令行中的相應值):
export APP_NAME='change-to-your-app-name'
export APP_S3_HOME='change-to-your-app-s3-home'
export APP_LOCAL_HOME='change-to-your-app-local-home'
export EMR_SERVERLESS_APP_ID='change-to-your-application-id'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='change-to-your-execution-role-arn'
以下是一份示例:
export APP_NAME='my-spark-sql-job'
export APP_S3_HOME='s3://my-spark-sql-job'
export APP_LOCAL_HOME='/home/ec2-user/my-spark-sql-job'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'
《最佳實踐:如何優雅地提交一個 Amazon EMR Serverless 作業?》一文提供了多個操作Job的通用腳本,都非常實用,本文也會直接復用這些腳本,但是由于我們需要多次提交且每次的參數又有所不同,為了便于使用和簡化行文,我們將原文中的部分腳本封裝為一個Shell函數,取名為submit-spark-sql-job
:
submit-spark-sql-job() {sqlFiles="$1"sqlParams="$2"cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{"name":"my-spark-sql-job","applicationId":"$EMR_SERVERLESS_APP_ID","executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN","jobDriver":{"sparkSubmit":{"entryPoint":"$APP_S3_HOME/jars/emr-serverless-utils-1.0.jar","entryPointArguments":[$([[ -n "$sqlFiles" ]] && echo "\"--sql-files\", \"$sqlFiles\"")$([[ -n "$sqlParams" ]] && echo ",\"--sql-params\", \"$sqlParams\"")],"sparkSubmitParameters":"--class com.github.emr.serverless.SparkSqlJob --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}},"configurationOverrides":{"monitoringConfiguration":{"s3MonitoringConfiguration":{"logUri":"$APP_S3_HOME/logs"}}}
}
EOFjq . $APP_LOCAL_HOME/start-job-run.jsonexport EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name my-spark-sql-job \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)now=$(date +%s)secwhile true; dojobStatus=$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; thenfor i in {0..5}; doecho -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"sleep 1doneelseprintf "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]%50s\n\n"breakfidone
}
該函數接受兩個位置參數:
① 第一位置上的參數用于指定SQL文件路徑,其值會傳遞給SparkSqlJob
的--sql-files
② 第二位置上的參數用于指定SQL文件中的用戶自定義變量,其值會傳遞給SparkSqlJob
的--sql-params
函數中使用的Jar包和SQL文件與《2.1. 環境準備》一節準備的Jar包和SQL文件一致,所以使用腳本提交作業前同樣需要完成2.1節的環境準備工作。接下來,我們就使用該函數完成與2.2節一樣的操作。
2.3.1. 執行單一SQL文件
本節操作與2.2.1節完全一致,只是改用了命令行方式實現,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/drop-tables.sql"
2.3.2. 執行帶自定義參數的SQL文件
本節操作與2.2.2節完全一致,只是改用了命令行方式實現,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/create-tables.sql" "APP_S3_HOME=$APP_S3_HOME"
2.3.3. 使用通配符執行多個文件
本節操作與2.2.3節完全一致,只是改用了命令行方式實現,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/insert-into-*.sql"
2.3.4. 一個復合示例
本節操作與2.2.4節完全一致,只是改用了命令行方式實現,命令如下:
submit-spark-sql-job "$APP_S3_HOME/sqls/select-tables.sql" "APP_S3_HOME=$APP_S3_HOME,CUST_CITY=NEW YORK,ORD_DATE=2008-07-15"
3. 在源代碼中調用工具類
盡管在Spark編程環境中可以使用spark.sql(...)
形式直接執行SQL語句,但是,從前文示例中可以看出 emr-serverless-utils 提供的SQL文件執行能力更便捷也更強大一些,所以,最后我們簡單介紹一下如何在源代碼中調用相關的工具類獲得上述SQL文件的處理能力。具體做法非常簡單,你只需要:
① 將emr-serverless-utils-1.0.jar
加載到你的類路徑中
② 聲明隱式類型轉換
③ 在spark上直接調用execSqlFile()
# 初始化SparkSession及其他操作
...# 聲明隱式類型轉換
import com.github.emr.serverless.SparkSqlSupport._# 在spark上直接調用execSqlFile()
spark.execSqlFile("s3://YOUR/XXX.sql")# 在spark上直接調用execSqlFile()
spark.execSqlFile("s3://YOUR/XXX.sql", "K1=V1,K2=V2,...")# 其他操作
...