使用pyspark對上百億行的hive表生成稀疏向量

背景:一張上百億行的hive表,只有id和app兩列,其中app的去重量是8w多個(原app有上百萬枚舉值,此處已經用id數量進行過篩選,只留下有一定規模的app),id的去重量大概有八九億,最終希望生成pid和對應app的稀疏向量。

我們使用pyspark來實現:

# 處理app特征,生成id,app和app對應的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT  # 新增導入
from pyspark.ml.functions import vector_to_array  # 新增關鍵導入
import sys
import os# 配置環境變量,否則報錯python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()# 1. 從Hive讀取數據
df = spark.sql("SELECT id,app FROM database.table")# 2. & 3. 定義特征轉換管道(Pipeline)
# 步驟1:將app字符串轉換為數值索引
indexer = StringIndexer(inputCol="app", outputCol="app_index")
# 步驟2:將索引進行One-Hot編碼,輸出為稀疏向量
encoder = OneHotEncoder(inputCol="app_index", outputCol="app_ohe_vector")# 將兩個步驟組合成一個管道
pipeline = Pipeline(stages=[indexer, encoder])# 擬合數據并轉換
model = pipeline.fit(df)
result = model.transform(df)# 查看結果(可選)
# result.select("id", "app", "app_ohe_vector").show(truncate=False)# 4. 將結果保存回HDFS(例如Parquet格式)
# result.select("id", "app_ohe_vector").write \
#     .mode("overwrite") \
#     .parquet("/path/to/your/output/onehot_result.parquet")# 需要跑6小時,表非常大 338億數據
result.createOrReplaceTempView("temp_view")
spark.sql("CREATE TABLE database.app_vec AS SELECT * FROM temp_view")# 停止SparkSession
spark.stop()

此方案的優點:

?高效:?? Spark是專為大規模數據處理設計的,性能遠超Hive UDF。

?節省空間:?? 輸出是稀疏向量,8萬個類別中每個用戶只有少量app,向量中大部分是0,稀疏表示非常緊湊。

?標準化:?? 這是ML領域處理類別特征的標準流程,與后續的Spark MLlib機器學習庫無縫集成。

此方案生成的結果數據示例如下:

id

app

app_index

app_ohe_vector

1001

微信

0

(0,80000, [0], [1.0])

1001

王者榮耀

79999

(0,80000, [79999], [1.0])

1002

淘寶

1

(0,80000, [1], [1.0])

在hive表中,app_ohe_vector的格式為row("type" tinyint, "size" integer, "indices" array(integer), "values" array(double))。

app_ohe_vector的結構是Spark ML的標準格式:

  • 0: 向量類型(0=稀疏向量,1=密集向量)
  • 80000: 向量總長度(即app總數)
  • [0, 1, 2, 3, ...]: 非零元素的索引位置
  • [1.0, 1.0, 1.0, ...]: 對應索引位置的值

接下來我們對id進行聚合,同樣使用pyspark來實現:

