### Bi-LSTM Conditional Random Field
### pytorch tutorials https://pytorch.org/tutorials/beginner/nlp/advanced_tutorial.html
### 模型主要結構:

pytorch bilstm crf的教程,注意 這里不支持批處理
Python version: 3.7.4 (default, Aug 13 2019, 20:35:49) [GCC 7.3.0]
Torch version: 1.4.0
# Author: Robert Guthrieimport torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optimtorch.manual_seed(1)
def argmax(vec):# return the argmax as a python int# 返回vec的dim為1維度上的最大值索引_, idx = torch.max(vec, 1)return idx.item()def prepare_sequence(seq, to_ix):# 將句子轉化為IDidxs = [to_ix[w] for w in seq]return torch.tensor(idxs, dtype=torch.long)# Compute log sum exp in a numerically stable way for the forward algorithm
# 前向算法是不斷累積之前的結果,這樣就會有個缺點
# 指數和累積到一定程度后,會超過計算機浮點值的最大值,變成inf,這樣取log后也是inf
# 為了避免這種情況,用一個合適的值clip去提指數和的公因子,這樣就不會使某項變得過大而無法計算
# SUM = log(exp(s1)+exp(s2)+...+exp(s100))
# = log{exp(clip)*[exp(s1-clip)+exp(s2-clip)+...+exp(s100-clip)]}
# = clip + log[exp(s1-clip)+exp(s2-clip)+...+exp(s100-clip)]
# where clip=max
def log_sum_exp(vec):max_score = vec[0, argmax(vec)]max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])return max_score + torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))class BiLSTM_CRF(nn.Module):def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim):super(BiLSTM_CRF, self).__init__()self.embedding_dim = embedding_dim # word embedding dimself.hidden_dim = hidden_dim # Bi-LSTM hidden dimself.vocab_size = vocab_size self.tag_to_ix = tag_to_ixself.tagset_size = len(tag_to_ix)self.word_embeds = nn.Embedding(vocab_size, embedding_dim)self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,num_layers=1, bidirectional=True)# Maps the output of the LSTM into tag space.# 將BiLSTM提取的特征向量映射到特征空間,即經過全連接得到發射分數self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size)# Matrix of transition parameters. Entry i,j is the score of transitioning *to* i *from* j.# 轉移矩陣的參數初始化,transitions[i,j]代表的是從第j個tag轉移到第i個tag的轉移分數self.transitions = nn.Parameter(torch.randn(self.tagset_size, self.tagset_size))# These two statements enforce the constraint that we never transfer# to the start tag and we never transfer from the stop tag# 初始化所有其他tag轉移到START_TAG的分數非常小,即不可能由其他tag轉移到START_TAG# 初始化STOP_TAG轉移到所有其他tag的分數非常小,即不可能由STOP_TAG轉移到其他tagself.transitions.data[tag_to_ix[START_TAG], :] = -10000self.transitions.data[:, tag_to_ix[STOP_TAG]] = -10000self.hidden = self.init_hidden()def init_hidden(self):# 初始化LSTM的參數return (torch.randn(2, 1, self.hidden_dim // 2),torch.randn(2, 1, self.hidden_dim // 2))def _get_lstm_features(self, sentence):# 通過Bi-LSTM提取特征self.hidden = self.init_hidden()# 因此,embeds 的最終數據形式是一個三維張量,形狀為 (seq_len, 1, embed_dim),其中:# seq_len 是句子的長度(即單詞的數量)。# 1 表示批次大小,表明當前處理的是單個句子。# embed_dim 是每個單詞的嵌入向量維度。# 這種形狀非常適合直接傳遞給 PyTorch 的 LSTM 層進行處理,因為 LSTM 層期望輸入有三個維度,分別對應序列長度# 、批次大小和特征數(或輸入大小)。如果你希望模型能夠處理多個句子(即更大的批次),你應該相應地調整代碼,# 使得 sentence 可以同時包含多條序列,并且批次大小不固定為1。embeds = self.word_embeds(sentence).view(len(sentence), 1, -1)lstm_out, self.hidden = self.lstm(embeds, self.hidden)lstm_out = lstm_out.view(len(sentence), self.hidden_dim)lstm_feats = self.hidden2tag(lstm_out)return lstm_featsdef _score_sentence(self, feats, tags):# Gives the score of a provided tag sequence# 計算給定tag序列的分數,即一條路徑的分數score = torch.zeros(1)tags = torch.cat([torch.tensor([self.tag_to_ix[START_TAG]], dtype=torch.long), tags])for i, feat in enumerate(feats):# 遞推計算路徑分數:轉移分數 + 發射分數score = score + self.transitions[tags[i + 1], tags[i]] + feat[tags[i + 1]]score = score + self.transitions[self.tag_to_ix[STOP_TAG], tags[-1]]return scoredef _forward_alg(self, feats):# Do the forward algorithm to compute the partition function# 通過前向算法遞推計算init_alphas = torch.full((1, self.tagset_size), -10000.)# START_TAG has all of the score.# 初始化step 0即START位置的發射分數,START_TAG取0其他位置取-10000init_alphas[0][self.tag_to_ix[START_TAG]] = 0.# Wrap in a variable so that we will get automatic backprop# 將初始化START位置為0的發射分數賦值給previousprevious = init_alphas# Iterate through the sentence# 迭代整個句子for obs in feats:# The forward tensors at this timestep# 當前時間步的前向tensoralphas_t = []for next_tag in range(self.tagset_size):# broadcast the emission score: it is the same regardless of the previous tag# 取出當前tag的發射分數,與之前時間步的tag無關emit_score = obs[next_tag].view(1, -1).expand(1, self.tagset_size)# the ith entry of trans_score is the score of transitioning to next_tag from i# 取出當前tag由之前tag轉移過來的轉移分數trans_score = self.transitions[next_tag].view(1, -1)# The ith entry of next_tag_var is the value for the edge (i -> next_tag) before we do log-sum-exp# 當前路徑的分數:之前時間步分數 + 轉移分數 + 發射分數next_tag_var = previous + trans_score + emit_score# The forward variable for this tag is log-sum-exp of all the scores.# 對當前分數取log-sum-expalphas_t.append(log_sum_exp(next_tag_var).view(1))# 更新previous 遞推計算下一個時間步previous = torch.cat(alphas_t).view(1, -1)# 考慮最終轉移到STOP_TAGterminal_var = previous + self.transitions[self.tag_to_ix[STOP_TAG]]# 計算最終的分數scores = log_sum_exp(terminal_var)return scoresdef _viterbi_decode(self, feats):backpointers = []# Initialize the viterbi variables in log space# 初始化viterbi的previous變量init_vvars = torch.full((1, self.tagset_size), -10000.)init_vvars[0][self.tag_to_ix[START_TAG]] = 0previous = init_vvarsfor obs in feats:# holds the backpointers for this step# 保存當前時間步的回溯指針bptrs_t = []# holds the viterbi variables for this step# 保存當前時間步的viterbi變量viterbivars_t = [] for next_tag in range(self.tagset_size):# next_tag_var[i] holds the viterbi variable for tag i at the# previous step, plus the score of transitioning# from tag i to next_tag.# We don't include the emission scores here because the max# does not depend on them (we add them in below)# 維特比算法記錄最優路徑時只考慮上一步的分數以及上一步tag轉移到當前tag的轉移分數# 并不取決與當前tag的發射分數next_tag_var = previous + self.transitions[next_tag]best_tag_id = argmax(next_tag_var)bptrs_t.append(best_tag_id)viterbivars_t.append(next_tag_var[0][best_tag_id].view(1))# Now add in the emission scores, and assign forward_var to the set# of viterbi variables we just computed# 更新previous,加上當前tag的發射分數obsprevious = (torch.cat(viterbivars_t) + obs).view(1, -1)# 回溯指針記錄當前時間步各個tag來源前一步的tagbackpointers.append(bptrs_t)# Transition to STOP_TAG# 考慮轉移到STOP_TAG的轉移分數terminal_var = previous + self.transitions[self.tag_to_ix[STOP_TAG]]best_tag_id = argmax(terminal_var)path_score = terminal_var[0][best_tag_id]# Follow the back pointers to decode the best path.# 通過回溯指針解碼出最優路徑best_path = [best_tag_id]# best_tag_id作為線頭,反向遍歷backpointers找到最優路徑for bptrs_t in reversed(backpointers):best_tag_id = bptrs_t[best_tag_id]best_path.append(best_tag_id)# Pop off the start tag (we dont want to return that to the caller)# 去除START_TAGstart = best_path.pop()assert start == self.tag_to_ix[START_TAG] # Sanity checkbest_path.reverse()return path_score, best_pathdef neg_log_likelihood(self, sentence, tags):# CRF損失函數由兩部分組成,真實路徑的分數和所有路徑的總分數。# 真實路徑的分數應該是所有路徑中分數最高的。# log真實路徑的分數/log所有可能路徑的分數,越大越好,構造crf loss函數取反,loss越小越好feats = self._get_lstm_features(sentence)forward_score = self._forward_alg(feats)gold_score = self._score_sentence(feats, tags)return forward_score - gold_scoredef forward(self, sentence): # dont confuse this with _forward_alg above.# Get the emission scores from the BiLSTM# 通過BiLSTM提取發射分數lstm_feats = self._get_lstm_features(sentence)# Find the best path, given the features.# 根據發射分數以及轉移分數,通過viterbi解碼找到一條最優路徑score, tag_seq = self._viterbi_decode(lstm_feats)return score, tag_seqSTART_TAG = "<START>"
STOP_TAG = "<STOP>"
EMBEDDING_DIM = 5
HIDDEN_DIM = 4# Make up some training data
# 構造一些訓練數據
training_data = [("the wall street journal reported today that apple corporation made money".split(),"B I I I O O O B I O O".split()
), ("georgia tech is a university in georgia".split(),"B I O O O O B".split()
)]word_to_ix = {}
for sentence, tags in training_data:for word in sentence:if word not in word_to_ix:word_to_ix[word] = len(word_to_ix)tag_to_ix = {"B": 0, "I": 1, "O": 2, START_TAG: 3, STOP_TAG: 4}model = BiLSTM_CRF(len(word_to_ix), tag_to_ix, EMBEDDING_DIM, HIDDEN_DIM)
optimizer = optim.SGD(model.parameters(), lr=0.01, weight_decay=1e-4)# Check predictions before training
# 訓練前檢查模型預測結果
with torch.no_grad():precheck_sent = prepare_sequence(training_data[0][0], word_to_ix)precheck_tags = torch.tensor([tag_to_ix[t] for t in training_data[0][1]], dtype=torch.long)print(model(precheck_sent))# Make sure prepare_sequence from earlier in the LSTM section is loaded
for epoch in range(300): # again, normally you would NOT do 300 epochs, it is toy datafor sentence, tags in training_data:# Step 1. Remember that Pytorch accumulates gradients.# We need to clear them out before each instance# 第一步,pytorch梯度累積,需要清零梯度model.zero_grad()# Step 2. Get our inputs ready for the network, that is,# turn them into Tensors of word indices.# 第二步,將輸入轉化為tensorssentence_in = prepare_sequence(sentence, word_to_ix)targets = torch.tensor([tag_to_ix[t] for t in tags], dtype=torch.long)# Step 3. Run our forward pass.# 進行前向計算,取出crf lossloss = model.neg_log_likelihood(sentence_in, targets)# Step 4. Compute the loss, gradients, and update the parameters by# calling optimizer.step()# 第四步,計算loss,梯度,通過optimier更新參數loss.backward()optimizer.step()# Check predictions after training
# 訓練結束查看模型預測結果,對比觀察模型是否學到
with torch.no_grad():precheck_sent = prepare_sequence(training_data[0][0], word_to_ix)print(model(precheck_sent))
# We got it!
改成批處理關鍵代碼? previous_score = score[t - 1].view(batch_size, -1, 1)
def viterbi_decode(self, h: FloatTensor, mask: BoolTensor) -> List[List[int]]:"""decode labels using viterbi algorithm:param h: hidden matrix (batch_size, seq_len, num_labels):param mask: mask tensor of each sequencein mini batch (batch_size, batch_size):return: labels of each sequence in mini batch"""batch_size, seq_len, _ = h.size()# prepare the sequence lengths in each sequenceseq_lens = mask.sum(dim=1)# In mini batch, prepare the score# from the start sequence to the first labelscore = [self.start_trans.data + h[:, 0]]path = []for t in range(1, seq_len):# extract the score of previous sequence# (batch_size, num_labels, 1)previous_score = score[t - 1].view(batch_size, -1, 1)# extract the score of hidden matrix of sequence# (batch_size, 1, num_labels)h_t = h[:, t].view(batch_size, 1, -1)# extract the score in transition# from label of t-1 sequence to label of sequence of t# self.trans_matrix has the score of the transition# from sequence A to sequence B# (batch_size, num_labels, num_labels)score_t = previous_score + self.trans_matrix + h_t# keep the maximum value# and point where maximum value of each sequence# (batch_size, num_labels)best_score, best_path = score_t.max(1)score.append(best_score)path.append(best_path)
torchcrf 使用 支持批處理,torchcrf的簡單使用-CSDN博客文章瀏覽閱讀9.7k次,點贊5次,收藏33次。本文介紹了如何在PyTorch中安裝和使用TorchCRF庫,重點講解了CRF模型參數設置、自定義掩碼及損失函數的計算。作者探討了如何將CRF的NLL損失與交叉熵結合,并通過自適應權重優化訓練過程。雖然在單任務中效果不顯著,但對于多任務學習提供了有價值的方法。
https://blog.csdn.net/csdndogo/article/details/125541213
torchcrf的簡單使用-CSDN博客
為了防止文章丟失 ,吧內容轉發在這里
https://blog.csdn.net/csdndogo/article/details/125541213
. 安裝torchcrf,模型使用
安裝:pip install TorchCRF
CRF的使用:在官網里有簡單的使用說明
注意輸入的格式。在其他地方下載的torchcrf有多個版本,有些版本有batch_first參數,有些沒有,要看清楚有沒有這個參數,默認batch_size是第一維度。
這個代碼是我用來熟悉使用crf模型和損失函數用的,模擬多分類任務輸入為隨機數據和隨機標簽,所以最后的結果預測不能很好的跟標簽對應。
import torch
import torch.nn as nn
import numpy as np
import random
from TorchCRF import CRF
from torch.optim import Adam
seed = 100
def seed_everything(seed=seed):
? ? random.seed(seed)
? ? np.random.seed(seed)
? ? torch.manual_seed(seed)
? ? torch.cuda.manual_seed(seed)
? ? torch.cuda.manual_seed_all(seed)
? ? torch.backends.cudnn.benchmark = False
? ? torch.backends.cudnn.deterministic = True
num_tags = 5
model = CRF(num_tags, batch_first=True) ?# 這里根據情況而定
seq_len = 3
batch_size = 50
seed_everything()
trainset = torch.randn(batch_size, seq_len, num_tags) ?# features
traintags = (torch.rand([batch_size, seq_len])*4).floor().long() ?# (batch_size, seq_len)
testset = torch.randn(5, seq_len, num_tags) ?# features
testtags = (torch.rand([5, seq_len])*4).floor().long() ?# (batch_size, seq_len)
# 訓練階段
for e in range(50):
? ? optimizer = Adam(model.parameters(), lr=0.05)
? ? model.train()
? ? optimizer.zero_grad()
? ? loss = -model(trainset, traintags)
? ? print('epoch{}: loss score is {}'.format(e, loss))
? ? loss.backward()
? ? torch.nn.utils.clip_grad_norm_(model.parameters(),5)
? ? optimizer.step()
#測試階段
model.eval()
loss = model(testset, testtags)
model.decode(testset)
1.1模型參數,自定義掩碼mask注意事項
def forward(self, emissions, labels: LongTensor, mask: BoolTensor)?
1
分別為發射矩陣(各標簽的預測值),標簽,掩碼(注意這里的mask類型為BoolTensor)
注意:此處自定義mask掩碼時,使用LongTensor類型的[1,1,1,1,0,0]會報錯,需要轉換成ByteTensor,下面是一個簡單的獲取mask的函數,輸入為標簽數據:
? ? def get_crfmask(self, labels):
? ? ? ? crfmask = []
? ? ? ? for batch in labels:
? ? ? ? ? ? res = [0 if d == -1 else 1 for d in batch]
? ? ? ? ? ? crfmask.append(res)
? ? ? ? return torch.ByteTensor(crfmask)
運行運行
2. CRF的損失函數是什么?
損失函數由真實轉移路徑值和所有可能情況路徑轉移值兩部分組成,損失函數的公式為
分子為真實轉移路徑值,分母為所有路徑總分數,上圖公式在crf原始代碼中為:
? ? def forward(
? ? ? ? self, h: FloatTensor, labels: LongTensor, mask: BoolTensor) -> FloatTensor:
? ? ? ? log_numerator = self._compute_numerator_log_likelihood(h, labels, mask)
? ? ? ? log_denominator = self._compute_denominator_log_likelihood(h, mask)
? ? ? ? return log_numerator - log_denominator
CRF損失函數值為負對數似然函數(NLL),所以如果原來的模型損失函數使用的是交叉熵損失函數,兩個損失函數相加時要對CRF返回的損失取負。
? ? loss = -model(trainset, traintags)
1
3. 如何聯合CRF的損失函數和自己的網絡模型的交叉熵損失函數進行訓練?
我想在自己的模型上添加CRF,就需要聯合原本的交叉熵損失函數和CRF的損失函數,因為CRF輸出的時NLL,所以在模型在我僅對該損失函數取負之后和原先函數相加。
?? ??? ?loss2 = -crf_layer(log_prob, label, mask=crfmask)
? ? ? ? loss1 = loss_function(log_prob.permute(0, 2, 1), label)
? ? ? ? loss = loss1 + loss2
? ? ? ? loss.backward()
缺陷: 效果不佳,可以嘗試對loss2添加權重。此處貼一段包含兩個損失函數的自適應權重訓練的函數。
3.1.自適應損失函數權重
由于CRF返回的損失與原來的損失數值不在一個量級,所以產生了自適應權重調整兩個權重的大小來達到優化的目的。自適應權重原本屬于多任務學習部分,未深入了解,代碼源自某篇復現論文的博客。
class AutomaticWeightedLoss(nn.Module):
? ? def __init__(self, num=2):
? ? ? ? super(AutomaticWeightedLoss, self).__init__()
? ? ? ? params = torch.ones(num, requires_grad=True)
? ? ? ? self.params = torch.nn.Parameter(params)
? ? def forward(self, *x):
? ? ? ? loss_sum = 0
? ? ? ? for i, loss in enumerate(x):
? ? ? ? ? ? loss_sum += 0.5 / (self.params[i] ** 2) * loss + torch.log(1 + self.params[i] ** 2)
? ? ? ? return loss_sum