基于矩陣分解的CF算法實現(一):LFM
LFM也就是前面提到的Funk SVD矩陣分解
LFM原理解析
LFM(latent factor model) 隱語義模型核心思想是通過隱含特征聯系用戶和物品,如下圖:
- P矩陣是User-LF矩陣,即用戶和隱含特征矩陣。LF有三個,表示總共有三個隱含特征。
- Q矩陣是LF-Item矩陣,即隱含特征和物品的矩陣
- R矩陣是User-Item矩陣,有P*Q得來
- 能處理稀疏評分矩陣
利用矩陣分解技術,將原始User-Item的評分矩陣(稠密/稀疏)分解為P和Q矩陣,然后利用P?QP*QP?Q還原出User-Item評分矩陣RRR。整個過程相當于降維處理,其中:
-
矩陣值P11P_{11}P11?表示用戶1對隱含特征1的權重值
-
矩陣值Q11Q_{11}Q11?表示隱含特征1在物品1上的權重值
-
矩陣值R11R_{11}R11?就表示預測的用戶1對物品1的評分,且R11=P1,k??Qk,1?R_{11}=\vec{P_{1,k}}\cdot \vec{Q_{k,1}}R11?=P1,k???Qk,1??
利用LFM預測用戶對物品的評分,kkk表示隱含特征數量:
r^ui=puk??qik?=∑k=1kpukqik
\begin{split}
\hat {r}_{ui} &=\vec {p_{uk}}\cdot \vec {q_{ik}}
\\&={\sum_{k=1}}^k p_{uk}q_{ik}
\end{split}
r^ui??=puk???qik??=k=1∑?kpuk?qik??
因此最終,我們的目標也就是要求出P矩陣和Q矩陣及其當中的每一個值,然后再對用戶-物品的評分進行預測。
損失函數
同樣對于評分預測我們利用平方差來構建損失函數:
Cost=∑u,i∈R(rui?r^ui)2=∑u,i∈R(rui?∑k=1kpukqik)2
\begin{split}
Cost &= \sum_{u,i\in R} (r_{ui}-\hat{r}_{ui})^2
\\&=\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})^2
\end{split}
Cost?=u,i∈R∑?(rui??r^ui?)2=u,i∈R∑?(rui??k=1∑?kpuk?qik?)2?
加入L2正則化:
Cost=∑u,i∈R(rui?∑k=1kpukqik)2+λ(∑Upuk2+∑Iqik2)
Cost = \sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})^2 + \lambda(\sum_U{p_{uk}}^2+\sum_I{q_{ik}}^2)
Cost=u,i∈R∑?(rui??k=1∑?kpuk?qik?)2+λ(U∑?puk?2+I∑?qik?2)
對損失函數求偏導:
??pukCost=??puk[∑u,i∈R(rui?∑k=1kpukqik)2+λ(∑Upuk2+∑Iqik2)]=2∑u,i∈R(rui?∑k=1kpukqik)(?qik)+2λpuk??qikCost=??qik[∑u,i∈R(rui?∑k=1kpukqik)2+λ(∑Upuk2+∑Iqik2)]=2∑u,i∈R(rui?∑k=1kpukqik)(?puk)+2λqik
\begin{split}
\cfrac {\partial}{\partial p_{uk}}Cost &= \cfrac {\partial}{\partial p_{uk}}[\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})^2 + \lambda(\sum_U{p_{uk}}^2+\sum_I{q_{ik}}^2)]
\\&=2\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})(-q_{ik}) + 2\lambda p_{uk}
\\\\
\cfrac {\partial}{\partial q_{ik}}Cost &= \cfrac {\partial}{\partial q_{ik}}[\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})^2 + \lambda(\sum_U{p_{uk}}^2+\sum_I{q_{ik}}^2)]
\\&=2\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})(-p_{uk}) + 2\lambda q_{ik}
\end{split}
?puk???Cost?qik???Cost?=?puk???[u,i∈R∑?(rui??k=1∑?kpuk?qik?)2+λ(U∑?puk?2+I∑?qik?2)]=2u,i∈R∑?(rui??k=1∑?kpuk?qik?)(?qik?)+2λpuk?=?qik???[u,i∈R∑?(rui??k=1∑?kpuk?qik?)2+λ(U∑?puk?2+I∑?qik?2)]=2u,i∈R∑?(rui??k=1∑?kpuk?qik?)(?puk?)+2λqik??
隨機梯度下降法優化
梯度下降更新參數pukp_{uk}puk?:
puk:=puk?α??pukCost:=puk?α[2∑u,i∈R(rui?∑k=1kpukqik)(?qik)+2λpuk]:=puk+α[∑u,i∈R(rui?∑k=1kpukqik)qik?λpuk]
\begin{split}
p_{uk}&:=p_{uk} - \alpha\cfrac {\partial}{\partial p_{uk}}Cost
\\&:=p_{uk}-\alpha [2\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})(-q_{ik}) + 2\lambda p_{uk}]
\\&:=p_{uk}+\alpha [\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})q_{ik} - \lambda p_{uk}]
\end{split}
puk??:=puk??α?puk???Cost:=puk??α[2u,i∈R∑?(rui??k=1∑?kpuk?qik?)(?qik?)+2λpuk?]:=puk?+α[u,i∈R∑?(rui??k=1∑?kpuk?qik?)qik??λpuk?]?
同理:
qik:=qik+α[∑u,i∈R(rui?∑k=1kpukqik)puk?λqik]
\begin{split}
q_{ik}&:=q_{ik} + \alpha[\sum_{u,i\in R} (r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})p_{uk} - \lambda q_{ik}]
\end{split}
qik??:=qik?+α[u,i∈R∑?(rui??k=1∑?kpuk?qik?)puk??λqik?]?
隨機梯度下降: 向量乘法 每一個分量相乘 求和
puk:=puk+α[(rui?∑k=1kpukqik)qik?λ1puk]qik:=qik+α[(rui?∑k=1kpukqik)puk?λ2qik]
\begin{split}
&p_{uk}:=p_{uk}+\alpha [(r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})q_{ik} - \lambda_1 p_{uk}]
\\&q_{ik}:=q_{ik} + \alpha[(r_{ui}-{\sum_{k=1}}^k p_{uk}q_{ik})p_{uk} - \lambda_2 q_{ik}]
\end{split}
?puk?:=puk?+α[(rui??k=1∑?kpuk?qik?)qik??λ1?puk?]qik?:=qik?+α[(rui??k=1∑?kpuk?qik?)puk??λ2?qik?]?
由于P矩陣和Q矩陣是兩個不同的矩陣,通常分別采取不同的正則參數,如λ1\lambda_1λ1?和λ2\lambda_2λ2?
算法實現
'''
LFM Model
'''
import pandas as pd
import numpy as np# 評分預測 1-5
class LFM(object):def __init__(self, alpha, reg_p, reg_q, number_LatentFactors=10, number_epochs=10, columns=["uid", "iid", "rating"]):self.alpha = alpha # 學習率self.reg_p = reg_p # P矩陣正則self.reg_q = reg_q # Q矩陣正則self.number_LatentFactors = number_LatentFactors # 隱式類別數量self.number_epochs = number_epochs # 最大迭代次數self.columns = columns#數據預處理和模型訓練def fit(self, dataset):'''fit dataset:param dataset: uid, iid, rating:return:'''# 轉換為 DataFrame:self.dataset = pd.DataFrame(dataset)'''① 對 dataset 進行操作。dataset 是一個 Pandas DataFrame,它包含了用戶ID(userId)、物品ID(movieId)和評分(rating).② 對數據集dataset按照self.columns[0]進行分組,分組后,每個分組代表一個用戶的數據。③ 分組后對數據進行agg聚合操作,list 表示將每個分組中的元素轉換為列表。具體來說,self.columns[1] 和 self.columns[2] 分別是 movieId 和 rating,這意味著對于每個用戶(userId),我們將會把該用戶評分的所有 movieId 和相應的 rating 收集到一個列表中.④ 最終結果是:對每個用戶(userId),movieId和rating的列表將作為該用戶的評分數據。userId movieId rating1001 [1, 2, 3] [4.0, 5.0, 3.5]'''#構建用戶評分記錄和物品評分記錄 # 用戶評分self.users_ratings = dataset.groupby(self.columns[0]).agg([list])[[self.columns[1], self.columns[2]]]# 物品評分self.items_ratings = dataset.groupby(self.columns[1]).agg([list])[[self.columns[0], self.columns[2]]]#計算全局平均評分(用于處理冷啟動,即用戶或物品不在訓練集中時的預測)self.globalMean = self.dataset[self.columns[2]].mean()#調用梯度下降函數,訓練矩陣分解模型self.P, self.Q = self.sgd()#初始化隱向量矩陣def _init_matrix(self):'''初始化P和Q矩陣,同時為設置0,1之間的隨機值作為初始值:return:'''# User-LF# 每個用戶一個長度為 number_LatentFactors 的向量P = dict(zip(self.users_ratings.index,np.random.rand(len(self.users_ratings), self.number_LatentFactors).astype(np.float32)))# Item-LF# 每個物品一個長度為 number_LatentFactors 的向量Q = dict(zip(self.items_ratings.index,np.random.rand(len(self.items_ratings), self.number_LatentFactors).astype(np.float32)))return P, Q#隨機梯度下降訓練函數def sgd(self):'''使用隨機梯度下降,優化結果:return:'''#初始化P、Q矩陣P, Q = self._init_matrix()for i in range(self.number_epochs):print("iter%d"%i)error_list = []for uid, iid, r_ui in self.dataset.itertuples(index=False):# User-LF P## Item-LF Qv_pu = P[uid] #當前用戶向量v_qi = Q[iid] #物品向量# 損失函數err = np.float32(r_ui - np.dot(v_pu, v_qi))#梯度下降更新參數p_ukv_pu += self.alpha * (err * v_qi - self.reg_p * v_pu)#梯度下降更新參數p_ikv_qi += self.alpha * (err * v_pu - self.reg_q * v_qi)P[uid] = v_pu Q[iid] = v_qi# for k in range(self.number_of_LatentFactors):# v_pu[k] += self.alpha*(err*v_qi[k] - self.reg_p*v_pu[k])# v_qi[k] += self.alpha*(err*v_pu[k] - self.reg_q*v_qi[k])error_list.append(err ** 2)print(np.sqrt(np.mean(error_list)))return P, Qdef predict(self, uid, iid):# 如果uid或iid不在,我們使用全劇平均分作為預測結果返回if uid not in self.users_ratings.index or iid not in self.items_ratings.index:return self.globalMeanp_u = self.P[uid]q_i = self.Q[iid]return np.dot(p_u, q_i)def test(self,testset):'''預測測試集數據'''for uid, iid, real_rating in testset.itertuples(index=False):try:pred_rating = self.predict(uid, iid)except Exception as e:print(e)else:#對測試集中的每條樣本 (uid, iid, rating) 預測評分:yield uid, iid, real_rating, pred_ratingif __name__ == '__main__':dtype = [("userId", np.int32), ("movieId", np.int32), ("rating", np.float32)]dataset = pd.read_csv("datasets/ml-latest-small/ratings.csv", usecols=range(3), dtype=dict(dtype))lfm = LFM(0.02, 0.01, 0.01, 10, 100, ["userId", "movieId", "rating"])lfm.fit(dataset)#進入交互式預測while True:uid = input("uid: ")iid = input("iid: ")print(lfm.predict(int(uid), int(iid)))
你這段代碼是一個經典的 基于矩陣分解(Matrix Factorization, MF) 的 協同過濾推薦系統,主要使用了 隨機梯度下降(SGD) 來優化模型參數。
下面我會逐模塊解釋代碼邏輯,并且補充清晰注釋,幫助你深入理解每個部分。
LFM算法解讀
🔧 類初始化:__init__
def __init__(self, alpha, reg_p, reg_q, number_LatentFactors=10, number_epochs=10, columns=["uid", "iid", "rating"]):
alpha
: 學習率(控制步長)reg_p
,reg_q
: 分別是用戶矩陣P和物品矩陣Q的正則化系數,用于防止過擬合number_LatentFactors
: 隱向量維度(Latent Factors 的數量),即將用戶/物品特征壓縮到的低維空間大小number_epochs
: 最大迭代次數columns
: 指定輸入數據中用戶、物品、評分的列名
📊 數據預處理和模型訓練:fit
def fit(self, dataset):
- 轉換為 DataFrame:
self.dataset = pd.DataFrame(dataset)
- 構建用戶評分記錄和物品評分記錄:
self.users_ratings = dataset.groupby(self.columns[0]).agg([list])[[self.columns[1], self.columns[2]]]
self.items_ratings = dataset.groupby(self.columns[1]).agg([list])[[self.columns[0], self.columns[2]]]
例如:
userId: movieId: [1, 2, 3], rating: [4.0, 3.5, 5.0]
這兩行代碼是為了獲取所有用戶和所有物品的評分記錄(用于初始化 P/Q 矩陣)。
- 計算全局平均評分(用于處理冷啟動,即用戶或物品不在訓練集中時的預測):
self.globalMean = self.dataset[self.columns[2]].mean()
- 訓練矩陣分解模型(SGD):
self.P, self.Q = self.sgd()
🧮 初始化隱向量矩陣:_init_matrix
def _init_matrix(self):
- 用戶矩陣
P
:每個用戶一個長度為number_LatentFactors
的向量 - 物品矩陣
Q
:每個物品一個長度為number_LatentFactors
的向量
P = dict(zip(self.users_ratings.index, np.random.rand(len(self.users_ratings), self.number_LatentFactors)))
Q = dict(zip(self.items_ratings.index, np.random.rand(len(self.items_ratings), self.number_LatentFactors)))
向量初始化為 [0, 1)
之間的隨機數。
🔁 隨機梯度下降訓練:sgd
def sgd(self):
核心部分:通過最小化均方誤差(MSE)損失函數,更新 P 和 Q。
損失函數目標:
L=∑(u,i)∈R(rui?r^ui)2+λp∥Pu∥2+λq∥Qi∥2 L = \sum_{(u, i) \in R} (r_{ui} - \hat{r}_{ui})^2 + \lambda_p \|P_u\|^2 + \lambda_q \|Q_i\|^2 L=(u,i)∈R∑?(rui??r^ui?)2+λp?∥Pu?∥2+λq?∥Qi?∥2
每一輪迭代步驟如下:
for uid, iid, r_ui in self.dataset.itertuples(index=False):
v_pu = P[uid]
: 當前用戶向量v_qi = Q[iid]
: 當前物品向量
預測評分:
err = r_ui - np.dot(v_pu, v_qi)
err = r_ui - np.dot(v_pu, v_qi)
這行代碼是 計算預測誤差,并且 np.dot()
計算的是兩個向量的 內積。
下面詳細解釋一下這部分代碼。
1. np.dot()
是什么?
np.dot()
是 NumPy 庫中的函數,用于計算兩個向量的 點積(內積)。它的具體含義是:
dot?product=∑i=1nai?bi \text{dot product} = \sum_{i=1}^{n} a_i \cdot b_i dot?product=i=1∑n?ai??bi?
假設有兩個向量 a=[a1,a2,…,an]\mathbf{a} = [a_1, a_2, \dots, a_n]a=[a1?,a2?,…,an?] 和 b=[b1,b2,…,bn]\mathbf{b} = [b_1, b_2, \dots, b_n]b=[b1?,b2?,…,bn?],則它們的點積(內積)是:
a?b=a1b1+a2b2+?+anbn \mathbf{a} \cdot \mathbf{b} = a_1 b_1 + a_2 b_2 + \dots + a_n b_n a?b=a1?b1?+a2?b2?+?+an?bn?
對于模型來說,v_pu
和 v_qi
分別是用戶 uuu 和物品 iii 的隱向量表示。
v_pu
是用戶 uuu 的隱向量,表示該用戶在number_LatentFactors
個隱特征上的表示。v_qi
是物品 iii 的隱向量,表示該物品在number_LatentFactors
個隱特征上的表示。
通過 內積,我們可以得到用戶 uuu 對物品 iii 的 預測評分。預測評分的公式就是:
r^ui=pu?qi \hat{r}_{ui} = \mathbf{p_u} \cdot \mathbf{q_i} r^ui?=pu??qi?
所以:
np.dot(v_pu, v_qi)
就是計算出用戶 uuu 對物品 iii 的預測評分。
2. 誤差 err = r_ui - np.dot(v_pu, v_qi)
在實際應用中,r_ui
是用戶 uuu 給物品 iii 的實際評分,而 np.dot(v_pu, v_qi)
是模型預測的評分。
因此,err
就是實際評分和預測評分之間的差值,也就是 殘差(residual),即:
err=rui?r^ui \text{err} = r_{ui} - \hat{r}_{ui} err=rui??r^ui?
這個 殘差 會被用來更新隱向量 pu\mathbf{p_u}pu? 和 qi\mathbf{q_i}qi?,從而使得模型逐步減小預測誤差。
3. RMSE (Root Mean Squared Error)
RMSE(均方根誤差)是評估推薦系統性能的一個常用指標,它衡量了預測評分與實際評分之間的差異。其公式為:
RMSE=1N∑i=1N(rui?r^ui)2 \text{RMSE} = \sqrt{\frac{1}{N} \sum_{i=1}^{N} (r_{ui} - \hat{r}_{ui})^2} RMSE=N1?i=1∑N?(rui??r^ui?)2?
- ruir_{ui}rui? 是實際評分
- r^ui\hat{r}_{ui}r^ui? 是預測評分
- NNN 是測試集中的樣本數
RMSE 是 均方誤差(MSE) 的平方根,旨在將預測誤差的度量還原到與原始評分相同的單位。它越小,模型的預測精度越高。
4. 在代碼中的實現
在代碼中,你會看到如下部分:
error_list.append(err ** 2)
print(np.sqrt(np.mean(error_list)))
這里 error_list
是一個列表,用于記錄每個樣本的誤差的平方(也就是 MSE 的一部分)。
err ** 2
計算的是當前樣本的平方誤差(即 MSE 的分子部分)。np.mean(error_list)
計算所有樣本的均方誤差(MSE),就是將所有平方誤差取平均。np.sqrt(np.mean(error_list))
是取均方誤差的平方根,得到 RMSE。
這樣做的效果是每經過一輪迭代,都會輸出當前模型在訓練集上的 RMSE,用來評估模型的收斂程度和訓練效果。
5. 具體代碼解釋
error_list.append(err ** 2)
這一行將 每一條樣本的平方誤差 添加到 error_list
列表中。這是計算 RMSE 的步驟之一。
np.sqrt(np.mean(error_list))
這個部分將所有樣本的 平方誤差 取平均(得到 MSE),然后再 開根號,得到最終的 RMSE。在每一輪的訓練中,RMSE 用于衡量模型在訓練數據上的誤差。
6. 總結
np.dot(v_pu, v_qi)
計算的是用戶和物品隱向量的 內積,即 預測評分。err = r_ui - np.dot(v_pu, v_qi)
是 預測誤差(殘差),用來衡量模型的預測與實際評分之間的差距。- RMSE 是 均方根誤差,用于衡量模型的預測準確性,公式為:
RMSE=1N∑i=1N(rui?r^ui)2 \text{RMSE} = \sqrt{\frac{1}{N} \sum_{i=1}^{N} (r_{ui} - \hat{r}_{ui})^2} RMSE=N1?i=1∑N?(rui??r^ui?)2?
在代碼中,RMSE 的實現是通過計算每個樣本的平方誤差,取均值后開根號來完成的。
🔮 評分預測:predict
def predict(self, uid, iid):
如果用戶或物品不在訓練集中,用全局平均評分作為預測:
if uid not in self.users_ratings.index or iid not in self.items_ratings.index:return self.globalMean
否則返回預測評分:
return np.dot(self.P[uid], self.Q[iid])
? 測試集預測:test
def test(self, testset):
對測試集中的每條樣本 (uid, iid, rating)
預測評分:
yield uid, iid, real_rating, pred_rating
🧪 主函數入口(測試)
if __name__ == '__main__':
- 從
ratings.csv
讀取數據(只取前三列):
dataset = pd.read_csv("datasets/ml-latest-small/ratings.csv", usecols=range(3), dtype=dict(dtype))
- 實例化模型并訓練:
lfm = LFM(0.02, 0.01, 0.01, 10, 100, ["userId", "movieId", "rating"])
lfm.fit(dataset)
- 進入交互式預測:
while True:uid = input("uid: ")iid = input("iid: ")print(lfm.predict(int(uid), int(iid)))
🔍 總結重點:
模塊 | 作用 |
---|---|
P , Q 矩陣 | 用戶和物品的低維向量表示 |
sgd() | 通過最小化損失函數優化向量 |
predict() | 用內積預測評分 |
globalMean | 處理冷啟動的回退策略 |
fit() | 數據預處理 + 模型訓練入口 |
test() | 在測試集上做預測評估 |