# 處理app特征,按id聚合app對應的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import expr, col, collect_list, udf, first, size,struct
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StructType, StructField
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors,VectorUDT  # 新增導入
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array  # 新增關鍵導入
from pyspark import StorageLevel
import json
import sys
import os# 配置環境變量,否則報錯python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.sql.shuffle.partitions", "5000") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()for i in ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f']:# 1. 從Hive讀取數據print(i)sql_idapp = '''select id,app_ohe_vector from database.app_vec_par where flag = '{fflag}\''''.format(fflag=i)df = spark.sql(sql_idapp)# 打印數據概覽total_count = df.count()print(f"數據總量: {total_count:,}")    # 高效UDAF聚合函數(針對單元素向量優化)def merge_sparse_vectors(vectors):"""高效合并稀疏向量,針對單元素向量優化"""if not vectors:return {"type": 0, "size": 0, "indices": [], "values": []}# 獲取向量尺寸(假設所有向量尺寸相同)size_val = vectors[0]["size"]# 使用字典高效聚合value_dict = {}for vec in vectors:# 直接訪問第一個(也是唯一一個)索引和值idx = vec["indices"][0]val = vec["values"][0]# 使用get方法避免兩次字典查找value_dict[idx] = value_dict.get(idx, 0.0) + val# 提取并排序索引sorted_indices = sorted(value_dict.keys())sorted_values = [value_dict[i] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}# 注冊UDAFmerge_sparse_vectors_udf = udf(merge_sparse_vectors,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 數據預處理:過濾無效記錄并重新分區print("開始數據預處理...")cleaned_df = df.filter((col("app_ohe_vector").isNotNull()) & (size(col("app_ohe_vector.indices")) > 0)).repartition(5000, "id")  # 增加分區數處理數據傾斜# 釋放原始DF內存df.unpersist()# 兩階段聚合策略(處理數據傾斜)print("開始第一階段聚合(按id和索引分組)...")# 步驟1: 提取每個向量的索引和值expanded_df = cleaned_df.select("id",col("app_ohe_vector.indices")[0].alias("index"),col("app_ohe_vector.values")[0].alias("value"),col("app_ohe_vector.size").alias("size"))# 步驟2: 按(id, index)分組求和intermediate_df = expanded_df.groupBy("id", "index").agg(expr("sum(value)").alias("sum_value"),first("size").alias("size"))# 步驟3: 按id分組,收集所有(index, sum_value)對print("開始第二階段聚合(按id分組)...")grouped_df = intermediate_df.groupBy("id").agg(collect_list(struct("index", "sum_value")).alias("index_value_pairs"),first("size").alias("size"))# 步驟4: 轉換為稀疏向量格式def pairs_to_sparse_vector(pairs, size_val):"""將(index, value)對列表轉換為稀疏向量"""if not pairs:return {"type": 0, "size": size_val, "indices": [], "values": []}# 提取索引和值indices = [p["index"] for p in pairs]values = [p["sum_value"] for p in pairs]# 排序(如果需要)sorted_indices = sorted(indices)sorted_values = [values[indices.index(i)] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}pairs_to_sparse_vector_udf = udf(pairs_to_sparse_vector,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 生成最終結果result = grouped_df.withColumn("merged_vector",pairs_to_sparse_vector_udf("index_value_pairs", "size")).select("id", "merged_vector")print("開始第三階段數據插入...")# 創建臨時視圖result.createOrReplaceTempView("sparse_matrix_result")res_sql='''INSERT into TABLE database.app_vecagg_res PARTITION(flag='{fflag}')SELECT id,merged_vector from sparse_matrix_result'''.format(fflag=i)spark.sql(res_sql)print("數據插入完成")# 停止SparkSession
spark.stop()

此處因為原表有300億+數據,集群性能有限無法一次性處理,所以我將id進行了分區,然后循環分區進行的聚合。

聚合后的結果數據示例如下:

id

merged_vector

1001

(0,80000, [0,79999], [1.0,1.0])

merged_vector的結構是Spark ML的標準格式:

  • 0: 向量類型(0=稀疏向量,1=密集向量)
  • 80000: 向量總長度(即app總數)
  • [0, 1, 2, 3, ...]: 非零元素的索引位置
  • [1.0, 1.0, 1.0, ...]: 對應索引位置的值

Spark ML的算法設計時就已經考慮了這種向量格式,所有內置算法都能正確處理這種結構:

  1. ?算法兼容性?:Spark ML的所有分類、回歸、聚類算法都接受這種格式的向量
  2. ?性能優化?:稀疏向量格式在內存使用和計算效率上都有優化
  3. 內置支持?:Spark ML的VectorAssembler、特征變換器等都能處理這種格式

至此我們就可以將此向量作為特征用于后續的建模操作了。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/95961.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/95961.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/95961.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【設計模式】關于學習《重學Java設計模式》的一些成長筆記

