基于Spark的空氣質量數據分析可視化系統設計與實現
項目概述
本項目是一個基于Apache Spark的大數據分析和可視化系統,專門用于空氣質量數據的采集、分析、預測和可視化展示。系統采用分布式計算架構,結合機器學習算法,實現了對全國12個主要城市空氣質量數據的全面分析和預測功能。
項目背景與意義
隨著城市化進程的加快和工業化的快速發展,空氣質量問題日益成為公眾關注的焦點。傳統的空氣質量監測方式存在數據分散、分析效率低、可視化效果差等問題。本項目旨在構建一個完整的大數據分析和可視化平臺,為空氣質量監測提供科學、高效的技術解決方案。
項目目標
- 數據采集自動化:實現多城市空氣質量數據的自動采集和更新
- 大數據分析:利用Spark分布式計算能力,處理大規模空氣質量數據
- 智能預測:基于機器學習算法,預測空氣質量變化趨勢
- 可視化展示:提供直觀、交互式的數據可視化界面
- 系統集成:構建完整的數據科學流程,從采集到展示
項目特色
- 技術先進性:采用最新的Spark 3.x版本和機器學習技術
- 架構完整性:涵蓋數據采集、存儲、分析、預測、可視化的完整流程
- 擴展性強:支持新城市、新指標的快速接入
- 用戶友好:提供直觀的Web界面和豐富的交互功能
項目演示
項目演示視頻
91-基于Spark的空氣質量數據分析預測系統的設計與實現
技術架構
核心技術棧
后端框架
- Django 3.1.14 - 成熟的Python Web應用框架,提供完整的MVT架構
- Apache Spark 3.x - 分布式大數據處理引擎,支持內存計算和流處理
- PySpark - Spark的Python API接口,提供DataFrame和SQL操作
數據庫系統
- MySQL 8.0 - 主數據庫,存儲分析結果、用戶數據和系統配置
- Apache Hive - 數據倉庫,存儲原始空氣質量數據,支持SQL查詢
- SQLite - 本地開發數據庫,輕量級,便于開發和測試
數據科學與機器學習
- scikit-learn 1.3.2 - 機器學習算法庫,提供分類、回歸、聚類等算法
- pandas 1.4.3 - 數據處理和分析庫,提供DataFrame操作
- numpy 1.23.1 - 數值計算庫,提供高效的數組操作
- matplotlib 3.5.2 - 數據可視化庫,支持多種圖表類型
數據采集技術
- requests 2.31.0 - HTTP請求庫,支持GET/POST請求和會話管理
- BeautifulSoup4 4.12.3 - 網頁解析庫,支持HTML/XML解析
- lxml 4.9.3 - XML/HTML解析器,提供高性能的解析能力
前端技術
- Bootstrap 4.x - 響應式CSS框架,提供移動優先的設計
- ECharts 5.x - 數據可視化圖表庫,支持多種交互式圖表
- jQuery 3.x - JavaScript庫,簡化DOM操作和AJAX請求
- Material Design Icons - 現代化圖標系統,提供豐富的圖標資源
自然語言處理
- jieba 0.42.1 - 中文分詞庫,支持精確模式和全模式分詞
- wordcloud 1.8.2.2 - 詞云生成庫,支持自定義形狀和顏色
- snownlp 0.12.3 - 中文自然語言處理庫,提供情感分析等功能
大數據處理
- PyHive 0.7.0 - Hive連接器,支持Python連接Hive數據庫
- thrift 0.21.0 - 跨語言RPC框架,用于Hive連接
- mysqlclient 2.2.4 - MySQL數據庫驅動,提供高性能的數據庫連接
系統架構設計
整體架構圖
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 數據采集層 │ │ 數據存儲層 │ │ 數據處理層 │
│ │ │ │ │ │
│ ? 網絡爬蟲 │───?│ ? MySQL │───?│ ? Apache Spark │
│ ? 數據清洗 │ │ ? Hive │ │ ? PySpark │
│ ? 實時更新 │ │ ? SQLite │ │ ? 分布式計算 │
│ ? 數據驗證 │ │ ? 數據備份 │ │ ? 內存計算 │
└─────────────────┘ └─────────────────┘ └─────────────────┘│
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 機器學習層 │?───│ 數據分析層 │?───│ 數據展示層 │
│ │ │ │ │ │
│ ? 線性回歸 │ │ ? 統計分析 │ │ ? Web界面 │
│ ? 模型訓練 │ │ ? 趨勢分析 │ │ ? 圖表可視化 │
│ ? 預測算法 │ │ ? 關聯分析 │ │ ? 交互式展示 │
│ ? 模型評估 │ │ ? 聚類分析 │ │ ? 響應式設計 │
└─────────────────┘ └─────────────────┘ └─────────────────┘
數據流向圖
數據源網站 ──→ 爬蟲程序 ──→ 數據清洗 ──→ Hive數據倉庫│ │ │ │▼ ▼ ▼ ▼原始數據 結構化數據 清洗后數據 存儲數據│ │ │ │▼ ▼ ▼ ▼
Spark分析 ──→ 分析結果 ──→ MySQL存儲 ──→ Web展示│ │ │ │▼ ▼ ▼ ▼
機器學習 ──→ 預測模型 ──→ 預測結果 ──→ 可視化圖表
技術架構特點
1. 分層架構設計
- 表現層:Django Web框架,提供用戶界面和交互
- 業務邏輯層:Spark分布式計算,處理大規模數據分析
- 數據訪問層:多數據庫支持,實現數據的高效存儲和查詢
- 基礎設施層:Hadoop生態系統,提供分布式存儲和計算能力
2. 微服務架構思想
- 數據采集服務:獨立的爬蟲服務,支持多城市并行采集
- 數據分析服務:Spark集群服務,提供分布式計算能力
- 機器學習服務:獨立的模型訓練和預測服務
- Web展示服務:Django應用服務,提供用戶界面
3. 數據流設計
- 實時數據流:支持實時數據采集和處理
- 批量數據流:支持大規模歷史數據的批量分析
- 流式處理:支持數據流的實時處理和響應
核心功能模塊
1. 數據采集模塊
功能特點
- 多城市支持:支持12個主要城市的數據采集:北京、天津、上海、重慶、廣州、深圳、杭州、成都、沈陽、南京、長沙、南昌
- 歷史數據獲取:自動爬取歷史空氣質量數據,支持指定年份和月份的數據采集
- 完整指標采集:采集完整的空氣質量指標:AQI、PM2.5、PM10、SO2、NO2、CO、O3
- 數據質量控制:實現數據去重、清洗和格式標準化,確保數據質量
- 反爬蟲策略:采用隨機延時、User-Agent輪換等策略,避免被目標網站封禁
- 錯誤處理機制:完善的異常處理和重試機制,確保數據采集的穩定性
技術實現細節
1. 爬蟲架構設計
class AqiSpider:def __init__(self, cityname, realname):self.cityname = citynameself.realname = realname# 配置請求頭,模擬真實瀏覽器self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36","Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8","Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3","Accept-Encoding": "gzip, deflate","Connection": "keep-alive","Upgrade-Insecure-Requests": "1"}# 配置CSV文件寫入器self.f = open(f'data.csv', 'a', encoding='utf-8-sig', newline='')self.writer = csv.DictWriter(self.f, fieldnames=['city', 'date', 'airQuality', 'AQI', 'rank', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3'])
2. 數據解析策略
def parse_response(self, response):soup = BeautifulSoup(response, 'html.parser')tr = soup.find_all('tr')for j in tr[1:]: # 跳過表頭td = j.find_all('td')if len(td) < 10: # 數據完整性檢查continue# 數據提取和清洗Date = td[0].get_text().strip()Quality_level = td[1].get_text().strip()AQI = td[2].get_text().strip()AQI_rank = td[3].get_text().strip()PM25 = td[4].get_text().strip()PM10 = td[5].get_text().strip()SO2 = td[6].get_text().strip()NO2 = td[7].get_text().strip()CO = td[8].get_text().strip()O3 = td[9].get_text().strip()# 數據驗證和轉換data_dict = self.validate_and_convert_data({'city': self.realname,'date': Date,'airQuality': Quality_level,'AQI': AQI,'rank': AQI_rank,'PM2.5': PM25,'PM10': PM10,'So2': SO2,'No2': NO2,'Co': CO,'O3': O3,})if data_dict: # 只保存有效數據self.save_data(data_dict)
3. 數據驗證和清洗
def validate_and_convert_data(self, data_dict):"""數據驗證和類型轉換"""try:# 驗證日期格式if not self.is_valid_date(data_dict['date']):return None# 轉換數值類型numeric_fields = ['AQI', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']for field in numeric_fields:value = data_dict[field]if value and value != '—':try:data_dict[field] = float(value)except ValueError:data_dict[field] = 0.0else:data_dict[field] = 0.0# 驗證AQI范圍if data_dict['AQI'] < 0 or data_dict['AQI'] > 500:return Nonereturn data_dictexcept Exception as e:print(f"數據驗證失敗: {e}")return Nonedef is_valid_date(self, date_str):"""驗證日期格式"""try:from datetime import datetimedatetime.strptime(date_str, '%Y-%m-%d')return Trueexcept ValueError:return False
4. 錯誤處理和重試機制
def send_request_with_retry(self, year, month, max_retries=3):"""帶重試機制的請求發送"""for attempt in range(max_retries):try:url = f"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html"response = requests.get(url, headers=self.headers, timeout=60)if response.status_code == 200:return self.parse_response(response.text)else:print(f"請求失敗,狀態碼: {response.status_code}")except requests.exceptions.RequestException as e:print(f"請求異常 (嘗試 {attempt + 1}/{max_retries}): {e}")if attempt < max_retries - 1:time.sleep(2 ** attempt) # 指數退避else:print(f"請求失敗,已重試{max_retries}次")except Exception as e:print(f"未知錯誤: {e}")break
技術實現
import csv
import time
import requests
from bs4 import BeautifulSoupclass AqiSpider:def __init__(self, cityname, realname):self.cityname = citynameself.realname = realnameself.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"}self.f = open(f'data.csv', 'a', encoding='utf-8-sig', newline='')self.writer = csv.DictWriter(self.f, fieldnames=['city', 'date', 'airQuality', 'AQI', 'rank', 'PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3'])def send_request(self, year, month):url = f"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html"response = requests.get(url, headers=self.headers, timeout=60)time.sleep(2) # 避免請求過于頻繁print(f"響應狀態碼:{response.status_code}")return self.parse_response(response.text)def parse_response(self, response):soup = BeautifulSoup(response, 'html.parser')tr = soup.find_all('tr')for j in tr[1:]: # 跳過表頭td = j.find_all('td')Date = td[0].get_text().strip() # 日期Quality_level = td[1].get_text().strip() # 空氣質量等級AQI = td[2].get_text().strip()AQI_rank = td[3].get_text().strip()PM25 = td[4].get_text().strip()PM10 = td[5].get_text().strip()SO2 = td[6].get_text().strip()NO2 = td[7].get_text().strip()CO = td[8].get_text().strip()O3 = td[9].get_text().strip()data_dict = {'city': self.realname,'date': Date,'airQuality': Quality_level,'AQI': AQI,'rank': AQI_rank,'PM2.5': PM25,'PM10': PM10,'So2': SO2,'No2': NO2,'Co': CO,'O3': O3,}self.save_data(data_dict)def save_data(self, data_dict):self.writer.writerow(data_dict)def run(self):for month in range(1, 6):print(f"正在爬取2025年{month}月的數據")self.send_request(2025, month)if __name__ == '__main__':cityList = ['beijing', 'tianjin', 'shanghai', 'chongqing', 'guangzhou', 'shenzhen', 'hangzhou', 'chengdu', 'shenyang', 'nanjing','changsha', 'nanchang']nameList = ['北京', '天津', '上海', '重慶', '廣州', '深圳', '杭州', '成都', '沈陽', '南京', '長沙', '南昌']city_dict = dict(zip(cityList, nameList))for k, v in city_dict.items():AS = AqiSpider(k, v)AS.run()
2. Spark大數據分析模塊
分析維度與業務價值
1. 城市平均AQI分析
- 業務價值:識別空氣質量最佳和最差的城市,為城市環境治理提供數據支撐
- 分析指標:計算各城市平均空氣質量指數,進行城市排名
- 應用場景:城市環境評估、政策制定參考、公眾健康指導
2. 六項污染物分析
- 業務價值:全面了解各污染物的分布特征和貢獻度
- 分析指標:PM、PM10、SO2、NO2、CO、O3的統計分析和相關性研究
- 應用場景:污染物來源分析、治理效果評估、健康風險評估
3. 時間序列分析
- 業務價值:發現空氣質量的時間變化規律和季節性特征
- 分析指標:按年月分析AQI的最大值、最小值變化趨勢
- 應用場景:季節性污染預測、治理措施效果評估、公眾出行指導
4. 污染物分布分析
- 業務價值:了解污染物濃度的分布特征和超標情況
- 分析指標:統計不同濃度區間的污染物分布情況
- 應用場景:污染等級評估、預警閾值設定、治理目標制定
技術實現架構
1. Spark集群配置
# Spark會話配置
spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\config("spark.sql.shuffle.partitions", 2). \config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \config("hive.metastore.uris", "thrift://node1:9083"). \config("spark.sql.adaptive.enabled", "true"). \config("spark.sql.adaptive.coalescePartitions.enabled", "true"). \config("spark.sql.adaptive.skewJoin.enabled", "true"). \enableHiveSupport().\getOrCreate()# 性能優化配置
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
2. 數據預處理和清洗
def preprocess_air_data(spark):"""數據預處理和清洗"""# 讀取原始數據airdata = spark.read.table("airdata")# 數據類型轉換airdata = airdata.withColumn("date", airdata["date"].cast("date"))airdata = airdata.withColumn("AQI", airdata["AQI"].cast("double"))airdata = airdata.withColumn("PM", airdata["PM"].cast("double"))airdata = airdata.withColumn("PM10", airdata["PM10"].cast("double"))airdata = airdata.withColumn("So2", airdata["So2"].cast("double"))airdata = airdata.withColumn("No2", airdata["No2"].cast("double"))airdata = airdata.withColumn("Co", airdata["Co"].cast("double"))airdata = airdata.withColumn("O3", airdata["O3"].cast("double"))# 數據清洗:處理缺失值和異常值airdata = airdata.na.fill(0, subset=["AQI", "PM", "PM10", "So2", "No2", "Co", "O3"])# 異常值處理:AQI范圍檢查airdata = airdata.filter((col("AQI") >= 0) & (col("AQI") <= 500))return airdata
3. 高級分析功能
def advanced_analysis(airdata):"""高級分析功能"""# 1. 空氣質量等級分析airdata = airdata.withColumn("air_quality_level",when(col("AQI") <= 50, "優").when(col("AQI") <= 100, "良").when(col("AQI") <= 150, "輕度污染").when(col("AQI") <= 200, "中度污染").when(col("AQI") <= 300, "重度污染").otherwise("嚴重污染"))# 2. 污染物相關性分析correlation_analysis = airdata.select(corr("AQI", "PM").alias("AQI_PM_corr"),corr("AQI", "PM10").alias("AQI_PM10_corr"),corr("AQI", "So2").alias("AQI_So2_corr"),corr("AQI", "No2").alias("AQI_No2_corr"),corr("AQI", "Co").alias("AQI_Co_corr"),corr("AQI", "O3").alias("AQI_O3_corr"))# 3. 季節性分析seasonal_analysis = airdata.groupby(year("date").alias("year"),month("date").alias("month")).agg(avg("AQI").alias("avg_aqi"),stddev("AQI").alias("std_aqi"),count("*").alias("data_count")).orderBy("year", "month")# 4. 城市聚類分析city_clustering = airdata.groupby("city").agg(avg("AQI").alias("avg_aqi"),avg("PM").alias("avg_pm"),avg("PM10").alias("avg_pm10"),avg("So2").alias("avg_so2"),avg("No2").alias("avg_no2"),avg("Co").alias("avg_co"),avg("O3").alias("avg_o3"))return correlation_analysis, seasonal_analysis, city_clustering
4. 性能優化策略
def optimize_spark_performance(spark):"""Spark性能優化配置"""# 1. 內存優化spark.conf.set("spark.sql.adaptive.enabled", "true")spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")# 2. 緩存策略spark.conf.set("spark.sql.crossJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")# 3. 并行度優化spark.conf.set("spark.sql.shuffle.partitions", "200")spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")# 4. 數據傾斜處理spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
5. 結果存儲和導出
def save_analysis_results(results, spark):"""保存分析結果到不同存儲系統"""# 1. 保存到MySQLfor table_name, data in results.items():data.write.mode("overwrite"). \format("jdbc"). \option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \option("dbtable", table_name). \option("user", "root"). \option("password", "root"). \option("encoding", "utf-8"). \save()# 2. 保存到Hivefor table_name, data in results.items():data.write.mode("overwrite").saveAsTable(table_name, "parquet")# 3. 導出為CSV文件for table_name, data in results.items():data.toPandas().to_csv(f"results/{table_name}.csv", index=False, encoding='utf-8-sig')
Spark SQL實現示例
#coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import count, mean, col, sum, when, year, month, max, min, avg
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatTypeif __name__ == '__main__':# 構建Spark會話spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\config("spark.sql.shuffle.partitions", 2). \config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \config("hive.metastore.uris", "thrift://node1:9083"). \enableHiveSupport().\getOrCreate()sc = spark.sparkContext# 讀取數據airdata = spark.read.table("airdata")# 需求一:城市平均AQI分析result1 = airdata.groupby("city")\.agg(mean("AQI").alias("avg_AQI"))\.orderBy("avg_AQI", ascending=False)# 需求二:六項污染物分析result2 = airdata.groupby("city") \.agg(mean("PM").alias("avg_PM"),mean("PM10").alias("avg_PM10"),mean("So2").alias("avg_So2"),mean("No2").alias("avg_No2"),mean("Co").alias("avg_Co"),mean("O3").alias("avg_O3"))# 需求三:年度空氣質量趨勢分析airdata = airdata.withColumn("date", airdata["date"].cast("date"))result3 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month"))\.agg(max("AQI").alias("max_AQI"),min("AQI").alias("min_AQI"))# 需求四:月度PM污染物分析result4 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month")) \.agg(avg("PM").alias("max_PM"),avg("PM10").alias("min_PM10"))# 需求五:優質空氣天數統計result5 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month"))\.agg(count(when(airdata["AQI"] < 50, True)).alias("greatAirCount"))# 需求六:污染物最大值分析result6 = airdata.groupby("city")\.agg(max("So2").alias("max_So2"),max("No2").alias("max_No2"))# 需求七:CO濃度分級統計airdata = airdata.withColumn("Co_category",when((col("Co") >= 0) & (col("Co") < 0.25), '0-0.25').when((col("Co") >= 0.25) & (col("Co") < 0.5), '0.25-0.5').when((col("Co") >= 0.5) & (col("Co") < 0.75), '0.5-0.75').when((col("Co") >= 0.75) & (col("Co") < 1.0), '0.75-1').otherwise("1以上"))result7 = airdata.groupby("Co_category").agg(count('*').alias('Co_count'))# 需求八:O3濃度分級統計airdata = airdata.withColumn("O3_category",when((col("O3") >= 0) & (col("O3") < 25), '0-25').when((col("O3") >= 0.25) & (col("O3") < 50), '25-50').when((col("O3") >= 50) & (col("O3") < 75), '50-75').when((col("O3") >= 75) & (col("O3") < 100), '75-100').otherwise("100以上"))result8 = airdata.groupby("O3_category").agg(count('*').alias('O3_count'))# 保存結果到MySQLresult1.write.mode("overwrite"). \format("jdbc"). \option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \option("dbtable", "avgCityAqi"). \option("user", "root"). \option("password", "root"). \option("encoding", "utf-8"). \save()
3. 機器學習預測模塊
預測模型設計與業務價值
1. 模型選擇與理論基礎
- 線性回歸模型:基于污染物濃度與AQI的線性關系,適合快速預測
- 理論基礎:AQI計算公式中各項污染物都有對應的權重系數
- 業務價值:為環境監測和公眾健康提供實時預測服務
- 應用場景:空氣質量預警、出行建議、健康防護指導
2. 特征工程與數據預處理
- 特征選擇:PM2.5、SO2、NO2、O3作為主要預測特征
- 數據清洗:處理缺失值、異常值和數據標準化
- 特征重要性:通過模型分析各污染物對AQI的貢獻度
- 數據質量:確保訓練數據的準確性和完整性
3. 模型評估與優化
- 評估指標:R2、MAE、RMSE等指標評估模型性能
- 交叉驗證:使用K折交叉驗證確保模型泛化能力
- 超參數調優:通過網格搜索優化模型參數
- 模型監控:實時監控模型性能,及時更新模型
技術實現細節
1. 數據預處理與特征工程
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.ensemble import RandomForestRegressor
import joblibclass AirQualityPredictor:def __init__(self):self.scaler = StandardScaler()self.model = Noneself.feature_names = ['PM2_5', 'SO2', 'NO2', 'O3']def preprocess_data(self, data):"""數據預處理和特征工程"""# 1. 數據清洗data = data.dropna(subset=['AQI', 'PM2_5', 'SO2', 'NO2', 'O3'])# 2. 異常值處理data = data[(data['AQI'] >= 0) & (data['AQI'] <= 500)]data = data[(data['PM2_5'] >= 0) & (data['PM2_5'] <= 500)]data = data[(data['SO2'] >= 0) & (data['SO2'] <= 1000)]data = data[(data['NO2'] >= 0) & (data['NO2'] <= 400)]data = data[(data['O3'] >= 0) & (data['O3'] <= 300)]# 3. 特征工程# 添加時間特征data['date'] = pd.to_datetime(data['date'])data['year'] = data['date'].dt.yeardata['month'] = data['date'].dt.monthdata['day'] = data['date'].dt.daydata['day_of_week'] = data['date'].dt.dayofweek# 添加空氣質量等級特征level_map = {'優': 1, '良': 2, '輕度污染': 3, '中度污染': 4, '重度污染': 5, '嚴重污染': 6}data['level_numeric'] = data['level'].map(level_map).fillna(0)# 添加污染物交互特征data['PM_SO2_ratio'] = data['PM2_5'] / (data['SO2'] + 1)data['NO2_O3_ratio'] = data['NO2'] / (data['O3'] + 1)return datadef feature_selection(self, data):"""特征選擇"""# 基礎特征base_features = ['PM2_5', 'SO2', 'NO2', 'O3']# 時間特征time_features = ['year', 'month', 'day', 'day_of_week']# 交互特征interaction_features = ['PM_SO2_ratio', 'NO2_O3_ratio']# 組合所有特征all_features = base_features + time_features + interaction_featuresreturn data[all_features], all_features
2. 模型訓練與優化
def train_model(self, X_train, y_train):"""模型訓練與優化"""# 1. 數據標準化X_train_scaled = self.scaler.fit_transform(X_train)# 2. 基礎線性回歸lr_model = LinearRegression()lr_model.fit(X_train_scaled, y_train)# 3. 正則化模型ridge_model = Ridge(alpha=1.0)lasso_model = Lasso(alpha=0.1)# 4. 隨機森林模型rf_model = RandomForestRegressor(n_estimators=100, random_state=42)# 5. 模型評估models = {'LinearRegression': lr_model,'Ridge': ridge_model,'Lasso': lasso_model,'RandomForest': rf_model}best_model = Nonebest_score = -float('inf')for name, model in models.items():# 交叉驗證cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2')avg_score = cv_scores.mean()print(f"{name}: CV R2 Score = {avg_score:.4f} (+/- {cv_scores.std() * 2:.4f})")if avg_score > best_score:best_score = avg_scorebest_model = model# 訓練最佳模型best_model.fit(X_train_scaled, y_train)self.model = best_modelreturn best_model, best_scoredef hyperparameter_tuning(self, X_train, y_train):"""超參數調優"""# 定義參數網格param_grid = {'alpha': [0.001, 0.01, 0.1, 1.0, 10.0],'max_iter': [1000, 2000, 3000]}# 網格搜索grid_search = GridSearchCV(Ridge(),param_grid,cv=5,scoring='r2',n_jobs=-1)X_train_scaled = self.scaler.transform(X_train)grid_search.fit(X_train_scaled, y_train)print(f"最佳參數: {grid_search.best_params_}")print(f"最佳得分: {grid_search.best_score_:.4f}")return grid_search.best_estimator_
3. 模型評估與預測
def evaluate_model(self, X_test, y_test):"""模型評估"""X_test_scaled = self.scaler.transform(X_test)y_pred = self.model.predict(X_test_scaled)# 計算評估指標mse = mean_squared_error(y_test, y_pred)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)rmse = np.sqrt(mse)print(f"模型評估結果:")print(f"R2 Score: {r2:.4f}")print(f"MAE: {mae:.4f}")print(f"RMSE: {rmse:.4f}")print(f"MSE: {mse:.4f}")# 特征重要性分析if hasattr(self.model, 'feature_importances_'):feature_importance = pd.DataFrame({'feature': self.feature_names,'importance': self.model.feature_importances_}).sort_values('importance', ascending=False)print(f"\n特征重要性:")print(feature_importance)return {'r2': r2,'mae': mae,'rmse': rmse,'mse': mse,'predictions': y_pred}def predict_aqi(self, pm25, so2, no2, o3):"""實時AQI預測"""# 構建輸入數據input_data = np.array([[pm25, so2, no2, o3]])# 數據標準化input_scaled = self.scaler.transform(input_data)# 預測prediction = self.model.predict(input_scaled)[0]# 預測結果解釋aqi_level = self.get_aqi_level(prediction)return {'predicted_aqi': round(prediction, 2),'aqi_level': aqi_level,'confidence': self.get_prediction_confidence(input_scaled)}def get_aqi_level(self, aqi):"""獲取AQI等級"""if aqi <= 50:return "優"elif aqi <= 100:return "良"elif aqi <= 150:return "輕度污染"elif aqi <= 200:return "中度污染"elif aqi <= 300:return "重度污染"else:return "嚴重污染"def get_prediction_confidence(self, input_data):"""獲取預測置信度"""# 基于模型的不確定性估計if hasattr(self.model, 'predict_proba'):confidence = np.max(self.model.predict_proba(input_data))else:# 對于回歸模型,使用預測值的合理性作為置信度prediction = self.model.predict(input_data)[0]confidence = max(0, min(1, 1 - abs(prediction - 100) / 100))return round(confidence, 3)
4. 模型持久化與部署
def save_model(self, model_path):"""保存模型"""model_data = {'model': self.model,'scaler': self.scaler,'feature_names': self.feature_names}joblib.dump(model_data, model_path)print(f"模型已保存到: {model_path}")def load_model(self, model_path):"""加載模型"""model_data = joblib.load(model_path)self.model = model_data['model']self.scaler = model_data['scaler']self.feature_names = model_data['feature_names']print(f"模型已從 {model_path} 加載")def batch_predict(self, data_batch):"""批量預測"""predictions = []for data in data_batch:pred = self.predict_aqi(data['PM2_5'], data['SO2'], data['NO2'], data['O3'])predictions.append(pred)return predictions
模型實現
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from utils.query import querysdef getData():# 從數據庫獲取數據data = querys('select * from airData', [], 'select')print("數據庫返回的列數:", len(data[0]) if data else 0)print("第一行數據:", data[0] if data else None)# 轉換為DataFrame,根據實際數據調整列名products = pd.DataFrame(data, columns=['city', 'date', 'level', 'AQI', 'PM2_5', 'PM10', 'SO2', 'NO2', 'CO', 'O3', 'predict', 'extra'])# 處理空氣質量等級,將文本轉換為數值level_map = {'優': 1, '良': 2, '輕度污染': 3, '中度污染': 4, '重度污染': 5, '嚴重污染': 6}products['level'] = products['level'].map(level_map).fillna(0).astype('int')# 確保數值列為數值類型numeric_columns = ['AQI', 'PM2_5', 'PM10', 'SO2', 'NO2', 'CO', 'O3']for col in numeric_columns:products[col] = pd.to_numeric(products[col], errors='coerce').fillna(0).astype('float')return productsdef model_train(data):# 特征選擇:PM2.5、SO2、NO2、O3x_train, x_test, y_train, y_test = train_test_split(data[["PM2_5", "SO2", "NO2", "O3"]], data['AQI'],test_size=0.25, random_state=1)model = LinearRegression()model.fit(x_train, y_train)return modeldef model_predict(model, data):return model.predict(data)def pred(model, *args):# 將輸入參數轉換為numpy數組data = np.array([args]).reshape(1, -1)pred = model.predict(data)return predif __name__ == '__main__':trainData = getData()model = model_train(trainData)print(pred(model, 10, 2, 18, 30))
4. 數據可視化模塊
圖表類型
- 年度空氣質量分析圖表:展示年度AQI變化趨勢和季節性規律
- 月度分析圖表:按月展示空氣質量變化,識別污染高峰期
- 氣體分析圖表:六項污染物的對比分析,展示污染物相關性
- 城市分布圖表:各城市空氣質量對比,識別區域差異
- 數據詞云圖:空氣質量相關詞匯的可視化展示
圖表數據處理
def getIndexData(defaultCity):avgCityAqiList = list(getavgCityAqi())avgCitySixList = list(getavgCitySix())realSixList = querys('select * from avgCitySix where city = %s', [defaultCity], 'select')[0]yLine = list(realSixList[1:])yLine = [round(x, 1) for x in yLine]xLine = ['PM2.5', 'PM10', 'So2', 'No2', 'Co', 'O3']xBar = [x[0] for x in avgCityAqiList]yBar = [round(x[1], 0) for x in avgCityAqiList]return xBar, yBar, xLine, yLinedef getYearChartData(city):maxCityAqiList = querys('select * from maxCityAqi where city = %s', [city], 'select')y1DataM = [x[3] if x[3] is not None else 0 for x in maxCityAqiList]y1DataN = [x[4] if x[4] is not None else 0 for x in maxCityAqiList]xData = []for i in range(1, 13):xData.append(str(i) + '月')avgCityPMList = querys('select * from avgCityPM where city = %s', [city], 'select')y2Data2 = [round(x[3], 1) if x[3] is not None else 0 for x in avgCityPMList]y2Data10 = [round(x[4], 1) if x[4] is not None else 0 for x in avgCityPMList]return xData, y1DataM, y1DataN, y2Data2, y2Data10def getAirMonthData(city, month):airDataList = querys('select * from airData where city = %s and SUBSTRING(date,6,2) = %s', [city, month], 'select')# 按日期排序,確保日期格式正確airDataList = sorted(airDataList, key=lambda x: int(x[1].split('-')[2]))y1Data = [x[3] if x[3] is not None else 0 for x in airDataList]y2Data = [int(x[4]) if x[4] is not None else 0 for x in airDataList]dateList = [x[1] for x in airDataList]xData = []for date in dateList:year, month, day = date.split('-')xData.append(day)greatAirList = querys('select * from greatAir where city = %s', [city], 'select')funnelData = []for i in greatAirList:funnelData.append({'name': str(i[2]) + '月','value': i[3] if i[3] is not None else 0})return xData, y1Data, y2Data, funnelData
ECharts實現示例
// 年度AQI趨勢圖
var option = {title: { text: '年度空氣質量趨勢分析' },tooltip: { trigger: 'axis' },legend: {},toolbox: {show: true,feature: {dataZoom: { yAxisIndex: 'none' },dataView: { readOnly: false },magicType: { type: ['line', 'bar'] },restore: {},saveAsImage: {}}},xAxis: { type: 'category', boundaryGap: false,data: {{ xData | safe }}},yAxis: { type: 'value',axisLabel: { formatter: '{value} ' }},series: [{name: 'So2',type: 'line',data: {{ y1Data }},markPoint: {data: [{ type: 'max', name: 'Max' },{ type: 'min', name: 'Min' }]},markLine: {data: [{ type: 'average', name: 'Avg' }]}},{name: 'No2',type: 'line',data: {{ y2Data }},markPoint: {data: [{ name: '周最低', value: -2, xAxis: 1, yAxis: -1.5 }]},markLine: {data: [{ type: 'average', name: 'Avg' },[{symbol: 'none',x: '90%',yAxis: 'max'},{symbol: 'circle',label: {position: 'start',formatter: 'Max'},type: 'max',name: '最高點'}]]}}]
};
5. Web用戶界面模塊
界面功能
- 用戶注冊登錄系統
- 個人信息管理
- 數據總覽表格展示
- 交互式日期選擇器
- 響應式設計,支持多設備訪問
數據模型設計
from django.db import modelsclass User(models.Model):id = models.AutoField("id", primary_key=True)username = models.CharField("用戶名", max_length=255, default='')password = models.CharField("密碼", max_length=255, default='')creteTime = models.DateField("創建時間", auto_now_add=True)class Meta:db_table = 'user'
數據庫查詢工具
from pyhive import hive
import time
from thrift.Thrift import TApplicationExceptiondef get_connection():try:return hive.Connection(host='node1', port='10000', username='hadoop')except Exception as e:print(f"連接Hive失敗: {str(e)}")return Nonedef querys(sql, params, type='no_select', max_retries=3):retry_count = 0while retry_count < max_retries:try:conn = get_connection()if not conn:raise Exception("無法建立Hive連接")cursor = conn.cursor()params = tuple(params)cursor.execute(sql, params)if type != 'no_select':data_list = cursor.fetchall()conn.commit()return data_listelse:conn.commit()return '數據庫語句執行成功'except TApplicationException as e:print(f"查詢執行失敗 (嘗試 {retry_count + 1}/{max_retries}): {str(e)}")retry_count += 1if retry_count == max_retries:raise Exception(f"查詢執行失敗,已重試{max_retries}次: {str(e)}")time.sleep(2) # 等待2秒后重試except Exception as e:print(f"發生錯誤: {str(e)}")raisefinally:try:if 'cursor' in locals():cursor.close()if 'conn' in locals():conn.close()except:pass
Django視圖實現
def index(request):uname = request.session.get('username')userInfo = User.objects.get(username=uname)airdataList = list(getairdata())dateList = list(set([x[1] for x in airdataList]))# 提取年月日數據yearList = []monthList = []dayList = []for date in dateList:year, month, day = date.split('-')yearList.append(year)monthList.append(month)dayList.append(day)yearList = sorted(set(yearList))monthList = sorted(set(monthList))dayList = sorted(set(dayList))defaultYear = '2023'defaultMonth = '01'defaultDay = '01'cityList = list(set(x[0] for x in airdataList))if request.method == 'POST':defaultYear = request.POST.get('defaultYear')defaultMonth = request.POST.get('defaultMonth')defaultDay = request.POST.get('defaultDay')defaultCity = request.session.get('defaultCity')currentDate = defaultYear + '-' + defaultMonth + '-' + defaultDay# 獲取圖表數據xBar, yBar, xLine, yLine = getIndexData(defaultCity)currentData = querys('select * from airdata where city = %s and date = %s',[defaultCity, currentDate], 'select')[0]currentData = convert_none_to_null(currentData)return render(request, 'index.html', {'userInfo': userInfo,'yearList': yearList,'monthList': monthList,'dayList': dayList,'defaultYear': defaultYear,'defaultMonth': defaultMonth,'defaultDay': defaultDay,'cityList': cityList,'defaultCity': defaultCity,'currentData': currentData,'xBar': convert_none_to_null(xBar),'yBar': convert_none_to_null(yBar),'xLine': convert_none_to_null(xLine),'yLine': convert_none_to_null(yLine)})
系統特色功能
1. 分布式數據處理
- 利用Spark的分布式計算能力,處理大規模空氣質量數據
- 支持實時數據流處理和批量數據分析
- 實現數據并行計算,提高處理效率
2. 智能預測分析
- 基于歷史數據訓練機器學習模型
- 支持多維度特征工程和模型優化
- 提供預測結果的可信度評估
3. 多維度數據可視化
- 支持多種圖表類型:折線圖、柱狀圖、散點圖、熱力圖等
- 實現交互式數據探索和動態篩選
- 提供圖表導出和分享功能
4. 實時數據更新
- 定時任務自動更新空氣質量數據
- 支持增量數據同步和全量數據更新
- 確保數據的時效性和準確性
項目部署與運維
環境配置
# 安裝Python依賴
pip install -r requirements.txt# 配置數據庫
mysql -u root -p < design_91_airdata.sql# 啟動Spark集群
spark-submit --master local[*] sparks/sparkAna.py# 啟動Django應用
python manage.py runserver
系統要求
- Python 3.8+
- Apache Spark 3.0+
- MySQL 8.0+
- 內存:8GB+
- 存儲:100GB+
性能優化
1. 數據處理優化
- 使用Spark的緩存機制減少重復計算
- 優化SQL查詢,使用索引提高查詢效率
- 實現數據分區,提高并行處理能力
2. 前端性能優化
- 使用CDN加速靜態資源加載
- 實現圖表懶加載和分頁顯示
- 優化JavaScript代碼,減少頁面加載時間
3. 數據庫優化
- 建立合適的索引提高查詢速度
- 使用連接池管理數據庫連接
- 定期清理歷史數據,保持數據庫性能
項目成果
1. 技術成果
- 成功構建了完整的大數據分析和可視化系統
- 實現了從數據采集到預測分析的完整流程
- 建立了可擴展的系統架構,支持功能擴展
2. 業務價值
- 為空氣質量監測提供了科學的數據分析工具
- 支持多城市空氣質量對比和趨勢分析
- 為環境決策提供了數據支撐
3. 用戶體驗
- 提供了直觀友好的用戶界面
- 支持多維度數據探索和可視化
- 實現了響應式設計,適配多種設備
未來展望
1. 功能擴展
- 增加更多城市的空氣質量數據
- 支持更多污染物指標的監測
- 集成更多機器學習算法
2. 技術升級
- 升級到Spark 3.x版本,利用新特性
- 引入深度學習模型提高預測精度
- 實現實時數據流處理
3. 用戶體驗優化
- 開發移動端APP
- 增加個性化推薦功能
- 提供API接口供第三方調用
總結
本項目成功構建了一個基于Spark的空氣質量數據分析可視化系統,實現了從數據采集、存儲、分析、預測到可視化的完整數據科學流程。系統采用現代化的技術棧,具備良好的可擴展性和維護性,為空氣質量監測和分析提供了強有力的技術支撐。
通過本項目的開發,不僅掌握了大數據處理、機器學習、Web開發等多項技術,更重要的是理解了如何將復雜的技術棧整合成一個完整的業務系統,為用戶提供有價值的服務。
聯系方式:如需源碼等情況,歡迎主頁聯系
許可證:MIT License