并行計算的K-Means聚類算法實現
一,實驗介紹
聚類是擁有相同屬性的對象或記錄的集合,屬于無監督學習,K-Means聚類算法是其中較為簡單的聚類算法之一,具有易理解,運算深度塊的特點.
1.1 實驗內容
通過本次課程我們將使用C++語言實現一個完整的面向對象的可并行K-Means算法.這里我們一起圍繞著算法需求實現各種類,最終打造出一個健壯的程序.所以為了更好地完成這個實驗,需要你有C++語言基礎,會安裝一些常用庫,喜歡或愿意學習面向對象的編程思維.
1.2 實驗知識點
C++語言語法
K-Means算法思路與實現
并行計算思路與實現
boost庫的常用技巧(Smart Pointers,Variant,tokenizer)
1.3 實驗環境
Xfce 終端(Xfce Terminal):
Linux 命令行終端,打開后會進入 Bash 環境,可以用來執行 Linux 命令和調用系統調用.
GVim:非常好用的編輯器,不會使用的可以參考課程 《Vim編輯器》.
boost,MPICH2庫
1.4 適合人群
本課程適合有C++語言基礎,對聚類算法感興趣并希望在動手能力上得到提升的同學.
1.5 代碼獲取
1.6 效果圖
完成時間顯示:
單進程
completed in 31.9997 seconds
number of processes: 1
4進程
completed in 9.87732 seconds
number of processes: 4
輸出結果文件
圖1 輸出文件圖
1.7 項目結構與框架
項目的整個文件目錄:
├── clusters
│ ├── distance.hpp
│ └── record.hpp
├── datasets
│ ├── attrinfo.hpp
│ ├── dataset.hpp
│ └── dcattrinfo.hpp
├── mainalgorithm
│ ├── kmean.hpp
│ └── kmeanmain.cpp
└── utilities
├── datasetreader.hpp
├── exceptions.hpp
├── null.hpp
└── types.hpp
這里簡單介紹一下功能模塊,在具體實踐每一個類的時候會有詳細UML圖或流程圖.
主要分為4個模塊:數據集類,聚集類,實用工具類,算法類.
實用工具類:定義各種需要的數據類型;常用的異常處理;文件讀取.
數據集類:將文件中的數據通過智能指針建立一個統一數據類,擁有豐富的屬性和操作.
聚集類:在數據類基礎上實現中心簇.
算法類:完成對聚集類的初始化,通過算法進行更新迭代,最終實現數據集的聚類并輸出聚類結果.
二, 實驗原理
這一章我們將配置好我們的實驗環境并介紹一些基礎知識.
2.1 依賴庫安裝
安裝boost和mpich2
mpich2下載:
wget -c http://www.mpich.org/static/downloads/3.2.1/mpich-3.2.1.tar.gz
解壓:
tar xvfz mpich-3.2.1.tar.gz
配置:
cd mpich-3.2.1
./configure
編譯:
make
安裝:
make install
boost下載:
wget -c https://dl.bintray.com/boostorg/release/1.68.0/source/boost_1_68_0.tar.gz
解壓
tar xvfz boost_1_68_0.tar.gz
cd boost_1_68_0
編譯:
sh bootstrap.sh
修改project-config.jam 文件
第24行添加一句:using mpi ;(注意中間的空格)
安裝:
sudo ./bjam --with-program_options --with-mpi install
mpi支持的進程數和計算機的配置有關,通過cat /proc/cpuinfo |grep "processor"|wc -l命令,可以查看得知實驗樓支持4個進程.
檢驗boost是否安裝成功,可以檢測一下,這里我們測試開啟3個進程:
運行源碼,test/mpitest.cpp
mpic++ -o mpitest mpitest.cpp -L/usr/local/lib -lboost_mpi -lboost_serialization
mpirun -n 3 ./mpitest
若結果如下,有三個Process則證明安裝成功!
Process 1: a msg from master
Process 2: a msg from master
Process 2:
Process 1:
Process 0: zero one two
Process 0: zero one two
Process 1: zero one two
Process 2: zero one two
2.2 boost的小技巧
Smart Pointers
在Boost中,智能指針是存儲指向動態分配對象的指針的對象.智能指針非常有用,因為它們確保正確銷毀動態分配的對象,即使在異常情況下也是如此.事實上,智能指針被視為擁有指向的對象,因此負責在不再需要時刪除對象.Boost智能指針庫提供了六個智能指針類模板.表給出了這些類模板的描述.本實驗中將大量使用智能指針.
類
描述
scoped_ptr
單個對象的簡單唯一所有權,不可復制.
scoped_array
數組的簡單唯一所有權.不可復制
shared_ptr
對象所有權在多個指針之間共享
shared_array
多個指針共享的數組所有權
weak_ptr
shared_ptr擁有的對象的非擁有觀察者
intrusive_ptr
具有嵌入引用計數的對象的共享所有權.
表1 智能指針類型簡介
Variant versus Any
Boost Variant類模板是一個安全通用的聯合容器,和std::vector不同儲存單個類型的多個值,variant可以儲存多個類型的單個值,本實驗中將使用variant儲存雙精度和整數類型來表示不同類型的數據.
與variant一樣,Boost any是另一個異構容器.雖然Boost anys有許多與Boost variant相同的功能.
根據Boost庫文檔,Boost variant比Boost any具有以下優勢:
1,variant保證其內容的類型是用戶指定的有限類型集之一.
2,variant提供對其內容的編譯時檢查訪問.
3,variant通過提供有效的,基于堆棧的存儲方案,可以避免動態分配的開銷.
同樣Boost any也有一些優勢:
1,any幾乎允許任何類型的內容.
2,很少使用模板元編程技術.
3,any 對交換操作提供安全的不拋出異常保證.
Tokenizer
Tokenizer提供了一種靈活而簡單的方法通過分割符(如:" , ")將一個完整的string分隔開.
字符串為:”A flexible,easy tokenizer“
如果通過","分割,則結果為:
[A flexible] [ easy tokenizer>]
以" " 為分隔符:
分割結果為:
[A] [flexible,] [easy] [tokenizer]
三,實驗步驟
接下來將具體實踐各個類,會給出每一個類的聲明并解釋其成員函數和數據成員以及相關聯類之間的繼承關系和邏輯關系.涉及到重要的成員函數的實現會給出其定義代碼,一些普通的成員函數的源碼可以到下載的源文件中查看,里面也會有詳細的注解.
3.1 數據集的構建
數據對于一個聚類算法來說非常重要,在這里我們將一個數據集描述為一個記錄(record),一個記錄由一些屬性(Attribute)表征.因此自然而然將依次建立attributes,records,最后是數據集datasets.
在此之前我們需要了解一下我們在聚類中實際接觸到的數據類型.
這里有一個示例,心臟數據集.
//heart.data
70.0,1.0,4.0,130.0,322.0,0.0,2.0,109.0,0.0,2.4,2.0,3.0,3.0,2
67.0,0.0,3.0,115.0,564.0,0.0,2.0,160.0,0.0,1.6,2.0,0.0,7.0,1
57.0,1.0,2.0,124.0,261.0,0.0,0.0,141.0,0.0,0.3,1.0,0.0,7.0,2
64.0,1.0,4.0,128.0,263.0,0.0,0.0,105.0,1.0,0.2,2.0,1.0,7.0,1
74.0,0.0,2.0,120.0,269.0,0.0,2.0,121.0,1.0,0.2,1.0,1.0,3.0,1
65.0,1.0,4.0,120.0,177.0,0.0,0.0,140.0,0.0,0.4,1.0,0.0,7.0,1
......
包含13個屬性,age,sex,chest pain type(4 values),resting blood pressure......
為了更好地表述不同數據相同屬性的差異,我們需要對這些數據進行離散/連續處理,即對于有些數據我們認為它是連續的如:age,有些是離散的如:年齡.這樣我們建立一個描述數據類型的文件:
//heart.names
schema file for heart.dat
///: schema
1, Continuous
2, Discrete
3, Discrete
4, Continuous
5, Continuous
6, Discrete
7, Discrete
8, Continuous
9, Discrete
10, Continuous
11, Discrete
12, Continuous
13, Discrete
14, Class
3.1.1 AttrValue類
AttrValue類有一個私有變量,有兩個友元函數,一個公有成員函數.
_value是一個variant類型變量,它可以存儲一個雙精度或無符號整形的數據,分類數據用無符號整形數據表示.
AttrValue類自身無法存儲或獲取數據.它的兩個友元函數可以獲取和修改數據_value.
圖2 數據類UML關系圖
//source:datasets.attrinfo.hpp
class AttrValue
{
public:
friend class DAttrInfo;//友元函數可以訪問_value
friend class CAttrInfo;//友元函數可以訪問_value
typedef boost::variant value_type;//可存儲雙精度和無符號整形數據
AttrValue();
private:
value_type _value;
};
inline AttrValue::AttrValue(): _value(Null()) {
}//構造函數,將_value初始化為Null(定義在utillities/null.hpp中)
3.1.2 AttrInfo類
AttrInfo是一個基類,包括了許多虛函數和純虛函數.這些函數都將在它的派生類中具體實現,基類中僅進行聲明和簡單定義.
//source:datasets.attrinfo.hpp
//三種數據類型:未知型,連續型(雙精度),離散型(無符號整形)
enum AttrType
{
Unknow,
Continuous,
Discrete
};
class DAttrInfo;
class CAttrInfo;
class AttrInfo
{
public:
AttrInfo(const std::string &name,AttrType type);//每一欄的屬性名(id,attr,label,...)和該屬性的數據類型(離散或連續)
virtual ~AttrInfo(){}//虛析構函數
std::string &name();//返回標簽
AttrType type() const;//返回數據類型
virtual Real distance(const AttrValue&,const AttrValue&) const = 0;
virtual void set_d_val(AttrValue&, Size) const;//AttrValue賦值;適用于DAttrInfo
virtual Size get_d_val(const AttrValue&) const;//獲取_value
virtual void set_c_val(AttrValue&, Real) const;//AttrValue賦值;適用于CAttrInfo
virtual Real get_c_val(const AttrValue&) const;//獲取_value
virtual bool can_cast_to_d() const;//布爾值,對于DAttrInfo類來說其返回值為true,相反為false.在基類的聲明中全部初始化為false.
virtual bool can_cast_to_c() const;
virtual DAttrInfo& cast_to_d();//返回DAttrInfo本身
virtual bool is_unknown(const AttrValue&) const = 0;
virtual void set_unknown(AttrValue&) const = 0;
private:
std::string _name;
AttrType _type;
};
3.1.3 CAttrInfo類和DAttrInfo類
CAttrInfo主要是用來表示連續型數據的一些屬性和方法.有兩個數據成員:_min和_max.表示最小值和最大值屬性,在初始化時都將設置為Null .這兩個屬性將在歸一化的時候用到.CAttrInfo將會繼承AttrInfo的一些函數,并且重新定義.
//source:datasets/dcattrinfo.hpp
class CAttrInfo: public AttrInfo
{
public:
CAttrInfo(const std::string& name);//構造函數
Real distance(const AttrValue&,const AttrValue&)const;//兩個距離
void set_c_val(AttrValue &, Real) const;
void set_min(Real);//設置最小值
void set_max(Real);//設置最大值
Real get_min() const;//獲取最小值
Real get_max() const;//獲取最大值
Real get_c_val(const AttrValue&) const;
bool is_unknown(const AttrValue&) const;
bool can_cast_to_c() const;
void set_unknown(AttrValue&) const;
protected:
Real _min;
Real _max;
};
CAttrInfo::CAttrInfo(const std::string& name)
: AttrInfo(name, Continuous) {
_min = Null();
_max = Null();
}
DAttrInfo類有一個私有變量_values,它是一個string類型的vector,用來存儲一些離散的字符串.在DAttrInfo對象中所有的離散值都將由字符串轉化為唯一的無符號整形.
//source:datasets/dcattrinfo.hpp
class DAttrInfo: public AttrInfo //繼承AttrInfo
{
public:
DAttrInfo(const std::string& name);//構造函數,傳入屬性字符串
const std::string& int_to_str(Size i) const;
Size num_values() const;//獲取長度
Size get_d_val(const AttrValue&) const; //接口定義
void set_d_val(AttrValue& , Size)const;//接口定義
Size add_value(const std::string&,
bool bAllowDuplicate = true);//將一組離散值加入到_values中,比如“X,X,Y,Z",
//則values=[X,Y,Z],對應的二進制數字為[0,0,1,2]
//對于屬性值,則可以重復,但對于id則具有唯一性,不能重復
DAttrInfo& cast_to_d();
Real distance(const AttrValue&, const AttrValue&) const; //比較兩個離散型變量的距離
bool is_unknown(const AttrValue& av) const;//值有缺省
bool can_cast_to_d() const;
void set_unknown(AttrValue&) const;
protected:
std::vector<:string> _values;
};
add_value 是一個將字符串轉化為無符號整形數據的重要函數,返回值為該字符所表示的整形,并將為出現的字符添加進_values.
Record
Attribute
AttrValue
1
"A"
0
2
"B"
1
3
"A"
0
4
"C"
2
5
"B"
1
Record
Attribute
0
"A"
1
"B"
2
"C"
表2 DAttrInfo的一個具體實例
通過上面表格中我們可以看到一組字符類型的數據被存儲為該字符串所在的inex,如果該字符串第一次出現則為上一個字符串的index+1.這樣相同的字符串都被轉化為唯一的無符號整形._value這個輔助變量可以幫助實現這一功能.
//source:datasets/dcattrinfo.hpp
Size DAttrInfo::add_value(const std::string& s,
bool bAllowDuplicate) {
Size ind = Null();
//如果該字符串已經出現,則返回該字符串在_values中的index
for(Size i=0;i<_values.size>
if(_values[i] == s) {
ind = i;
break;
}
}
//如果未出現,則返回_values的大小-1.
//同時對于不允許重復字符串的數據,如ID,當出現重復字符串時則會錯誤提示.
if(ind == Null()) {
_values.push_back(s);
return _values.size()-1;
} else {
if(bAllowDuplicate) {
return ind;
} else {
FAIL("value "<
return Null();
}
}
}
這里需要看一下distance這個函數的定義,它返回的是一個雙精度類型數值.如果傳入的兩個數據類型為Unknow則返回為0.0,其中一個為Unknow則為1,對于兩個雙精度類型的數據返回其差值.
//source:datasets/dcattrinfo.hpp
Real CAttrInfo::distance(const AttrValue& av1,const AttrValue& av2) const {
if(is_unknown(av1) && is_unknown(av2)){
return 0.0;
}
if(is_unknown(av1) ^ is_unknown(av2)){
return 1.0;
}
return boost::get(av1._value) -
boost::get(av2._value);
}
對于離散型數據,兩個離散數據之間的距離定義也會不同,這里主要是考慮到離散型數據都轉化為相差為1的整形,所以只要兩個DAttrInfo的值不同則距離就為1.0,所以在含有離散型和連續型數據的混合數據中連續型數據要進行歸一化處理以滿足量綱統一.
//source:datasets/dcattrinfo.hpp
Real DAttrInfo::distance(const AttrValue& av1,
const AttrValue& av2) const {
if(is_unknown(av1) && is_unknown(av2)) {
return 0.0; //如果兩個值都有缺省,則距離為0
}
if(is_unknown(av1) ^ is_unknown(av2)) {
return 1.0;//如果有一個值缺省,距離為1
}
if(boost::get(av1._value) ==
boost::get(av2._value) ) {
return 0.0;//如果兩個值相等,則無差距
} else {
return 1.0;//否則為最大距離1
}
}
3.1.4 Container類
Container類是一個基類模板,有一個vector的數據成員_data.add函數可以將T類型的數據添加進入_data,同樣erase可以刪除數據.[]是一個操作符重載,返回索引i對應的數據.
//source:clusters/record.hpp
template
class Container//基類模板
{
public:
typedef typename std::vector::iterator iterator;
iterator begin();
iterator end();
void erase(const T& val);
void add(const T&val);//將val添加到向量中
Size size() const; //返回_data的長度
T& operator[](Size i);//下標索引,建立Schema與data的關系
protected:
~Container(){}
std::vector_data;
};
Record和Schema是繼承Container類的兩個重要的類,他們之間的關系如下:
圖3 Container關系圖
3.1.5 Schema類
Schema有兩個保護數據成員_labelInfo,_idInfo.和一個繼承父類的成員_data,_data是一個元素為AttrInfo的vector,表示每一個數據的屬性(離散/連續)._labelInfo是一個指向DattrInfo的共享指針,其包含了輸入數據的分類情況.
Schema的目的是為一個Record對象設置label和id.set_id和set_label函數是為了實現此功能,但是他們又依賴與Record所以我們在Record類中具體定義.
//source:clusters/record.hpp
class Record;
class Schema:public Container<:shared_ptr> >
{
public:
const boost::shared_ptr& labelInfo() const;//標簽信息,整形
const boost::shared_ptr& idInfo() const;//id信息,整形
boost::shared_ptr& idInfo();//可以修改成員變量,_labelInfo
boost::shared_ptr& labelInfo();//可以修改成員變量,_idInfo
void set_label(const boost::shared_ptr& r,const std::string& val);
//設置記錄的label
void set_id(boost::shared_ptr& r,const std::string& val);
//設置記錄的id
protected:
boost::shared_ptr _labelInfo;
boost::shared_ptr _idInfo;
};
3.1.6 Record類
Record繼承帶參數AttrValue的模板類Container,有四個私有數據成員_label,_data,id和_schema._data繼承自父類.每一個Record類都有一個指向Schema類的共享指針,可以將類型為AttrValue的數據儲存在_data中,同樣每一個record都有一個label和id.Record的構造函數需要傳入一個指向Schema的共享指針,并將_data的長度設置為與_schema一樣,將_data里的值設置為默認值.我們就可以通過Schema來操控Record,因為Schema的_data類型為AttrInfo有很多函數如add,set_c_val,add_value等函數可以對離散/類型數據進行操作.所以Record和Schema的關系為通過Schema定義了每一條數據的規范(label,id,每一條屬性的類型),然后按照這個規范將數據填充到record中,因為record直接接觸的類型是AttrValue.
//source:clusters/record.hpp
class Record:public Container
{
public:
Record(const boost::shared_ptr& schema);//構造函數
const boost::shared_ptr& schema() const;
const AttrValue& labelValue() const;
const AttrValue& idValue() const;
AttrValue& labelValue();
AttrValue& idValue();
Size get_id() const;
Size get_label() const;
private:
boost::shared_ptr _schema;//通過_schema創建記錄
AttrValue _label;
AttrValue _id;
};
3.1.7 dataset類
上面已經實現了一條數據的儲存就是一個Record,我們最終需要n條數據.這里新定義一個類Dataset.很明顯按照上面的思路,Record依賴Schema,則Dataset依賴Record.
所以Dataset類繼承類型為Record的Container.因為最后我們使用的的Dataset類,我們一些我們需要用到的屬性可以在這里直接給出.num_attr(),返回屬性的個數,is_numeric()判斷該列屬性值是否是連續行(對于Kmeans算法這里需要連續型數據),為了更加方便第獲取每一個數據,使用操作符重載.
//source:datasets/dataset.hpp
inline const AttrValue& Dataset::operator()(Size i, Size j) const {
return (*_data[i])[j];
}
//source:datasets/dataset.hpp
class Dataset:public Container<:shared_ptr> >
{
public:
Dataset(const boost::shared_ptr&);//構造函數,傳入含有屬性值的schema
Size num_attr() const;//返回屬性個數
const boost::shared_ptr &schema() const;//返回_schrma
const AttrValue& operator()(Size i, Size j) const;//返回第i條第j個屬性的值
std::vector get_CM() const;
bool is_numeric() const;
bool is_categorical() const;
protected:
boost::shared_ptr _schema;
};
3.2 創建一個數據實例
前面關于如何構建dataset相關類已經花了很多時間,下面就讓我們實際操作如何創建一個具體的dataset.
假設我們有這樣的一組數據:
ID
Attr1
Attr2
Attr3
Label
r1
1.2
A
-0.5
1
r2
-2.1
B
1.5
2
r3
1.5
A
-0.1
1
表3 數據實例
那么我們如何將以上數據用我們的dataset類來表示呢?
//test/datasettest.cpp
#include"../clusters/record.hpp"
#include "../datasets/dataset.hpp"
#include
#include
#include
using namespace std;
int main()
{
boost::shared_ptr schema(new Schema);
boost::shared_ptr labelInfo(new DAttrInfo("Label"));
boost::shared_ptridInfo(new DAttrInfo("id"));
schema->labelInfo() = labelInfo;
schema->idInfo() = idInfo;
stringstream ss;
boost::shared_ptr ai;
for(Size j=0;j<3;++j)
{
ss.str("");
ss<
if(j==0||j==2)
{
ai = boost::shared_ptr(new CAttrInfo(ss.str()));
}
else{
ai = boost::shared_ptr(new DAttrInfo(ss.str()));
}
schema->add(ai);
}
boost::shared_ptr ds(new Dataset(schema));
Size val;
boost::shared_ptr r;
r = boost::shared_ptr(new Record(schema));
schema->set_id(r,"r1");
schema->set_label(r,"1");
(*schema)[0]->set_c_val((*r)[0],1.2);
val = (*schema)[1]->cast_to_d().add_value("A");
(*schema)[1]->set_d_val((*r)[1],val);
(*schema)[2]->set_c_val((*r)[2],-0.5);
ds->add(r);
r = boost::shared_ptr(new Record(schema));
schema->set_id(r, "r2");
schema->set_label(r, "2");
(*schema)[0]->set_c_val((*r)[0], -2.1);
val = (*schema)[1]->cast_to_d().add_value("B");
(*schema)[1]->set_d_val((*r)[1], val);
(*schema)[2]->set_c_val((*r)[2], 1.5);
ds->add(r);
r = boost::shared_ptr(new Record(schema));
schema->set_id(r, "r3");
schema->set_label(r, "1");
(*schema)[0]->set_c_val((*r)[0], 1.5);
val = (*schema)[1]->cast_to_d().add_value("A");
(*schema)[1]->set_d_val((*r)[1], val);
(*schema)[2]->set_c_val((*r)[2], -0.1);
ds->add(r);
cout<
cout<
for(Size j=0; jnum_attr(); ++j) {
stringstream ss;
ss<
cout<
}
cout<
for(Size i=0; isize(); ++i) {
cout<get_id();
for(Size j=0; jnum_attr(); ++j) {
if((*schema)[j]->can_cast_to_c()) {
cout<
<get_c_val((*ds)(i,j));
} else {
cout<
<get_d_val((*ds)(i,j));
}
}
cout<get_label()<
}
return 0;
}
輸出結果與我們預想的一樣:
Data:
RecordID Attr(1) Attr(2) Attr(3) Label
0 1.2 0 -0.5 0
1 -2.1 1 1.5 1
2 1.5 0 -0.1 0
3.3 構建簇
構建簇的目的就是為了將dataset中的record進行重新組合,所以我們定義一個基類Cluster直接接觸Record,
有一個數據成員id.
class Cluster:public Container<:shared_ptr> >
{
public:
virtual ~Cluster() {}
void set_id(Size id);
Size get_id() const;
protected:
Size _id;
};
inline void Cluster::set_id(Size id) {
_id = id;
}
inline Size Cluster::get_id() const {
return _id;
}
定義一個中心簇,來表示一個簇的中心.中心簇只有一個數據成員_center即表示中心簇的指向Record的共享指針.
//clusters/record.hpp
class CenterCluster : public Cluster
{
public:
CenterCluster(){}
CenterCluster(const boost::shared_ptr& center);//構造函數傳入一個record
const boost::shared_ptr& center() const;//返回中心點的record,不可更改
protected:
boost::shared_ptr_center; //成員變量,中心點的record
};
CenterCluster::CenterCluster(const boost::shared_ptr& center):_center(center){}
const boost::shared_ptr& CenterCluster::center()
const {
return _center;
}
為了實現更多豐富的功能,我們需要再定義一個類PClustering.
圖4 PClustering 關系圖
PClustering繼承Container,通過add函數添加了中心簇Center.Center也擁有add函數,它添加屬于和他同一簇的record,每一個record都有自己的id信息.這樣我們就能通過PClustering儲存了聚類信息.PClustering的一個數據成員為_CM,是用來儲存每一條record的所屬聚類.如:[1,1,1,2,2,2],同一簇擁有相同的數值.calculate函數是用來從_data中提取相關聚類信息,然后更新_CM.
//clusters/record.hpp
class PClustering:public Container<:shared_ptr> >
{
public:
PClustering();//構造函數
friend std::ostream& operator<
PClustering& pc);//操作符重載,輸出聚類結構相關信息
void removeEmptyClusters();//移除空的record
void createClusterID();//創建聚類id
void save(const std::string& filename);//保存聚類結果相關信息至文件
private:
void print(std::ostream &os);//打印聚類結果相關信息
void calculate();//更新_CM和_CMGiven
void crosstab();//將一些聚類結果儲存為交叉表
bool _bCalculated;/如果數據文件沒有標簽信息,則不需要計算_numclustGiven
Size _numclust;//聚類數
Size _numclustGiven;//文件提供的label數
std::vector _clustsize;//記錄每一簇的數據量
std::vector<:string> _clustLabel;//記錄原文件中的每個分類的數量
std::vector _CM;//每一條記錄數據的所屬index
std::vector _CMGiven;//原文件每一條記錄所屬標簽
iiiMapB _crosstab;//交叉表儲存數據
};
這里我們介紹一個模板鍵-值映射類nnmap(utilities/nnmap.hpp),在這里我們用來儲存聚類和原標簽的數量信息.
如有6條數據,計算的_CM為[1,1,2,2,2,3],所給標簽為[0,0,1,1,2,2].
我們需要通過下面_crosstab.填充下面的表格
Cluster ID 1 2 3
0 # # #
1 # # #
2 # # #
_crosstab(1,0)表示聚類為1,標簽為0的數量.通過下面的函數,可以為2.同理_crosstab(2,0)=0,_crosstab(3,0) = 0.最終可以打印交叉表:
Cluster ID 1 2 3
0 2 0 0
1 0 2 0
2 0 1 1
如果以標簽信息為準的化,則(2,2)那個信息有誤,每一行只能有一個數據占據,且不能與之前有相同的列.
//clusters/record.hpp
void PClustering::crosstab() {
Size c1, c2;
for(Size i=0; i<_cm.size>
c1 = _CM[i];
c2 = _CMGiven[i];
if (_crosstab.contain_key(c1,c2)) {
_crosstab(c1,c2) += 1;
} else {
_crosstab.add_item(c1,c2,1);
}
}
}
3.4 K-Means算法
3.4.1 算法思路
圖5 K-Means算法流程圖
在代碼實現中我們還會有一個多次運行循環的結構,這樣的目的是通過多次的隨機初始化確保能夠消除一些極端的情況,最后取這些循環中最好的結果輸出.
3.4.2 并行化思路
我們使用一種序列 - 均值算法的思路.即計算所有記錄n和所有中心之間的距離.p個進程,讓每一個參與計算的進程處理 n/p條數據.主要步驟如下:
(a)主進程:讀取數據文件,并將數據塊發送至每一個進程.
(b)主進程:初始化簇中心,并將這些簇中心發送至每一個程.
(c)所有進程:計算所給數據塊與簇中心的距離,并將這些數據塊歸屬到與它距離最近的中心.
(d)所有進程:更新新的簇中心.
(e)所有進程:重復(c)和(4)直至滿足停止條件.
(f)主進程:收集聚類結果.
3.4.3 MPIKmean類
將所有的中心簇的數據編碼成一個向量_clusters,這樣可以很方便第從一個進程發送至其他進程.同樣_data表示所有的數據的值.
//source:mainalgorithm/mpikmean.hpp
class MPIKmean
{
public:
Arguments& getArguments();//獲取初始參數
const Results& getResults() const;//獲取結果_CM
void reset() const;//清除結果
void clusterize();//執行計算(初始化,更新,迭代,...)
protected:
void setupArguments();//設置初始參數
void fetchResults() const;//獲取結果
virtual void initialization() const;//隨機初始中心簇
virtual void iteration() const;//迭代更新
virtual Real dist(Size i, Size j) const;//返回與中心簇的距離
mutable vector _centers;//中心簇的屬性值
mutable vector _data;//數據值
mutable Size _numObj;//分發給每一個進程的數據量
mutable Size _numAttr;//數據屬性量
mutable vector _CM;//數據的所屬簇index
mutable vector<:shared_ptr> >
_clusters;//中心簇
mutable Real _error;//簇之間的總距離
mutable Size _numiter;//迭代次數
mutable Results _results;//結果
boost::shared_ptr _ds;//dataset
Arguments _arguments;
Size _numclust;//聚類數目
Size _maxiter;//最大迭代數目
Size _seed;//種子
boost::mpi::communicator _world;//mpi通信
};
主進程負責初始化中心簇(4-33行),一旦中心簇被初始化,就會將中心簇(_centers)和每個進程的數據的數目(_numRecords)和屬性數(_numAttr)發送給所有進程(34-36行).
一旦這些數據被進程接收到,每個進程就會劃分自己的數據塊數量和剩余量(37-38行).首先主進程會將第一個數據塊分配給自己(40-49行),剩余的數據通過send函數發送給其他進程(51-63行).其他進程通過recv進行接收數據(67行).
void MPIKmean::initialization() const {
Size numRecords;
Size rank = _world.rank();
if (rank == 0) {
numRecords = _ds->size();
_numAttr = _ds->num_attr();
_centers.resize(_numclust * _numAttr);
vector index(numRecords,0);
for(Size i=0;i
index[i] = i;
}
boost::shared_ptr schema = _ds->schema();
boost::minstd_rand generator(_seed);
for(Size i=0;i<_numclust>
boost::uniform_int<> uni_dist(0,numRecords-i-1);
boost::variate_generator<:minstd_rand>
boost::uniform_int<> >
uni(generator,uni_dist);
Integer r = uni();
boost::shared_ptr cr = boost::shared_ptr
(new Record(*(*_ds)[r]));
boost::shared_ptr c =
boost::shared_ptr(
new CenterCluster(cr));
c->set_id(i);
_clusters.push_back(c);
for(Size j=0; j<_numattr>
_centers[i*_numAttr + j] =
(*schema)[j]->get_c_val((*_ds)(r,j));
}
index.erase(index.begin()+r);
}
}
boost::mpi::broadcast(_world, _centers, 0);
boost::mpi::broadcast(_world, numRecords, 0);
boost::mpi::broadcast(_world, _numAttr, 0);
Size nDiv = numRecords / _world.size();
Size nRem = numRecords % _world.size();
if(rank == 0) {
boost::shared_ptr schema = _ds->schema();
_numObj = (nRem >0) ? nDiv+1: nDiv;
_data.resize(_numObj * _numAttr);
_CM.resize(_numObj);
for(Size i=0; i<_numobj>
for(Size j=0; j<_numattr>
_data[i*_numAttr +j] =
(*schema)[j]->get_c_val((*_ds)(i, j));
}
}
Size nCount = _numObj;
for(Size p=1; p<_world.size>
Size s = (p< nRem) ? nDiv +1 : nDiv;
vector dv(s*_numAttr);
for(Size i=0; i
for(Size j=0; j<_numattr>
dv[i*_numAttr+j] =
(*schema)[j]->get_c_val(
(*_ds)(i+nCount,j));
}
}
nCount += s;
_world.send(p, 0, dv);
}
} else {
_numObj = (rank < nRem) ? nDiv+1: nDiv;
_CM.resize(_numObj);
_world.recv(0,0,_data);
}
}
進行初始化之后,就開始迭代中心簇.
首先定義一個單元素的vector來控制循環(2行).在while循環內,定義三個局部變量nChangedLocal,newCenters,newSize.每一個進程將會處理自己的數據塊與每一個中心簇的距離(11-30行).變量newCenters包含了一個聚類中所有數據的和.newSize包含了一個聚類中的數據的數量.一旦所有的數據通過并行處理完畢.all_reduce方法將會對所有的進程的數據進行收集,如
all_reduce(_world, nChangedLocal, nChanged,vplus());
對所有進程中的nChangedLocal進行相加(通過操作符vplus,具體見源文件定義),只有有一個進程的nChangedLocal>0(中心簇未收斂)則nChange都會>0,整個迭代都會繼續進行.(31-36行),在對這些數據進行收集之后會更新_center(37-41行).
收斂之后,所有進程會將聚類的index _CM發送給主進程.主進程會將自己的_CM添加進去就形成了整個數據集的_CM(47-58行).
void MPIKmean::iteration() const {
vector nChanged(1,1);//初始化nChanged,表示中心簇是否有變化.
_numiter = 1;//初始迭代次數
while(nChanged[0] > 0) {
nChanged[0] = 0;
Size s;
Real dMin,dDist;
vector nChangedLocal(1,0);
vector newCenters(_numclust*_numAttr,0.0);
vector newSize(_numclust,0);
for(Size i=0;i<_numobj>
dMin = MAX_REAL;
for(Size k=0;k<_numclust>
dDist = dist(i, k);
if (dMin > dDist) {
dMin = dDist;
s = k;
}
}
for(Size j=0; j<_numattr>
newCenters[s*_numAttr+j] +=
_data[i*_numAttr+j];
}
newSize[s] +=1;
if (_CM[i] != s){
_CM[i] = s;
nChangedLocal[0]++;
}
}
all_reduce(_world, nChangedLocal, nChanged,
vplus());
all_reduce(_world, newCenters, _centers,
vplus());
vector totalSize(_numclust,0);
all_reduce(_world, newSize, totalSize, vplus());
for(Size k=0; k<_numclust>
for(Size j=0; j<_numattr>
_centers[k*_numAttr+j] /= totalSize[k];
}
}
++_numiter;
if (_numiter > _maxiter){
break;
}
}
if(_world.rank() > 0) {
_world.send(0,0,_CM);
} else {
for(Size p=1; p<_world.size>
vector msg;
_world.recv(p,0,msg);
for(Size j=0; j
_CM.push_back(msg[j]);
}
}
}
}
其他幾個函數的定義就不再贅述,相信通過看源文件一定可以看懂.
3.4.5 主函數
從前面建立數據集,到構建簇類,編寫一些輔助類到算法的實現.最后我們將具體對一個文件數據進行聚類.
代碼如下:
//source:mainalgorithm/mpikmeanmain.cpp
#include
#include
#include
#include
#include
#include
#include
#include "mpikmean.hpp"
#include "../utilities/datasetreader.hpp"
using namespace std;
using namespace boost::program_options;
namespace mpi=boost::mpi;
int main(int ac, char* av[]){
try{
mpi::environment env(ac, av);
mpi::communicator world;
options_description desc("Allowed options");
desc.add_options()
("help", "produce help message")
("datafile", value(), "the data file")
("k", value()->default_value(3),
"number of clusters")
("seed", value()->default_value(1),
"seed used to choose random initial centers")
("maxiter", value()->default_value(100),
"maximum number of iterations")
("numrun", value()->default_value(1),
"number of runs");
variables_map vm;
store(parse_command_line(ac, av, desc), vm);
notify(vm);
if (vm.count("help") || ac==1) {
cout << desc << "\n";
return 1;
}
Size numclust = vm["k"].as();
Size maxiter = vm["maxiter"].as();
Size numrun = vm["numrun"].as();
Size seed = vm["seed"].as();
string datafile;
if (vm.count("datafile")) {
datafile = vm["datafile"].as();
} else {
cout << "Please provide a data file\n";
return 1;
}
boost::shared_ptr ds;
if (world.rank() ==0) {
DatasetReader reader(datafile);
reader.fill(ds);
}
boost::timer t;
t.restart();
Results Res;
Real avgiter = 0.0;
Real avgerror = 0.0;
Real dMin = MAX_REAL;
Real error;
for(Size i=1; i<=numrun; ++i) {
MPIKmean ca;
Arguments &Arg = ca.getArguments();
Arg.ds = ds;
Arg.insert("numclust", numclust);
Arg.insert("maxiter", maxiter);
Arg.insert("seed", seed);
if (numrun == 1) {
Arg.additional["seed"] = seed;
} else {
Arg.additional["seed"] = i;
}
ca.clusterize();
if(world.rank() == 0) {
const Results &tmp = ca.getResults();
avgiter +=
boost::any_cast(tmp.get("numiter"));
error = boost::any_cast(tmp.get("error"));
avgerror += error;
if (error < dMin) {
dMin = error;
Res = tmp;
}
}
}
double seconds = t.elapsed();
if(world.rank() == 0) {
avgiter /= numrun;
avgerror /= numrun;
std::cout<
<
std::cout<
<
PClustering pc =
boost::any_cast(Res.get("pc"));
std::cout<
std::cout<
std::cout<
<
std::cout<
std::cout<
std::string prefix;
size_t ind = datafile.find_last_of('.');
if(ind != std::string::npos ) {
prefix = datafile.substr(0,ind);
} else {
prefix = datafile;
}
std::stringstream ss;
ss<
pc.save(ss.str());
}
return 0;
} catch (std::exception& e) {
std::cout<
return 1;
} catch (...){
std::cout<
return 2;
}
}
編譯:
mpic++ -o mpikmean mpikmeanmain.cpp -L/usr/local/lib -lboost_program_options -lboost_mpi -lboost_serialization
運行時需要傳入一些參數:datafile(文件路徑),k(聚類數目),seed(隨機初始化種子),maxiter(最大迭代次數),numrun(運行次數).除了datafile之外其他參數在主函數中均有默認值.下面我們進行一個15000條數據的3分類,執行50次.
運行:
mpirun -n 4 ./mpikmean --datafile=../testdata/15000points.csv --k=3 --numrun=50
運行結果如第一章所示.
可以比較使用不同的進程數目的不同運行時間,使用多進程確實可以提高運行速度,但是因為I/O操作會占用一些時間,運行效率并沒有出現倍數的提升.
對于小數據集,I/O操作的開銷與數據計算開銷相差無幾,多進程沒有明顯優勢.對于大數據集,I/O操作開銷會小于數據計算的時間,這時候多進程會帶來效率上的提升.
四 ,實驗總結
到此,我們的K-Means算法的實驗就到此結束了.由于考慮到整個內容的繁雜度,有很多小的細節可能沒有拿出來細講,如果小伙伴對有些地方沒有弄懂,希望自己能夠繼續從源碼中尋找答案.雖然我們最后只實現了一個簡單的聚類算法,但前面介紹的關于構建聚類數據集卻具有一定的通用性,對于其他聚類算法也很適用,如果小伙伴愿意嘗試其他聚類算法,也可以按照此思路進行改寫.并行處理是一種技巧,如果使用恰當,能夠給計算效率帶來很大的提升,本例的并行處理思路同樣可以推廣到其他算法當中.
感謝你能夠看到最后,希望你有所收獲!