前言
今天是Transformer的編碼實戰階段,照著示例代碼執行一遍吧
embedding
self.tok_embeddings = nn.Embedding(args.vocab_size, args.dim)
把token向量轉為embedding矩陣(一個token一個embedding向量)
位置編碼
為了解決“我喜歡你”和“你喜歡我”結果一致的問題,加入了位置的影響。
小技巧
import pdb
pdb.set_trace()
在需要打斷點的地方加這個,可以暫停python程序進行調試。
Transformer源碼和結構圖
來自:happy-llm,代碼部分做了第2章代碼的合并總結,方便本地直接運行。
import torch
import torch.nn as nn
import torch.nn.functional as F
import math'''多頭自注意力計算模塊'''
class MultiHeadAttention(nn.Module):def __init__(self, args, is_causal=False):# 構造函數# args: 配置對象super().__init__()# 隱藏層維度必須是頭數的整數倍,因為后面我們會將輸入拆成頭數個矩陣assert args.dim % args.n_heads == 0# 模型并行處理大小,默認為1。model_parallel_size = 1# 本地計算頭數,等于總頭數除以模型并行處理大小。self.n_local_heads = args.n_heads // model_parallel_size# 每個頭的維度,等于模型維度除以頭的總數。self.head_dim = args.dim // args.n_heads# Wq, Wk, Wv 參數矩陣,每個參數矩陣為 n_embd x n_embd# 這里通過三個組合矩陣來代替了n個參數矩陣的組合,其邏輯在于矩陣內積再拼接其實等同于拼接矩陣再內積,# 不理解的讀者可以自行模擬一下,每一個線性層其實相當于n個參數矩陣的拼接self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)self.wk = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)self.wv = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)# 輸出權重矩陣,維度為 dim x n_embd(head_dim = n_embeds / n_heads)self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)# 注意力的 dropoutself.attn_dropout = nn.Dropout(args.dropout)# 殘差連接的 dropoutself.resid_dropout = nn.Dropout(args.dropout)self.is_causal = is_causal# 創建一個上三角矩陣,用于遮蔽未來信息# 注意,因為是多頭注意力,Mask 矩陣比之前我們定義的多一個維度if is_causal:mask = torch.full((1, 1, args.max_seq_len, args.max_seq_len), float("-inf"))mask = torch.triu(mask, diagonal=1)# 注冊為模型的緩沖區self.register_buffer("mask", mask)def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor):# 獲取批次大小和序列長度,[batch_size, seq_len, dim]bsz, seqlen, _ = q.shape# 計算查詢(Q)、鍵(K)、值(V),輸入通過參數矩陣層,維度為 (B, T, n_embed) x (n_embed, n_embed) -> (B, T, n_embed)xq, xk, xv = self.wq(q), self.wk(k), self.wv(v)# 將 Q、K、V 拆分成多頭,維度為 (B, T, n_head, C // n_head),然后交換維度,變成 (B, n_head, T, C // n_head)# 因為在注意力計算中我們是取了后兩個維度參與計算# 為什么要先按B*T*n_head*C//n_head展開再互換1、2維度而不是直接按注意力輸入展開,是因為view的展開方式是直接把輸入全部排開,# 然后按要求構造,可以發現只有上述操作能夠實現我們將每個頭對應部分取出來的目標xq = xq.view(bsz, seqlen, self.n_local_heads, self.head_dim)xk = xk.view(bsz, seqlen, self.n_local_heads, self.head_dim)xv = xv.view(bsz, seqlen, self.n_local_heads, self.head_dim)xq = xq.transpose(1, 2)xk = xk.transpose(1, 2)xv = xv.transpose(1, 2)# 注意力計算# 計算 QK^T / sqrt(d_k),維度為 (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)scores = torch.matmul(xq, xk.transpose(2, 3)) / math.sqrt(self.head_dim)# 掩碼自注意力必須有注意力掩碼if self.is_causal:assert hasattr(self, 'mask')# 這里截取到序列長度,因為有些序列可能比 max_seq_len 短scores = scores + self.mask[:, :, :seqlen, :seqlen]# 計算 softmax,維度為 (B, nh, T, T)scores = F.softmax(scores.float(), dim=-1).type_as(xq)# 做 Dropoutscores = self.attn_dropout(scores)# V * Score,維度為(B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)output = torch.matmul(scores, xv)# 恢復時間維度并合并頭。# 將多頭的結果拼接起來, 先交換維度為 (B, T, n_head, C // n_head),再拼接成 (B, T, n_head * C // n_head)# contiguous 函數用于重新開辟一塊新內存存儲,因為Pytorch設置先transpose再view會報錯,# 因為view直接基于底層存儲得到,然而transpose并不會改變底層存儲,因此需要額外存儲output = output.transpose(1, 2).contiguous().view(bsz, seqlen, -1)# 最終投影回殘差流。output = self.wo(output)output = self.resid_dropout(output)return outputclass MLP(nn.Module):'''前饋神經網絡'''def __init__(self, dim: int, hidden_dim: int, dropout: float):super().__init__()# 定義第一層線性變換,從輸入維度到隱藏維度self.w1 = nn.Linear(dim, hidden_dim, bias=False)# 定義第二層線性變換,從隱藏維度到輸入維度self.w2 = nn.Linear(hidden_dim, dim, bias=False)# 定義dropout層,用于防止過擬合self.dropout = nn.Dropout(dropout)def forward(self, x):# 前向傳播函數# 首先,輸入x通過第一層線性變換和RELU激活函數# 然后,結果乘以輸入x通過第三層線性變換的結果# 最后,通過第二層線性變換和dropout層return self.dropout(self.w2(F.relu(self.w1(x))))class LayerNorm(nn.Module):''' Layer Norm 層'''def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()# 線性矩陣做映射self.a_2 = nn.Parameter(torch.ones(features))self.b_2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):# 在統計每個樣本所有維度的值,求均值和方差mean = x.mean(-1, keepdim=True) # mean: [bsz, max_len, 1]std = x.std(-1, keepdim=True) # std: [bsz, max_len, 1]# 注意這里也在最后一個維度發生了廣播return self.a_2 * (x - mean) / (std + self.eps) + self.b_2class EncoderLayer(nn.Module):'''Encoder層'''def __init__(self, args):super().__init__()# 一個 Layer 中有兩個 LayerNorm,分別在 Attention 之前和 MLP 之前self.attention_norm = LayerNorm(args.n_embd)# Encoder 不需要掩碼,傳入 is_causal=Falseself.attention = MultiHeadAttention(args, is_causal=False)self.fnn_norm = LayerNorm(args.n_embd)self.feed_forward = MLP(args.n_embd, args.hidden_dim, args.dropout)def forward(self, x):# Layer Normnorm_x = self.attention_norm(x)# 自注意力h = x + self.attention.forward(norm_x, norm_x, norm_x)# 經過前饋神經網絡out = h + self.feed_forward.forward(self.fnn_norm(h))return outclass Encoder(nn.Module):'''Encoder 塊'''def __init__(self, args):super(Encoder, self).__init__() # 一個 Encoder 由 N 個 Encoder Layer 組成self.layers = nn.ModuleList([EncoderLayer(args) for _ in range(args.n_layer)])self.norm = LayerNorm(args.n_embd)def forward(self, x):"分別通過 N 層 Encoder Layer"for layer in self.layers:x = layer(x)return self.norm(x)class DecoderLayer(nn.Module):'''解碼層'''def __init__(self, args):super().__init__()# 一個 Layer 中有三個 LayerNorm,分別在 Mask Attention 之前、Self Attention 之前和 MLP 之前self.attention_norm_1 = LayerNorm(args.n_embd)# Decoder 的第一個部分是 Mask Attention,傳入 is_causal=Trueself.mask_attention = MultiHeadAttention(args, is_causal=True)self.attention_norm_2 = LayerNorm(args.n_embd)# Decoder 的第二個部分是 類似于 Encoder 的 Attention,傳入 is_causal=Falseself.attention = MultiHeadAttention(args, is_causal=False)self.ffn_norm = LayerNorm(args.n_embd)# 第三個部分是 MLPself.feed_forward = MLP(args.n_embd, args.hidden_dim, args.dropout)def forward(self, x, enc_out):# Layer Normnorm_x = self.attention_norm_1(x)# 掩碼自注意力x = x + self.mask_attention.forward(norm_x, norm_x, norm_x)# 多頭注意力norm_x = self.attention_norm_2(x)h = x + self.attention.forward(norm_x, enc_out, enc_out)# 經過前饋神經網絡out = h + self.feed_forward.forward(self.ffn_norm(h))return outclass Decoder(nn.Module):'''解碼器'''def __init__(self, args):super(Decoder, self).__init__() # 一個 Decoder 由 N 個 Decoder Layer 組成self.layers = nn.ModuleList([DecoderLayer(args) for _ in range(args.n_layer)])self.norm = LayerNorm(args.n_embd)def forward(self, x, enc_out):"Pass the input (and mask) through each layer in turn."for layer in self.layers:x = layer(x, enc_out)return self.norm(x)class PositionalEncoding(nn.Module):'''位置編碼模塊'''def __init__(self, args):super(PositionalEncoding, self).__init__()# Dropout 層self.dropout = nn.Dropout(p=args.dropout)# block size 是序列的最大長度pe = torch.zeros(args.block_size, args.n_embd)position = torch.arange(0, args.block_size).unsqueeze(1)# 計算 thetadiv_term = torch.exp(torch.arange(0, args.n_embd, 2) * -(math.log(10000.0) / args.n_embd))# 分別計算 sin、cos 結果pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer("pe", pe)def forward(self, x):# 將位置編碼加到 Embedding 結果上x = x + self.pe[:, : x.size(1)].requires_grad_(False)return self.dropout(x)class Transformer(nn.Module):'''整體模型'''def __init__(self, args):super().__init__()# 必須輸入詞表大小和 block sizeassert args.vocab_size is not Noneassert args.block_size is not Noneself.args = argsself.transformer = nn.ModuleDict(dict(wte = nn.Embedding(args.vocab_size, args.n_embd),wpe = PositionalEncoding(args),drop = nn.Dropout(args.dropout),encoder = Encoder(args),decoder = Decoder(args),))# 最后的線性層,輸入是 n_embd,輸出是詞表大小self.lm_head = nn.Linear(args.n_embd, args.vocab_size, bias=False)# 初始化所有的權重self.apply(self._init_weights)# 查看所有參數的數量print("number of parameters: %.2fM" % (self.get_num_params()/1e6,))'''統計所有參數的數量'''def get_num_params(self, non_embedding=False):# non_embedding: 是否統計 embedding 的參數n_params = sum(p.numel() for p in self.parameters())# 如果不統計 embedding 的參數,就減去if non_embedding:n_params -= self.transformer.wpe.weight.numel()return n_params'''初始化權重'''def _init_weights(self, module):# 線性層和 Embedding 層初始化為正則分布if isinstance(module, nn.Linear):torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)if module.bias is not None:torch.nn.init.zeros_(module.bias)elif isinstance(module, nn.Embedding):torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)'''前向計算函數'''def forward(self, idx, targets=None):# 輸入為 idx,維度為 (batch size, sequence length, 1);targets 為目標序列,用于計算 lossdevice = idx.deviceb, t = idx.size()assert t <= self.args.block_size, f"不能計算該序列,該序列長度為 {t}, 最大序列長度只有 {self.args.block_size}"# 通過 self.transformer# 首先將輸入 idx 通過 Embedding 層,得到維度為 (batch size, sequence length, n_embd)print("idx",idx.size())# 通過 Embedding 層tok_emb = self.transformer.wte(idx)print("tok_emb",tok_emb.size())# 然后通過位置編碼pos_emb = self.transformer.wpe(tok_emb) # 再進行 Dropoutx = self.transformer.drop(pos_emb)# 然后通過 Encoderprint("x after wpe:",x.size())enc_out = self.transformer.encoder(x)print("enc_out:",enc_out.size())# 再通過 Decoderx = self.transformer.decoder(x, enc_out)print("x after decoder:",x.size())if targets is not None:# 訓練階段,如果我們給了 targets,就計算 loss# 先通過最后的 Linear 層,得到維度為 (batch size, sequence length, vocab size)logits = self.lm_head(x)# 再跟 targets 計算交叉熵loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)else:# 推理階段,我們只需要 logits,loss 為 None# 取 -1 是只取序列中的最后一個作為輸出logits = self.lm_head(x[:, [-1], :]) # note: using list [-1] to preserve the time dimloss = Nonereturn logits, lossif __name__ == '__main__':class Args:vocab_size = 10000block_size = 128n_embd = 256dropout = 0.1n_layer = 6n_heads = 8dim = 256hidden_dim = 512dropout = 0.1max_seq_len = 128args = Args()model = Transformer(args)print(model)# 訓練模型是有targetprint("########## Training ##########")target = torch.randint(0, args.vocab_size, (2, args.block_size))logits, loss = model(target, target)print("Training logits shape:", logits.shape)print("logits shape:", logits.shape)print("loss:", loss)# 測試模型的前向計算print("########## Inference ##########")idx = torch.randint(0, args.vocab_size, (2, args.block_size))logits, loss = model(idx)print("Inference logits shape:", logits.shape)print("logits shape:", logits.shape)print("loss:", loss)
將上面代碼保存為 transformer_demo.py
后,直接運行即可
python .\transformer_demo.py
運行結果
可以看到訓練模式下計算了loss(真實GPU訓練還會再稍復雜一些)。推理模式下直接推理了下一個的輸出,[2,1,10000]中,10000是詞表大小,2是batch(如同時2個人在用),1是下一次token。10000中最大的數值最大的位置就是下次概率最大的token。
參考鏈接
1、happy-llm/docs/chapter2/第二章 Transformer架構.md