前言
上星期寫了Kaggle競賽的詳細介紹及入門指導,但對于真正想要玩這個競賽的伙伴,機器學習中的相關算法是必不可少的,即使是你不想獲得名次和獎牌。那么,從本周開始,我將介紹在Kaggle比賽中的最基本的也是運用最廣的機器學習算法,很多項目用這些基本的模型就能解決基礎問題了。
1 概述
隱馬爾可夫模型(Hidden Markov Model,HMM)是結構最簡單的動態貝葉斯網,這是一種著名的有向圖模型,主要用于時序數據建模(語音識別、自然語言處理等)。
假設有三個不同的骰子(6面、4面、8面),每次先從三個骰子里選一個,每個骰子選中的概率為1/3,如下圖所示,重復上述過程,得到一串數字[1 6 3 5 2 7]。這些可觀測變量組成可觀測狀態鏈。
同時,在隱馬爾可夫模型中還有一條由隱變量組成的隱含狀態鏈,在本例中即骰子的序列。比如得到這串數字骰子的序列可能為[D6 D8 D8 D6 D4 D8]。
隱馬爾可夫模型示意圖如下所示:
圖中,箭頭表示變量之間的依賴關系。在任意時刻,觀測變量(骰子點數)僅依賴于狀態變量(哪類骰子),“觀測獨立性假設”。
同時,t時刻的狀態qt僅依賴于t-1時刻的狀態qt-1。這就是馬爾可夫鏈,即系統的下一時刻的狀態僅由當前狀態決定不依賴以往的任何狀態(無記憶性),“齊次馬爾可夫性假設”。
2 隱馬爾可夫模型三要素
對于一個隱馬爾可夫模型,它的所有N個可能的狀態的集合Q={q1,q2,…,qN},所有M個可能的觀測集合V={v1,v2,…,vM}
隱馬爾可夫模型三要素:狀態轉移概率矩陣A,A=[aij]NxN,aij表示任意時刻t狀態qi條件下,下一時刻t+1狀態為qj的概率;
觀測概率矩陣B,B=[bij]NxM,bij表示任意時刻t狀態qi條件下,生成觀測值vj的概率;
初始狀態概率向量Π,Π=(π1,π2,…,πN),πi表示初始時刻t=1,狀態為qi的概率。
一個隱馬爾可夫模型可由λ=(A, B, Π)來指代。
3 隱馬爾可夫模型的三個基本問題(1) 給定模型λ=(A, B, Π),計算其產生觀測序列O={o1,o2,…,oT}的概率P(O|λ);
計算擲出點數163527的概率
(2) 給定模型λ=(A, B, Π)和觀測序列O={o1,o2,…,oT},推斷能夠最大概率產生此觀測序列的狀態序列I={i1,i2,…,iT},即使P(I|O)最大的I;
推斷擲出點數163527的骰子種類
(3) 給定觀測序列O={o1,o2,…,oT},估計模型λ=(A, B, Π)的參數,使P(O|λ)最大;
已知骰子有幾種,不知道骰子的種類,根據多次擲出骰子的結果,反推出骰子的種類
這三個基本問題在現實應用中非常重要,例如根據觀測序列O={o1,o2,…,oT-1,}推測當前時刻最有可能出現的觀測值OT,這就轉換成基本問題(1);
在語音識別中,觀測值為語音信號,隱藏狀態為文字,根據觀測信號推斷最有可能的狀態序列,即基本問題(2);
在大多數應用中,人工指定參數模型已變得越來越不可行,如何根據訓練樣本學得最優參數模型,就是基本問題(3)。
4 三個基本問題的解法
基于兩個條件獨立假設,隱馬爾可夫模型的這三個基本問題均能被高效求解。
4.1 基本問題(1)解法
4.1.1 直接計算法(概念上可行,計算上不可行)
通過列舉所有可能的長度為T的狀態序列I=(i1,i2,…,iT),求各個狀態序列I與觀測序列O同時出現的聯合概率P(I,O|λ),然后對所有可能求和得到P(O|λ)。
狀態序列I=(i1,i2,…,iT)的概率是P(I|λ)=π1a12a23…a(T-1)T
對于固定狀態序列 I,觀測序O=(o1,o2,…,oT)的概率P(O|I,λ)= b11b22…bTT
I 和 O同時出現的聯合概率P(I,O|λ)= P(I|λ) P(O|I,λ)
然后對所有可能的 I 求和,得到P(O|λ)
這種方法計算量很大,算法不可行。
4.1.2 前向算法(forward algorithm)(t=1,一步一步向前計算)
前向概率αt(i)= P(o1,o2,…,ot,ii=qi|λ),表示模型λ,時刻 t,觀測序列為o1,o2,…,ot且狀態為qi的概率。
(1) 初始化前向概率
狀態為qi和觀測值為o1的聯合概率 α1(i)=π1bi1
(2) 遞推t=1,2,…,T-1
根據下圖,得到αt+1(i)= [Σj=1,…,Nαt(j)αji]bi(t+1)
(3) 終止 P(O|λ) = Σi=1,…,NαT(i)
前向算法高效的關鍵是其局部計算前向概率,根據路徑結構,如下圖所示,每次計算直接利用前一時刻計算結果,避免重復計算,減少計算量。
4.1.3 后向算法(backward algorithm)
后向概率βt(i) = P(o1,o2,…,ot,ii=qi|λ),表示模型λ,時刻t,從t+1到時刻T觀測序列o1,o2,…,ot且狀態為qi的概率。
(1)初始化后向概率
對最終時刻的所有狀態為qi規定βT(i) = 1。
(2)遞推t=T-1,T-2,…,1
根據下圖,計算t時刻狀態為qi,且時刻t+1之后的觀測序列為ot+1,ot+2,…,ot的后向概率。只需考慮t+1時刻所有可能的N個狀態qi的轉移概率(transition probability) αij,以及在此狀態下的觀測概率bi(t+1),然后考慮qj之后的后向概率βt+1(j),得到
βt(i) = Σj=1,…,Nβt+1(j)αijbi(t+1).
?(3) 終止 P(O|λ) = Σi=1,…,Nβ1(i)πibi1
前向算法高效的關鍵是其局部計算前向概率,根據路徑結構,如下圖所示,每次計算直接利用前一時刻計算結果,避免重復計算,減少計算量。
4.2 基本問題(2)解法
4.2.1 近似算法
選擇每一時刻最有可能出現的狀態,從而得到一個狀態序列。這個方法計算簡單,但是不能保證整個狀態序列的出現概率最大。因為可能出現轉移概率為0的相鄰狀態。
4.2.2 Viterbi算法
使用動態規劃求解概率最大(最優)路徑。t=1時刻開始,遞推地計算在時刻t狀態為i的各條部分路徑的最大概率,直到計算到時刻T,狀態為i的各條路徑的最大概率,時刻T的最大概率即為最優路徑的概率,最優路徑的節點也同時得到。
如果還不明白,看一下李航《統計學習方法》的186-187頁的例題就能明白算法的原理。
狀態[3 3 3]極為概率最大路徑。
4.3 基本問題(3)解法
4.3.1 監督學習方法
給定S個長度相同的(觀測序列,狀態序列)作為訓練集(O1,I1),O2,I3),…, (Os,Is)使用極大似然估計法來估計模型參數。
轉移概率αij 的估計:樣本中t時刻處于狀態i,t+1時刻轉移到狀態j的頻數為αij,則
觀測概率bij和初始狀態概率πi的估計類似。
4.3.2 Baum-Welch算法
使用EM算法得到模型參數估計式
EM算法是常用的估計參數隱變量的利器,它是一種迭代方法,基本思想是:
(1) 選擇模型參數初始值;
(2) (E步)根據給定的觀測數據和模型參數,求隱變量的期望;
(3) (M步)根據已得隱變量期望和觀測數據,對模型參數做極大似然估計,得到新的模型參數,重復第二步。
5 Python代碼實現
5.1 Baum-Welch算法的程序實現
這里提供Baum-Welch算法的代碼實現。
本來打算試一下用自己寫的HMM跑一下中文分詞,但很可惜,代碼運行的比較慢。所以改成 模擬 三角波 以及 正弦波。
# encoding=utf8
?
import numpy as np
import csv
?
class HMM(object):
def __init__(self,N,M):
self.A = np.zeros((N,N)) # 狀態轉移概率矩陣
self.B = np.zeros((N,M)) # 觀測概率矩陣
self.Pi = np.array([1.0/N]*N) # 初始狀態概率矩陣
?
self.N = N # 可能的狀態數
self.M = M # 可能的觀測數
?
def cal_probality(self, O):
self.T = len(O)
self.O = O
?
self.forward()
return sum(self.alpha[self.T-1])
?
def forward(self):
"""前向算法"""
self.alpha = np.zeros((self.T,self.N))
?
# 公式 10.15
for i in range(self.N):
self.alpha[0][i] = self.Pi[i]*self.B[i][self.O[0]]
?
# 公式10.16
for t in range(1,self.T):
for i in range(self.N):
sum = 0
for j in range(self.N):
sum += self.alpha[t-1][j]*self.A[j][i]
self.alpha[t][i] = sum * self.B[i][self.O[t]]
?
def backward(self):
"""后向算法"""
self.beta = np.zeros((self.T,self.N))
?
# 公式10.19
for i in range(self.N):
self.beta[self.T-1][i] = 1
?
# 公式10.20
for t in range(self.T-2,-1,-1):
for i in range(self.N):
for j in range(self.N):
self.beta[t][i] += self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j]
?
def cal_gamma(self, i, t):
"""公式 10.24"""
numerator = self.alpha[t][i]*self.beta[t][i]
denominator = 0
?
for j in range(self.N):
denominator += self.alpha[t][j]*self.beta[t][j]
?
return numerator/denominator
?
def cal_ksi(self, i, j, t):
"""公式 10.26"""
?
numerator = self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j]
denominator = 0
?
for i in range(self.N):
for j in range(self.N):
denominator += self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j]
?
return numerator/denominator
?
def init(self):
"""隨機生成 A,B,Pi并保證每行相加等于 1"""
import random
for i in range(self.N):
randomlist = [random.randint(0,100) for t in range(self.N)]
Sum = sum(randomlist)
for j in range(self.N):
self.A[i][j] = randomlist[j]/Sum
?
for i in range(self.N):
randomlist = [random.randint(0,100) for t in range(self.M)]
Sum = sum(randomlist)
for j in range(self.M):
self.B[i][j] = randomlist[j]/Sum
?
def train(self, O, MaxSteps = 100):
self.T = len(O)
self.O = O
?
# 初始化
self.init()
?
step = 0
# 遞推
while step
step+=1
print(step)
tmp_A = np.zeros((self.N,self.N))
tmp_B = np.zeros((self.N,self.M))
tmp_pi = np.array([0.0]*self.N)
?
self.forward()
self.backward()
?
# a_{ij}
for i in range(self.N):
for j in range(self.N):
numerator=0.0
denominator=0.0
for t in range(self.T-1):
numerator += self.cal_ksi(i,j,t)
denominator += self.cal_gamma(i,t)
tmp_A[i][j] = numerator/denominator
?
# b_{jk}
for j in range(self.N):
for k in range(self.M):
numerator = 0.0
denominator = 0.0
for t in range(self.T):
if k == self.O[t]:
numerator += self.cal_gamma(j,t)
denominator += self.cal_gamma(j,t)
tmp_B[j][k] = numerator / denominator
?
# pi_i
for i in range(self.N):
tmp_pi[i] = self.cal_gamma(i,0)
?
self.A = tmp_A
self.B = tmp_B
self.Pi = tmp_pi
?
def generate(self, length):
import random
I = []
?
# start
ran = random.randint(0,1000)/1000.0
i = 0
while self.Pi[i]
ran -= self.Pi[i]
i += 1
I.append(i)
?
# 生成狀態序列
for i in range(1,length):
last = I[-1]
ran = random.randint(0, 1000) / 1000.0
i = 0
while self.A[last][i] < ran or self.A[last][i]<0.0001:
ran -= self.A[last][i]
i += 1
I.append(i)
?
# 生成觀測序列
Y = []
for i in range(length):
k = 0
ran = random.randint(0, 1000) / 1000.0
while self.B[I[i]][k] < ran or self.B[I[i]][k]<0.0001:
ran -= self.B[I[i]][k]
k += 1
Y.append(k)
?
return Y
?
?
?
def triangle(length):
'''三角波'''
X = [i for i in range(length)]
Y = []
?
for x in X:
x = x % 6
if x <= 3:
Y.append(x)
else:
Y.append(6-x)
return X,Y
?
?
?
def show_data(x,y):
import matplotlib.pyplot as plt
plt.plot(x, y, 'g')
plt.show()
?
return y
?
?
if __name__ == '__main__':
hmm = HMM(10,4)
tri_x, tri_y = triangle(20)
?
hmm.train(tri_y)
y = hmm.generate(100)
print(y)
x = [i for i in range(100)]
show_data(x,y)
5.2 運行結果
5.2.1 三角波
三角波比較簡單,我設置N=10,扔進去長度為20的序列,訓練100次,下圖是其生成的長度為100的序列。
可以看出效果還是很不錯的。
5.2.2 正弦波
正弦波有些麻煩,因為觀測序列不能太大,所以我設置N=15,M=100,扔進去長度為40的序列,訓練100次,訓練的非常慢,下圖是其生成的長度為100的序列。
可以看出效果一般,如果改成C的代碼,并增加N應該是能模擬的。
6 HMM的實際應用舉例
參考文獻:
[2]《機器學習》周志華
[3]《統計學習方法》李航
延伸閱讀:
△長按關注「邁微電子研發社」
△長按加入「邁微電子研發社」學習輔導群
給個關注么么噠!