pyspark大規模數據加解密優化實踐

假如有1億行數據

方法1 spark udf解密

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyDes import *
import binasciispark=SparkSession.builder.getOrCreate()def dec_fun(text):key = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, b"XXXXXXXX", padmode=PAD_PKCS5)if not text:return Nonereturn k.decrypt(base64.b64decode(text)).decode('utf-8')
spark.udf.register("dec_fun",dec_fun)df.withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
  • 密鑰初始化1億次
  • 每條數據觸發一次JVM → Python進程的數據傳輸
  • 每次傳輸需序列化/反序列化數據(性能殺手)
  • 高頻進程間通信(IPC)產生巨大開銷

方法2 repartition+mapPartition

為了提高效率,我們可以利用mapPartitions在每個分區內部只初始化一次解密對象,避免重復初始化。

def dec_fun(text):if not text:return Noneelse:result = base64.b64decode(text)k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)d = k.decrypt(result)return d.decode("utf-8")spark.udf.register("dec_fun", dec_fun)# 分區解密
def rdd_decrypt(partitionData):for row in partitionData:try:yield [dec_fun(row.zj_no)]except:passdf.select("en_col").repartition(30).rdd.mapPartitions(rdd_decrypt).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")

密鑰初始化依然是1億次,這個代碼寫的不好,應該每個分區初始化1次而不是每行。但相對方法1依然有答復性能提升,

  • mapPartitions
    • 分區級批量傳輸:每個分區一次性從JVM發送到Python進程(例如1個分區10萬條數據,僅1次傳輸)
    • 幾個分區就調用幾次函數(對比1億次的UDF調用)

方法3 repartition+mapPartition+分區1次初始化

def dec_fun(partitionData):k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)for row in partitionData:try:if row.zj_no:result = base64.b64decode(row.zj_no)d = k.decrypt(result)yield [d.decode("utf-8")]else:continueexcept:pass# 如果要保留原始一大堆列,更麻煩
df.select("en_col").repartition(20).rdd.mapPartitions(dec_fun).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")
  • 密鑰初始化數=分區數
  • 通過repartition(20)合理調整分區
  • 利用RDD底層優化

方法4 scalar pandas_udf

import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series: pd.Series) -> pd.Series:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passreturn series.apply(_decrypt)df.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")

🌟 優勢

  • 利用Apache Arrow高效內存傳輸
  • Pandas向量化操作潛力
  • 與DataFrame API無縫集成

?? 局限

  • 密鑰初始化數=batch數
  • 大分區內存壓力大

方法5 迭代器型pandas_udf

我們可以使用迭代器類型的Pandas UDF,在每次處理一個迭代器(一個迭代器對應一個batch)時只初始化一次密鑰對象:

import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passfor series in series_iter:de_series = series.apply(_decrypt)yield de_seriesdf.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
  • 密鑰初始化數=分區數
  • 內存友好的批處理流
  • 向量化
  • Arrow優化零拷貝傳輸
  • 完美平衡初始化開銷和并行效率

綜合

方法5>(方法4,方法3)>方法2>方法1

方法4,方法3這兩者,我也傾向方法4

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/913254.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/913254.shtml
英文地址,請注明出處:http://en.pswp.cn/news/913254.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

華為云Flexus+DeepSeek征文|華為云ECS與CCE:從介紹到架構部署·僅需要此文足矣

前引:當今的企業面臨著前所未有的技術挑戰:如何構建既安全又高效、既靈活又可靠的云服務架構?如何有效整合人工智能技術,打造智能化的運維和服務體系?這些問題的答案,正在悄然改變著企業級IT基礎設施的生態…

DAY 50 預訓練模型+CBAM模塊

浙大疏錦行https://blog.csdn.net/weixin_45655710 知識點回顧: resnet結構解析CBAM放置位置的思考針對預訓練模型的訓練策略 差異化學習率三階段微調 作業: 好好理解下resnet18的模型結構嘗試對vgg16cbam進行微調策略 ResNet-18 結構核心思想 可以將R…

docker連接mysql

查看在運行的容器:docker ps -s 進入容器:docker exec -it 容器號或名 /bin/bash,如:docker exec -it c04c438ff177 /bin/bash 或docker exec -it mysql /bin/bash。 3. 登錄mysql:mysql -uroot -p123456

javaweb第182節Linux概述~ 虛擬機連接不上FinalShell

問題描述 虛擬機無法連接到finalshell 報錯 session.connect:java.net.socketexception:connection reset 或者 connection is closed by foreign host 解決 我經過一系列的排查,花費了一天的時間后,發現,只是因為,我將連接…

高壓電纜護層安全的智能防線:TLKS-PLGD 監控設備深度解析

在現代電力系統龐大復雜的網絡中,高壓電纜護層是守護電力傳輸的 "隱形鎧甲",其安全直接影響電網穩定。傳統監測手段響應慢、精度低,難以滿足安全運維需求。TLKS-PLGD 高壓電纜護層環流監控設備應運而生,提供智能化解決方…

Element-Plus Cascader 級聯選擇器獲取節點名稱和value值方法

html 部分 <template><el-cascaderref"selectAeraRef":options"areas":disabled"disabled":props"optionProps"v-model"selectedOptions"filterablechange"handleChange"><template #default"…

STM32中實現shell控制臺(命令解析實現)

文章目錄一、核心設計思想二、命令系統實現詳解&#xff08;含完整注釋&#xff09;1. 示例命令函數實現2. 初始化命令系統3. 命令注冊函數4. 命令查找函數5. 命令執行函數三、命令結構體&#xff08;cmd\_t&#xff09;四、運行效果示例五、小結在嵌入式系統的命令行控制臺&am…

基于matlab的二連桿機械臂PD控制的仿真

