通過門店銷售明細表用PySpark得到每月每個門店的銷冠和按月的同比環比數據

假設我在Amazon S3上有銷售表的Parquet數據文件的路徑,包含ID主鍵、門店ID、日期、銷售員姓名和銷售額,需要分別用PySpark的SparkSQL和Dataframe API統計出每個月所有門店和各門店銷售額最高的人,不一定是一個人,以及他所在的門店ID和月總銷售額。

使用DataFrame API實現:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, date_format, col
from pyspark.sql.window import Window# 初始化Spark會話
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()# 讀取S3上的Parquet文件
df = spark.read.parquet("s3://path/to/sales/data")# 處理日期字段并計算每月各門店各銷售員的銷售額總和
sales_aggregated = df.withColumn("month", date_format(col("日期"), "yyyy-MM")) \.groupBy("門店ID", "month", "銷售員姓名") \.agg(sum("銷售額").alias("sales_total"))# 定義窗口規范(按門店和月份分區)
window_spec = Window.partitionBy("門店ID", "month")# 使用窗口函數計算最大銷售額和月總銷售額
result_df = sales_aggregated \.withColumn("max_sales", max("sales_total").over(window_spec)) \.withColumn("monthly_total", sum("sales_total").over(window_spec)) \.filter(col("sales_total") == col("max_sales")) \.select("month", "門店ID", "monthly_total", "銷售員姓名", "sales_total") \.orderBy("month", "門店ID", "銷售員姓名")# 顯示結果
result_df.show()

使用SparkSQL實現:

# 注冊DataFrame為臨時視圖
df.createOrReplaceTempView("sales_data")# 執行SQL查詢
sql_result = spark.sql("""
WITH sales_aggregated AS (SELECT 門店ID,date_format(日期, 'yyyy-MM') AS month,銷售員姓名,SUM(銷售額) AS sales_totalFROM sales_dataGROUP BY 門店ID, date_format(日期, 'yyyy-MM'), 銷售員姓名
)
SELECT month,門店ID,monthly_total,銷售員姓名,sales_total
FROM (SELECT month,門店ID,銷售員姓名,sales_total,MAX(sales_total) OVER (PARTITION BY 門店ID, month) AS max_sales,SUM(sales_total) OVER (PARTITION BY 門店ID, month) AS monthly_totalFROM sales_aggregated
) 
WHERE sales_total = max_sales
ORDER BY month, 門店ID, 銷售員姓名
""")# 顯示結果
sql_result.show()

說明:

  1. 數據準備:從S3讀取Parquet文件,并使用date_format處理日期字段為年月格式。
  2. 聚合計算
    • 先按門店、月份和銷售員分組,計算每個銷售員當月的總銷售額。
  3. 窗口函數
    • 使用窗口函數分別計算每個門店每月的最大銷售額(用于識別最高銷售員)和月總銷售額。
  4. 結果過濾
    • 篩選出銷售額等于當月最大銷售額的記錄(可能包含多個銷售員)。
  5. 排序輸出:按月份、門店ID和銷售員姓名排序,確保結果有序。

兩種實現方式均會輸出以下列:

  • month:年月格式(yyyy-MM)
  • 門店ID:門店標識
  • monthly_total:該門店當月的總銷售額
  • 銷售員姓名:當月銷售額最高的銷售員
  • sales_total:該銷售員當月的銷售額(等于當月最高銷售額)

假設我在Amazon S3上有銷售表的Parquet數據文件的路徑,包含ID主鍵、門店ID、日期、銷售員姓名和銷售額,需要分別用PySpark的SparkSQL和Dataframe API統計出按月統計的同比和環比數據,當前月如果不是月底的話,同比或環比數據需要取得上個月或者去年1日到對應的日期的總銷售額值。

1. 使用DataFrame API實現

