1. 數據采集層測試
用例編號 | 測試目標 | 測試場景 | 預期結果 |
---|---|---|---|
TC-001 | 驗證用戶行為日志采集完整性 | 模擬用戶瀏覽、點擊、加購行為 | Kafka Topic中日志記錄數與模擬量一致 |
TC-002 | 驗證無效數據過濾規則 | 發送爬蟲請求(高頻IP) | 清洗后數據中無該IP的日志記錄 |
2. 數據處理層測試
用例編號 | 測試目標 | 測試場景 | 預期結果 |
---|---|---|---|
TC-003 | 驗證用戶興趣標簽計算邏輯 | 用戶連續瀏覽3次“運動鞋”類目 | 用戶畫像中“運動鞋”興趣權重≥0.8 |
TC-004 | 驗證實時推薦算法覆蓋率 | 新用戶首次訪問無歷史行為 | 推薦列表包含熱門商品且無重復 |
3. 輸出層測試
用例編號 | 測試目標 | 測試場景 | 預期結果 |
---|---|---|---|
TC-005 | 驗證推薦結果寫入Redis的實時性 | 用戶下單后30秒內刷新頁面 | 推薦列表排除已下單商品 |
TC-006 | 驗證API返回結果的安全性 | 請求未授權用戶的推薦接口 | 返回403錯誤碼 |
4. 性能與容錯測試
用例編號 | 測試目標 | 測試場景 | 預期結果 |
---|---|---|---|
TC-007 | 驗證高并發下的數據處理延遲 | 每秒發送10萬條日志,持續5分鐘 | 端到端延遲≤1秒,無數據堆積 |
TC-008 | 驗證Spark任務容錯恢復能力 | 強制終止Spark Executor進程 | 任務自動恢復,數據計算結果一致 |
測試步驟詳解(以TC-001和TC-007為例)
用例TC-001:用戶行為日志采集完整性
步驟:
-
準備測試數據
-
使用Python腳本生成模擬用戶行為日志(JSON格式),包含:
{"user_id": "U123", "event": "click", "item_id": "I456", "timestamp": 1620000000}
-
總數據量:10,000條(含瀏覽、點擊、加購)。
-
-
發送數據到Kafka
-
使用
kafka-console-producer
命令行工具或自定義Producer發送測試數據到指定Topic。 -
命令示例:
cat test_logs.json | kafka-console-producer --broker-list localhost:9092 --topic user_behavior
-
-
驗證數據完整性
-
使用
kafka-console-consumer
消費Topic數據并統計數量:kafka-console-consumer --bootstrap-server localhost:9092 --topic user_behavior --from-beginning | wc -l
-
通過標準:消費到的數據量=10,000條,且字段無缺失。
-
用例TC-007:高并發數據處理延遲測試
步驟:
-
模擬高并發流量
-
使用
Apache JMeter
或Gatling
工具構造每秒10萬條日志的請求壓力。 -
配置JMeter線程組:
-
線程數:500
-
Ramp-up時間:10秒
-
循環次數:持續300秒
-
-
-
監控數據處理鏈路
-
Kafka吞吐量:通過Kafka Manager監控Topic的
Messages In/Seconds
是否達到10萬/秒。 -
Spark Streaming延遲:在Spark UI中查看
Processing Time
和Scheduling Delay
。 -
端到端延遲:在推薦API響應頭中記錄
X-Data-Latency
字段(從日志生成到推薦結果返回的時間)。
-
-
驗證資源與容錯
-
使用
Grafana
監控集群資源:CPU利用率≤80%,內存無OOM(Out of Memory)錯誤。 -
檢查Kafka消費者組是否有Lag(未消費消息堆積)。
-
-
結果校驗
-
數據一致性:對比原始日志與HDFS落地文件的總記錄數是否一致。
-
延遲達標:95%的請求端到端延遲≤1秒。
-
關鍵測試工具與技巧
-
數據生成工具
-
Python Faker庫:生成模擬用戶ID、商品ID、時間戳等字段。
-
Apache Kafka Tools:
kafka-producer-perf-test
用于壓測。
-
-
自動化校驗腳本
-
使用
PySpark
對比處理前后數據差異:# 對比原始數據與處理后的Hive表數據量 raw_count = spark.read.json("hdfs://raw_logs").count() processed_count = spark.sql("SELECT COUNT(*) FROM user_behavior_clean").collect()[0][0] assert raw_count == processed_count, "數據丟失!"
-
-
日志追蹤
-
在日志中植入唯一標識(如
trace_id
),通過ELK(Elasticsearch+Logstash+Kibana)追蹤全鏈路處理過程。
-
總結
數據測試需要系統化驗證大數據處理鏈路的功能性、性能、容錯能力。實際工作中需結合業務需求補充場景(如冷啟動推薦、數據回溯測試等),并利用自動化框架(如Airflow調度測試任務)提升效率。