假如有1億行數據
方法1 spark udf解密
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyDes import *
import binasciispark=SparkSession.builder.getOrCreate()def dec_fun(text):key = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, b"XXXXXXXX", padmode=PAD_PKCS5)if not text:return Nonereturn k.decrypt(base64.b64decode(text)).decode('utf-8')
spark.udf.register("dec_fun",dec_fun)df.withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
- 密鑰初始化1億次
- 每條數據觸發一次JVM → Python進程的數據傳輸
- 每次傳輸需序列化/反序列化數據(性能殺手)
- 高頻進程間通信(IPC)產生巨大開銷
方法2 repartition+mapPartition
為了提高效率,我們可以利用mapPartitions
在每個分區內部只初始化一次解密對象,避免重復初始化。
def dec_fun(text):if not text:return Noneelse:result = base64.b64decode(text)k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)d = k.decrypt(result)return d.decode("utf-8")spark.udf.register("dec_fun", dec_fun)# 分區解密
def rdd_decrypt(partitionData):for row in partitionData:try:yield [dec_fun(row.zj_no)]except:passdf.select("en_col").repartition(30).rdd.mapPartitions(rdd_decrypt).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")
密鑰初始化依然是1億次,這個代碼寫的不好,應該每個分區初始化1次而不是每行。但相對方法1依然有答復性能提升,
- mapPartitions:
- 分區級批量傳輸:每個分區一次性從JVM發送到Python進程(例如1個分區10萬條數據,僅1次傳輸)
- 幾個分區就調用幾次函數(對比1億次的UDF調用)
方法3 repartition+mapPartition+分區1次初始化
def dec_fun(partitionData):k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)for row in partitionData:try:if row.zj_no:result = base64.b64decode(row.zj_no)d = k.decrypt(result)yield [d.decode("utf-8")]else:continueexcept:pass# 如果要保留原始一大堆列,更麻煩
df.select("en_col").repartition(20).rdd.mapPartitions(dec_fun).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")
- 密鑰初始化數=分區數
- 通過
repartition(20)
合理調整分區 - 利用RDD底層優化
方法4 scalar pandas_udf
import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series: pd.Series) -> pd.Series:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passreturn series.apply(_decrypt)df.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
🌟 優勢:
- 利用Apache Arrow高效內存傳輸
- Pandas向量化操作潛力
- 與DataFrame API無縫集成
?? 局限:
- 密鑰初始化數=batch數
- 大分區內存壓力大
方法5 迭代器型pandas_udf
我們可以使用迭代器類型的Pandas UDF,在每次處理一個迭代器(一個迭代器對應一個batch)時只初始化一次密鑰對象:
import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passfor series in series_iter:de_series = series.apply(_decrypt)yield de_seriesdf.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
- 密鑰初始化數=分區數
- 內存友好的批處理流
- 向量化
- Arrow優化零拷貝傳輸
- 完美平衡初始化開銷和并行效率
綜合
方法5>(方法4,方法3)>方法2>方法1
方法4,方法3這兩者,我也傾向方法4