Transformer 作為 NLP 領域的里程碑模型,徹底改變了序列建模的方式。它基于自注意力機制,擺脫了 RNN 的序列依賴,實現了并行計算,在機器翻譯、文本生成等任務中表現卓越。本文將從零開始,手寫一個簡化版 Transformer,并詳細講解其核心模塊的實現原理。
一、Transformer 整體架構回顧
Transformer 由編碼器(Encoder)?和解碼器(Decoder)?兩部分組成:
編碼器:將輸入序列(如源語言句子)編碼為上下文向量,也就是圖片中的左半
解碼器:根據編碼器輸出和已生成的目標序列,預測下一個詞,也就是圖片中的右半
核心模塊包括:詞嵌入、位置編碼、多頭注意力、前饋網絡、掩碼機制等。我們將逐一實現這些模塊,并最終組合成完整的 Transformer。
二、環境與數據準備
2.1 依賴環境
需要依賴的下載:
pip install torch torchvision torchaudio numpy
2.2 數據處理(data_deal.py)
我們使用簡單的德英翻譯數據集作為示例,先實現數據預處理邏輯:
這段代碼實現了一個 Transformer 模型的基礎數據處理部分,原理是將德語 - 英語平行語料轉換為模型可處理的索引序列,構建數據集和數據加載器,為后續模型訓練做準備:定義了詞匯表將文本映射為索引,通過自定義數據集類和 DataLoader 實現批量加載德語輸入、英語解碼器輸入(帶起始符)和解碼器輸出(帶結束符)的索引數據。
# data_deal.py
# 定義樣本數據(德語->英語)
sentences = [# enc_input dec_input dec_output['ich mochte ein bier P', 'S i want a beer .', 'i want a beer . E'],['ich mochte ein cola P', 'S i want a coke .', 'i want a coke . E']
]# 源語言(德語)詞匯表
src_vocab = {'P': 0, 'ich': 1, 'mochte': 2, 'ein': 3, 'bier': 4, 'cola': 5}
src_vocab_size = len(src_vocab)
src_idx2word = {i: w for i, w in enumerate(src_vocab)}# 目標語言(英語)詞匯表
tgt_vocab = {'P': 0, 'i': 1, 'want': 2, 'a': 3, 'beer': 4, 'coke': 5, 'S': 6, 'E': 7, '.': 8}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)# 序列長度配置
src_len = 5 # 源序列最大長度
tgt_len = 6 # 目標序列最大長度# Transformer超參數
d_model = 512 # 嵌入維度
d_ff = 2048 # 前饋網絡維度
n_heads = 8 # 多頭注意力頭數
n_layers = 6 # 編碼器/解碼器層數# 文本轉索引序列
def make_data(sentences):enc_inputs, dec_inputs, dec_outputs = [], [], []for i in range(len(sentences)):# 編碼器輸入:德語句子轉索引enc_input = [src_vocab[n] for n in sentences[i][0].split()]# 解碼器輸入:英語句子(帶起始符S)dec_input = [tgt_vocab[n] for n in sentences[i][1].split()]# 解碼器輸出:英語句子(帶結束符E)dec_output = [tgt_vocab[n] for n in sentences[i][2].split()]enc_inputs.append(enc_input)dec_inputs.append(dec_input)dec_outputs.append(dec_output)return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)# 構建數據集
class MyDataSet(Data.Dataset):def __init__(self, enc_inputs, dec_inputs, dec_outputs):super().__init__()self.enc_inputs = enc_inputsself.dec_inputs = dec_inputsself.dec_outputs = dec_outputsdef __len__(self):return self.enc_inputs.shape[0]def __getitem__(self, idx):return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]# 生成數據加載器
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs),batch_size=2,shuffle=True
)
數據處理說明:
詞匯表:將源語言和目標語言的單詞映射為整數索引(P 為填充符,S 為起始符,E 為結束符)
數據集:自定義
Dataset
類,將輸入序列、解碼器輸入、解碼器輸出打包數據加載器:用于批量加載數據,方便訓練
三、核心模塊實現
3.1 位置編碼(position.py)
Transformer 沒有循環結構,需要通過位置編碼注入序列的位置信息。采用正弦余弦函數實現:
這段代碼實現了Transformer中的位置編碼,原理是通過正弦和余弦函數生成與輸入序列長度、嵌入維度匹配的位置信息,注入到詞嵌入中以體現序列順序:偶數維度用正弦函數、奇數維度用余弦函數計算不同位置的編碼值,作為非參數緩沖區存儲,前向傳播時將其與輸入嵌入相加并應用dropout。
# position.py
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout=0.1, max_len=5000):super().__init__()self.dropout = nn.Dropout(p=dropout)# 初始化位置編碼矩陣pe = torch.zeros(max_len, d_model)# 位置索引(0到max_len-1)position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)# 頻率項:10000^(-2i/d_model)div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))# 偶數維度用sin,奇數維度用cospe[:, 0::2] = torch.sin(position * div_term) # 0,2,4...維度pe[:, 1::2] = torch.cos(position * div_term) # 1,3,5...維度pe = pe.unsqueeze(0) # 增加批次維度:[1, max_len, d_model]self.register_buffer('pe', pe) # 注冊為非參數緩沖區def forward(self, x):# x: [batch_size, seq_len, d_model]x = x + self.pe[:, :x.size(1), :] # 注入位置信息return self.dropout(x)
位置編碼原理:
- 公式:
- 作用:通過不同頻率的正弦余弦函數,讓模型感知單詞的位置關系(如相對位置)
3.2 掩碼機制(mask.py)
這段代碼實現了Transformer中的兩種注意力掩碼,原理是通過掩碼矩陣遮擋不需要參與注意力計算的位置:填充掩碼(att_pad_mask)將序列中值為0的填充位置標記為需遮擋,生成[batch_size, len_q, len_k]的掩碼矩陣;序列掩碼(att_sub_mask)用上三角矩陣標記未來位置為需遮擋,確保解碼時只能關注當前及之前的詞。
# mask.py
def att_pad_mask(seq_q, seq_k):"""填充掩碼:遮擋padding位置(值為0的位置)"""batch_size, len_q = seq_q.size()batch_size, len_k = seq_k.size()# seq_k中值為0的位置標記為True(需要遮擋)mask = seq_k.eq(0).unsqueeze(1) # [batch_size, 1, len_k]return mask.expand(batch_size, len_q, len_k) # [batch_size, len_q, len_k]def att_sub_mask(seq):"""序列掩碼:上三角矩陣,遮擋未來的詞"""attn_shape = [seq.size(0), seq.size(1), seq.size(1)]# 生成上三角矩陣(k=1表示對角線以上為1)mask = np.triu(np.ones(attn_shape), k=1)return torch.from_numpy(mask).byte() # 轉為byte類型掩碼
掩碼說明:
填充掩碼:確保模型不關注無意義的填充符(如句子長度不足時補的 P)
序列掩碼:在解碼器自注意力中,當前位置只能關注之前的位置(避免信息泄露)
3.3 多頭注意力(MHA.py)
這段代碼實現了Transformer中的注意力機制,原理是通過將輸入映射到多個查詢(Q)、鍵(K)、值(V)空間并行計算注意力,再合并結果以捕捉不同維度的關聯:基礎注意力(Attention)計算Q與K的相似度得分,經掩碼和softmax得到權重后與V加權求和;多頭注意力(MultiHeadAttention)通過線性變換將輸入分成多個頭并行計算注意力,合并后經線性變換、殘差連接和層歸一化輸出,既支持自注意力也支持交叉注意力。
# MHA.py
class Attention(nn.Module):"""基礎注意力計算"""def __init__(self, dropout=0.1):super().__init__()self.softmax = nn.Softmax(dim=-1)self.dropout = nn.Dropout(dropout)def forward(self, q, k, v, mask=None):# q: [batch_size, n_heads, len_q, d_k]# k: [batch_size, n_heads, len_k, d_k]# v: [batch_size, n_heads, len_v, d_v](len_k=len_v)scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(k.size(-1)) # 注意力得分if mask is not None:# 掩碼位置置為負無窮(softmax后接近0)scores = scores.masked_fill_(mask, -1e9)att = self.softmax(scores) # 注意力權重att = self.dropout(att)output = torch.matmul(att, v) # 加權求和return outputclass MultiHeadAttention(nn.Module):"""多頭注意力"""def __init__(self, d_model, num_heads):super().__init__()self.d_model = d_modelself.num_heads = num_headsself.d_k = d_model // num_heads # 每個頭的維度# Q、K、V的線性變換self.w_q = nn.Linear(d_model, d_model)self.w_k = nn.Linear(d_model, d_model)self.w_v = nn.Linear(d_model, d_model)self.attention = Attention()self.w_o = nn.Linear(d_model, d_model) # 輸出線性變換self.dropout = nn.Dropout(0.1)self.layer_norm = nn.LayerNorm(d_model) # 層歸一化def forward(self, enc_inputs, dec_inputs, mask=None):"""enc_inputs: 編碼器輸入(自注意力時為QKV的源,交叉注意力時為K/V的源)dec_inputs: 解碼器輸入(自注意力時為QKV的源,交叉注意力時為Q的源)"""res = dec_inputs # 殘差連接的輸入batch_size = enc_inputs.size(0)# 線性變換 + 分頭([batch_size, seq_len, d_model] -> [batch_size, n_heads, seq_len, d_k])Q = self.w_q(dec_inputs).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)K = self.w_k(enc_inputs).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)V = self.w_v(enc_inputs).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)# 處理掩碼維度(擴展到多頭)if mask is not None:mask = mask.unsqueeze(1).repeat(1, self.num_heads, 1, 1) # [batch_size, n_heads, len_q, len_k]# 計算注意力att_out = self.attention(Q, K, V, mask)# 多頭合并([batch_size, n_heads, seq_len, d_k] -> [batch_size, seq_len, d_model])att_out = att_out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)att_out = self.w_o(att_out) # 輸出線性變換# 殘差連接 + 層歸一化att_out = self.dropout(att_out)att_out = self.layer_norm(att_out + res)return att_out
多頭注意力原理:
將 Q、K、V 通過線性變換投影到低維(d_k = d_model /n_heads)
拆分到多個頭并行計算注意力
合并多頭結果,通過線性變換得到最終輸出
加入殘差連接和層歸一化(穩定訓練)
3.4 前饋網絡(FFN.py)
前饋網絡對每個位置進行獨立的非線性變換,增強模型表達能力:
# FFN.py
class FFN(nn.Module):def __init__(self, d_model, d_ff):super().__init__()self.ffn = nn.Sequential(nn.Linear(d_model, d_ff), # 升維nn.ReLU(),nn.Dropout(0.1),nn.Linear(d_ff, d_model) # 降維)self.dropout = nn.Dropout(0.1)self.layer_norm = nn.LayerNorm(d_model)def forward(self, x):res = x # 殘差連接x = self.ffn(x)x = self.dropout(x)return self.layer_norm(x + res) # 殘差 + 層歸一化
前饋網絡作用:
通過兩層線性變換和 ReLU 激活,對注意力輸出進行非線性映射
保持輸入輸出維度一致(d_model),方便殘差連接
四、編碼器與解碼器實現
4.1 編碼器(encoder.py)
編碼器由 N 個編碼器層堆疊而成,每個編碼器層包含:多頭自注意力 + 前饋網絡
這段代碼實現了Transformer的編碼器部分,原理是通過多層疊加的編碼器層對輸入序列進行深度特征提取:編碼器(Encoder)先將輸入序列通過詞嵌入和位置編碼轉換為向量表示,再傳入n_layers個編碼器層(EncoderLayer);每個編碼器層包含自注意力機制(捕捉序列內部詞與詞的關聯)和前饋網絡(進行非線性變換),并通過填充掩碼處理padding位置,最終輸出編碼后的序列特征。
# encoder.py
class EncoderLayer(nn.Module):"""編碼器層"""def __init__(self, d_model, d_ff, n_heads):super().__init__()self.multi_head_attention = MultiHeadAttention(d_model, n_heads) # 自注意力self.feed_forward = FFN(d_model, d_ff) # 前饋網絡def forward(self, enc_inputs, mask=None):# 自注意力(Q=K=V=enc_inputs)enc_outputs = self.multi_head_attention(enc_inputs, enc_inputs, mask)# 前饋網絡enc_outputs = self.feed_forward(enc_outputs)return enc_outputsclass Encoder(nn.Module):"""編碼器"""def __init__(self, vocab_size, d_model, d_ff, n_heads, n_layers, dropout=0.1):super().__init__()self.embedding = nn.Embedding(vocab_size, d_model) # 詞嵌入self.position_encoding = PositionalEncoding(d_model) # 位置編碼# 堆疊n_layers個編碼器層self.layers = nn.ModuleList([EncoderLayer(d_model, d_ff, n_heads)for _ in range(n_layers)])def forward(self, enc_inputs):# 詞嵌入 + 位置編碼enc_outputs = self.embedding(enc_inputs)enc_outputs = self.position_encoding(enc_outputs)# 生成填充掩碼(自注意力中,遮擋padding)mask = att_pad_mask(enc_inputs, enc_inputs)# 經過所有編碼器層for layer in self.layers:enc_outputs = layer(enc_outputs, mask)return enc_outputs
4.2 解碼器(decoder.py)
這段代碼實現了Transformer的解碼器部分,原理是通過多層疊加的解碼器層結合編碼器輸出生成目標序列,同時確保解碼時不依賴未來信息:解碼器(Decoder)先將目標序列經詞嵌入和位置編碼轉換為向量表示,再傳入n_layers個解碼器層(DecoderLayer);每個解碼器層包含三步處理——帶填充+序列合并掩碼的自注意力(僅關注當前及之前的詞)、以編碼器輸出為鍵值的交叉注意力(關聯源序列信息)、前饋網絡(非線性變換),最終輸出解碼后的序列特征。
# decoder.py
class DecoderLayer(nn.Module):"""解碼器層"""def __init__(self, d_model, d_ff, n_heads):super().__init__()self.self_attn = MultiHeadAttention(d_model, n_heads) # 解碼器自注意力(帶掩碼)self.cross_attn = MultiHeadAttention(d_model, n_heads) # 交叉注意力(與編碼器交互)self.feed_forward = FFN(d_model, d_ff) # 前饋網絡def forward(self, enc_outputs, dec_inputs, mask_self=None, mask_cross=None):# 1. 解碼器自注意力(Q=K=V=dec_inputs,帶掩碼)dec_outputs = self.self_attn(dec_inputs, dec_inputs, mask_self)# 2. 交叉注意力(Q=dec_outputs,K=V=enc_outputs)dec_outputs = self.cross_attn(enc_outputs, dec_outputs, mask_cross)# 3. 前饋網絡dec_outputs = self.feed_forward(dec_outputs)return dec_outputsclass Decoder(nn.Module):"""解碼器"""def __init__(self, vocab_size, d_model, d_ff, n_heads, n_layers, dropout=0.1):super().__init__()self.embedding = nn.Embedding(vocab_size, d_model) # 詞嵌入self.position_encoding = PositionalEncoding(d_model) # 位置編碼# 堆疊n_layers個解碼器層self.layers = nn.ModuleList([DecoderLayer(d_model, d_ff, n_heads)for _ in range(n_layers)])def forward(self, enc_inputs, enc_outputs, dec_inputs):# 詞嵌入 + 位置編碼dec_outputs = self.embedding(dec_inputs)dec_outputs = self.position_encoding(dec_outputs)# 1. 解碼器自注意力掩碼(填充掩碼 + 序列掩碼)mask_pad = att_pad_mask(dec_inputs, dec_inputs).to(dec_inputs.device) # 填充掩碼mask_sub = att_sub_mask(dec_inputs).to(dec_inputs.device) # 序列掩碼mask_self = torch.gt((mask_pad + mask_sub), 0) # 合并掩碼(True表示需要遮擋)# 2. 交叉注意力掩碼(目標序列對源序列的填充掩碼)mask_cross = att_pad_mask(dec_inputs, enc_inputs).to(dec_inputs.device)# 經過所有解碼器層for layer in self.layers:dec_outputs = layer(enc_outputs, dec_outputs, mask_self, mask_cross)return dec_outputs
五、Transformer 整體組裝(transformer.py)
將編碼器、解碼器和輸出投影層組合成完整模型:
# transformer.py
class Transformer(nn.Module):def __init__(self, enc_vocab_size, dec_vocab_size, d_model, d_ff, n_heads, n_layers):super().__init__()self.encoder = Encoder(enc_vocab_size, d_model, d_ff, n_heads, n_layers)self.decoder = Decoder(dec_vocab_size, d_model, d_ff, n_heads, n_layers)self.projection = nn.Linear(d_model, dec_vocab_size) # 輸出投影到目標詞匯表def forward(self, enc_inputs, dec_inputs):# 編碼器輸出enc_outputs = self.encoder(enc_inputs)# 解碼器輸出dec_outputs = self.decoder(enc_inputs, enc_outputs, dec_inputs)# 投影到詞匯表outputs = self.projection(dec_outputs)# 調整維度:[batch_size, tgt_len, vocab_size] -> [batch_size*tgt_len, vocab_size]return outputs.view(-1, outputs.size(2))
六、模型訓練(訓練.py)
使用交叉熵損失和 SGD 優化器訓練模型:
# 訓練.py
from torch import nn, optim
from transformer import Transformer
from data_deal import *# 設備選擇
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')# 初始化模型
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, d_ff, n_heads, n_layers).to(device)# 損失函數(忽略填充符0)
criterion = nn.CrossEntropyLoss(ignore_index=0)
# 優化器(SGD+動量)
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)# 訓練循環
for epoch in range(100):for idx, (enc_inputs, dec_inputs, dec_labels) in enumerate(loader):# 數據移至設備enc_inputs, dec_inputs, dec_labels = enc_inputs.to(device), dec_inputs.to(device), dec_labels.to(device)optimizer.zero_grad() # 清空梯度outputs = model(enc_inputs, dec_inputs) # 模型輸出loss = criterion(outputs, dec_labels.view(-1)) # 計算損失loss.backward() # 反向傳播optimizer.step() # 更新參數# 打印每輪損失print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss.item()))# 保存模型
torch.save(model.state_dict(), 'transformer.pth')
七、模型預測(預測.py)
使用貪婪解碼生成目標序列:
# 預測.py
from transformer import Transformer
from data_deal import *device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')# 加載模型
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, d_ff, n_heads, n_layers).to(device)
model.load_state_dict(torch.load('transformer.pth', weights_only=False))
model.eval() # 切換到評估模式def greedy_decoder(model, enc_input, start_symbol):"""貪婪解碼:從起始符開始,每次選擇概率最大的詞"""enc_outputs = model.encoder(enc_input) # 編碼器輸出dec_input = torch.zeros(1, 0).type_as(enc_input.data) # 初始化解碼器輸入terminal = Falsenext_symbol = start_symbol # 起始符while not terminal:# 拼接下一個符號dec_input = torch.cat([dec_input.detach(), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)], -1)# 解碼器輸出dec_outputs = model.decoder(enc_input, enc_outputs, dec_input)# 投影到詞匯表projected = model.projection(dec_outputs)# 選擇概率最大的詞next_symbol = projected.squeeze(0).max(dim=-1)[1][-1].item()if next_symbol == tgt_vocab["E"]: # 遇到結束符則停止terminal = Truereturn dec_input# 測試
enc_inputs, _, _ = next(iter(loader))
enc_inputs = enc_inputs.to(device)
for i in range(len(enc_inputs)):# 生成解碼輸入greedy_dec_input = greedy_decoder(model, enc_inputs[i].view(1, -1), start_symbol=tgt_vocab["S"])# 預測predict = model(enc_inputs[i].view(1, -1), greedy_dec_input)predict = predict.view(-1, predict.size(-1)).max(1)[1]# 打印結果print([src_idx2word[word.item()] for word in enc_inputs[i]], '->', [idx2word[n.item()] for n in predict])