在 PySpark 中打包 Python 環境并調度到集群是處理依賴一致性的關鍵步驟。以下是完整的解決方案,包含環境打包、分發和配置方法:
一、環境打包方法
- 使用 Conda 打包環境
# 創建 Conda 環境
conda create -n pyspark_env python=3.8
conda activate pyspark_env
conda install numpy pandas pyarrow # 安裝所需依賴# 導出環境為 YAML 文件
conda env export > environment.yml# 打包環境(Linux/macOS)
conda-pack -n pyspark_env -o pyspark_env.tar.gz
- 使用 Virtualenv 打包環境
# 創建虛擬環境
python3 -m venv pyspark_env
source pyspark_env/bin/activate # Linux/macOS
pyspark_env\Scripts\activate # Windows# 安裝依賴
pip install numpy pandas pyarrow# 打包環境(需使用第三方工具)
pip install virtualenv-pack
virtualenv-pack -f -o pyspark_env.tar.gz
二、分發環境到集群
方法 1:通過 --archives 參數上傳
在提交作業時,使用 --archives 參數將打包的環境分發到所有節點:
# 將環境包上傳到 HDFS,避免每次提交都重新傳輸:
hdfs dfs -put pyspark_env.tar.gz /path/in/hdfs/spark-submit \--master yarn \--deploy-mode cluster \--py-files helper.py\ # python依賴文件,比如第三方代碼等--archives hdfs:///path/in/hdfs/pyspark_env.tar.gz#environment \your_script.py
三、配置 PySpark 使用打包環境
- 設置 Python 解釋器路徑
在代碼中指定 Executor 使用打包環境中的 Python:
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python" # 對應 --archives 指定的目錄名
os.environ["PYSPARK_DRIVER_PYTHON"] = "./environment/bin/python" # Cluster 模式需要,如果是client模式,driver_python配置本地python路徑,比如/opt/conda/bin/python, 需注意本地python和集群打包python的版本一致from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PackagedEnvApp").getOrCreate()
- 編寫pyspark腳本
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"from pyspark.sql import SparkSession
import pandas as pd # 使用打包環境中的 pandasspark = SparkSession.builder.appName("PackagedEnvExample").getOrCreate()# 使用 pandas 處理數據
pdf = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
df.show()spark.stop()
- 提交作業
spark-submit \--master yarn \--deploy-mode cluster \--archives pyspark_env.tar.gz#environment \example.py
- 配置優先級與運行模式詳解
-
配置優先級規則
Spark 配置的優先級從高到低如下: -
關鍵結論:
- SparkSession.builder.config() 優先級最高,會覆蓋其他配置
- spark-submit 參數優先級次之
- 特殊參數例外:–master 和 --deploy-mode 在 spark-submit 中具有最高優先級
-
deploy-mode 配置規則
設置方式 是否生效 說明 spark-submit
? 總是生效 命令行參數具有最終決定權 SparkSession
代碼? 當 spark-submit
指定時無效若 spark-submit
未指定,則代碼配置生效spark-defaults
?? 最低優先級 僅當其他方式未配置時生效
四、替代方案
- Docker 容器
使用 Docker 打包完整環境,通過 Kubernetes 調度:
# Dockerfile 示例
FROM apache/spark-py:v3.3.2
RUN pip install pandas numpy# 構建并推送鏡像
docker build -t my-spark-image:v1 .
docker push myregistry/my-spark-image:v1# 提交作業
spark-submit \--master k8s://https://kubernetes-host:port \--conf spark.kubernetes.container.image=myregistry/my-spark-image:v1 \...
- PySpark 內置依賴管理
通過 --py-files 參數上傳 Python 文件 / 包:
spark-submit \--py-files my_module.zip,another_dep.py \your_script.py
五、Pyspark調用Xgboost或者LightGBM
5.1 調用 XGBoost 模型
- 準備依賴
下載 XGBoost 的 Spark 擴展 jar 包:可以從 XGBoost 的官方 GitHub 發布頁面 或者 Maven 倉庫下載與你使用的 XGBoost 和 Spark 版本兼容的xgboost4j-spark和xgboost4j的 jar 包。例如,如果你使用的是 Spark 3.3.0 和 XGBoost 1.6.2,可以下載對應的版本。
下載其他依賴:確保scala-library等相關依賴也在合適的版本,因為xgboost4j-spark會依賴它們。 - 配置 Spark 提交參數
在使用spark-submit提交作業時,通過–jars參數指定上述下載的 jar 包路徑。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/xgboost4j-spark-1.6.2.jar,/path/to/xgboost4j-1.6.2.jar,/path/to/scala-library-2.12.10.jar \your_script.py
也可以將這些 jar 包上傳到 HDFS,然后使用 HDFS 路徑:
hdfs dfs -put /path/to/xgboost4j-spark-1.6.2.jar /lib/
hdfs dfs -put /path/to/xgboost4j-1.6.2.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/xgboost4j-spark-1.6.2.jar,hdfs:///lib/xgboost4j-1.6.2.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
- Python 代碼示例
在 Python 代碼中,導入相關模塊并使用 XGBoost 的 Spark 接口:
from pyspark.sql import SparkSession
from xgboost.spark import XGBoostClassifierspark = SparkSession.builder \.appName("XGBoostOnSpark") \.getOrCreate()# 假設data是一個包含特征和標簽的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 創建XGBoost分類器
model = XGBoostClassifier(num_round=10, objective="binary:logistic")
# 擬合模型
model.fit(data, label_col=label_col, features_col=feature_cols)
5.2 調用 LightGBM 模型
- 準備依賴
下載 LightGBM 的 Spark 擴展 jar 包:從 LightGBM 的官方 GitHub 發布頁面或者 Maven 倉庫獲取lightgbm4j-spark相關的 jar 包,以及lightgbm4j的 jar 包。注意選擇與你的 Spark 和 LightGBM 版本適配的版本。
處理其他依賴:同樣要保證scala-library等依賴的兼容性。 - 配置 Spark 提交參數
和 XGBoost 類似,使用spark-submit時通過–jars參數指定 jar 包路徑。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/lightgbm4j-spark-3.3.1.jar,/path/to/lightgbm4j-3.3.1.jar,/path/to/scala-library-2.12.10.jar \your_script.py
或者上傳到 HDFS 后使用 HDFS 路徑:
hdfs dfs -put /path/to/lightgbm4j-spark-3.3.1.jar /lib/
hdfs dfs -put /path/to/lightgbm4j-3.3.1.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/lightgbm4j-spark-3.3.1.jar,hdfs:///lib/lightgbm4j-3.3.1.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
- Python 代碼示例
在 Python 代碼中,導入模塊并使用 LightGBM 的 Spark 接口:
from pyspark.sql import SparkSession
from lightgbm4j.spark import LightGBMClassifierspark = SparkSession.builder \.appName("LightGBMOnSpark") \.getOrCreate()# 假設data是一個包含特征和標簽的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 創建LightGBM分類器
params = {"objective": "binary","num_leaves": 31,"learning_rate": 0.05,"feature_fraction": 0.9
}
model = LightGBMClassifier(params=params, num_round=10)
# 擬合模型
model.fit(data, label_col=label_col, features_col=feature_cols)