基于matlab的二連桿機械臂PD控制的仿真。。。 chap3_5input.m , 1206 d2plant1.m , 1364 hs_err_pid2808.log , 15398 hs_err_pid4008.log , 15494 lx_plot.m , 885 PD_Control.mdl , 35066 tiaojie.m , 737 chap2_1ctrl.asv , 988 chap2_1ctrl.m , 905

TCP、HTTP/1.1 和HTTP/2 協議

TCP、HTTP/1.1 和 HTTP/2 是互聯網通信中的核心協議&#xff0c;它們在網絡分層中處于不同層級&#xff0c;各有特點且逐步演進。以下是它們的詳細對比和關鍵特性&#xff1a;1. TCP&#xff08;傳輸控制協議&#xff09; 層級&#xff1a;傳輸層&#xff08;OSI第4層&#xff…

Java+Vue開發的進銷存ERP系統,集采購、銷售、庫存管理,助力企業數字化運營

前言&#xff1a;在當今競爭激烈的商業環境中&#xff0c;企業對于高效管理商品流通、采購、銷售、庫存以及財務結算等核心業務流程的需求日益迫切。進銷存ERP系統作為一種集成化的企業管理解決方案&#xff0c;能夠整合企業資源&#xff0c;實現信息的實時共享與協同運作&…

【趣談】Android多用戶導致的UserID、UID、shareUserId、UserHandle術語混亂討論

【趣談】Android多用戶導致的UserID、UID、shareUserId、UserHandle術語混亂討論 備注一、概述二、概念對比1.UID2.shareUserId3.UserHandle4.UserID 三、結論 備注 2025/07/02 星期三 在與Android打交道時總遇到UserID、UID、shareUserId、UserHandle這些術語&#xff0c;但是…

P1424 小魚的航程(改進版)

題目描述有一只小魚&#xff0c;它平日每天游泳 250 公里&#xff0c;周末休息&#xff08;實行雙休日)&#xff0c;假設從周 x 開始算起&#xff0c;過了 n 天以后&#xff0c;小魚一共累計游泳了多少公里呢&#xff1f;輸入格式輸入兩個正整數 x,n&#xff0c;表示從周 x 算起…

<二>Sping-AI alibaba 入門-記憶聊天及持久化

請看文檔&#xff0c;流程不再贅述&#xff1a;官網及其示例 簡易聊天 環境變量 引入Spring AI Alibaba 記憶對話還需要我們有數據庫進行存儲&#xff0c;mysql&#xff1a;mysql-connector-java <?xml version"1.0" encoding"UTF-8"?> <pr…

【機器學習深度學習】模型參數量、微調效率和硬件資源的平衡點

目錄 一、核心矛盾是什么&#xff1f; 二、微調本質&#xff1a;不是全調&#xff0c;是“挑著調” 三、如何平衡&#xff1f; 3.1 核心策略 3.2 參數量 vs 微調難度 四、主流輕量微調方案盤點 4.1 凍結部分參數 4.2 LoRA&#xff08;低秩微調&#xff09; 4.3 量化訓…

【V13.0 - 戰略篇】從“完播率”到“價值網絡”:訓練能預測商業潛力的AI矩陣

在上一篇 《超越“平均分”&#xff1a;用多目標預測捕捉觀眾的“心跳曲線”》 中&#xff0c;我們成功地讓AI學會了預測觀眾留存曲線&#xff0c;它的診斷能力已經深入到了視頻的“過程”層面&#xff0c;能精確地指出觀眾是在哪個瞬間失去耐心。 我的AI現在像一個頂級的‘心…

java微服務(Springboot篇)——————IDEA搭建第一個Springboot入門項目

在正文開始之前我們先來解決一些概念性的問題 &#x1f355;&#x1f355;&#x1f355; 問題1&#xff1a;Spring&#xff0c;Spring MVC&#xff0c;Spring Boot和Spring Cloud之間的區別與聯系&#xff1f; &#x1f36c;&#x1f36c;&#x1f36c;&#xff08;1&#xff0…

服務器間接口安全問題的全面分析

一、服務器接口安全核心威脅 文章目錄**一、服務器接口安全核心威脅**![在這里插入圖片描述](https://i-blog.csdnimg.cn/direct/6f54698b9a22439892f0c213bc0fd1f4.png)**二、六大安全方案深度對比****1. IP白名單機制****2. 雙向TLS認證(mTLS)****3. JWT簽名認證****4. OAuth…

vs code關閉函數形參提示

問題&#xff1a;函數內出現灰色的形參提示 需求/矛盾&#xff1a; 這個提示對老牛來說可能是一種干擾&#xff0c;比如不好對齊控制一行代碼的長度&#xff0c;或者容易看走眼&#xff0c;造成眼花繚亂的體驗。 關閉方法&#xff1a; 進入設置&#xff0c;輸入inlay Hints&…

ESXi 8.0安裝

使用群暉&#xff0c;突然nvme固態壞了 新nvme固態&#xff0c;先在PC上格式化下&#xff0c;不然可能N100可能不認 啟動&#xff0c;等待很長時間 回車 F11 輸入密碼&#xff0c;字母小寫字母大寫數字 拔掉U盤&#xff0c;回車重啟 網絡配置 按F2&#xff0c; 輸入密碼&…

【git學習】第2課:查看歷史與版本回退

好的&#xff0c;我們進入 第2課&#xff1a;版本查看與回退機制&#xff0c;本課你將學會如何查看提交歷史、對比更改&#xff0c;并掌握多種回退版本的方法。&#x1f4d8; 第2課&#xff1a;查看歷史與版本回退&#x1f3af; 本課目標熟練查看 Git 提交記錄掌握差異查看、版…