在 PySpark 中解鎖窗口函數的力量,實現高級數據轉換

本篇文章Mastering PySpark Window Functions: A Practical Guide to Time-Based Analytics適合數據分析和工程師入門了解PySpark的窗口函數。文章的亮點在于詳細介紹了窗口函數的基本概念及其在銷售數據分析中的實際應用,幫助讀者理解如何進行復雜的數據計算而無需多次連接或聚合。


文章目錄

  • 1 理解窗口函數:基礎
  • 2 搭建分析管道
  • 3 客戶級別聚合:理解歷史模式
  • 4 滾動窗口:捕捉時間趨勢
  • 5 關鍵概念解釋
  • 6 月度滯后特征:季節性模式分析
  • 7 性能優化技巧
  • 8 行業級別基準
  • 9 結論


PySpark logo image

窗口函數是 Apache Spark 中最強大但卻未被充分利用的功能之一。它們允許您對與當前行相關的行執行復雜的計算,而無需昂貴的連接或多次聚合。在這篇文章中,我們將通過一個銷售分析場景來探討窗口函數的實際應用。

1 理解窗口函數:基礎

可以將窗口函數視為一種在處理每個單獨行時“窺視”相鄰行的方式。與將多行合并為一行的常規聚合不同,窗口函數會為每個輸入行返回一個結果,同時考慮一個相關行的“窗口”。

窗口函數的基本組成包括:

  • Partition By(按分區):將行分組到邏輯分區中
  • Order By(按排序):定義每個分區內的排序
  • Frame(框架):指定分區內要包含在計算中的行

2 搭建分析管道

讓我們從一個包含交易記錄的銷售數據集開始。我們將使用各種窗口函數技術來構建預測支付延遲的特征。