【設計模式】關于學習《重學Java設計模式》的一些成長筆記 沒有幾個人是一說就會的,掌握一些技能,不僅要用心,而且還需要從溫故中知新。 為此,好記性不如爛筆頭,我干脆一步一腳印地系統學習一遍設計模式! (關注不迷路哈!!!) 文章目錄 【設計模式】關于學習《重學Jav…

【基礎-判斷】@Entry裝飾的自定義組件將作為頁面的入口。在單個頁面中可以使用多個@Entry裝飾不同自定義組件。

@Entry裝飾的自定義組件將作為頁面的入口。在單個頁面中可以使用多個@Entry裝飾不同自定義組件。 解釋: @Entry 的核心作用與唯一性:@Entry 裝飾器用于明確聲明該組件是一個頁面的入口組件,即整個頁面的“根”和“起點”。當UIAbility實例加載并顯示頁面時,系統需要明確知道…

醫學影像AI應用-實踐:使用MONAI實現肺部CT圖像分割的原理與實踐

?? 博主簡介:CSDN博客專家、CSDN平臺優質創作者,高級開發工程師,數學專業,10年以上C/C++, C#,Java等多種編程語言開發經驗,擁有高級工程師證書;擅長C/C++、C#等開發語言,熟悉Java常用開發技術,能熟練應用常用數據庫SQL server,Oracle,mysql,postgresql等進行開發應用…

如何訓練一個簡單的Transformer模型(附源碼)李宏毅2025大模型-作業4

摘要:一、作業目標:使用只有2層transformer的GPT-2,生成完整寶可夢圖像。二、源碼&解析:使用提供的Transformer模型(GPT-2)進行訓練,FID Score: 96.3425一、作業目標1)目標使用T…

leetcode211.添加與搜索單詞-數據結構設計

與208.前綴樹的設計是一樣的,關鍵點在于word中存在通配符“.",所以針對該特殊情況,在search時針對這里進行全子節點的深度搜索class WordDictionary {TrieNode root;private class TrieNode {char val;// 當前節點的值,冗余了…

項目中的一些比較實用的自定義控件

