文章目錄
- 基于用戶的協同過濾推薦系統實戰項目
- 1. 推薦系統基礎理論
- 1.1 協同過濾概述
- 1.2 基于用戶的協同過濾原理
- 1.3 相似度計算方法
- 1.3.1 余弦相似度(Cosine Similarity)
- 1.3.2 皮爾遜相關系數(Pearson Correlation)
- 1.3.3 歐幾里得距離(Euclidean Distance)
- 1.3.4 調整余弦相似度(Adjusted Cosine Similarity)
- 1.4 評分預測方法
- 1.4.1 簡單加權平均
- 1.4.2 考慮用戶評分偏置的加權平均
- 1.5 評估指標
- 1.5.1 均方根誤差(RMSE)
- 1.5.2 平均絕對誤差(MAE)
- 1.6 TopN推薦
- 1.6.1 精確率(Precision)和召回率(Recall)
- 1.6.2 F1分數
- 2. 項目介紹
- 2.1 數據集介紹
- 2.2 項目架構設計
- 2.2.1 模塊化架構
- 3. 項目實施步驟
- 3.1 數據獲取與探索
- 3.2 數據加載模塊實現
- 3.3 構建評分矩陣和數據集拆分
- 3.4 日志系統設計
- 3.5 相似度計算模塊
- 3.6 評分預測模塊
- 3.7 推薦生成模塊
- 3.8 評估模塊
- 3.9 完整推薦系統模型
- 3.10 命令行接口設計
- 3.11 模型序列化與反序列化
- 3.12 Web應用開發
- 3.13 性能優化與錯誤處理
- 5. 模塊化設計的優勢
- 6. 結論與未來工作
- 7. 參考資料
基于用戶的協同過濾推薦系統實戰項目
1. 推薦系統基礎理論
1.1 協同過濾概述
協同過濾(Collaborative Filtering, CF)是推薦系統中最經典、應用最廣泛的技術之一。其核心思想是利用群體的智慧來進行推薦,基于"相似的用戶喜歡相似的物品"或"喜歡某物品的用戶也喜歡其他相似物品"的假設。
協同過濾主要分為兩大類:
- 基于記憶的協同過濾(Memory-based CF):直接使用用戶-物品交互數據進行推薦,包括基于用戶的協同過濾(User-based CF)和基于物品的協同過濾(Item-based CF)。
- 基于模型的協同過濾(Model-based CF):通過機器學習算法從數據中學習模型,如矩陣分解(Matrix Factorization)、奇異值分解(SVD)等。
本項目主要聚焦于基于用戶的協同過濾。
1.2 基于用戶的協同過濾原理
基于用戶的協同過濾的工作流程如下:
- 構建用戶-物品評分矩陣:每行代表一個用戶,每列代表一個物品,矩陣中的元素表示用戶對物品的評分。
- 計算用戶相似度:尋找與目標用戶具有相似品味的用戶群體。
- 預測評分:基于相似用戶的評分,預測目標用戶對未評分物品的可能評分。
- 生成推薦:為用戶推薦評分最高的未接觸物品。
1.3 相似度計算方法
在協同過濾中,常用的相似度計算方法包括:
1.3.1 余弦相似度(Cosine Similarity)
余弦相似度計算兩個向量之間夾角的余弦值,范圍從-1到1,值越大表示越相似。
對于用戶 u u u和用戶 v v v,其余弦相似度計算公式為:
s i m c o s ( u , v ) = ∑ i ∈ I u v r u i ? r v i ∑ i ∈ I u r u i 2 ? ∑ i ∈ I v r v i 2 sim_{cos}(u, v) = \frac{\sum_{i \in I_{uv}} r_{ui} \cdot r_{vi}}{\sqrt{\sum_{i \in I_{u}} r_{ui}^2} \cdot \sqrt{\sum_{i \in I_{v}} r_{vi}^2}} simcos?(u,v)=∑i∈Iu??rui2???∑i∈Iv??rvi2??∑i∈Iuv??rui??rvi??
其中:
- I u v I_{uv} Iuv?是用戶 u u u和用戶 v v v共同評分的物品集合
- r u i r_{ui} rui?是用戶 u u u對物品 i i i的評分
- r v i r_{vi} rvi?是用戶 v v v對物品 i i i的評分
- I u I_u Iu?是用戶 u u u評分的所有物品集合
- I v I_v Iv?是用戶 v v v評分的所有物品集合
在Python中,可以使用sklearn.metrics.pairwise
中的cosine_similarity
或1 - pairwise_distances(X, metric='cosine')
計算余弦相似度。
1.3.2 皮爾遜相關系數(Pearson Correlation)
皮爾遜相關系數衡量兩個變量之間的線性相關性,范圍從-1到1,也稱為"皮爾遜積矩相關系數"。
計算公式為:
s i m p e a r s o n ( u , v ) = ∑ i ∈ I u v ( r u i ? r ˉ u ) ? ( r v i ? r ˉ v ) ∑ i ∈ I u v ( r u i ? r ˉ u ) 2 ? ∑ i ∈ I u v ( r v i ? r ˉ v ) 2 sim_{pearson}(u, v) = \frac{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u) \cdot (r_{vi} - \bar{r}_v)}{\sqrt{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u)^2} \cdot \sqrt{\sum_{i \in I_{uv}} (r_{vi} - \bar{r}_v)^2}} simpearson?(u,v)=∑i∈Iuv??(rui??rˉu?)2??∑i∈Iuv??(rvi??rˉv?)2?∑i∈Iuv??(rui??rˉu?)?(rvi??rˉv?)?
其中:
- r ˉ u \bar{r}_u rˉu?是用戶 u u u的平均評分
- r ˉ v \bar{r}_v rˉv?是用戶 v v v的平均評分
皮爾遜相關系數考慮了用戶評分的偏置(bias),能更好地處理用戶評分標準不同的情況(有些用戶傾向于給高分,有些用戶傾向于給低分)。
在Python中,可以使用numpy.corrcoef()
計算皮爾遜相關系數。
1.3.3 歐幾里得距離(Euclidean Distance)
歐幾里得距離直接計算兩個向量在空間中的距離,距離越小表示越相似。
計算公式為:
d i s t a n c e e u c l i d e a n ( u , v ) = ∑ i ∈ I u v ( r u i ? r v i ) 2 distance_{euclidean}(u, v) = \sqrt{\sum_{i \in I_{uv}} (r_{ui} - r_{vi})^2} distanceeuclidean?(u,v)=∑i∈Iuv??(rui??rvi?)2?
為了將距離轉換為相似度,通常使用以下變換:
s i m e u c l i d e a n ( u , v ) = 1 1 + d i s t a n c e e u c l i d e a n ( u , v ) sim_{euclidean}(u, v) = \frac{1}{1 + distance_{euclidean}(u, v)} simeuclidean?(u,v)=1+distanceeuclidean?(u,v)1?
在Python中,可以使用sklearn.metrics.pairwise
中的euclidean_distances
計算歐幾里得距離。
1.3.4 調整余弦相似度(Adjusted Cosine Similarity)
調整余弦相似度在計算前先減去用戶的平均評分,解決了用戶評分標準不一致的問題:
s i m a d j _ c o s ( u , v ) = ∑ i ∈ I u v ( r u i ? r ˉ u ) ? ( r v i ? r ˉ v ) ∑ i ∈ I u v ( r u i ? r ˉ u ) 2 ? ∑ i ∈ I u v ( r v i ? r ˉ v ) 2 sim_{adj\_cos}(u, v) = \frac{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u) \cdot (r_{vi} - \bar{r}_v)}{\sqrt{\sum_{i \in I_{uv}} (r_{ui} - \bar{r}_u)^2} \cdot \sqrt{\sum_{i \in I_{uv}} (r_{vi} - \bar{r}_v)^2}} simadj_cos?(u,v)=∑i∈Iuv??(rui??rˉu?)2??∑i∈Iuv??(rvi??rˉv?)2?∑i∈Iuv??(rui??rˉu?)?(rvi??rˉv?)?
1.4 評分預測方法
在確定用戶相似度后,需要預測目標用戶對未評分物品的可能評分。傳統的基于用戶的協同過濾通常采用加權平均的方式進行預測。
1.4.1 簡單加權平均
r ^ u i = ∑ v ∈ N u ( i ) s i m ( u , v ) ? r v i ∑ v ∈ N u ( i ) ∣ s i m ( u , v ) ∣ \hat{r}_{ui} = \frac{\sum_{v \in N_u(i)} sim(u, v) \cdot r_{vi}}{\sum_{v \in N_u(i)} |sim(u, v)|} r^ui?=∑v∈Nu?(i)?∣sim(u,v)∣∑v∈Nu?(i)?sim(u,v)?rvi??
其中:
- r ^ u i \hat{r}_{ui} r^ui?是預測的用戶 u u u對物品 i i i的評分
- N u ( i ) N_u(i) Nu?(i)是與用戶 u u u相似且評價過物品 i i i的用戶集合
- s i m ( u , v ) sim(u, v) sim(u,v)是用戶 u u u與用戶 v v v的相似度
- r v i r_{vi} rvi?是用戶 v v v對物品 i i i的實際評分
1.4.2 考慮用戶評分偏置的加權平均
為了解決不同用戶評分標準不同的問題,可以使用考慮評分偏置的改進公式:
r ^ u i = r ˉ u + ∑ v ∈ N u ( i ) s i m ( u , v ) ? ( r v i ? r ˉ v ) ∑ v ∈ N u ( i ) ∣ s i m ( u , v ) ∣ \hat{r}_{ui} = \bar{r}_u + \frac{\sum_{v \in N_u(i)} sim(u, v) \cdot (r_{vi} - \bar{r}_v)}{\sum_{v \in N_u(i)} |sim(u, v)|} r^ui?=rˉu?+∑v∈Nu?(i)?∣sim(u,v)∣∑v∈Nu?(i)?sim(u,v)?(rvi??rˉv?)?
其中:
- r ˉ u \bar{r}_u rˉu?是用戶 u u u的平均評分
- r ˉ v \bar{r}_v rˉv?是用戶 v v v的平均評分
這種方法不直接使用原始評分,而是使用評分與用戶平均評分的偏差,能夠更好地處理用戶評分偏好不同的情況。
1.5 評估指標
推薦系統的性能評估通常使用以下指標:
1.5.1 均方根誤差(RMSE)
RMSE是預測評分與實際評分之間差異的平方平均的平方根,值越小表示預測越準確。
R M S E = 1 ∣ T ∣ ∑ ( u , i ) ∈ T ( r ^ u i ? r u i ) 2 RMSE = \sqrt{\frac{1}{|T|} \sum_{(u,i) \in T} (\hat{r}_{ui} - r_{ui})^2} RMSE=∣T∣1?∑(u,i)∈T?(r^ui??rui?)2?
其中:
- T T T是測試集中的用戶-物品對集合
- r ^ u i \hat{r}_{ui} r^ui?是預測的評分
- r u i r_{ui} rui?是實際評分
1.5.2 平均絕對誤差(MAE)
MAE是預測評分與實際評分之間絕對差值的平均,同樣值越小表示預測越準確。
M A E = 1 ∣ T ∣ ∑ ( u , i ) ∈ T ∣ r ^ u i ? r u i ∣ MAE = \frac{1}{|T|} \sum_{(u,i) \in T} |\hat{r}_{ui} - r_{ui}| MAE=∣T∣1?∑(u,i)∈T?∣r^ui??rui?∣
與RMSE相比,MAE對異常值的敏感度較低,因此兩個指標通常一起使用,以全面評估推薦系統的性能。
1.6 TopN推薦
在實際應用中,我們通常不僅關注評分預測的準確度,還關注能否為用戶推薦最適合的N個物品,稱為TopN推薦。為此,可以使用額外的評估指標:
1.6.1 精確率(Precision)和召回率(Recall)
-
精確率:推薦的物品中實際相關的比例
P r e c i s i o n @ k = ∣ 推薦列表 ∩ 相關物品 ∣ ∣ 推薦列表 ∣ Precision@k = \frac{|推薦列表 \cap 相關物品|}{|推薦列表|} Precision@k=∣推薦列表∣∣推薦列表∩相關物品∣?
-
召回率:實際相關物品中被成功推薦的比例
R e c a l l @ k = ∣ 推薦列表 ∩ 相關物品 ∣ ∣ 相關物品 ∣ Recall@k = \frac{|推薦列表 \cap 相關物品|}{|相關物品|} Recall@k=∣相關物品∣∣推薦列表∩相關物品∣?
1.6.2 F1分數
F1分數是精確率和召回率的調和平均數,綜合考慮兩個指標。
F 1 = 2 ? P r e c i s i o n ? R e c a l l P r e c i s i o n + R e c a l l F1 = 2 \cdot \frac{Precision \cdot Recall}{Precision + Recall} F1=2?Precision+RecallPrecision?Recall?
2. 項目介紹
協同過濾是推薦系統的核心算法之一,它基于用戶行為數據來推薦物品。基于用戶的協同過濾(User-Based Collaborative Filtering)算法通過尋找與目標用戶相似的用戶群體,然后推薦這些相似用戶喜歡但目標用戶尚未接觸的物品。gitcode
本項目基于以下經典論文:
- Resnick P, Iacovou N, Suchak M, et al. Grouplens: An open architecture for collaborative filtering of netnews[C]//Proceedings of the 1994 ACM conference on Computer supported cooperative work. 1994: 175-186. https://doi.org/10.1145/192844.192905
- Breese J S, Heckerman D, Kadie C. Empirical analysis of predictive algorithms for collaborative filtering[J]. arXiv preprint arXiv:1301.7363, 2013. https://doi.org/10.48550/arXiv.1301.7363
2.1 數據集介紹
本項目使用經典的MovieLens 100K數據集,該數據集包含:
- 943個用戶
- 1682部電影
- 10萬條評分數據(1-5分)
- 用戶的人口統計學特征
- 電影的類型信息
2.2 項目架構設計
為了構建一個可維護、可擴展的推薦系統,本項目采用模塊化設計,將核心功能按照職責分散到不同模塊中。
2.2.1 模塊化架構
項目由以下核心模塊組成:
- 數據加載模塊(data_loader.py):負責數據讀取、預處理和矩陣構建
- 相似度計算模塊(similarity.py):實現多種相似度計算方法
- 評分預測模塊(prediction.py):實現評分預測算法
- 推薦生成模塊(recommendation.py):生成個性化推薦列表
- 評估模塊(evaluation.py):實現多種評估指標計算
- 日志模塊(logger.py):管理系統日志記錄
- 主模型模塊(model.py):集成各個組件,提供統一接口
- 命令行接口(main.py):提供命令行交互功能
- Web應用(app.py):提供Web界面
3. 項目實施步驟
3.1 數據獲取與探索
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error
from sklearn.metrics.pairwise import pairwise_distances
from scipy.spatial.distance import cosine
import warnings
warnings.filterwarnings('ignore')# 設置隨機種子保證結果可重復
np.random.seed(42)# 加載數據集
# 從 https://grouplens.org/datasets/movielens/100k/ 下載
# 或使用以下代碼自動下載
!wget -nc http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -n ml-100k.zip# 加載用戶評分數據
column_names = ['user_id', 'item_id', 'rating', 'timestamp']
df = pd.read_csv('ml-100k/u.data', sep='\t', names=column_names)# 加載電影信息
movies_column_names = ['movie_id', 'movie_title', 'release_date', 'video_release_date','IMDb_URL', 'unknown', 'Action', 'Adventure', 'Animation','Children', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy','Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi','Thriller', 'War', 'Western']
movies_df = pd.read_csv('ml-100k/u.item', sep='|', names=movies_column_names, encoding='latin-1')# 查看數據集基本信息
print(f"評分數據集形狀: {df.shape}")
print(f"電影數據集形狀: {movies_df.shape}")
print("\n評分數據預覽:")
print(df.head())
print("\n電影數據預覽:")
print(movies_df[['movie_id', 'movie_title', 'release_date']].head())
3.2 數據加載模塊實現
我們將數據加載和處理功能封裝在DataLoader
類中,提供以下功能:
- 從文件加載數據
- 構建用戶-物品評分矩陣
- 劃分訓練集和測試集
- 計算數據集統計信息
class DataLoader:"""負責加載和預處理MovieLens數據集的類"""def __init__(self, data_path="data/ml-100k"):"""初始化DataLoader并設置數據路徑"""self.data_path = data_pathself.ratings_df = Noneself.movies_df = Noneself.n_users = Noneself.n_items = Noneself.ratings_matrix = Noneself.train_data = Noneself.test_data = Nonedef load_data(self):"""加載評分數據和電影數據"""# 檢查數據路徑if not os.path.exists(self.data_path):raise FileNotFoundError(f"數據目錄 {self.data_path} 不存在")# 加載評分數據ratings_file = os.path.join(self.data_path, "u.data")if not os.path.exists(ratings_file):raise FileNotFoundError(f"評分文件 {ratings_file} 不存在")column_names = ["user_id", "item_id", "rating", "timestamp"]self.ratings_df = pd.read_csv(ratings_file, sep="\t", names=column_names)# 加載電影數據movies_file = os.path.join(self.data_path, "u.item")if not os.path.exists(movies_file):raise FileNotFoundError(f"電影文件 {movies_file} 不存在")movies_column_names = ["movie_id", "movie_title", "release_date", "video_release_date","IMDb_URL", "unknown", "Action", "Adventure", "Animation","Children", "Comedy", "Crime", "Documentary", "Drama", "Fantasy","Film-Noir", "Horror", "Musical", "Mystery", "Romance", "Sci-Fi","Thriller", "War", "Western"]self.movies_df = pd.read_csv(movies_file, sep="|", names=movies_column_names, encoding="latin-1")# 獲取用戶和電影數量self.n_users = self.ratings_df["user_id"].max()self.n_items = self.ratings_df["item_id"].max()return self.ratings_df, self.movies_df
3.3 構建評分矩陣和數據集拆分
def create_matrix(self):"""創建用戶-物品評分矩陣"""if self.ratings_df is None:self.load_data()# 創建矩陣self.ratings_matrix = np.zeros((self.n_users, self.n_items))# 填充評分for row in self.ratings_df.itertuples():# 調整為0-based索引self.ratings_matrix[row.user_id-1, row.item_id-1] = row.ratingreturn self.ratings_matrixdef split_data(self, test_size=0.2, random_state=42):"""將數據劃分為訓練集和測試集"""if self.ratings_df is None:self.load_data()from sklearn.model_selection import train_test_split# 劃分數據self.train_data, self.test_data = train_test_split(self.ratings_df, test_size=test_size, random_state=random_state)# 創建訓練集矩陣self.train_matrix = np.zeros((self.n_users, self.n_items))for row in self.train_data.itertuples():self.train_matrix[row.user_id-1, row.item_id-1] = row.rating# 創建測試集矩陣self.test_matrix = np.zeros((self.n_users, self.n_items))for row in self.test_data.itertuples():self.test_matrix[row.user_id-1, row.item_id-1] = row.ratingreturn self.train_data, self.test_data, self.train_matrix, self.test_matrix
3.4 日志系統設計
為了跟蹤系統運行狀態、性能和錯誤,我們實現了一個靈活的日志模塊:
import os
import logging
from logging.handlers import RotatingFileHandler
import timedef setup_logger(log_dir="results/logs", log_level=logging.INFO, silent=False):"""配置并返回推薦系統的日志記錄器參數:log_dir: 日志文件存儲目錄log_level: 日志級別silent: 是否靜默模式(不記錄日志)返回:logger: 配置好的日志記錄器"""# 創建日志記錄器logger = logging.getLogger("recommendation_system")logger.setLevel(log_level)# 清除已有的處理器(避免重復日志)if logger.handlers:logger.handlers = []if silent:# 添加NullHandler以防止"找不到處理器"警告logger.addHandler(logging.NullHandler())else:# 創建日志目錄os.makedirs(log_dir, exist_ok=True)# 使用時間戳創建唯一的日志文件名timestamp = time.strftime("%Y%m%d-%H%M%S")log_file = os.path.join(log_dir, f"recommender_{timestamp}.log")# 創建文件處理器(每文件最大10MB,保留5個備份)file_handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5, encoding="utf-8")file_handler.setLevel(log_level)# 創建控制臺處理器console_handler = logging.StreamHandler()console_handler.setLevel(log_level)# 創建格式化器并添加到處理器formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)# 將處理器添加到記錄器logger.addHandler(file_handler)logger.addHandler(console_handler)return loggerdef get_logger():"""獲取推薦系統日志記錄器"""logger = logging.getLogger("recommendation_system")# 如果記錄器沒有處理器,設置一個默認的靜默記錄器if not logger.handlers:return setup_logger(silent=True)return logger
3.5 相似度計算模塊
def calculate_similarity(ratings_matrix, method="cosine"):"""計算用戶相似度矩陣參數:ratings_matrix: 用戶-物品評分矩陣method: 相似度計算方法('cosine', 'pearson', 'euclidean', 'adjusted_cosine')返回:similarity: 用戶相似度矩陣"""if method == "cosine":return cosine_similarity(ratings_matrix)elif method == "pearson":return pearson_similarity(ratings_matrix)elif method == "euclidean":return euclidean_similarity(ratings_matrix)elif method == "adjusted_cosine":return adjusted_cosine_similarity(ratings_matrix)else:raise ValueError(f"未知的相似度計算方法: {method}")
每種相似度計算方法的具體實現:
def cosine_similarity(ratings_matrix):"""計算余弦相似度"""similarity = 1 - pairwise_distances(ratings_matrix, metric="cosine", n_jobs=-1)similarity = np.nan_to_num(similarity) # 處理NaN值return similaritydef pearson_similarity(ratings_matrix):"""計算皮爾遜相關系數"""similarity = np.corrcoef(ratings_matrix)similarity = np.nan_to_num(similarity) # 處理NaN值return similaritydef euclidean_similarity(ratings_matrix):"""計算基于歐幾里得距離的相似度"""distances = pairwise_distances(ratings_matrix, metric="euclidean", n_jobs=-1)similarity = 1 / (1 + distances) # 轉換距離為相似度return similaritydef adjusted_cosine_similarity(ratings_matrix):"""計算調整后的余弦相似度"""# 獲取非零元素以計算平均值rated_mask = ratings_matrix != 0# 計算用戶平均評分(僅考慮已評分項目)user_means = np.sum(ratings_matrix, axis=1) / np.sum(rated_mask, axis=1)# 通過減去用戶平均值歸一化評分(僅對已評分項目)normalized_matrix = np.zeros_like(ratings_matrix)for i, (user_ratings, mask, mean) in enumerate(zip(ratings_matrix, rated_mask, user_means)):normalized_matrix[i, mask] = user_ratings[mask] - mean# 計算余弦相似度similarity = 1 - pairwise_distances(normalized_matrix, metric="cosine", n_jobs=-1)# 處理NaN值similarity = np.nan_to_num(similarity)return similarity
3.6 評分預測模塊
def predict_ratings(ratings_matrix, similarity_matrix, method="bias_weighted", k=10):"""預測用戶對物品的評分參數:ratings_matrix: 用戶-物品評分矩陣similarity_matrix: 用戶相似度矩陣method: 預測方法('simple_weighted', 'bias_weighted')k: 考慮的近鄰數量返回:predicted_ratings: 預測評分矩陣"""if method == "simple_weighted":return simple_weighted_average(ratings_matrix, similarity_matrix, k)elif method == "bias_weighted":return bias_weighted_average(ratings_matrix, similarity_matrix, k)else:raise ValueError(f"未知的預測方法: {method}")def bias_weighted_average(ratings_matrix, similarity_matrix, k=10):"""使用考慮用戶偏置的加權平均預測評分"""n_users, n_items = ratings_matrix.shapepredicted_ratings = np.zeros((n_users, n_items))# 計算用戶平均評分user_rated_mask = ratings_matrix > 0user_ratings_count = np.sum(user_rated_mask, axis=1)user_ratings_sum = np.sum(ratings_matrix, axis=1)# 處理除零問題user_mean_ratings = np.where(user_ratings_count > 0, user_ratings_sum / user_ratings_count, 0)# 為每個用戶預測評分for u in range(n_users):# 找到k個最相似用戶(不包括自己)user_similarities = similarity_matrix[u]user_similarities[u] = -1 # 排除自己similar_users = np.argsort(user_similarities)[::-1][:k]# 用戶的平均評分u_mean = user_mean_ratings[u]# 為每個物品預測評分for i in range(n_items):# 如果用戶已經評分,保留原評分if ratings_matrix[u, i] > 0:predicted_ratings[u, i] = ratings_matrix[u, i]continue# 獲取評價過該物品的相似用戶sim_users_rated = [v for v in similar_users if ratings_matrix[v, i] > 0]# 如果沒有相似用戶評價過,使用用戶平均評分if len(sim_users_rated) == 0:predicted_ratings[u, i] = u_mean if u_mean > 0 else np.mean(ratings_matrix[ratings_matrix > 0])continue# 計算考慮偏置的加權平均sim_sum = sum(abs(similarity_matrix[u, v]) for v in sim_users_rated)if sim_sum == 0:predicted_ratings[u, i] = u_meancontinueweighted_sum = sum(similarity_matrix[u, v] * (ratings_matrix[v, i] - user_mean_ratings[v])for v in sim_users_rated)predicted_rating = u_mean + weighted_sum / sim_sum# 將預測評分限制在有效范圍[1,5]predicted_ratings[u, i] = max(1, min(5, predicted_rating))return predicted_ratings
3.7 推薦生成模塊
def recommend_items(user_id, ratings_matrix, predicted_ratings, movies_df, top_n=10):"""為特定用戶生成電影推薦參數:user_id: 用戶ID(從1開始)ratings_matrix: 用戶-物品評分矩陣predicted_ratings: 預測評分矩陣movies_df: 電影信息DataFrametop_n: 推薦數量返回:recommendations: 包含推薦電影的DataFrame"""# 調整為0-based索引user_idx = user_id - 1# 獲取用戶評分user_ratings = ratings_matrix[user_idx]user_predictions = predicted_ratings[user_idx]# 找出用戶未評分的物品unrated_items = np.where(user_ratings == 0)[0]# 如果用戶已評分所有物品,返回空DataFrameif len(unrated_items) == 0:return pd.DataFrame(columns=["movie_id", "movie_title", "predicted_rating"])# 獲取未評分物品的預測評分unrated_predictions = user_predictions[unrated_items]# 按預測評分降序排序sorted_indices = np.argsort(-unrated_predictions)# 獲取top_n推薦top_item_indices = sorted_indices[:top_n]top_items = unrated_items[top_item_indices]top_ratings = unrated_predictions[top_item_indices]# 轉換為1-based電影IDmovie_ids = top_items + 1# 創建推薦DataFramerecommendations = pd.DataFrame({"movie_id": movie_ids,"predicted_rating": top_ratings})# 合并電影信息recommendations = recommendations.merge(movies_df[["movie_id", "movie_title"]], on="movie_id")# 按預測評分降序排序recommendations = recommendations.sort_values("predicted_rating", ascending=False)return recommendations
3.8 評估模塊
def evaluate_recommendations(test_data, predicted_ratings, ratings_matrix=None, k_values=[5, 10], threshold=3.5):"""對推薦系統性能進行全面評估參數:test_data: 測試集DataFramepredicted_ratings: 預測評分矩陣ratings_matrix: 原始評分矩陣k_values: 評估的k值列表threshold: 判定物品相關性的評分閾值返回:results: 包含各項評估指標的字典"""# 評估評分預測rating_metrics = evaluate_rating_predictions(test_data, predicted_ratings)# 初始化結果字典results = rating_metrics.copy()# 評估top-k推薦for k in k_values:pr_metrics = calculate_precision_recall_at_k(test_data, predicted_ratings, ratings_matrix=ratings_matrix, k=k, threshold=threshold)precision_key = f"precision@{k}"recall_key = f"recall@{k}"results[precision_key] = pr_metrics[precision_key]results[recall_key] = pr_metrics[recall_key]# 計算F1分數precision = pr_metrics[precision_key]recall = pr_metrics[recall_key]f1 = calculate_f1_score(precision, recall)results[f"f1@{k}"] = f1return results
3.9 完整推薦系統模型
class UserBasedCF:"""基于用戶的協同過濾推薦系統這個類實現了一個完整的推薦系統,使用基于用戶的協同過濾算法。它通過識別具有相似評分模式的用戶,并推薦這些相似用戶喜歡但目標用戶尚未體驗的物品。"""def __init__(self, similarity_method="cosine", prediction_method="bias_weighted", k=30):"""初始化推薦系統模型參數:similarity_method: 計算用戶相似度的方法prediction_method: 預測評分的方法k: 考慮的相似用戶數量"""self.logger = get_logger()self.logger.info(f"初始化UserBasedCF模型,相似度方法={similarity_method},"f"預測方法={prediction_method},k={k}")self.similarity_method = similarity_methodself.prediction_method = prediction_methodself.k = k# 初始化數據屬性self.ratings_df = Noneself.movies_df = Noneself.train_data = Noneself.test_data = Noneself.ratings_matrix = Noneself.user_similarity = Noneself.predicted_ratings = Noneself.n_users = Noneself.n_items = None# 跟蹤模型是否已訓練self.is_trained = Falsedef fit(self, ratings_df, movies_df=None, test_size=0.2, random_state=42):"""訓練推薦模型參數:ratings_df: 包含評分數據的DataFramemovies_df: 包含電影信息的DataFrametest_size: 用于測試的數據比例random_state: 隨機數種子返回:self: 訓練后的模型實例"""from sklearn.model_selection import train_test_splitself.logger.info("開始模型訓練")train_start = time.time()# 存儲數據self.ratings_df = ratings_dfself.movies_df = movies_df# 獲取維度self.n_users = ratings_df["user_id"].max()self.n_items = ratings_df["item_id"].max()self.logger.info(f"使用{self.n_users}個用戶和{self.n_items}個物品訓練模型")# 如果test_size>0,劃分數據if test_size > 0:self.logger.info(f"劃分數據,test_size={test_size},random_state={random_state}")split_start = time.time()self.train_data, self.test_data = train_test_split(ratings_df, test_size=test_size, random_state=random_state)split_time = time.time() - split_startself.logger.info(f"數據劃分耗時{split_time:.2f}秒: {len(self.train_data)}訓練樣本,{len(self.test_data)}測試樣本")else:self.train_data = ratings_dfself.test_data = Noneself.logger.info("使用所有數據進行訓練(無測試集)")# 創建評分矩陣self.logger.info("從訓練數據創建評分矩陣")matrix_start = time.time()self.ratings_matrix = np.zeros((self.n_users, self.n_items))for row in self.train_data.itertuples():# 調整為0-based索引user_idx = row.user_id - 1item_idx = row.item_id - 1self.ratings_matrix[user_idx, item_idx] = row.ratingmatrix_time = time.time() - matrix_startself.logger.info(f"評分矩陣創建耗時{matrix_time:.2f}秒")# 計算矩陣密度n_ratings = np.sum(self.ratings_matrix > 0)density = n_ratings / (self.n_users * self.n_items)self.logger.info(f"評分矩陣密度: {density:.6f} ({n_ratings}條評分)")# 計算用戶相似度self.logger.info(f"使用{self.similarity_method}方法計算用戶相似度")sim_start = time.time()self.user_similarity = calculate_similarity(self.ratings_matrix, method=self.similarity_method)sim_time = time.time() - sim_startself.logger.info(f"用戶相似度計算耗時{sim_time:.2f}秒")# 預測評分self.logger.info(f"使用{self.prediction_method}方法預測評分,k={self.k}")pred_start = time.time()self.predicted_ratings = predict_ratings(self.ratings_matrix,self.user_similarity,method=self.prediction_method,k=self.k)pred_time = time.time() - pred_startself.logger.info(f"評分預測耗時{pred_time:.2f}秒")# 標記模型為已訓練self.is_trained = Truetotal_time = time.time() - train_startself.logger.info(f"模型訓練完成,總耗時{total_time:.2f}秒")return self
3.10 命令行接口設計
為了方便用戶使用推薦系統,我們設計了一個功能豐富的命令行接口:
def parse_args():"""解析命令行參數"""parser = argparse.ArgumentParser(description="基于用戶的協同過濾推薦系統")# 數據參數parser.add_argument("--data_path",type=str,default="data/ml-100k",help="MovieLens數據集目錄路徑")# 模型參數parser.add_argument("--similarity",type=str,default="cosine",choices=["cosine", "pearson", "euclidean", "adjusted_cosine"],help="使用的相似度度量")parser.add_argument("--prediction",type=str,default="bias_weighted",choices=["simple_weighted", "bias_weighted"],help="使用的預測方法")parser.add_argument("--k",type=int,default=30,help="考慮的相似用戶數量")# 輸出目錄參數parser.add_argument("--output_dir",type=str,default="results",help="保存可視化輸出的目錄")parser.add_argument("--log_dir",type=str,default="results/logs",help="保存日志文件的目錄")parser.add_argument("--log_level",type=str,default="INFO",choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],help="日志級別")# 推薦參數parser.add_argument("--user_id",type=int,required=False,help="為其生成推薦的用戶ID")parser.add_argument("--num_recommendations",type=int,default=10,help="生成的推薦數量")# 評估參數parser.add_argument("--test_size",type=float,default=0.2,help="用于測試的數據比例")parser.add_argument("--evaluate",action="store_true",help="評估模型")# 可視化參數parser.add_argument("--visualize",action="store_true",help="可視化用戶相似度和推薦")# 模型保存/加載parser.add_argument("--save_model",type=str,default=None,help="保存訓練模型的路徑")parser.add_argument("--load_model",type=str,default=None,help="加載預訓練模型的路徑")# 參數調優parser.add_argument("--tune",action="store_true",help="執行參數調優")return parser.parse_args()
使用示例:
# 生成推薦
python main.py --user_id 123 --num_recommendations 10 --similarity cosine --k 30# 評估模型
python main.py --evaluate --test_size 0.2# 參數調優
python main.py --tune --output_dir results# 保存和加載模型
python main.py --similarity pearson --k 40 --save_model models/my_model.pkl
python main.py --load_model models/my_model.pkl --user_id 123
3.11 模型序列化與反序列化
對于訓練好的模型,我們提供了保存和加載功能,以便在不同環境中復用:
def save_model(self, filepath):"""將訓練好的模型保存到文件參數:filepath: 保存模型的路徑"""if not self.is_trained:error_msg = "模型未訓練。保存前請先調用fit()"self.logger.error(error_msg)raise ValueError(error_msg)self.logger.info(f"保存模型到{filepath}")# 如果目錄不存在則創建os.makedirs(os.path.dirname(filepath), exist_ok=True)import picklemodel_data = {"similarity_method": self.similarity_method,"prediction_method": self.prediction_method,"k": self.k,"ratings_matrix": self.ratings_matrix,"user_similarity": self.user_similarity,"predicted_ratings": self.predicted_ratings,"n_users": self.n_users,"n_items": self.n_items,"is_trained": self.is_trained}save_start = time.time()with open(filepath, "wb") as f:pickle.dump(model_data, f)save_time = time.time() - save_start# 計算文件大小file_size = os.path.getsize(filepath) / (1024 * 1024) # MBself.logger.info(f"模型已保存到{filepath} ({file_size:.2f} MB),耗時{save_time:.2f}秒")@classmethod
def load_model(cls, filepath, movies_df=None):"""從文件加載訓練好的模型參數:filepath: 模型文件路徑movies_df: 電影信息DataFrame返回:model: 加載的模型實例"""logger = get_logger()logger.info(f"從{filepath}加載模型")if not os.path.exists(filepath):error_msg = f"模型文件{filepath}不存在"logger.error(error_msg)raise FileNotFoundError(error_msg)import pickleload_start = time.time()with open(filepath, "rb") as f:model_data = pickle.load(f)load_time = time.time() - load_start# 創建新實例model = cls(similarity_method=model_data["similarity_method"],prediction_method=model_data["prediction_method"],k=model_data["k"])# 恢復模型屬性model.ratings_matrix = model_data["ratings_matrix"]model.user_similarity = model_data["user_similarity"]model.predicted_ratings = model_data["predicted_ratings"]model.n_users = model_data["n_users"]model.n_items = model_data["n_items"]model.is_trained = model_data["is_trained"]model.movies_df = movies_df# 計算文件大小file_size = os.path.getsize(filepath) / (1024 * 1024) # MBlogger.info(f"從{filepath}加載模型 ({file_size:.2f} MB),耗時{load_time:.2f}秒")logger.info(f"模型維度: {model.n_users}用戶, {model.n_items}物品")return model
3.12 Web應用開發
為了提供友好的用戶界面,我們使用Flask開發了一個Web應用:
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_fileapp = Flask(__name__)
app.secret_key = os.urandom(24)# 全局變量
data_loader = None
model = None
MODEL_PATH = "../models/user_based_cf.pkl"
DATA_PATH = "../data/ml-100k"
OUTPUT_DIR = "../results"# 創建目錄
os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True)
os.makedirs(os.path.join(OUTPUT_DIR, "images"), exist_ok=True)def initialize_data_and_model():"""初始化數據加載器和模型"""global data_loader, modellogger.info("初始化數據和模型...")# 初始化數據加載器并加載數據if data_loader is None:try:data_loader = DataLoader(data_path=DATA_PATH)ratings_df, movies_df = data_loader.load_data()logger.info(f"數據加載完成: {len(ratings_df)}條評分, {len(movies_df)}部電影")except Exception as e:logger.error(f"加載數據出錯: {str(e)}")logger.error(traceback.format_exc())return False# 初始化模型(如果未加載)if model is None:try:# 檢查是否存在預訓練模型if os.path.exists(MODEL_PATH):logger.info(f"從{MODEL_PATH}加載預訓練模型")model = UserBasedCF.load_model(MODEL_PATH, movies_df=data_loader.movies_df)else:logger.info("使用默認參數訓練新模型")model = UserBasedCF(similarity_method="cosine", prediction_method="bias_weighted", k=30)model.fit(data_loader.ratings_df, data_loader.movies_df, test_size=0)# 保存模型以供將來使用model.save_model(MODEL_PATH)logger.info(f"模型訓練完成并保存到{MODEL_PATH}")except Exception as e:logger.error(f"初始化模型出錯: {str(e)}")logger.error(traceback.format_exc())return Falsereturn True@app.route("/")
def index():"""渲染首頁"""# 初始化數據和模型if not initialize_data_and_model():flash("初始化數據和模型出錯,請檢查日志", "danger")# 如果模型已初始化,獲取模型參數model_params = {}if model and model.is_trained:model_params = {"similarity_method": model.similarity_method,"prediction_method": model.prediction_method,"k": model.k,"n_users": model.n_users,"n_items": model.n_items}# 獲取可用的相似度和預測方法similarity_methods = ["cosine", "pearson", "euclidean", "adjusted_cosine"]prediction_methods = ["simple_weighted", "bias_weighted"]# 獲取數據集統計信息dataset_stats = {}if data_loader:dataset_stats = data_loader.get_dataset_stats()return render_template("index.html",model_params=model_params,similarity_methods=similarity_methods,prediction_methods=prediction_methods,dataset_stats=dataset_stats)@app.route("/recommend", methods=["GET", "POST"])
def recommend():"""基于表單輸入或URL參數生成推薦"""# 初始化數據和模型if not initialize_data_and_model():flash("初始化數據和模型出錯,請檢查日志", "danger")return redirect(url_for("index"))# 提取當前模型參數作為默認值current_similarity = model.similarity_method or "cosine"current_prediction = model.prediction_method or "bias_weighted"current_k = model.k or 30# 根據請求方法提取參數if request.method == "POST":# 獲取表單數據user_id = int(request.form.get("user_id", 1))num_recommendations = int(request.form.get("num_recommendations", 10))similarity_method = request.form.get("similarity_method", current_similarity)prediction_method = request.form.get("prediction_method", current_prediction)k = int(request.form.get("k", current_k))else: # GET請求# 獲取URL參數user_id = int(float(request.args.get("user_id", "1")))# 驗證user_id在有效范圍內if user_id < 1 or user_id > model.n_users:flash(f"無效的用戶ID: {user_id}。必須在1到{model.n_users}之間", "danger")return redirect(url_for("index"))num_recommendations = int(request.args.get("num_recommendations", 10))similarity_method = request.args.get("similarity_method", current_similarity)prediction_method = request.args.get("prediction_method", current_prediction)k = int(request.args.get("k", current_k))# 確保參數不為Nonesimilarity_method = similarity_method or "cosine"prediction_method = prediction_method or "bias_weighted"k = k or 30logger.info(f"推薦參數: user_id={user_id}, similarity={similarity_method}, prediction={prediction_method}, k={k}")# 檢查是否需要重新訓練模型retrain = model.similarity_method != similarity_method or model.prediction_method != prediction_method or model.k != k# 如需要,重新訓練if retrain:try:logger.info(f"使用參數重新訓練模型: similarity={similarity_method}, prediction={prediction_method}, k={k}")model = UserBasedCF(similarity_method=similarity_method,prediction_method=prediction_method,k=k)model.fit(data_loader.ratings_df, data_loader.movies_df, test_size=0)# 保存重新訓練的模型model.save_model(MODEL_PATH)logger.info("模型重新訓練完成并保存")except Exception as e:logger.error(f"重新訓練模型出錯: {str(e)}")logger.error(traceback.format_exc())flash(f"重新訓練模型出錯: {str(e)}", "danger")return redirect(url_for("index"))# 生成推薦try:start_time = time.time()recommendations = model.recommend(user_id, top_n=num_recommendations)generation_time = time.time() - start_time# 可視化推薦img_path = Noneif not recommendations.empty:img_path = visualize_recommendations(user_id, recommendations)# 獲取用戶當前評分(如果有)user_ratings = Noneif data_loader:try:user_ratings = data_loader.get_user_ratings(user_id)# 按評分降序排序user_ratings = user_ratings.sort_values("rating", ascending=False)except:pass# 獲取相似用戶similar_users = Nonetry:similar_users = model.get_similar_users(user_id, top_n=5)except:passreturn render_template("recommendations.html",user_id=user_id,recommendations=recommendations,generation_time=generation_time,img_path=img_path,user_ratings=user_ratings,similar_users=similar_users,model_params={"similarity_method": model.similarity_method,"prediction_method": model.prediction_method,"k": model.k})except Exception as e:logger.error(f"生成推薦出錯: {str(e)}")logger.error(traceback.format_exc())flash(f"生成推薦出錯: {str(e)}", "danger")return redirect(url_for("index"))
3.13 性能優化與錯誤處理
為了提高系統性能并增強魯棒性,我們實現了以下優化與錯誤處理機制:
# 矩陣運算優化
# 使用向量化操作代替循環
def calculate_similarity_optimized(ratings_matrix, method="cosine"):"""優化的相似度計算函數"""# 利用多核加速計算return 1 - pairwise_distances(ratings_matrix, metric=method, n_jobs=-1)# 大規模數據處理:使用稀疏矩陣表示
def create_sparse_matrix(self):"""創建稀疏評分矩陣以節省內存"""from scipy.sparse import csr_matrixrows, cols, data = [], [], []for row in self.ratings_df.itertuples():rows.append(row.user_id - 1)cols.append(row.item_id - 1)data.append(row.rating)self.sparse_ratings_matrix = csr_matrix((data, (rows, cols)), shape=(self.n_users, self.n_items))return self.sparse_ratings_matrix# 異常處理與數據驗證
def recommend(self, user_id, top_n=10):"""帶異常處理的推薦函數"""# 驗證模型狀態if not self.is_trained:raise ValueError("模型未訓練,請先調用fit()")# 驗證用戶IDif user_id < 1 or user_id > self.n_users:raise ValueError(f"無效的用戶ID: {user_id}。必須在1到{self.n_users}之間")# 驗證參數if top_n < 1:raise ValueError(f"無效的推薦數量: {top_n}。必須大于0")# 驗證必要的數據是否可用if self.movies_df is None:raise ValueError("缺少電影信息。請在fit()中提供movies_df")# 生成推薦try:return recommend_items(user_id, self.ratings_matrix, self.predicted_ratings, self.movies_df, top_n)except Exception as e:self.logger.error(f"為用戶{user_id}生成推薦時出錯: {str(e)}")# 重新拋出異常,附加上下文信息raise RuntimeError(f"推薦生成失敗: {str(e)}")
5. 模塊化設計的優勢
采用模塊化設計使得我們的推薦系統具有以下優勢:
- 代碼可維護性:每個模塊專注于單一功能,便于理解和修改
- 可擴展性:可以輕松添加新的相似度計算方法或評分預測算法
- 可重用性:各個組件可以在其他項目中重用
- 測試便利性:可以獨立測試每個模塊的功能
- 團隊協作:不同團隊成員可以同時開發不同模塊
6. 結論與未來工作
本項目成功實現了一個基于用戶的協同過濾推薦系統,并在MovieLens 100K數據集上進行了評估。系統具有以下特點:
- 模塊化設計:清晰的代碼結構,便于維護和擴展
- 多種算法支持:實現了多種相似度計算和評分預測方法
- 完整的評估體系:使用RMSE、MAE、精確率、召回率等指標全面評估性能
- 用戶友好界面:提供命令行和Web兩種交互方式
- 日志和錯誤處理:完善的日志系統和健壯的錯誤處理機制
7. 參考資料
- Resnick, P., Iacovou, N., Suchak, M., Bergstrom, P., & Riedl, J. (1994). GroupLens: An Open Architecture for Collaborative Filtering of Netnews. Proceedings of the 1994 ACM Conference on Computer Supported Cooperative Work.
- Breese, J. S., Heckerman, D., & Kadie, C. (1998). Empirical Analysis of Predictive Algorithms for Collaborative Filtering. Proceedings of the 14th Conference on Uncertainty in Artificial Intelligence.
- Sarwar, B., Karypis, G., Konstan, J., & Riedl, J. (2001). Item-based collaborative filtering recommendation algorithms. Proceedings of the 10th International Conference on World Wide Web.
- Harper, F. M., & Konstan, J. A. (2015). The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS).
- Koren, Y., Bell, R., & Volinsky, C. (2009). Matrix factorization techniques for recommender systems. Computer, 42(8), 30-37.
- Ning, X., Desrosiers, C., & Karypis, G. (2015). A comprehensive survey of neighborhood-based recommendation methods. Recommender systems handbook, 37-76.