假設我在Amazon S3上有銷售表的Parquet數據文件的路徑,包含ID主鍵、門店ID、日期、銷售員姓名和銷售額,需要分別用PySpark的SparkSQL和Dataframe API統計出每個月所有門店和各門店銷售額最高的人,不一定是一個人,以及他所在的門店ID和月總銷售額。
使用DataFrame API實現:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, date_format, col
from pyspark.sql.window import Window# 初始化Spark會話
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()# 讀取S3上的Parquet文件
df = spark.read.parquet("s3://path/to/sales/data")# 處理日期字段并計算每月各門店各銷售員的銷售額總和
sales_aggregated = df.withColumn("month", date_format(col("日期"), "yyyy-MM")) \.groupBy("門店ID", "month", "銷售員姓名") \.agg(sum("銷售額").alias("sales_total"))# 定義窗口規范(按門店和月份分區)
window_spec = Window.partitionBy("門店ID", "month")# 使用窗口函數計算最大銷售額和月總銷售額
result_df = sales_aggregated \.withColumn("max_sales", max("sales_total").over(window_spec)) \.withColumn("monthly_total", sum("sales_total").over(window_spec)) \.filter(col("sales_total") == col("max_sales")) \.select("month", "門店ID", "monthly_total", "銷售員姓名", "sales_total") \.orderBy("month", "門店ID", "銷售員姓名")# 顯示結果
result_df.show()
使用SparkSQL實現:
# 注冊DataFrame為臨時視圖
df.createOrReplaceTempView("sales_data")# 執行SQL查詢
sql_result = spark.sql("""
WITH sales_aggregated AS (SELECT 門店ID,date_format(日期, 'yyyy-MM') AS month,銷售員姓名,SUM(銷售額) AS sales_totalFROM sales_dataGROUP BY 門店ID, date_format(日期, 'yyyy-MM'), 銷售員姓名
)
SELECT month,門店ID,monthly_total,銷售員姓名,sales_total
FROM (SELECT month,門店ID,銷售員姓名,sales_total,MAX(sales_total) OVER (PARTITION BY 門店ID, month) AS max_sales,SUM(sales_total) OVER (PARTITION BY 門店ID, month) AS monthly_totalFROM sales_aggregated
)
WHERE sales_total = max_sales
ORDER BY month, 門店ID, 銷售員姓名
""")# 顯示結果
sql_result.show()
說明:
- 數據準備:從S3讀取Parquet文件,并使用
date_format
處理日期字段為年月格式。 - 聚合計算:
- 先按門店、月份和銷售員分組,計算每個銷售員當月的總銷售額。
- 窗口函數:
- 使用窗口函數分別計算每個門店每月的最大銷售額(用于識別最高銷售員)和月總銷售額。
- 結果過濾:
- 篩選出銷售額等于當月最大銷售額的記錄(可能包含多個銷售員)。
- 排序輸出:按月份、門店ID和銷售員姓名排序,確保結果有序。
兩種實現方式均會輸出以下列:
month
:年月格式(yyyy-MM)門店ID
:門店標識monthly_total
:該門店當月的總銷售額銷售員姓名
:當月銷售額最高的銷售員sales_total
:該銷售員當月的銷售額(等于當月最高銷售額)
假設我在Amazon S3上有銷售表的Parquet數據文件的路徑,包含ID主鍵、門店ID、日期、銷售員姓名和銷售額,需要分別用PySpark的SparkSQL和Dataframe API統計出按月統計的同比和環比數據,當前月如果不是月底的話,同比或環比數據需要取得上個月或者去年1日到對應的日期的總銷售額值。
1. 使用DataFrame API實現
from pyspark.sql import SparkSession
from pyspark.sql import functions as Fspark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()# 讀取Parquet數據
df = spark.read.parquet("s3://your-bucket/path/to/sales_data")# 獲取當前日期信息
current_date = spark.sql("SELECT current_date()").first()[0]
current_year = current_date.year
current_month = current_date.month
current_day = current_date.day# 數據預處理
processed_df = (df.withColumn("date", F.col("date").cast("date")).withColumn("last_day", F.last_day("date")).withColumn("max_day", F.dayofmonth("last_day")).withColumn("cutoff_day", F.least(F.lit(current_day), F.col("max_day"))).filter(F.dayofmonth("date") <= F.col("cutoff_day"))
)# 按月聚合銷售額
monthly_sales = (processed_df.groupBy(F.year("date").alias("year"), F.month("date").alias("month")).agg(F.sum("sales").alias("total_sales"))
)# 計算前月和去年同月信息
monthly_sales = (monthly_sales.withColumn("prev_month_year", F.when(F.col("month") == 1, F.col("year") - 1).otherwise(F.col("year"))).withColumn("prev_month_month", F.when(F.col("month") == 1, 12).otherwise(F.col("month") - 1)).withColumn("prev_year_year", F.col("year") - 1).withColumn("prev_year_month", F.col("month"))
)# 創建臨時視圖
monthly_sales.createOrReplaceTempView("monthly_sales")# 通過自連接獲取比較數據
final_result = (monthly_sales.alias("curr").join(monthly_sales.alias("prev_month"),(F.col("curr.prev_month_year") == F.col("prev_month.year")) &(F.col("curr.prev_month_month") == F.col("prev_month.month")),"left").join(monthly_sales.alias("prev_year"),(F.col("curr.prev_year_year") == F.col("prev_year.year")) &(F.col("curr.prev_year_month") == F.col("prev_year.month")),"left").select(F.col("curr.year"),F.col("curr.month"),F.col("curr.total_sales"),F.col("prev_month.total_sales").alias("prev_month_sales"),F.col("prev_year.total_sales").alias("prev_year_sales"))
)# 計算增長率
final_result = final_result.withColumn("month_over_month",((F.col("total_sales") - F.col("prev_month_sales")) / F.col("prev_month_sales") * 100
).withColumn("year_over_year",((F.col("total_sales") - F.col("prev_year_sales")) / F.col("prev_year_sales") * 100
)# 顯示結果
final_result.show()
2. 使用SparkSQL實現
# 注冊預處理后的視圖
processed_df.createOrReplaceTempView("processed_sales")# 執行SQL查詢
sql_query = """
WITH monthly_sales AS (SELECTYEAR(date) AS year,MONTH(date) AS month,SUM(sales) AS total_salesFROM processed_salesGROUP BY YEAR(date), MONTH(date)
),
comparison_data AS (SELECTcurr.year,curr.month,curr.total_sales,prev_month.total_sales AS prev_month_sales,prev_year.total_sales AS prev_year_salesFROM monthly_sales currLEFT JOIN monthly_sales prev_monthON (curr.year = prev_month.year AND curr.month = prev_month.month + 1)OR (curr.month = 1 AND prev_month.month = 12 AND curr.year = prev_month.year + 1)LEFT JOIN monthly_sales prev_yearON curr.year = prev_year.year + 1 AND curr.month = prev_year.month
)
SELECTyear,month,total_sales,ROUND((total_sales - prev_month_sales) / prev_month_sales * 100, 2) AS mom_growth,ROUND((total_sales - prev_year_sales) / prev_year_sales * 100, 2) AS yoy_growth
FROM comparison_data
ORDER BY year, month
"""spark.sql(sql_query).show()
說明:
-
數據預處理:
- 轉換日期類型并計算每個月的最后一天
- 動態計算每個月的有效截止日期(考慮當前日期和月份長度)
- 過濾出有效日期范圍內的數據
-
聚合計算:
- 按年月分組計算總銷售額
- 使用自連接獲取前月和去年同月的銷售額數據
-
增長率計算:
- 環比增長率 = (本月銷售額 - 上月銷售額) / 上月銷售額 * 100
- 同比增長率 = (本月銷售額 - 去年同期銷售額) / 去年同期銷售額 * 100
-
特殊處理:
- 自動處理月份邊界(如1月的前月是去年12月)
- 處理NULL值避免除零錯誤
- 動態適應不同月份的天數差異