from pyspark.sql import SparkSession
from pyspark.sql import functions as Fspark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()# 讀取Parquet數據
df = spark.read.parquet("s3://your-bucket/path/to/sales_data")# 獲取當前日期信息
current_date = spark.sql("SELECT current_date()").first()[0]
current_year = current_date.year
current_month = current_date.month
current_day = current_date.day# 數據預處理
processed_df = (df.withColumn("date", F.col("date").cast("date")).withColumn("last_day", F.last_day("date")).withColumn("max_day", F.dayofmonth("last_day")).withColumn("cutoff_day", F.least(F.lit(current_day), F.col("max_day"))).filter(F.dayofmonth("date") <= F.col("cutoff_day"))
)# 按月聚合銷售額
monthly_sales = (processed_df.groupBy(F.year("date").alias("year"), F.month("date").alias("month")).agg(F.sum("sales").alias("total_sales"))
)# 計算前月和去年同月信息
monthly_sales = (monthly_sales.withColumn("prev_month_year", F.when(F.col("month") == 1, F.col("year") - 1).otherwise(F.col("year"))).withColumn("prev_month_month", F.when(F.col("month") == 1, 12).otherwise(F.col("month") - 1)).withColumn("prev_year_year", F.col("year") - 1).withColumn("prev_year_month", F.col("month"))
)# 創建臨時視圖
monthly_sales.createOrReplaceTempView("monthly_sales")# 通過自連接獲取比較數據
final_result = (monthly_sales.alias("curr").join(monthly_sales.alias("prev_month"),(F.col("curr.prev_month_year") == F.col("prev_month.year")) &(F.col("curr.prev_month_month") == F.col("prev_month.month")),"left").join(monthly_sales.alias("prev_year"),(F.col("curr.prev_year_year") == F.col("prev_year.year")) &(F.col("curr.prev_year_month") == F.col("prev_year.month")),"left").select(F.col("curr.year"),F.col("curr.month"),F.col("curr.total_sales"),F.col("prev_month.total_sales").alias("prev_month_sales"),F.col("prev_year.total_sales").alias("prev_year_sales"))
)# 計算增長率
final_result = final_result.withColumn("month_over_month",((F.col("total_sales") - F.col("prev_month_sales")) / F.col("prev_month_sales") * 100
).withColumn("year_over_year",((F.col("total_sales") - F.col("prev_year_sales")) / F.col("prev_year_sales") * 100
)# 顯示結果
final_result.show()

2. 使用SparkSQL實現

# 注冊預處理后的視圖
processed_df.createOrReplaceTempView("processed_sales")# 執行SQL查詢
sql_query = """
WITH monthly_sales AS (SELECTYEAR(date) AS year,MONTH(date) AS month,SUM(sales) AS total_salesFROM processed_salesGROUP BY YEAR(date), MONTH(date)
),
comparison_data AS (SELECTcurr.year,curr.month,curr.total_sales,prev_month.total_sales AS prev_month_sales,prev_year.total_sales AS prev_year_salesFROM monthly_sales currLEFT JOIN monthly_sales prev_monthON (curr.year = prev_month.year AND curr.month = prev_month.month + 1)OR (curr.month = 1 AND prev_month.month = 12 AND curr.year = prev_month.year + 1)LEFT JOIN monthly_sales prev_yearON curr.year = prev_year.year + 1 AND curr.month = prev_year.month
)
SELECTyear,month,total_sales,ROUND((total_sales - prev_month_sales) / prev_month_sales * 100, 2) AS mom_growth,ROUND((total_sales - prev_year_sales) / prev_year_sales * 100, 2) AS yoy_growth
FROM comparison_data
ORDER BY year, month
"""spark.sql(sql_query).show()

說明:

  1. 數據預處理

    • 轉換日期類型并計算每個月的最后一天
    • 動態計算每個月的有效截止日期(考慮當前日期和月份長度)
    • 過濾出有效日期范圍內的數據
  2. 聚合計算

    • 按年月分組計算總銷售額
    • 使用自連接獲取前月和去年同月的銷售額數據
  3. 增長率計算

    • 環比增長率 = (本月銷售額 - 上月銷售額) / 上月銷售額 * 100
    • 同比增長率 = (本月銷售額 - 去年同期銷售額) / 去年同期銷售額 * 100
  4. 特殊處理

    • 自動處理月份邊界(如1月的前月是去年12月)
    • 處理NULL值避免除零錯誤
    • 動態適應不同月份的天數差異

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/78395.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/78395.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/78395.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PostgreSQL 常用日志

PostgreSQL 常用日志詳解 PostgreSQL 提供了多種日志類型&#xff0c;用于監控數據庫活動、排查問題和優化性能。以下是 PostgreSQL 中最常用的日志類型及其配置和使用方法。 一、主要日志類型 日志類型文件位置主要內容用途服務器日志postgresql-<日期>.log服務器運行…

MySQL 存儲過程:解鎖數據庫編程的高效密碼

目錄 一、什么是存儲過程?二、創建存儲過程示例 1:創建一個簡單的存儲過程示例 2:創建帶輸入參數的存儲過程示例 3:創建帶輸出參數的存儲過程三、調用存儲過程調用無參數存儲過程調用帶輸入參數的存儲過程調用帶輸出參數的存儲過程四、存儲過程中的流控制語句示例 1:使用 …

基于STM32的物流搬運機器人

功能&#xff1a;智能循跡、定距夾取、顏色切換、自動跟隨、自動避障、聲音夾取、藍牙遙控、手柄遙控、顏色識別夾取、循跡避障、循跡定距…… 包含內容&#xff1a;完整源碼、使用手冊、原理圖、視頻演示、PPT、論文參考、其余資料 資料只私聊

pg_jieba 中文分詞

os: centos 7.9.2009 pg: 14.7 pg_jieba 依賴 cppjieba、limonp pg_jieba 下載 su - postgreswget https://github.com/jaiminpan/pg_jieba/archive/refs/tags/vmaster.tar.gzunzip ./pg_jieba-master cd ~/pg_jieba-mastercppjieba、limonp 下載 su - postgrescd ~/pg_jie…

基于Python+Flask的MCP SDK響應式文檔展示系統設計與實現

以下是使用Python Flask HTML實現的MCP文檔展示系統&#xff1a; # app.py from flask import Flask, render_templateapp Flask(__name__)app.route(/) def index():return render_template(index.html)app.route(/installation) def installation():return render_templa…

【“星睿O6”AI PC開發套件評測】GPU矩陣指令算力,GPU帶寬和NPU算力測試

【“星睿O6”AI PC開發套件評測】GPU矩陣指令算力&#xff0c;GPU帶寬和NPU算力測試 安謀科技、此芯科技與瑞莎計算機聯合打造了面向AI PC、邊緣、機器人等不同場景的“星睿O6”開發套件 該套件異構集成了Armv9 CPU核心、Arm Immortalis? GPU以及安謀科技“周易”NPU 開箱和…

【Go語言】RPC 使用指南(初學者版)

RPC&#xff08;Remote Procedure Call&#xff0c;遠程過程調用&#xff09;是一種計算機通信協議&#xff0c;允許程序調用另一臺計算機上的子程序&#xff0c;就像調用本地程序一樣。Go 語言內置了 RPC 支持&#xff0c;下面我會詳細介紹如何使用。 一、基本概念 在 Go 中&…

11、Refs:直接操控元素——React 19 DOM操作秘籍

一、元素操控的魔法本質 "Refs是巫師與麻瓜世界的連接通道&#xff0c;讓開發者能像操控魔杖般精準控制DOM元素&#xff01;"魔杖工坊的奧利凡德先生輕撫著魔杖&#xff0c;React/Vue的refs能量在杖尖躍動。 ——以神秘事務司的量子糾纏理論為基&#xff0c;揭示DOM…

MinIO 教程:從入門到Spring Boot集成

文章目錄 一. MinIO 簡介1. 什么是MinIO&#xff1f;2. 應用場景 二. 文件系統存儲發展史1. 服務器磁盤&#xff08;本地存儲&#xff09;2. 分布式文件系統(如 HDFS、Ceph、GlusterFS)3. 對象存儲&#xff08;如 MinIO、AWS S3&#xff09;4.對比總結5.選型建議6.示例方案 三.…

電競俱樂部護航點單小程序,和平地鐵俱樂部點單系統,三角洲護航小程序,暗區突圍俱樂部小程序

電競俱樂部護航點單小程序開發&#xff0c;和平地鐵俱樂部點單系統&#xff0c;三角洲護航小程序&#xff0c;暗區突圍俱樂部小程序開發 端口包含&#xff1a; 超管后臺&#xff0c; 老板端&#xff0c;打手端&#xff0c;商家端&#xff0c;客服端&#xff0c;管事端&#x…

基于 IPMI + Kickstart + Jenkins 的 OS 自動化安裝

Author&#xff1a;Arsen Date&#xff1a;2025/04/26 目錄 環境要求實現步驟自定義 ISO安裝 ipmitool安裝 NFS定義 ks.cfg安裝 HTTP編寫 Pipeline 功能驗證 環境要求 目標服務器支持 IPMI / Redfish 遠程管理&#xff08;如 DELL iDRAC、HPE iLO、華為 iBMC&#xff09;&…

如何在SpringBoot中通過@Value注入Map和List并使用YAML配置?

在SpringBoot開發中&#xff0c;我們經常需要從配置文件中讀取各種參數。對于簡單的字符串或數值&#xff0c;直接使用Value注解就可以了。但當我們需要注入更復雜的數據結構&#xff0c;比如Map或者List時&#xff0c;該怎么操作呢&#xff1f;特別是使用YAML這種更人性化的配…

短信驗證碼安全實戰:三網API+多語言適配開發指南

在短信服務中&#xff0c;創建自定義簽名是發送通知、驗證信息和其他類型消息的重要步驟。萬維易源提供的“三網短信驗證碼”API為開發者和企業提供了高效、便捷的自定義簽名創建服務&#xff0c;可以通過簡單的接口調用提交簽名給運營商審核。本文將詳細介紹如何使用該API&…

RabbitMQ和Seata沖突嗎?Seata與Spring中的事務管理沖突嗎

1. GlobalTransactional 和 Transactional 是否沖突&#xff1f; 答&#xff1a;不沖突&#xff0c;它們可以協同工作&#xff0c;但作用域不同。 Transactional: 這是 Spring 提供的注解&#xff0c;用于管理單個數據源內的本地事務。在你當前的 register 方法中&#xff0c…

一臺服務器已經有個python3.11版本了,如何手動安裝 Python 3.10,兩個版本共存

環境&#xff1a; debian12.8 python3.11 python3.10 問題描述&#xff1a; 一臺服務器已經有個python3.11版本了&#xff0c;如何手動安裝 Python 3.10&#xff0c;兩個版本共存 解決方案&#xff1a; 1.下載 Python 3.10 源碼&#xff1a; wget https://www.python.or…

c++中的enum變量 和 constexpr說明符

author: hjjdebug date: 2025年 04月 23日 星期三 13:40:21 CST description: c中的enum變量 和 constexpr說明符 文章目錄 1.Q:enum 類型變量可以有,--操作嗎&#xff1f;1.1補充: c/c中enum的另一個細微差別. 2.Q: constexpr 修飾的函數,要求傳入的參數必需是常量嗎&#xff…

postman工具

postman工具 進入postman官網 www.postman.com/downloads/ https://www.postman.com/downloads/ https://www.postman.com/postman/published-postman-templates/documentation/ae2ja6x/postman-echo?ctxdocumentation Postman Echo is a service you can use to test your …

Spring和Spring Boot集成MyBatis的完整對比示例,包含從項目創建到測試的全流程代碼

以下是Spring和Spring Boot集成MyBatis的完整對比示例&#xff0c;包含從項目創建到測試的全流程代碼&#xff1a; 一、Spring集成MyBatis示例 1. 項目結構 spring-mybatis-demo/ ├── src/ │ ├── main/ │ │ ├── java/ │ │ │ └── com.example/…

【數據可視化-24】巧克力銷售數據的多維度可視化分析

?? 博主簡介:曾任某智慧城市類企業算法總監,目前在美國市場的物流公司從事高級算法工程師一職,深耕人工智能領域,精通python數據挖掘、可視化、機器學習等,發表過AI相關的專利并多次在AI類比賽中獲獎。CSDN人工智能領域的優質創作者,提供AI相關的技術咨詢、項目開發和個…

c語言-分支結構

以下是我初學C語言的筆記記錄&#xff0c;歡迎留言補充 一&#xff0c;分支結構分為幾個 兩個&#xff0c;一個是if語句&#xff0c;一個是Switch語句 二&#xff0c;if語句 &#xff08;1&#xff09;結構體 int main() {if()//判斷條件{//表達式}else if()//判斷條件{//表達式…