總體架構
輸入部分
代碼實現:
導包
# -*-coding:utf-8-*-
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
# -*-coding:utf-8-*-
import copy
import torch.nn.functional as F
import math
位置編碼器部分
詞嵌入WordEmbedding
# todo 作用:輸入數據進行詞嵌入升維處理
class Embeddings(nn.Module):def __init__(self, vocab_size, embed_dim):super().__init__()# vocab_size:代表單詞的總個數self.vocab_size = vocab_size# embed_dim:代表詞嵌入維度self.embed_dim = embed_dim# 定義Embedding層self.embed = nn.Embedding(vocab_size, embed_dim)def forward(self, x):# x--》[batch_size, seq_len]return self.embed(x) * math.sqrt(self.embed_dim)
位置編碼模型PositionEncoding
# todo 作用:生成位置編碼矩陣,與輸入數據x進行融合,并輸出-->加入了位置編碼信息的詞嵌入張量
class PositionEncoding(nn.Module):def __init__(self, d_model, dropout_p, max_len=60):super().__init__()# d_model:代表詞嵌入維度self.d_model = d_model# dropout_p:代表隨機失活的系數self.dropout_p = dropout_p# max_len:代表最大句子長度self.max_len = max_len# 定義dropout層self.dropout = nn.Dropout(p=dropout_p)# 根據三角函數的公式實現位置的編碼# 定義位置編碼矩陣[max_len, d_model]-->[60, 512]pe = torch.zeros(max_len, d_model)# 定義位置列矩陣--》[max_len, 1]-->[60, 1]position = torch.arange(0, max_len).unsqueeze(dim=1)# 定義轉換矩陣:根據三角函數的計算公式,是其中的除了pos之外的系數(頻率)# temp_vec-->[256]temp_vec = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000)/d_model))# 根據三角函數的計算公式,計算角度:pos_vec-->[60, 256]pos_vec = position * temp_vec# 將奇數位用sin處理,偶數位用cos處理pe[:, 0::2] = torch.sin(pos_vec)pe[:, 1::2] = torch.cos(pos_vec)# 需要對上述的位置編碼結果升維:pe-->[1, max_len, d_model]-->[1, 60, 512]#todo pe就是位置編碼矩陣 似乎每次結果一樣pe = pe.unsqueeze(dim=0)# pe位置編碼結果不隨著模型的訓練而更新,因此需要進行注冊到緩存區self.register_buffer('pe', pe)def forward(self, x):# x--》來自于embedding之后的結果--》[batch_size, seq_len, embed_dim]-->[2, 4, 512]# 將x和位置編碼的信息進行融合# todo x.size()[1]是指句子有多長pe就取出多長與x相加,pe的形狀為[1, max_len, d_model]-->[1, 4, 512]x = x + self.pe[:, :x.size()[1]]return self.dropout(x)
編碼器部分
掩碼矩陣部分:生成掩碼矩陣
# 生成一個下三角矩陣(sentence_mask)
def generate_triu(size):# a = np.triu(m=np.ones((1, size, size)), k=1).astype(int)# return torch.from_numpy(1-a)return 1-torch.triu(torch.ones(1, size, size, dtype=torch.int), 1)# 生成掩碼矩陣(padding_mask)
def generate_padding_mask(tensor_x):# tensor_x-->注意力權重分數--》張量tensor_x[tensor_x == 0] = 0tensor_x[tensor_x != 0] = 1return tensor_x.to(dtype=torch.int)# 繪圖:生成下三角矩陣
def show__triu():plt.figure(figsize=(5, 5))plt.imshow(generate_triu(20)[0])plt.show()
attention:基礎注意力計算方式,muti_head_atten將調用次模組
def attention(query, key, value, mask=None, dropout=None):# query/key/value-->[batch_size, seq_len, embed_dim]# mask-->shape-->[batch_size, seq_len, seq_len]# dropout--》實例化的對象# 第一步:獲得詞嵌入表達的維度d_k = query.size(-1)# 第二步:計算query和key之間的相似性分數(注意力權重分數(未經過softmax歸一化的結果))# query-->[2, 4, 512];key-->[2, 4, 512]-->轉置--》[2, 512,4]. 相乘后--》scores-->[2, 4, 4]scores = torch.matmul(query, torch.transpose(key, -1, -2)) / math.sqrt(d_k)# 第三步:判斷是否需要maskif mask is not None:scores = scores.masked_fill(mask==0, -1e9)# print(f'未歸一化的scores--》{scores}')# 第四步:進行softmax歸一化atten_weights = F.softmax(scores, dim=-1)# print(f'atten_weights--》{atten_weights}')# 第五步:如果有dropout 就進行隨機失活防止過擬合if dropout is not None:atten_weights = dropout(atten_weights)return torch.matmul(atten_weights, value), atten_weights # todo 返回注意力輸出,以及注意力權重
多頭注意力類與clones
多頭注意力機制原理(核心)
1. ??輸入與線性變換?
輸入序列(如詞向量)通過三個獨立的線性變換層生成查詢(Query, Q)、鍵(Key, K)和值(Value, V)矩陣:
- ??自注意力機制??:輸入為同一矩陣?X,通過不同權重矩陣?WQh?,WKh?,WVh??生成Q、K、V。
- ??交叉注意力機制??:輸入為兩個不同矩陣(如?Xq??和?Xkv?),分別生成Q和K、V
?
2. ??分頭處理與并行計算??
- ??分頭??:將Q、K、V按頭的數量?h?拆分為多個子矩陣,每個子矩陣對應一個注意力頭。例如,將?Q?拆分為?[Q1?,Q2?,...,Qh?],每個?Qi??的維度為?dk??。
- ??并行計算??:每個頭獨立計算縮放點積注意力(Scaled Dot-Product Attention):
3. ??多頭輸出的拼接與融合??
- ??拼接??:將所有頭的輸出?head1?,head2?,...,headh??沿特征維度拼接,形成組合輸出。
- ??線性變換??:通過權重矩陣?WO??將拼接后的結果映射回原始維度:
?
????????原論文中是先把qkv從[2,6,512]變成[2,6,8,64]后各經過8個權重矩陣總共24個權重矩陣得到變換后的qkv再進行注意力計算再concat拼接起來
? ? ? ? 這里的代碼實現是qkv[[2,6,512]]各經過1個矩陣總共3個矩陣得到[2,6,512]再變成[2,6,8,64]進行注意力計算再concat拼接起來
編碼器實例化一個多頭注意力類且無masked
# clones 的作用:將一個模塊復制N次,并返回一個ModuleList,ModuleList是一個Module的子類,可以迭代,并且可以保存多個Module
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])# todo:2. 定義多頭注意力類,注意 編碼器層的q=k=v=原句子 解碼器層的mask_muti_head輸入的q=k=v=預測的句子 解碼層的第二個muti_head的q=k=編碼器輸出,v=mask_muti_head輸出
class MutiHeadAttention(nn.Module):def __init__(self, head, embed_dim, dropout_p=0.1):super().__init__()# 第一步:確定embed_dim是否能被head整除assert embed_dim % head == 0# 第二步:確定每個head應該處理多少維度特征self.d_k = embed_dim // head# 第三步:定義head的屬性self.head = head# 第四步:定義4個全連接層self.linears = clones(nn.Linear(embed_dim, embed_dim), 4)# 第五步:定義atten權重屬性self.atten = None# 第六步:實例化dropout對象self.dropout = nn.Dropout(p=dropout_p)def forward(self, query, key, value, mask=None):# 需要對mask的形狀進行升維度# mask-->輸入的形狀--》[head, seq_len, seq_len]-->[8, 4, 4],升維之后--》[1, 8, 4, 4]if mask is not None:mask = mask.unsqueeze(dim=0)# 獲取當前輸入的batch_sizebatch_size = query.size(0)# 開始處理query,key,value,都要經過線性變化并且切分為8個頭# model(x)-->就是將數據經過linear層處理x-->[2, 4, 512]-->經過Linear-->[2, 4, 512]-->分割--》[2, 4, 8, 64]-->transpose-->[2, 8, 4, 64]# query,key,value--》shape-->[2, 8, 4, 64]query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]# 接下來將上述處理后的query,key,value--》shape-->[2, 8, 4, 64]送入attention方法進行注意力的計算:# query--》[2, 8, 4, 64]和key--》[2, 8, 4, 64]轉置結果[2, 8, 64, 4]進行相乘--》shape--》[2,8, 4, 4](所以傳的mask矩陣是4維的)# [2, 8, 4, 4]要和value-->[2, 8, 4, 64]-->相乘--》shape--》x-->[2, 8, 4, 64]x, self.atten = attention(query, key, value, mask=mask, dropout=self.dropout)# 需要將多頭注意力的結果進行合并# x.transpose(1, 2)-->【2,4, 8, 64】# y 合并后的結果-->[2, 4, 512]y = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head*self.d_k)# 經過線性變化得到指定輸出維度的結果return self.linears[-1](y)
前饋全連接層
兩層線性層 作用:進行特征提取,進行非線性映射,簡單來說就是經過兩個線性層一個relu激活函數加入非線性,再dropout隨機失活防止過擬合
class FeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout_p=0.1):super().__init__()# d_model:第一個全連接層輸入的特征維度;第二個全連接層輸出的特征維度self.d_model = d_model# d_ff: 第一個全連接層輸出的特征維度;第二個全連接層輸入的特征維度self.d_ff = d_ff# 定義第一個全連接層self.linear1 = nn.Linear(d_model, d_ff)# 定義第二個全連接層self.linear2 = nn.Linear(d_ff, d_model)# 定義dropout層self.dropout = nn.Dropout(p=dropout_p)def forward(self, x):return self.linear2(self.dropout(F.relu(self.linear1(x))))
規范化層
讓數據符合標準正態分布 作用機制:self.a * (x - x_mean) / (x_std + self.eps) + self.b, eps:防止分母為0
Add中是把原始經過embedding+position后得到的x與經過(多頭注意力層或者前饋全連接層)再進行規范化之后的結果進行相加得到殘差鏈接
殘差鏈接的作用:通過跨層連接(如恒等映射),梯度可直接通過“捷徑”回傳,避免因多層非線性變換導致的信號衰減或放大
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super().__init__()# 定義屬性self.features = features # 代表詞嵌入維度# epsself.eps = eps# 定義一個模型的參數(系數)self.a = nn.Parameter(torch.ones(features))self.b = nn.Parameter(torch.zeros(features))def forward(self, x):# x--->[2, 4, 512]# 1.求出均值:x_mean-->[2, 4, 1]x_mean = torch.mean(x, dim=-1, keepdim=True)# 2.求出標準差x_std = torch.std(x, dim=-1, keepdim=True)return self.a * (x - x_mean) / (x_std + self.eps) + self.b
子層鏈接結構
定義子層連接結構 把norm&add這一層與feedforward層或者muti_head_atten層進行連接,取決與輸入的sublayer是什么層
class SublayerConnection(nn.Module):def __init__(self, size, dropout_p=0.1):super().__init__()# 定義size屬性:詞嵌入的維度大小self.size = size# 實例化規范化層self.layer_norm = LayerNorm(features=size)# 實例化dropout層self.dropout = nn.Dropout(p=dropout_p)def forward(self, x, sublayer):# x--》來自于輸入部分:positionEncoding+WordEmbedding;[batch_size, seq_len, embed_dim]-->[2, 4, 512]# sublayer-->代表函數的對象:可以是處理多頭自注意力機制函數的對象,也可以是前饋全連接層對象# post_normx1 = x + self.dropout(self.layer_norm(sublayer(x)))# pre_norm# x1 = x + self.dropout(sublayer(self.layer_norm(x)))return x1
編碼器層
定義編碼器層 #超級拼裝:先試用子層鏈接拼成(norm&add+feedforward)層與(muti_head_atten+norm&add)層 把這兩個子層拼起來就是編碼器結構
輸入的是輸入部分:positionEncoding+WordEmbedding;[batch_size, seq_len, embed_dim]-->[2, 4, 512],輸出的是編碼器的結果-->送給解碼器當k和v使用
class EncoderLayer(nn.Module):def __init__(self, size, self_atten, feed_forward, dropout_p):super().__init__()# size:代表詞嵌入的維度self.size = size# self_atten:代表多頭自注意力機制的對象self.self_atten = self_atten# feed_forward:代表前饋全連接層的對象self.feed_forward = feed_forward# 定義兩層子層連接結構self.sub_layers = clones(SublayerConnection(size, dropout_p), 2)def forward(self, x, mask):# x-->來自輸入部分--》[batch_size, seq_len, embed_dim]:[2, 4, 512]# mask-->[head, seq_len, seq_len]-=-->[8, 4, 4]# 經過第一個子層連接結構:先經過多頭自注意力層--》然后經過norm-->最后殘差連接x1 = self.sub_layers[0](x, lambda x: self.self_atten(x, x, x, mask))# 經過第二個子層連接結構:先經過前饋全連接層--》然后經過norm-->最后殘差連接x2 = self.sub_layers[1](x1, self.feed_forward)return x2
編碼器
定義編碼器 超級拼裝2.0 n個編碼器層構成一個編碼器,按照這里的代碼,多個編碼器層是串聯執行,上一個編碼器的輸出作為下一個編碼器的輸入,最終輸出編碼器的結果給解碼器當v使用
class Encoder(nn.Module):def __init__(self, layer, N):super().__init__()# layer:代表編碼器層self.layer = layer# N:代表有幾個編碼器層# 定義N個編碼器層self.layers = clones(layer, N)# 實例化規范化層self.norm = LayerNorm(features=layer.size)def forward(self, x, mask):# x-->來自輸入部分--》[batch_size, seq_len, embed_dim]:[2, 4, 512]# mask-->[head, seq_len, seq_len]-=-->[8, 4, 4]# for循環迭代N個編碼器層得到最終的結果for layer in self.layers:x = layer(x, mask)return self.norm(x)
解碼器部分
解碼器層
依舊超級拼裝:先試用子層鏈接拼成(norm&add+feedforward)層,(muti_head_atten+norm&add)層,(mask_muti_head_atten+norm&add)層 ,然后把這三個子層拼起來就是解碼器結構
class DecoderLayer(nn.Module):def __init__(self, size, self_atten, src_atten, feed_forward, dropout_p):super().__init__()# size:代表詞嵌入維度的大小self.size = size# self_atten:自注意力機制的對象:Q=K=Vself.self_atten = self_atten# src_atten:一般注意力機制的對象:Q!=K=Vself.src_atten = src_atten# feed_forward:前饋全連接層對象self.feed_forward = feed_forward# 定義三個子層連接結構self.sub_layers = clones(SublayerConnection(size, dropout_p), 3)def forward(self, y, encoder_output, source_mask, target_mask):# y:代表解碼器的輸入--》[batch_size, seq_len, embed_dim]# encoder_output:代表編碼器的輸出結果--》[batch_size, seq_len, emebed_dim]# target_mask防止未來信息被提前看到/target_mask-->[head, y_seq_len, y_seq_len]# source_mask消除padding的影響# source_mask--shape-->[head, y_seq_len, x_seq_len]# 經過第一個子層連接結構 todo 看圖寫作:第一個子層連接結構是帶mask掩碼滴,輸入是q=k=v==預測值y的positionEncoding+WordEmbedding輸出,y1 = self.sub_layers[0](y, lambda x: self.self_atten(x, x, x, target_mask))# 經過第二個子層連接結構 todo 第二個子層鏈接是不帶mask掩碼,輸入的k=v==(源文本嵌入+位置編碼)再經過編碼器的輸出,v是第一個子層結構的輸出# query--》[2,6,512]-->[2, 8, 6, 64],key/value-->[2, 4, 512]-->[2, 8, 4, 64]# [2, 8, 6, 64]--和[2, 8, 4, 64]轉置[2,8, 64, 4]-->[2, 8, 6, 4]y2 = self.sub_layers[1](y1, lambda x: self.src_atten(x, encoder_output, encoder_output, source_mask))# 經過第三個子層連接結構 todo 這一層就是feed+norm&add 輸入什么維度輸出就是什么維度y3 = self.sub_layers[2](y2, self.feed_forward)return y3
解碼器
拼拼拼:n個解碼器層構成一個解碼器,這里的n==6 按照這里的代碼,多個解碼器層是串聯執行,上一個解碼器的輸出作為下一個解碼器的輸入,最終輸出解碼器的結果給輸出層
class Decoder(nn.Module):def __init__(self, layer, N):super().__init__()# layer:代表解碼器層self.layer = layer# N:代表有幾個解碼器層# 定義N個解碼層self.layers = clones(layer, N)# 實例化規范化層self.norm = LayerNorm(features=layer.size)def forward(self, y, encoder_output, source_mask, target_mask):# y:代表解碼器的輸入--》[batch_size, seq_len, embed_dim]# encoder_output:代表編碼器的輸出結果--》[batch_size, seq_len, emebed_dim]# target_mask防止未來信息被提前看到/target_mask-->[head, y_seq_len, y_seq_len]# source_mask消除padding的影響# source_mask--shape-->[head, y_seq_len, x_seq_len]# for循環迭代N個編碼器層得到最終的結果for layer in self.layers:y = layer(y, encoder_output, source_mask, target_mask)return self.norm(y)
輸出部分
生成器generator
輸出部分:將解碼器輸出經過一個線性層,再經過softmax,得到當前預測的結果
輸出[batch_size,seq_len,vocab_size] vocab_size是詞表詞個數,概率最大的為預測結果
class Generator(nn.Module):def __init__(self, d_model, vocab_size):# 參數d_model 線性層輸入特征尺寸大小# 參數vocab_size 線層輸出尺寸大小super(Generator, self).__init__()# 定義線性層self.project = nn.Linear(d_model, vocab_size)def forward(self, x):# 數據經過線性層 最后一個維度歸一化 log方式x = F.log_softmax(self.project(x), dim=-1)return x
使用部分
模擬使用,僅預測一個單詞
源文本輸入:也就是要翻譯的文本,兩個句子4個單詞
x = torch.tensor([[1, 40, 28, 100], [45, 89, 39, 10]])
上一步預測值輸入:如果為頭單詞,則為sos_token的詞向量表達
y0 = torch.tensor([[2, 4, 10, 29, 67, 89],[34, 56, 78, 20, 19, 6]])
詞表大小:要翻譯的語言總共有多少個單詞
vocab_size = 1000
詞向量維度
embed_dim = 512
總體流程概述:
編碼器部分:源文本輸入x經過embedding后與positionnal_encoding相加結果輸入encoder()
在每一個編碼器層經過兩個子層:
1.多頭自注意力子層+(殘差鏈接+規范化)
在編碼器的多頭注意力層中q=k=v
- 前饋全連接層+(殘差鏈接+規范化)
前饋全連接層的作用: 通過增加兩層網絡來增強模型的能力.
輸出x1
經過n個編碼器層后輸出xn給解碼器
解碼器部分:前一步的實際值或預測值 y0 = torch.tensor([[2, 4, 10, 29, 67, 89],[34, 56, 78, 20, 19, 6]])經過embedding后與positionnal_encoding相加輸入結果decoder()
在每一個解碼器層經過三個子層:
1.帶掩碼的多頭自注意力子層+(殘差鏈接+規范化)
在此層q=k=v mask的作用是防止未來信息被提前看見
2.多頭注意力子層+(殘差鏈接+規范化)
在此層k=v = 編碼器的輸出xn , q=上一個子層的輸入
- 前饋全連接子層+(殘差鏈接+規范化)
前饋全連接層的作用: 通過增加兩層網絡來增強模型的能力.
經過n個解碼器層后輸出yn給輸出部分
輸出部分:經過一個線性層和一個softmax層
線性層:通過對上一步的線性變化得到指定維度的輸出, 也就是轉換維度的作用.
softmax層:使最后一維的向量中的數字縮放到0-1的概率值域內, 并滿足他們的和為1.
輸出預測值result[batch_size,seq_len,vocab_size],batch_size句seq_len個單詞,其中概率最大的值為預測結果
def usb_position(): #todo 生成位置編碼vocab_size = 1000 # 定義詞匯大小embed_dim = 512 # 詞嵌入維度my_embed = Embeddings(vocab_size, embed_dim)x = torch.tensor([[1, 40, 28, 100], [45, 89, 39, 10]])embed_result = my_embed(x)my_position = PositionEncoding(d_model=512, dropout_p=0.1)position_result = my_position(embed_result)print(f'position_result-->{position_result.shape}')# print(f'position_result-->{position_result}')return position_result
def use_encoder(): # todo 輸入編碼器輸入(待翻譯的句子)以及位置編碼,得到編碼器輸出# 獲取編碼器輸入部分:[2, 4, 512]position_result = usb_position()# 實例化多頭注意力機制對象mutiHead_atten = MutiHeadAttention(head=8, embed_dim=512)# 實例化前饋全連接層對象ff = FeedForward(d_model=512, d_ff=1024)mask = torch.zeros(8, 4, 4)# 實例化編碼器層對象encoder_layer = EncoderLayer(size=512, self_atten=mutiHead_atten, feed_forward=ff, dropout_p=0.1)# 實例化編碼器對象encoder = Encoder(layer=encoder_layer, N=6)# 將數據送入編碼器 todo position_result先encoder_output = encoder(position_result, mask)print(f'encoder_output編碼器得到的結果--》{encoder_output}')print(f'encoder_output編碼器得到的結果--》{encoder_output.shape}')# todo 編碼器輸出return encoder_output
def use_decoder(): #todo 輸入編碼器輸出,得到解碼器輸出# 定義解碼器端的輸入 todo 編碼器的輸入是對應原始輸入序列(如待翻譯的源語言句子),解碼器輸入的是解碼器的輸入是右移(shifted right)的目標序列(如已生成的部分目標語言句子)。# todo 解碼器的輸出是解碼器逐步生成的目標序列(如翻譯結果),每次預測一個詞。y0 = torch.tensor([[2, 4, 10, 29, 67, 89],[34, 56, 78, 20, 19, 6]])vocab_size = 1000embed_dim = 512# 實例化Embedding層embed = Embeddings(vocab_size, embed_dim)# embed_y-->[2, 6, 512]embed_y = embed(y0)# # 實例化PositionEncoding層position_encode = PositionEncoding(d_model=512, dropout_p=0.1)# todo position_y 是預測的目標序列,位置編碼對預測結果進行編碼,從而提高預測效果。若為首字母則對應sos_tokenposition_y = position_encode(embed_y)# 實例化多頭注意力機制的對象muti_head_atten = MutiHeadAttention(head=8, embed_dim=512)self_atten = copy.deepcopy(muti_head_atten)src_atten = copy.deepcopy(muti_head_atten)# 實例化前饋全連接的對象ff = FeedForward(d_model=512, d_ff=1024)# 實例化解碼器層的對象decoder_layer = DecoderLayer(size=512, self_atten=self_atten, src_atten=src_atten, feed_forward=ff, dropout_p=0.1)# 準備數據# todo encoder_output是編碼器輸出結果,對應源語言句子encoder_output = use_encoder()source_mask = torch.zeros(8, 6, 4)target_mask = torch.zeros(8, 6, 6)# 實例化解碼器的對象decoder = Decoder(layer=decoder_layer, N=6)result = decoder(position_y, encoder_output, source_mask, target_mask)# print(f'解碼器得到的結果--》{result}')# print(f'解碼器得到的結果--》{result.shape}')return resultdef use_generator():x=use_decoder()my_generator = Generatorresult = my_generator(512, 1000)(x)print(f'生成器得到結果--》{result}')print(f'生成器得到結果--》{result.shape}')return result
if __name__ == '__main__':# use_decoder()use_generator()