本文是記錄項目開發中一些相對復雜但都比較實用的控件,這些控件都是基于自定義的方式去實現,如果有需要的朋友,這個可以作為一個參考,同時也做一個自我總結。 (1)子項大小不一致的RecyclerView(…

[iOS] 折疊 cell

目錄 前言 1.原理 2.折疊 cell 的點擊選中 3.折疊 cell 高度的變化 4.實現效果 5.總結 前言 折疊 cell 是在 3GShare 中寫過的一個小控件,這篇博客是一個小小的總結。 1.原理 在這里的核心就是我們可以通過改變按鈕的 tag 值來判斷我們是否應該展開還是回收…

MySQL的組復制(MGR)高可用集群搭建

一、MySQL 組復制(MGR)核心概念 MySQL Group Replication(簡稱 MGR)是 MySQL 官方推出的 高可用(HA) 強一致性 解決方案,基于改進的 Paxos 協議實現,核心能力可概括為 3 點&#xf…

使用Shell腳本實現Linux系統資源監控郵件告警

前言 1. 問題背景與需求 2. Bash 腳本監控資源 3. Bash 腳本判斷閾值 4. 配置 msmtp 發送郵件 4.1 安裝 msmtp 4.2 創建配置文件 /etc/msmtprc 5. 發送郵件 5.1 給別人發郵件 6. 完整示例腳本 7. 測試方法 8. 常見問題解答 9. 總結 前言 在運維過程中&#xff0c…

設計整體 的 序分(三“釋”)、正宗分(雙“門”)和流通分(統一的通行表達式) 之3 “自明性”(騰訊元寶 之2)

Q&AQ11、可能還需要補充 魂軸、體軸 和 中心軸 并行 上升 的內容Q11.1、我剛才說“可能還需要補充 魂軸、體軸 和 中心軸 并行 上升 的內容” 是指的 我們今天前面討論 得出的整體設計 的一個概念整體 的一個雙螺旋上升結構中的三個軸。 您剛才是這樣 理解的嗎?…

使用Ansible自動化部署Hadoop集群(含源碼)--環境準備

現在我們有5臺虛擬機,已經配置好了主機名和網絡我們的目標是通過Ansible實現自動化部署hadoop集群。在此之前,我們先編寫一個shell腳本來配置hadoop集群的環境,包括安裝軟件、安裝配置Ansible(一個主節點四個工作節點)…

C#海康車牌識別實戰指南帶源碼

C#海康車牌識別實戰指南帶源碼前言車牌識別技術在智能交通、停車場管理等領域有著廣泛的應用。海康威視作為國內領先的安防廠商,其車牌識別相機提供了豐富的SDK接口供開發者使用。本文將詳細介紹如何使用C#語言結合海康威視SDK實現車牌識別功能,并解析關…

智慧能源新范式:數字孿生平臺如何驅動風電場的精細化管理?

摘要你有沒有想過,一座風力發電場背后,藏著一個“看不見的孿生兄弟”?它能提前預知風機故障,實時模擬極端天氣的影響,甚至能“訓練”運維人員在虛擬場景中演練搶修。這就是數字孿生——一個讓風電場從“靠經驗管理”轉…

STM32-FreeRTOS操作系統-任務管理

引言 隨著嵌入式技術的飛速發展,STM32與FreeRTOS的融合愈發緊密。本文聚焦于STM32平臺下FreeRTOS操作系統的任務管理,旨在為開發者提供清晰的思路與實用的技巧,助力高效開發。 為什么要進行任務管理? 在嵌入式系統中,…

工業領域 ACP 協議全解析:從入門到實戰案例

工業領域 ACP 協議全解析:從入門到實戰案例 文章目錄工業領域 ACP 協議全解析:從入門到實戰案例一、前言二、ACP 協議是什么?1. 基本定義2. 與數據傳輸協議的區別三、ACP 協議的核心功能1. 身份認證(Authentication)2.…

計算機組成原理:計算機硬件的基本組成

📌目錄🖥? 計算機硬件的基本組成:從經典到現代的架構演進🧩 一、計算機硬件的五大部分:功能與協同📥 (一)輸入設備:人機交互的“入口”📤 (二&am…

AI歌手功能終于上線!Suno AI 帶你保存歌曲的靈魂

當我們談論一首歌時,究竟是什么讓它“獨一無二”?是主唱的聲音質感?是旋律里的氛圍?還是那種無法復制的風格氣息? 如今,Suno AI 給出了答案—— AI歌手功能正式上線! 🌟什么是「AI…

Dubbo3.3 Triple協議處理東西向流量

前言 Apache Dubbo 3.3 對 Triple 協議做了升級,現在 Dubbo 不僅可以處理東西向流量,也可以處理南北向流量。 **東西向流量(East-West Traffic) ** 指數據中心或網絡內部同級設備/服務之間的通信。例如,微服務之間的…

操作系統核心特點詳解:從并發到分布式,一文搞懂考研必備知識

操作系統核心特點詳解:從并發到分布式,一文搞懂考研必備知識 大家好,今天咱們來聊聊操作系統(OS)這個計算機世界的“大管家”。想象一下,你的電腦就像一個忙碌的廚房,操作系統就是那個廚師長&am…

2025精選5款AI視頻轉文字工具,高效轉錄秒變文字!

視頻轉文本的需求早已滲透到生活的方方面面:網課學習需要提取課件臺詞、會議記錄想快速整理要點、追劇時急需生肉轉字幕…… 手動記錄不僅費時,還容易遺漏關鍵信息。今天就分享5款實用工具,從免費到專業全覆蓋,幾步操作就能讓視頻…