文章目錄
- 一、實驗介紹
- 1. 算法流程
- 2. 算法解釋
- 3. 算法特點
- 4. 應用場景
- 5. 注意事項
- 二、實驗環境
- 1. 配置虛擬環境
- 2. 庫版本介紹
- 三、實驗內容
- 0. 導入必要的庫
- 1. Kmeans類
- a. 構造函數
- b. 閔可夫斯基距離
- c. 初始化簇心
- d. K-means聚類
- e. 聚類結果可視化
- 2. 輔助函數
- 3. 主函數
- a. 命令行界面 (CLI)
- b. 數據加載
- c. 模型訓練及可視化
- 4. 運行腳本的命令
- 5. 代碼整合
??原型聚類中的K均值算法是一種常用的聚類方法,該算法的目標是通過迭代過程找到數據集的簇劃分,使得每個簇內的樣本與簇內均值的平方誤差最小化。這一過程通過不斷迭代更新簇的均值來實現。
一、實驗介紹
1. 算法流程
- 初始化: 從樣本集中隨機選擇k個樣本作為初始均值向量。
- 迭代過程: 重復以下步驟直至均值向量不再更新:
- 對每個樣本計算與各均值向量的距離。
- 將樣本劃分到距離最近的均值向量所對應的簇。
- 更新每個簇的均值向量為該簇內樣本的平均值。
- 輸出: 返回最終的簇劃分。
2. 算法解釋
- 步驟1中,通過隨機選擇初始化k個均值向量。
- 步驟2中,通過計算樣本與均值向量的距離,將每個樣本分配到最近的簇。然后,更新每個簇的均值向量為該簇內樣本的平均值。
- 算法通過迭代更新,不斷優化簇內樣本與均值向量的相似度,最終得到較好的聚類結果。
3. 算法特點
- K均值算法是一種貪心算法,通過局部最優解逐步逼近全局最優解。
- 由于需要對每個樣本與均值向量的距離進行計算,算法復雜度較高。
- 對于大型數據集和高維數據,K均值算法的效果可能受到影響。
4. 應用場景
- K均值算法適用于樣本集可以被均值向量較好表示的情況,特別是當簇呈現球形或近似球形分布時效果較好。
- 在圖像分割、用戶行為分析等領域廣泛應用。
5. 注意事項
- 對于K均值算法,初始均值向量的選擇可能影響最終聚類結果,因此有時需要多次運行算法,選擇最優的結果。
- 算法對異常值敏感,可能導致簇的均值向量被拉向異常值,因此在處理異常值時需要謹慎。
二、實驗環境
1. 配置虛擬環境
conda create -n ML python==3.9
conda activate ML
conda install scikit-learn matplotlib seaborn
2. 庫版本介紹
軟件包 | 本實驗版本 |
---|---|
matplotlib | 3.5.2 |
numpy | 1.21.5 |
python | 3.9.13 |
scikit-learn | 1.0.2 |
seaborn | 0.11.2 |
三、實驗內容
0. 導入必要的庫
import numpy as np
import random
import seaborn as sns
import matplotlib.pyplot as plt
import argparse
1. Kmeans類
__init__
:初始化K均值聚類的參數,包括聚類數目k
、數據data
、初始化模式mode
(默認為 “random”)、最大迭代次數max_iters
、閔可夫斯基距離的階數p
、隨機種子seed
等。minkowski_distance
函數:計算兩個樣本點之間的閔可夫斯基距離。center_init
函數:根據指定的模式初始化聚類中心。fit
方法:執行K均值聚類的迭代過程,包括分配樣本到最近的簇、更新簇中心,直到滿足停止條件。visualization
函數:使用Seaborn和Matplotlib可視化聚類結果。
a. 構造函數
class Kmeans(object):def __init__(self, k, data: np.ndarray, mode="random", max_iters=0, p=2, seed=0):self.k = kself.data = dataself.mode = modeself.max_iter = max_iters if max_iters > 0 else int(1e8)self.p = pself.seed = seedself.centers = Noneself.clu_idx = np.zeros(len(self.data), dtype=np.int32) # 樣本的分類簇self.clu_dist = np.zeros(len(self.data), dtype=np.float64) # 樣本與簇心的距離
- 參數:
- 聚類數目
k
- 數據集
data
- 初始化模式
mode
- 最大迭代次數
max_iters
- 閔可夫斯基距離的階數
p
以及隨機種子seed
。
- 聚類數目
- 初始化類的上述屬性,此外
self.centers
被初始化為None
,表示簇心尚未計算self.clu_idx
和self.clu_dist
被初始化為全零數組,表示每個樣本的分類簇和與簇心的距離。
b. 閔可夫斯基距離
def minkowski_distance(self, x, y=0):return np.linalg.norm(x - y, ord=self.p)
- 使用了NumPy的
linalg.norm
函數,其中ord
參數用于指定距離的階數。
c. 初始化簇心
def center_init(self):random.seed(self.seed)if self.mode == "random":ids = random.sample(range(len(self.data)), k=self.k) # 隨機抽取k個樣本下標self.centers = self.data[ids] # 選取k個樣本作為簇中心else:ids = [random.randint(0, self.data.shape[0])]for _ in range(1, self.k):max_idx = 0max_dis = 0for i, x in enumerate(self.data):if i in ids:continuedis = 0for y in self.data[ids]:dis += self.minkowski_distance(x - y)if max_dis < dis:max_dis = dismax_idx = iids.append(max_idx)self.centers = self.data[ids]
- 根據指定的初始化模式,選擇隨機樣本或使用 “far” 模式。
- 在 “random” 模式下,通過隨機抽樣選擇
k
個樣本作為簇心; - 在 “far” 模式下,通過計算每個樣本到已選簇心的距離之和,選擇距離總和最大的樣本作為下一個簇心。
- 在 “random” 模式下,通過隨機抽樣選擇
d. K-means聚類
def fit(self):self.center_init() # 簇心初始化for _ in range(self.max_iter):flag = False # 判斷是否有樣本被重新分類# 遍歷每個樣本for i, x in enumerate(self.data):min_idx = -1 # 最近簇心下標min_dist = np.inf # 最小距離for j, y in enumerate(self.centers): # 遍歷每個簇,計算與該樣本的距離# 計算樣本i到簇j的距離distdist = self.minkowski_distance(x, y)if min_dist > dist:min_dist = distmin_idx = jif self.clu_idx[i] != min_idx:# 有樣本改變分類簇,需要繼續迭代更新簇心flag = True# 記錄樣本i與簇的最小距離min_dist,及對應簇的下標min_idxself.clu_idx[i] = min_idxself.clu_dist[i] = min_dist# 樣本的簇劃分好之后,用樣本均值更新簇心for i in range(self.k):x = self.data[self.clu_idx == i]# 用樣本均值更新簇心self.centers[i] = np.mean(x, axis=0)if not flag:break
- 在每次迭代中
- 遍歷每個樣本,計算其到各個簇心的距離,將樣本分配到距離最近的簇中。
- 更新每個簇的均值(簇心)為該簇內所有樣本的平均值。
- 上述過程迭代進行,直到滿足停止條件(樣本不再重新分配到不同的簇)或達到最大迭代次數。
e. 聚類結果可視化
def visualization(self, k=3):current_palette = sns.color_palette()sns.set_theme(context="talk", palette=current_palette)for i in range(self.k):x = self.data[self.clu_idx == i]sns.scatterplot(x=x[:, 0], y=x[:, 1], alpha=0.8)sns.scatterplot(x=self.centers[:, 0], y=self.centers[:, 1], marker="+", s=500)plt.title("k=" + str(k))plt.show()
2. 輔助函數
def order_type(v: str):if v.lower() in ("-inf", "inf"):return -np.inf if v.startswith("-") else np.infelse:try:return float(v)except ValueError:raise argparse.ArgumentTypeError("Unsupported value encountered")def mode_type(v: str):if v.lower() in ("random", "far"):return v.lower()else:raise argparse.ArgumentTypeError("Unsupported value encountered")
order_type
函數:用于處理命令行參數中的-p
(距離測量參數),將字符串轉換為浮點數。mode_type
函數:用于處理命令行參數中的--mode
(初始化模式參數),將字符串轉換為合法的初始化模式。
3. 主函數
a. 命令行界面 (CLI)
- 使用
argparse
解析命令行參數
parser = argparse.ArgumentParser(description="Kmeans Demo")parser.add_argument("-k", type=int, default=3, help="The number of clusters")parser.add_argument("--mode", type=mode_type, default="random", help="Initial centroid selection")parser.add_argument("-m", "--max-iters", type=int, default=40, help="Maximum iterations")parser.add_argument("-p", type=order_type, default=2., help="Distance measurement")parser.add_argument("--seed", type=int, default=0, help="Random seed")parser.add_argument("--dataset", type=str, default="./kmeans.2.txt", help="Path to dataset")args = parser.parse_args()
b. 數據加載
- 從指定路徑加載數據集。
dataset = np.loadtxt(args.dataset)
c. 模型訓練及可視化
model = Kmeans(k=args.k, data=dataset, mode=args.mode, max_iters=args.max_iters, p=args.p,seed=args.seed)model.fit()# 聚類結果可視化model.visualization(k=args.k)
4. 運行腳本的命令
- 通過命令行傳遞參數來運行腳本,指定聚類數目、初始化模式、最大迭代次數等。
python kmeans.py -k 3 --mode random -m 40 -p 2 --seed 0 --dataset ./kmeans.2.txt
5. 代碼整合
import numpy as np
import random
import seaborn as sns
import matplotlib.pyplot as plt
import argparseclass Kmeans(object):def __init__(self, k, data: np.ndarray, mode="random", max_iters=0, p=2, seed=0):self.k = kself.data = dataself.mode = modeself.max_iter = max_iters if max_iters > 0 else int(1e8)self.p = pself.seed = seedself.centers = Noneself.clu_idx = np.zeros(len(self.data), dtype=np.int32) # 樣本的分類簇self.clu_dist = np.zeros(len(self.data), dtype=np.float64) # 樣本與簇心的距離def minkowski_distance(self, x, y=0):return np.linalg.norm(x - y, ord=self.p)# 簇心初始化def center_init(self):random.seed(self.seed)if self.mode == "random":ids = random.sample(range(len(self.data)), k=self.k) # 隨機抽取k個樣本下標self.centers = self.data[ids] # 選取k個樣本作為簇中心else:ids = [random.randint(0, self.data.shape[0])]for _ in range(1, self.k):max_idx = 0max_dis = 0for i, x in enumerate(self.data):if i in ids:continuedis = 0for y in self.data[ids]:dis += self.minkowski_distance(x - y)if max_dis < dis:max_dis = dismax_idx = iids.append(max_idx)self.centers = self.data[ids]def fit(self):self.center_init() # 簇心初始化for _ in range(self.max_iter):flag = False # 判斷是否有樣本被重新分類# 遍歷每個樣本for i, x in enumerate(self.data):min_idx = -1 # 最近簇心下標min_dist = np.inf # 最小距離for j, y in enumerate(self.centers): # 遍歷每個簇,計算與該樣本的距離# 計算樣本i到簇j的距離distdist = self.minkowski_distance(x, y)if min_dist > dist:min_dist = distmin_idx = jif self.clu_idx[i] != min_idx:# 有樣本改變分類簇,需要繼續迭代更新簇心flag = True# 記錄樣本i與簇的最小距離min_dist,及對應簇的下標min_idxself.clu_idx[i] = min_idxself.clu_dist[i] = min_dist# 樣本的簇劃分好之后,用樣本均值更新簇心for i in range(self.k):x = self.data[self.clu_idx == i]# 用樣本均值更新簇心self.centers[i] = np.mean(x, axis=0)if not flag:breakdef visualization(self, k=3):current_palette = sns.color_palette()sns.set_theme(context="talk", palette=current_palette)for i in range(self.k):x = self.data[self.clu_idx == i]sns.scatterplot(x=x[:, 0], y=x[:, 1], alpha=0.8)sns.scatterplot(x=self.centers[:, 0], y=self.centers[:, 1], marker="+", s=500)plt.title("k=" + str(k))plt.show()def order_type(v: str):if v.lower() in ("-inf", "inf"):return -np.inf if v.startswith("-") else np.infelse:try:return float(v)except ValueError:raise argparse.ArgumentTypeError("Unsupported value encountered")def mode_type(v: str):if v.lower() in ("random", "far"):return v.lower()else:raise argparse.ArgumentTypeError("Unsupported value encountered")if __name__ == '__main__':parser = argparse.ArgumentParser(description="Kmeans Demo")parser.add_argument("-k", type=int, default=3, help="The number of clusters")parser.add_argument("--mode", type=mode_type, default="random", help="Initial centroid selection")parser.add_argument("-m", "--max-iters", type=int, default=40, help="Maximum iterations")parser.add_argument("-p", type=order_type, default=2., help="Distance measurement")parser.add_argument("--seed", type=int, default=0, help="Random seed")parser.add_argument("--dataset", type=str, default="./kmeans.2.txt", help="Path to dataset")args = parser.parse_args()dataset = np.loadtxt(args.dataset)model = Kmeans(k=args.k, data=dataset, mode=args.mode, max_iters=args.max_iters, p=args.p,seed=args.seed) # args.seed)model.fit()# 聚類結果可視化model.visualization(k=args.k)