本篇文章Mastering PySpark Window Functions: A Practical Guide to Time-Based Analytics適合數據分析和工程師入門了解PySpark的窗口函數。文章的亮點在于詳細介紹了窗口函數的基本概念及其在銷售數據分析中的實際應用,幫助讀者理解如何進行復雜的數據計算而無需多次連接或聚合。
文章目錄
- 1 理解窗口函數:基礎
- 2 搭建分析管道
- 3 客戶級別聚合:理解歷史模式
- 4 滾動窗口:捕捉時間趨勢
- 5 關鍵概念解釋
- 6 月度滯后特征:季節性模式分析
- 7 性能優化技巧
- 8 行業級別基準
- 9 結論
窗口函數是 Apache Spark 中最強大但卻未被充分利用的功能之一。它們允許您對與當前行相關的行執行復雜的計算,而無需昂貴的連接或多次聚合。在這篇文章中,我們將通過一個銷售分析場景來探討窗口函數的實際應用。
1 理解窗口函數:基礎
可以將窗口函數視為一種在處理每個單獨行時“窺視”相鄰行的方式。與將多行合并為一行的常規聚合不同,窗口函數會為每個輸入行返回一個結果,同時考慮一個相關行的“窗口”。
窗口函數的基本組成包括:
- Partition By(按分區):將行分組到邏輯分區中
- Order By(按排序):定義每個分區內的排序
- Frame(框架):指定分區內要包含在計算中的行
2 搭建分析管道
讓我們從一個包含交易記錄的銷售數據集開始。我們將使用各種窗口函數技術來構建預測支付延遲的特征。
from pyspark.sql import functions as F
from pyspark.sql.window import WindowsalesDF = salesDF.withColumn('transaction_day', F.dayofmonth(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_month', F.month(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_year', F.year(F.col('transaction_date')))
salesDF = salesDF.withColumn('day_of_week', F.dayofweek(F.col('transaction_date')) - 1)
salesDF = salesDF.withColumn('payment_day', F.dayofmonth(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_month', F.month(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_year', F.year(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_day_of_week', F.dayofweek(F.col('payment_due_date')) - 1)
3 客戶級別聚合:理解歷史模式
在深入了解窗口函數之前,我們通常需要客戶級別的統計數據。這些數據為理解當前行為是典型還是異常提供了背景。
salesDF = salesDF.join(salesDF.groupBy('client_id', 'transaction_type').agg(F.mean('delay_days').alias('client_delay_average'),F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('client_delay_weighted_avg'),F.stddev('delay_days').alias('client_delay_stddev'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('client_delay_weighted_stddev'),F.expr('percentile_approx(delay_days, 0.5)').alias('client_delay_median'),F.count('delay_days').alias('client_transaction_count')),on=['client_id', 'transaction_type'],how='left'
)
加權標準差的計算可能看起來很復雜,但它使用的是數學公式:E[X2]?(E[X])2\sqrt{E[X^2] - (E[X])^2}E[X2]?(E[X])2?,其中較大的交易對標準差計算的影響更大。
4 滾動窗口:捕捉時間趨勢
這就是窗口函數真正發揮作用的地方。滾動窗口允許我們計算滑動時間段內的指標,捕捉客戶行為中的趨勢和季節性。
time_windows = [30, 90, 365]for days in time_windows:rolling_window = (Window.partitionBy('client_id', 'transaction_type').orderBy(F.col('transaction_date').cast("timestamp").cast("long")).rangeBetween(-days * 86400, -1))salesDF = salesDF.withColumn(f'delay_rolling_avg_{days}d',F.avg('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_std_{days}d',F.stddev('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_weighted_avg_{days}d',F.try_divide(F.sum(F.col('delay_days') * F.col('invoice_amount')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)))salesDF = salesDF.withColumn(f'delay_rolling_weighted_std_{days}d',F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)), 2)))
5 關鍵概念解釋
Range(范圍)與 Rows(行)窗口:我們使用 rangeBetween(-days * 86400, -1)
而不是 rowsBetween()
,因為我們想要一個基于時間的窗口。這確保我們能夠精確地捕獲指定天數的數據,而與交易頻率無關。
加權計算:通過按發票金額對指標進行加權,我們賦予了較大交易更高的重要性,這通常能更好地代表客戶的支付行為。
排除當前行:將 -1
作為上限可以排除當前交易,從而防止預測模型中的數據泄露。
6 月度滯后特征:季節性模式分析
為了進行長期趨勢分析,我們可以創建月度聚合并生成滯后特征以捕捉季節性模式。
monthlyDF = salesDF.groupBy("client_id", "transaction_type", "transaction_year", "transaction_month"
).agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('monthly_weighted_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('monthly_weighted_delay_std')
)monthly_window = Window.partitionBy("client_id", "transaction_type") \.orderBy("transaction_year", "transaction_month")for lag_months in range(1, 13):monthlyDF = monthlyDF.withColumn(f"delay_avg_lag_{lag_months}m",F.lag("monthly_weighted_delay_avg", lag_months).over(monthly_window))monthlyDF = monthlyDF.withColumn(f"delay_std_lag_{lag_months}m",F.lag("monthly_weighted_delay_std", lag_months).over(monthly_window))salesDF = salesDF.join(monthlyDF,on=["client_id", "transaction_type", "transaction_year", "transaction_month"],how="left"
)
7 性能優化技巧
分區策略:始終根據邏輯上對數據進行分組的高基數列進行分區。這可以最大限度地減少數據混洗。
窗口框架優化:使用盡可能限制性的框架。無界窗口開銷大且通常不必要。
緩存:當對同一數據集執行多個窗口操作時,考慮緩存中間結果。
salesDF.cache()
8 行業級別基準
不要忘記創建行業或細分市場級別的基準進行比較:
salesDF = salesDF.join(salesDF.groupBy('industry_sector', "transaction_type").agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('industry_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('industry_delay_std')),on=['industry_sector', 'transaction_type'],how='left'
)
9 結論
窗口函數解鎖了 PySpark 中復雜的分析能力,使您能夠為機器學習和高級分析創建豐富的特征集。關鍵在于理解何時使用不同類型的窗口:
- 無界窗口:用于累積指標
- 基于范圍的窗口:用于時間序列分析
- 基于行的窗口:用于排名和百分位數
- 滯后函數:用于趨勢和季節性檢測
通過將這些技術與適當的分區和優化策略相結合,您可以構建健壯、可擴展的分析管道,捕捉數據中復雜的時間模式。
開始在您自己的數據集中嘗試這些模式,您很快就會發現窗口函數在將原始數據轉化為可操作洞察方面的真正力量。