背景:一張上百億行的hive表,只有id和app兩列,其中app的去重量是8w多個(原app有上百萬枚舉值,此處已經用id數量進行過篩選,只留下有一定規模的app),id的去重量大概有八九億,最終希望生成pid和對應app的稀疏向量。
我們使用pyspark來實現:
# 處理app特征,生成id,app和app對應的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorUDT # 新增導入
from pyspark.ml.functions import vector_to_array # 新增關鍵導入
import sys
import os# 配置環境變量,否則報錯python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()# 1. 從Hive讀取數據
df = spark.sql("SELECT id,app FROM database.table")# 2. & 3. 定義特征轉換管道(Pipeline)
# 步驟1:將app字符串轉換為數值索引
indexer = StringIndexer(inputCol="app", outputCol="app_index")
# 步驟2:將索引進行One-Hot編碼,輸出為稀疏向量
encoder = OneHotEncoder(inputCol="app_index", outputCol="app_ohe_vector")# 將兩個步驟組合成一個管道
pipeline = Pipeline(stages=[indexer, encoder])# 擬合數據并轉換
model = pipeline.fit(df)
result = model.transform(df)# 查看結果(可選)
# result.select("id", "app", "app_ohe_vector").show(truncate=False)# 4. 將結果保存回HDFS(例如Parquet格式)
# result.select("id", "app_ohe_vector").write \
# .mode("overwrite") \
# .parquet("/path/to/your/output/onehot_result.parquet")# 需要跑6小時,表非常大 338億數據
result.createOrReplaceTempView("temp_view")
spark.sql("CREATE TABLE database.app_vec AS SELECT * FROM temp_view")# 停止SparkSession
spark.stop()
此方案的優點:
?高效:?? Spark是專為大規模數據處理設計的,性能遠超Hive UDF。
?節省空間:?? 輸出是稀疏向量,8萬個類別中每個用戶只有少量app,向量中大部分是0,稀疏表示非常緊湊。
?標準化:?? 這是ML領域處理類別特征的標準流程,與后續的Spark MLlib機器學習庫無縫集成。
此方案生成的結果數據示例如下:
id | app | app_index | app_ohe_vector |
---|---|---|---|
1001 | 微信 | 0 |
|
1001 | 王者榮耀 | 79999 |
|
1002 | 淘寶 | 1 |
|
在hive表中,app_ohe_vector的格式為row("type" tinyint, "size" integer, "indices" array(integer), "values" array(double))。
app_ohe_vector的結構是Spark ML的標準格式:
0
: 向量類型(0=稀疏向量,1=密集向量)80000
: 向量總長度(即app總數)[0, 1, 2, 3, ...]
: 非零元素的索引位置[1.0, 1.0, 1.0, ...]
: 對應索引位置的值
接下來我們對id進行聚合,同樣使用pyspark來實現:
# 處理app特征,按id聚合app對應的稀疏向量
import time
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import expr, col, collect_list, udf, first, size,struct
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StructType, StructField
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler,StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors,VectorUDT # 新增導入
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array # 新增關鍵導入
from pyspark import StorageLevel
import json
import sys
import os# 配置環境變量,否則報錯python3找不到
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.config("spark.metrics.conf", "/opt/mobdata/spark/spark-2.4.3.mob1-bin-2.6.5/conf/metrics.properties") \.config("spark.sql.shuffle.partitions", "5000") \.config("spark.driver.memory", "48g")\.config("spark.driver.maxResultSize", "16g")\.appName("test_dj")\.enableHiveSupport()\.getOrCreate()for i in ['0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f']:# 1. 從Hive讀取數據print(i)sql_idapp = '''select id,app_ohe_vector from database.app_vec_par where flag = '{fflag}\''''.format(fflag=i)df = spark.sql(sql_idapp)# 打印數據概覽total_count = df.count()print(f"數據總量: {total_count:,}") # 高效UDAF聚合函數(針對單元素向量優化)def merge_sparse_vectors(vectors):"""高效合并稀疏向量,針對單元素向量優化"""if not vectors:return {"type": 0, "size": 0, "indices": [], "values": []}# 獲取向量尺寸(假設所有向量尺寸相同)size_val = vectors[0]["size"]# 使用字典高效聚合value_dict = {}for vec in vectors:# 直接訪問第一個(也是唯一一個)索引和值idx = vec["indices"][0]val = vec["values"][0]# 使用get方法避免兩次字典查找value_dict[idx] = value_dict.get(idx, 0.0) + val# 提取并排序索引sorted_indices = sorted(value_dict.keys())sorted_values = [value_dict[i] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}# 注冊UDAFmerge_sparse_vectors_udf = udf(merge_sparse_vectors,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 數據預處理:過濾無效記錄并重新分區print("開始數據預處理...")cleaned_df = df.filter((col("app_ohe_vector").isNotNull()) & (size(col("app_ohe_vector.indices")) > 0)).repartition(5000, "id") # 增加分區數處理數據傾斜# 釋放原始DF內存df.unpersist()# 兩階段聚合策略(處理數據傾斜)print("開始第一階段聚合(按id和索引分組)...")# 步驟1: 提取每個向量的索引和值expanded_df = cleaned_df.select("id",col("app_ohe_vector.indices")[0].alias("index"),col("app_ohe_vector.values")[0].alias("value"),col("app_ohe_vector.size").alias("size"))# 步驟2: 按(id, index)分組求和intermediate_df = expanded_df.groupBy("id", "index").agg(expr("sum(value)").alias("sum_value"),first("size").alias("size"))# 步驟3: 按id分組,收集所有(index, sum_value)對print("開始第二階段聚合(按id分組)...")grouped_df = intermediate_df.groupBy("id").agg(collect_list(struct("index", "sum_value")).alias("index_value_pairs"),first("size").alias("size"))# 步驟4: 轉換為稀疏向量格式def pairs_to_sparse_vector(pairs, size_val):"""將(index, value)對列表轉換為稀疏向量"""if not pairs:return {"type": 0, "size": size_val, "indices": [], "values": []}# 提取索引和值indices = [p["index"] for p in pairs]values = [p["sum_value"] for p in pairs]# 排序(如果需要)sorted_indices = sorted(indices)sorted_values = [values[indices.index(i)] for i in sorted_indices]return {"type": 0, "size": size_val, "indices": sorted_indices, "values": sorted_values}pairs_to_sparse_vector_udf = udf(pairs_to_sparse_vector,StructType([StructField("type", IntegerType()),StructField("size", IntegerType()),StructField("indices", ArrayType(IntegerType())),StructField("values", ArrayType(DoubleType()))]))# 生成最終結果result = grouped_df.withColumn("merged_vector",pairs_to_sparse_vector_udf("index_value_pairs", "size")).select("id", "merged_vector")print("開始第三階段數據插入...")# 創建臨時視圖result.createOrReplaceTempView("sparse_matrix_result")res_sql='''INSERT into TABLE database.app_vecagg_res PARTITION(flag='{fflag}')SELECT id,merged_vector from sparse_matrix_result'''.format(fflag=i)spark.sql(res_sql)print("數據插入完成")# 停止SparkSession
spark.stop()
此處因為原表有300億+數據,集群性能有限無法一次性處理,所以我將id進行了分區,然后循環分區進行的聚合。
聚合后的結果數據示例如下:
id | merged_vector |
---|---|
1001 |
|
merged_vector的結構是Spark ML的標準格式:
0
: 向量類型(0=稀疏向量,1=密集向量)80000
: 向量總長度(即app總數)[0, 1, 2, 3, ...]
: 非零元素的索引位置[1.0, 1.0, 1.0, ...]
: 對應索引位置的值
Spark ML的算法設計時就已經考慮了這種向量格式,所有內置算法都能正確處理這種結構:
- ?算法兼容性?:Spark ML的所有分類、回歸、聚類算法都接受這種格式的向量
- ?性能優化?:稀疏向量格式在內存使用和計算效率上都有優化
- 內置支持?:Spark ML的
VectorAssembler
、特征變換器等都能處理這種格式
至此我們就可以將此向量作為特征用于后續的建模操作了。