一、項目概述
(一)實驗目標
- 統計有效數據條數:篩選出uid、phone、addr三個字段均無空值的記錄并計數。
- 提取用戶數量最多的前 20 個地址:按地址分組統計用戶數,按降序排序后取前 20 名。
(二)數據說明
- 數據格式
- 輸入數據為 JSON 格式,字段包括uid(用戶 ID)、phone(電話號碼)、addr(地址)。
- 數據特點:部分記錄存在格式不規范問題(如單引號混用、字段值缺失、地址格式不統一,例如 “江蘇省 蘇州”“廣東省 中山” 等),需先清洗轉換。
- 示例數據:
json
{"uid":"1000166111","phone":"17703771999","addr":"河南省 南陽"}
{"uid":"1000432103","phone":"15388889881","addr":"云南省 昆明"}
- 有效數據定義
同時滿足以下條件的記錄:- uid不為空(非null且非空字符串);
- phone不為空(非null且非空字符串);
- addr不為空(非null且非空字符串)。
二、實驗準備
(一)環境配置
- 軟件依賴
- Spark 3.x+(需啟用 Hive 支持以使用get_json_object函數);
- 編程語言:Scala/Python(本文以 Scala 為例,Python 代碼可通過 PySpark 實現);
- 配置文件:確保spark.sql.warehouse.dir指向 HDFS 或本地路徑(如hdfs://node01:9000/user/hive/warehouse)。
- 數據準備
- 將 JSON 數據保存為文件(如user_data.json),確保每行一個 JSON 對象;
- 若存在格式錯誤(如單引號),先用文本處理工具(如sed 's/\'/"/g')統一為雙引號。
三、數據處理流程
(一)數據讀取與格式轉換
1. 讀取原始數據
使用 Spark 的 JSON 數據源直接加載數據,自動推斷 Schema:
scala
val rawDF = spark.read.json("path/to/user_data.json")
rawDF.printSchema() // 檢查字段是否正確解析(可能因格式問題導致字段類型為String)
2. 字段提取與清洗
通過get_json_object函數(Spark SQL 內置函數)解析 JSON 字段,處理不規范格式:
scala
// 方法1:Spark SQL語句(推薦,清晰易讀)
rawDF.createOrReplaceTempView("raw_data")
val parsedDF = spark.sql("""
SELECT
get_json_object(raw_data.data, '$.uid') AS uid, -- 提取uid
get_json_object(raw_data.data, '$.phone') AS phone, -- 提取phone
trim(get_json_object(raw_data.data, '$.addr')) AS addr -- 提取addr并去除前后空格
FROM raw_data
""")
// 方法2:DataFrame API(適合編程式處理)
import org.apache.spark.sql.functions.expr
val parsedDF = rawDF.select(
expr("get_json_object(data, '$.uid')").as("uid"),
expr("get_json_object(data, '$.phone')").as("phone"),
expr("trim(get_json_object(data, '$.addr'))").as("addr")
)
(二)統計有效數據條數
1. 篩選有效數據
過濾掉任一字段為空的記錄:
scala
val validDF = parsedDF.filter(
col("uid").isNotNull &&
col("phone").isNotNull &&
col("addr").isNotNull
)
或通過SQL語句:
spark.sql("SELECT * FROM parsed_data WHERE uid IS NOT NULL AND phone IS NOT NULL AND addr IS NOT NULL")
2. 計數
scala
val validCount = validDF.count()
println(s"有效數據條數:$validCount")
或通過SQL返回結果:
spark.sql("SELECT COUNT(*) AS valid_data_count FROM valid_data").show()
(三)統計用戶數量最多的前 20 個地址
1. 分組聚合
按addr分組,統計每個地址的用戶數(直接使用count(*),因uid唯一,也可count(DISTINCT uid),需根據業務需求選擇):
scala
val addrGroupDF = validDF.groupBy("addr").count().withColumnRenamed("count", "user_count")
2. 排序與篩選
按用戶數降序排序,取前 20 條:
scala
val top20Addresses = addrGroupDF.orderBy(desc("user_count")).limit(20)
top20Addresses.show(false) // 展示結果,地址不換行
3. SQL 完整實現
spark.sql("""
SELECT
addr,
COUNT(*) AS user_count-- 或COUNT(DISTINCT uid)去重統計
FROM valid_data
GROUP BY addr
ORDER BY user_count DESC
LIMIT 20
""").show()
五、擴展與優化建議
(一)數據清洗增強
- 地址標準化:使用正則表達式或自定義函數清洗地址(如 “江蘇省蘇州” 統一為 “江蘇省蘇州市”);
- 手機號格式校驗:添加正則表達式過濾無效手機號(如^1[3-9]\d{9}$)。
(二)性能優化
- 分區與緩存:對大數據集使用repartition分區,對高頻訪問的中間表(如validDF)調用cache();
- 列式存儲:將結果數據保存為 Parquet 格式(validDF.write.parquet("output/valid_data")),提升后續查詢效率。
(三)結果輸出
將最終結果導出到 HDFS、本地文件或數據庫:
scala
top20Addresses.write
.mode("overwrite")
.csv("output/top20_addresses") // 保存為CSV文件