from pyspark.sql import functions as F
from pyspark.sql.window import WindowsalesDF = salesDF.withColumn('transaction_day', F.dayofmonth(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_month', F.month(F.col('transaction_date')))
salesDF = salesDF.withColumn('transaction_year', F.year(F.col('transaction_date')))
salesDF = salesDF.withColumn('day_of_week', F.dayofweek(F.col('transaction_date')) - 1)
salesDF = salesDF.withColumn('payment_day', F.dayofmonth(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_month', F.month(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_year', F.year(F.col('payment_due_date')))
salesDF = salesDF.withColumn('payment_day_of_week', F.dayofweek(F.col('payment_due_date')) - 1)

3 客戶級別聚合:理解歷史模式

在深入了解窗口函數之前,我們通常需要客戶級別的統計數據。這些數據為理解當前行為是典型還是異常提供了背景。

salesDF = salesDF.join(salesDF.groupBy('client_id', 'transaction_type').agg(F.mean('delay_days').alias('client_delay_average'),F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('client_delay_weighted_avg'),F.stddev('delay_days').alias('client_delay_stddev'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('client_delay_weighted_stddev'),F.expr('percentile_approx(delay_days, 0.5)').alias('client_delay_median'),F.count('delay_days').alias('client_transaction_count')),on=['client_id', 'transaction_type'],how='left'
)

加權標準差的計算可能看起來很復雜,但它使用的是數學公式:E[X2]?(E[X])2\sqrt{E[X^2] - (E[X])^2}E[X2]?(E[X])2?,其中較大的交易對標準差計算的影響更大。

4 滾動窗口:捕捉時間趨勢

這就是窗口函數真正發揮作用的地方。滾動窗口允許我們計算滑動時間段內的指標,捕捉客戶行為中的趨勢和季節性。

time_windows = [30, 90, 365]for days in time_windows:rolling_window = (Window.partitionBy('client_id', 'transaction_type').orderBy(F.col('transaction_date').cast("timestamp").cast("long")).rangeBetween(-days * 86400, -1))salesDF = salesDF.withColumn(f'delay_rolling_avg_{days}d',F.avg('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_std_{days}d',F.stddev('delay_days').over(rolling_window))salesDF = salesDF.withColumn(f'delay_rolling_weighted_avg_{days}d',F.try_divide(F.sum(F.col('delay_days') * F.col('invoice_amount')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)))salesDF = salesDF.withColumn(f'delay_rolling_weighted_std_{days}d',F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')).over(rolling_window),F.sum(F.col('invoice_amount')).over(rolling_window)), 2)))

5 關鍵概念解釋

Range(范圍)與 Rows(行)窗口:我們使用 rangeBetween(-days * 86400, -1) 而不是 rowsBetween(),因為我們想要一個基于時間的窗口。這確保我們能夠精確地捕獲指定天數的數據,而與交易頻率無關。

加權計算:通過按發票金額對指標進行加權,我們賦予了較大交易更高的重要性,這通常能更好地代表客戶的支付行為。

排除當前行:將 -1 作為上限可以排除當前交易,從而防止預測模型中的數據泄露。

6 月度滯后特征:季節性模式分析

為了進行長期趨勢分析,我們可以創建月度聚合并生成滯后特征以捕捉季節性模式。

monthlyDF = salesDF.groupBy("client_id", "transaction_type", "transaction_year", "transaction_month"
).agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('monthly_weighted_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('monthly_weighted_delay_std')
)monthly_window = Window.partitionBy("client_id", "transaction_type") \.orderBy("transaction_year", "transaction_month")for lag_months in range(1, 13):monthlyDF = monthlyDF.withColumn(f"delay_avg_lag_{lag_months}m",F.lag("monthly_weighted_delay_avg", lag_months).over(monthly_window))monthlyDF = monthlyDF.withColumn(f"delay_std_lag_{lag_months}m",F.lag("monthly_weighted_delay_std", lag_months).over(monthly_window))salesDF = salesDF.join(monthlyDF,on=["client_id", "transaction_type", "transaction_year", "transaction_month"],how="left"
)

7 性能優化技巧

分區策略:始終根據邏輯上對數據進行分組的高基數列進行分區。這可以最大限度地減少數據混洗。

窗口框架優化:使用盡可能限制性的框架。無界窗口開銷大且通常不必要。

緩存:當對同一數據集執行多個窗口操作時,考慮緩存中間結果。

salesDF.cache()

8 行業級別基準

不要忘記創建行業或細分市場級別的基準進行比較:

salesDF = salesDF.join(salesDF.groupBy('industry_sector', "transaction_type").agg(F.expr('sum(delay_days * invoice_amount) / sum(invoice_amount)').alias('industry_delay_avg'),F.sqrt(F.try_divide(F.sum(F.col('invoice_amount') * F.pow(F.col('delay_days'), 2)),F.sum('invoice_amount')) -F.pow(F.try_divide(F.sum(F.col('invoice_amount') * F.col('delay_days')),F.sum('invoice_amount')), 2)).alias('industry_delay_std')),on=['industry_sector', 'transaction_type'],how='left'
)

9 結論

窗口函數解鎖了 PySpark 中復雜的分析能力,使您能夠為機器學習和高級分析創建豐富的特征集。關鍵在于理解何時使用不同類型的窗口:

  • 無界窗口:用于累積指標
  • 基于范圍的窗口:用于時間序列分析
  • 基于行的窗口:用于排名和百分位數
  • 滯后函數:用于趨勢和季節性檢測

通過將這些技術與適當的分區和優化策略相結合,您可以構建健壯、可擴展的分析管道,捕捉數據中復雜的時間模式。

開始在您自己的數據集中嘗試這些模式,您很快就會發現窗口函數在將原始數據轉化為可操作洞察方面的真正力量。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/97746.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/97746.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/97746.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

從理念到實踐:三層解耦架構與“無系統”論

在上一篇中,我們揭示了“五層雙閉環”治理模型如何像骨骼一樣,為數字化轉型提供支撐和定型。但再宏偉的藍圖也需要堅實的施工來實現。今天,我們將深入最具體的實施層面,將“業務重塑”和“以人為本”的理念,轉化為可落…

詳細介紹Linux 內存管理struct page數據結構中的_count和_mapcount有什么區別?

在Linux內核的struct page中,_count(或_refcount)和_mapcount是兩個關鍵的引用計數成員,它們各自承擔不同的職責。以下是深度解析和代碼案例:1. _count vs _mapcount 區別詳解_count(或_refcount&#xff0…

面陣 vs 線陣相機:怎么選不踩坑?選型公式直接套用

面陣vs線陣相機:怎么選不踩坑?選型公式直接套用🎯面陣vs線陣相機怎么選不踩坑?🎯一、面陣相機:工業檢測的“萬能選手”,拍全圖靠它🎯二、線陣相機:大視野/高精度的“專屬…

Spring Security 如何使用@PreAuthorize注解

🧱 第一步:環境準備? 1. 創建數據庫(MySQL)-- 創建數據庫,使用 utf8mb4 字符集支持 emoji 和多語言 CREATE DATABASE security_demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;-- 使用該數據庫 USE security…

JVM中產生OOM(內存溢出)的8種典型情況及解決方案

Java中的OutOfMemoryError(OOM)是當JVM內存不足時拋出的錯誤。本文將全面剖析JVM中產生OOM的各種情況,包括堆內存溢出、方法區溢出、棧溢出等,并提供詳細的診斷方法和解決方案。 一、OOM基礎概念 1.1 OOM錯誤類型 Java中的OOM是…

【IEEE出版、EI檢索、往屆會后3個月檢索】第四屆信號處理、計算機網絡與通信國際學術會議(SPCNC 2025)

第四屆信號處理、計算機網絡與通信國際學術會議(SPCNC 2025)將于2025年12月5-7日于中國武漢召開(線上同步)。為本次會議旨在齊聚海內外信號處理、計算機網絡與通信等計算機領域的專家學者,為相關領域研究和從業人員提供…

Spring boot注解介紹

1. Spring 核心注解Spring Boot 是基于 Spring 框架的,所以核心注解依然適用。? 常見核心注解Component表示一個通用組件,Spring 會自動掃描并注入到容器中。Component public class MyComponent {public void sayHello() {System.out.println("He…

撤銷回退 情況?:已經 add ,但沒有 commit

撤銷回退 情況?:已經 add ,但沒有 commit add 后還是保存到了暫存區呢?怎么撤銷呢? 1 # 向ReadMe中新增??代碼 2 hyb139-159-150-152:~/gitcode$ vim ReadMe 3 hyb139-159-150-152:~/gitcode$ cat ReadMe 4 hello bit 5 hell…

【Linux筆記】命令行與vim基礎

一、Linux命令行基礎 1. 基本語法命令空格參數(可寫可不寫)空格文件,文件夾(可寫可不寫)ls列出文件夾中的內容/opt 根目錄下的opt文件夾ls-a all顯示出所有文件以及隱藏文件/optls-a如果不寫則輸出一個點,當…

Redis 的整數集合:像分類收納盒一樣的整數專屬存儲

目錄 一、先懂定位:為什么需要整數集合?(銜接哈希表) 二、整數集合的結構:像 “貼了規格標簽的收納盒” 1. encoding:收納盒的 “規格標簽”(核心:決定格子大小) 2. …

Linux 進程狀態 — 僵尸進程

🎁個人主頁:工藤新一 🔍系列專欄:C面向對象(類和對象篇) 🌟心中的天空之城,終會照亮我前方的路 🎉歡迎大家點贊👍評論📝收藏?文章 文章目錄進…

React 中 key 的作用

React 中 key 的作用是什么? Date: August 31, 2025 Area: 原理key 概念 在 React 中,key 用于識別哪些元素是變化、添加或刪除的。 在列表渲染中,key 尤其重要,因為它能提高渲染性能和確保組件狀態的一致性。key 的作用 1&#x…

wpf之附加屬性

前言 附加屬性是 WPF 中一個非常強大和獨特的概念。簡單來說,它允許一個對象為另一個在其本身類定義中未定義的屬性賦值。 1、定義附加屬性 定義一個Watermark的附加屬性,該屬性的作用是將TextBox的附加屬性改變時,TextBox的字體顏色改成灰…

深入淺出 RabbitMQ-消息可靠性投遞

大家好,我是工藤學編程 🦉一個正在努力學習的小博主,期待你的關注實戰代碼系列最新文章😉C實現圖書管理系統(Qt C GUI界面版)SpringBoot實戰系列🐷【SpringBoot實戰系列】SpringBoot3.X 整合 Mi…

數字化時代,中小企業如何落地數字化轉型

大數據時代,各行各業的行業龍頭和大型集團都已經開始了數據管理,讓數據成為數據資產。但是在我國,中小企業的數量巨大,很多管理者忽視了這一點,今天我們就來聊一聊中小企業的數字化轉型。中小企業需要數字化轉型首先要…

Unity筆記(九)——畫線功能Linerenderer、范圍檢測、射線檢測

寫在前面:寫本系列(自用)的目的是回顧已經學過的知識、記錄新學習的知識或是記錄心得理解,方便自己以后快速復習,減少遺忘。這里只記錄代碼知識。十一、畫線功能Linerenderer畫線功能Linerenderer是Unity提供的畫線腳本,創建一個空…

刷題記錄(8)string類操作使用

一、僅反轉字母 917. 僅僅反轉字母 - 力扣(LeetCode) 簡單來說輸入字符串,要求你返回所有僅字母位置反轉后的字符串。 簡單看一個樣例加深理解: 前后互換,我想思路基本很明顯了,雙指針,或者說…

用好AI,從提示詞工程到上下文工程

前言 隨著 AI 大模型的爆發,提示詞工程(prompt engineering ) 一度是用戶應用 AI ,發揮 AI 能力最重要、也最應該掌握的技術。 但現在,在 “提示詞工程”的基礎上,一個更寬泛也更強力的演化概念被提出,也就是本文我們要介紹的 “上下文工程(Context Engineering)” …

計算機Python畢業設計推薦:基于Django+Vue用戶評論挖掘旅游系統

精彩專欄推薦訂閱:在下方主頁👇🏻👇🏻👇🏻👇🏻 💖🔥作者主頁:計算機畢設木哥🔥 💖 文章目錄 一、項目介紹二、…

? 肆 ? ? 默認安全:安全建設方案 ? a.信息安全基線

👍點「贊」📌收「藏」👀關「注」💬評「論」 在金融科技深度融合的背景下,信息安全已從單純的技術攻防擴展至架構、合規、流程與創新的系統工程。作為一名從業十多年的老兵,將系統闡述數字銀行安全體系的建設…