import torch import torch.nn as nn import torch.nn.functional as F import math import numpy as np import pytestclass PositionalEncoding(nn.Module):def __init__(self, d_model, max_seq_length=5000):super(PositionalEncoding, self).__init__()# 創建位置編碼矩陣pe = torch.zeros(max_seq_length, d_model)position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))# 計算正弦和余弦位置編碼pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)# 注冊為非訓練參數self.register_buffer('pe', pe)def forward(self, x):# 添加位置編碼到輸入張量return x + self.pe[:, :x.size(1)]class MultiHeadAttention(nn.Module):def __init__(self, d_model, num_heads):super(MultiHeadAttention, self).__init__()assert d_model % num_heads == 0self.d_model = d_modelself.num_heads = num_headsself.d_k = d_model // num_heads# 定義線性變換層self.q_linear = nn.Linear(d_model, d_model)self.k_linear = nn.Linear(d_model, d_model)self.v_linear = nn.Linear(d_model, d_model)self.out_linear = nn.Linear(d_model, d_model)def forward(self, q, k, v, mask=None):batch_size = q.size(0)# 線性變換和重塑q = self.q_linear(q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)k = self.k_linear(k).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)v = self.v_linear(v).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)# 計算注意力分數scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)# 應用掩碼(如果提供)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)# 應用softmax獲取注意力權重attn_weights = F.softmax(scores, dim=-1)# 應用注意力權重到值向量attn_output = torch.matmul(attn_weights, v)# 重塑并應用最終線性變換attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)output = self.out_linear(attn_output)return outputclass FeedForward(nn.Module):def __init__(self, d_model, d_ff):super(FeedForward, self).__init__()self.linear1 = nn.Linear(d_model, d_ff)self.linear2 = nn.Linear(d_ff, d_model)def forward(self, x):return self.linear2(F.relu(self.linear1(x)))class EncoderLayer(nn.Module):def __init__(self, d_model, num_heads, d_ff, dropout=0.1):super(EncoderLayer, self).__init__()self.self_attn = MultiHeadAttention(d_model, num_heads)self.feed_forward = FeedForward(d_model, d_ff)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)def forward(self, x, mask=None):# 自注意力層和殘差連接attn_output = self.self_attn(x, x, x, mask)x = self.norm1(x + self.dropout(attn_output))# 前饋網絡和殘差連接ff_output = self.feed_forward(x)x = self.norm2(x + self.dropout(ff_output))return xclass DecoderLayer(nn.Module):def __init__(self, d_model, num_heads, d_ff, dropout=0.1):super(DecoderLayer, self).__init__()self.self_attn = MultiHeadAttention(d_model, num_heads)self.cross_attn = MultiHeadAttention(d_model, num_heads)self.feed_forward = FeedForward(d_model, d_ff)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.norm3 = nn.LayerNorm(d_model)self.dropout = nn.Dropout(dropout)def forward(self, x, enc_output, src_mask=None, tgt_mask=None):# 自注意力層和殘差連接attn_output = self.self_attn(x, x, x, tgt_mask)x = self.norm1(x + self.dropout(attn_output))# 編碼器-解碼器注意力層和殘差連接cross_attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)x = self.norm2(x + self.dropout(cross_attn_output))# 前饋網絡和殘差連接ff_output = self.feed_forward(x)x = self.norm3(x + self.dropout(ff_output))return xclass Transformer(nn.Module):def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_encoder_layers,num_decoder_layers, d_ff, max_seq_length, dropout=0.1):super(Transformer, self).__init__()# 詞嵌入層self.src_embedding = nn.Embedding(src_vocab_size, d_model)self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)# 位置編碼self.positional_encoding = PositionalEncoding(d_model, max_seq_length)# 編碼器和解碼器層self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout)for _ in range(num_encoder_layers)])self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout)for _ in range(num_decoder_layers)])# 輸出層self.output_layer = nn.Linear(d_model, tgt_vocab_size)self.dropout = nn.Dropout(dropout)self.d_model = d_model# 初始化參數self._init_parameters()def _init_parameters(self):for p in self.parameters():if p.dim() > 1:nn.init.xavier_uniform_(p)def forward(self, src, tgt, src_mask=None, tgt_mask=None):# 源序列和目標序列的嵌入和位置編碼src = self.src_embedding(src) * math.sqrt(self.d_model)src = self.positional_encoding(src)src = self.dropout(src)tgt = self.tgt_embedding(tgt) * math.sqrt(self.d_model)tgt = self.positional_encoding(tgt)tgt = self.dropout(tgt)# 編碼器前向傳播enc_output = srcfor enc_layer in self.encoder_layers:enc_output = enc_layer(enc_output, src_mask)# 解碼器前向傳播dec_output = tgtfor dec_layer in self.decoder_layers:dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)# 輸出層output = self.output_layer(dec_output)return output# 創建掩碼函數 def create_masks(src, tgt):# 源序列掩碼(用于屏蔽填充標記)src_mask = (src != 0).unsqueeze(1).unsqueeze(2)# 目標序列掩碼(用于屏蔽填充標記和未來標記)tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)# 創建后續標記掩碼(用于自回歸解碼)seq_length = tgt.size(1)nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()# 合并掩碼tgt_mask = tgt_mask & nopeak_maskreturn src_mask, tgt_mask# 簡單的訓練函數 def train_transformer(model, optimizer, criterion, train_loader, epochs):model.train()for epoch in range(epochs):total_loss = 0for src, tgt in train_loader:# 創建掩碼src_mask, tgt_mask = create_masks(src, tgt[:, :-1])# 前向傳播output = model(src, tgt[:, :-1], src_mask, tgt_mask)# 計算損失loss = criterion(output.contiguous().view(-1, output.size(-1)),tgt[:, 1:].contiguous().view(-1))# 反向傳播和優化optimizer.zero_grad()loss.backward()optimizer.step()total_loss += loss.item()print(f'Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}')# 添加model fixture @pytest.fixture def model():# 定義超參數d_model = 512num_heads = 8num_encoder_layers = 6num_decoder_layers = 6d_ff = 2048max_seq_length = 100dropout = 0.1# 假設的詞匯表大小src_vocab_size = 10000tgt_vocab_size = 10000# 創建模型model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads,num_encoder_layers, num_decoder_layers, d_ff, max_seq_length, dropout)return model# 添加test_loader fixture @pytest.fixture def test_loader():# 創建一個簡單的測試數據集batch_size = 2seq_length = 10# 隨機生成一些測試數據src_data = torch.randint(1, 10000, (batch_size, seq_length))tgt_data = torch.randint(1, 10000, (batch_size, seq_length))# 創建DataLoaderfrom torch.utils.data import TensorDataset, DataLoaderdataset = TensorDataset(src_data, tgt_data)test_loader = DataLoader(dataset, batch_size=batch_size)return test_loader# 簡單的測試函數 def test_transformer(model, test_loader):model.eval()correct = 0total = 0with torch.no_grad():for src, tgt in test_loader:# 創建掩碼src_mask, _ = create_masks(src, tgt)# 預測output = model(src, tgt, src_mask, None)pred = output.argmax(dim=-1)# 計算準確率total += tgt.size(0) * tgt.size(1)correct += (pred == tgt).sum().item()accuracy = correct / totalprint(f'Test Accuracy: {accuracy:.4f}')# 簡單的序列到序列翻譯示例 def translate(model, src_sequence, src_vocab, tgt_vocab, max_length=50):model.eval()# 將源序列轉換為索引src_indices = [src_vocab.get(token, src_vocab['<unk>']) for token in src_sequence]src_tensor = torch.LongTensor(src_indices).unsqueeze(0)# 創建源序列掩碼src_mask = (src_tensor != 0).unsqueeze(1).unsqueeze(2)# 初始目標序列為開始標記tgt_indices = [tgt_vocab['<sos>']]with torch.no_grad():for i in range(max_length):tgt_tensor = torch.LongTensor(tgt_indices).unsqueeze(0)# 創建目標序列掩碼_, tgt_mask = create_masks(src_tensor, tgt_tensor)# 預測下一個標記output = model(src_tensor, tgt_tensor, src_mask, tgt_mask)next_token_logits = output[:, -1, :]next_token = next_token_logits.argmax(dim=-1).item()# 添加預測的標記到目標序列tgt_indices.append(next_token)# 如果預測到結束標記,則停止if next_token == tgt_vocab['<eos>']:break# 將目標序列索引轉換回標記tgt_sequence = [tgt_vocab.get(index, '<unk>') for index in tgt_indices]return tgt_sequence# 示例使用 if __name__ == "__main__":# 定義超參數d_model = 512num_heads = 8num_encoder_layers = 6num_decoder_layers = 6d_ff = 2048max_seq_length = 100dropout = 0.1# 假設的詞匯表大小src_vocab_size = 10000tgt_vocab_size = 10000# 創建模型model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads,num_encoder_layers, num_decoder_layers, d_ff, max_seq_length, dropout)# 定義優化器和損失函數optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)criterion = nn.CrossEntropyLoss(ignore_index=0) # 忽略填充標記# 這里應該有實際的數據加載代碼# train_loader = ...# test_loader = ...# 訓練模型# train_transformer(model, optimizer, criterion, train_loader, epochs=10)# 測試模型# test_transformer(model, test_loader)# 翻譯示例# src_vocab = ...# tgt_vocab = ...# src_sequence = ["hello", "world", "!"]# translation = translate(model, src_sequence, src_vocab, tgt_vocab)# print(f"Source: {' '.join(src_sequence)}")# print(f"Translation: {' '.join(translation)}")