背景說明
在 PySpark 3.1.3 環境中,當需要使用與集群環境不同版本的 PyArrow (如 1.0.0 版本)時,可以通過以下方法實現,而無需更改集群環境配置
完整操作說明
- 去pyarrow·PyPI下載對應版本的whl文件
- 后綴whl直接改成zip
- 解壓后有兩個文件夾,分別是pyarrow和pyarrow-1.0.0.dist-info
- 直接把那兩個文件夾打包成pyarrow.zip
因為pyarrow里不是單純的python代碼,還有C擴展,所以不能用--py-files
參數,只能放在--archives
參數里
spark -submit spark-submit \--master yarn \--deploy-mode cluster \ --executor-memory 4G \--num-executors 10 \--archives /my_path/pyarrow-1.0.0.zip#pyarrow #必須添加的參數your_script.py
#pyarrow
?表示在容器內解壓到?./pyarrow
?目錄
pyspark里要添加對應代碼
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import pyspark.sql.types as T
import pandas as pd
import os, sys# 初始化Spark并配置Arrow支持
spark = SparkSession.builder.config("spark.sql.execution.arrow.pyspark.enabled", "true").getOrCreate()# 加載自定義PyArrow
**pyarrow_dir = os.path.join(os.getcwd(), "pyarrow") # 對應 --archives 中的解壓目錄
sys.path.insert(0, pyarrow_dir) # 添加到 Python 路徑**import pyarrow
print("pyarrow version ", pyarrow.__version__) # 應顯示 1.0.0# 定義大寫轉換UDF
@pandas_udf(T.StringType())
def uppercase(s: pd.Series) -> pd.Series:return s.str.upper()# 數據讀取和處理
df = spark.read.load("my_path_tofile/*").select("version")
df.withColumn("hh", uppercase(F.col("version"))).show()