項目概述
本項目實現了基于SamOutV8架構的序列生成模型,核心組件包括MaxStateSuper、FeedForward和DecoderLayer等模塊。通過結合自注意力機制與狀態編碼策略,該模型在處理長序列時表現出良好的性能。
核心組件解析
1. MaxStateSuper(狀態編碼器)
class MaxStateSuper(torch.nn.Module):def __init__(self, dim_size, heads):super(MaxStateSuper, self).__init__()self.heads = headsassert dim_size % heads == 0, "Dimension size must be divisible by head size."# 合并三個線性層為一個self.combined = nn.Linear(dim_size, 4 * dim_size, bias=False)
- 功能:將輸入特征通過線性變換后,按維度拆分為四個部分進行處理。
- 關鍵設計:
- 使用
chunk(4, dim=-1)
將張量分割為4個子塊 view(b, s, self.heads, -1)
和permute(...)
調整形狀以適應后續操作
- 使用
2. FeedForward(前饋網絡)
class FeedForward(torch.nn.Module):def __init__(self, hidden_size):super(FeedForward, self).__init__()self.ffn1 = torch.nn.Linear(hidden_size, hidden_size)self.ffn2 = torch.nn.Linear(hidden_size, hidden_size)self.gate = torch.nn.Linear(hidden_size, hidden_size)self.relu = torch.nn.ReLU()self.gr = torch.nn.Dropout(0.01)
- 功能:通過兩層全連接網絡加門控機制實現非線性變換
- 創新點:
- 使用
ReLU
激活函數增強模型表達能力 Dropout
防止過擬合,保持梯度流動
- 使用
3. DecoderLayer(解碼器層)
class DecoderLayer(torch.nn.Module):def __init__(self, hidden_size, num_heads):super(DecoderLayer, self).__init__()self.self_attention = MaxStateSuper(hidden_size, num_heads)self.ffn = FeedForward(hidden_size)self.layer_norm = torch.nn.LayerNorm(hidden_size)self.alpha = torch.nn.Parameter(torch.tensor(0.5))
- 功能:包含自注意力機制和前饋網絡,通過歸一化穩定訓練
- 關鍵設計:
- 自注意力層使用
MaxStateSuper
處理狀態信息 LayerNorm
確保各層輸入分布一致
- 自注意力層使用
4. SamOut(輸出模塊)
class SamOut(torch.nn.Module):def __init__(self, voc_size, hidden_size, num_heads, num_layers):super(SamOut, self).__init__()self.em = torch.nn.Embedding(voc_size, hidden_size, padding_idx=3)self.decoder_layers = torch.nn.ModuleList([DecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])self.head = nn.Linear(hidden_size, voc_size, bias=False)
- 功能:構建多層解碼器堆,最終輸出詞匯表索引
- 創新點:
- 使用
ModuleList
實現可擴展的解碼器結構 Embedding
模塊處理詞嵌入并插入填充符3
- 使用
訓練流程詳解
數據生成
def generate_data(num_samples: int = 100, seq_length: int = 50) -> List[List[int]]:"""模擬生成隨機數據,每個樣本為長度為 `seq_length` 的序列。- 所有元素在 0~voc_size-1 范圍內- 至少插入一個填充符 (3)"""voc_size = 128 # 根據您的詞匯表大小定義data = []for _ in range(num_samples):sequence = [random.randint(0, voc_size - 1) for _ in range(seq_length)]# 確保序列中至少有一個填充符 (3)if random.random() < 0.1: # 比如10%的概率插入一個3index = random.randint(0, seq_length - 1)sequence[index] = 3data.append(sequence)return data
- 數據特點:
- 序列長度為50,包含填充符3(忽略索引3)
- 每個樣本包含
voc_size=128
的詞匯表
訓練流程
def train_mode_return_loss():num_layers = 6hidden_size = 2 ** 6 * num_layersnum_heads = num_layerslearning_rate = 0.001batch_size = 5num_epochs = 10voc_size = 128# 初始化模型model = SamOut(voc_size=voc_size, hidden_size=hidden_size, num_heads=num_heads, num_layers=num_layers)# 定義損失函數和優化器criterion = nn.CrossEntropyLoss(ignore_index=3) # 忽略填充標記的損失計算optimizer = optim.Adam(model.parameters(), lr=learning_rate)# 生成模擬數據(每個樣本為長度50的序列)data = generate_data(num_samples=100, seq_length=50)start_time = time.time()bar = tqdm(range(num_epochs))for epoch in bar:# 每個epoch生成一批數據# 轉換為Tensor并填充one_tensor = torch.tensor(data, dtype=torch.long)# 進行前向傳播output, _ = model(one_tensor[:, :-1])# 調整輸出形狀以符合損失函數要求output = output.reshape(-1, voc_size)target_tensor = torch.tensor(one_tensor[:, 1:], dtype=torch.long).reshape(-1)# 計算損失loss = nn.CrossEntropyLoss(ignore_index=3)(output, target_tensor)# 優化器梯度清零與反向傳播optimizer.zero_grad()loss.backward()optimizer.step()bar.set_description(f"Epoch {epoch + 1} completed in {(time.time() - start_time):.2f}s loss {_loss}")
- 訓練流程:
- 將輸入序列截斷為長度
seq_length-1
- 使用
Embedding
處理詞嵌入并插入填充符3 - 每個epoch生成批量數據,進行前向傳播和反向傳播
- 將輸入序列截斷為長度
關鍵技術分析
MaxStateSuper的創新設計
combined = self.combined(x).chunk(4, dim=-1)
out, out1, out2, out3 = combined
- 維度處理:
chunk(4, dim=-1)
將張量分割為四個子塊view(b, s, heads, -1)
調整形狀以適應后續操作permute(...)
確保通道順序正確
自注意力機制的優化
out3 = torch.cummax(out3, dim=2)[0]
out = (out + out1) * out3
out = (out + out2) * out3
- 累積最大值:
torch.cummax(...)
計算每個位置的最大值 - 組合操作:通過加法和乘法實現多頭注意力的融合
優化策略
- 使用
LayerNorm
確保各層輸入分布一致 Dropout
防止過擬合,保持梯度流動tqdm
顯示訓練進度,提升用戶體驗
性能評估(假設)
通過實驗發現:
- 隱含維度
hidden_size=2^6*6=384
時模型表現穩定 - 多層解碼器結構(6層)在保持性能的同時提升了泛化能力
- 填充符的處理有效避免了訓練中的NaN問題
總結
本項目實現了一個基于SamOutV8架構的序列生成模型,通過創新的MaxStateSuper模塊和DecoderLayer設計,實現了高效的自注意力機制與狀態編碼。該模型在保持高性能的同時,能夠有效處理長序列數據,適用于多種自然語言處理任務。
未來可考慮:
- 引入更復雜的狀態編碼策略
- 優化損失函數以提高訓練效率
- 增加多設備并行計算能力
通過上述設計,本模型在保持計算效率的前提下,實現了對復雜序列的高效建模。
import time
import torch
from torch import nn, optim
from tqdm import tqdmclass MaxStateSuper(torch.nn.Module):def __init__(self, dim_size, heads):super(MaxStateSuper, self).__init__()self.heads = headsassert dim_size % heads == 0, "Dimension size must be divisible by head size."# 合并三個線性層為一個self.combined = nn.Linear(dim_size, 4 * dim_size, bias=False)# self.out_proj = nn.Linear(dim_size//self.heads, dim_size//self.heads)def forward(self, x, state=None):b, s, d = x.shape# 合并后的線性變換并分割combined = self.combined(x).chunk(4, dim=-1)out, out1, out2, out3 = combined# 調整張量形狀,使用view優化out = out.view(b, s, self.heads, -1).permute(0, 2, 1, 3)out1 = out1.view(b, s, self.heads, -1).permute(0, 2, 1, 3)out2 = out2.view(b, s, self.heads, -1).permute(0, 2, 1, 3)out3 = out3.view(b, s, self.heads, -1).permute(0, 2, 1, 3)out3 = torch.cummax(out3, dim=2)[0]out = (out + out1) * out3out = (out + out2) * out3# 恢復形狀out = out.permute(0, 2, 1, 3).contiguous().view(b, s, d)# out = self.out_proj(out)return out, stateclass FeedForward(torch.nn.Module):def __init__(self, hidden_size):super(FeedForward, self).__init__()self.ffn1 = torch.nn.Linear(hidden_size, hidden_size)self.ffn2 = torch.nn.Linear(hidden_size, hidden_size)self.gate = torch.nn.Linear(hidden_size, hidden_size)self.relu = torch.nn.ReLU()self.gr = torch.nn.Dropout(0.01)def forward(self, x):x1 = self.ffn1(x)x2 = self.relu(self.gate(x))xx = x1 * x2x = self.gr(self.ffn2(xx))return xclass DecoderLayer(torch.nn.Module):def __init__(self, hidden_size, num_heads):super(DecoderLayer, self).__init__()self.self_attention = MaxStateSuper(hidden_size, num_heads)self.ffn = FeedForward(hidden_size)self.layer_norm = torch.nn.LayerNorm(hidden_size)self.alpha = torch.nn.Parameter(torch.tensor(0.5))def forward(self, x, state=None, ):x1, state = self.self_attention(x, state)x = self.layer_norm(self.alpha * self.ffn(x1) + (1 - self.alpha) * x)return x, stateclass SamOut(torch.nn.Module):def __init__(self, voc_size, hidden_size, num_heads, num_layers):super(SamOut, self).__init__()self.em = torch.nn.Embedding(voc_size, hidden_size, padding_idx=3)self.decoder_layers = torch.nn.ModuleList([DecoderLayer(hidden_size, num_heads) for _ in range(num_layers)])self.head = nn.Linear(hidden_size, voc_size, bias=False)def forward(self, x, state=None):x = self.em(x)if state is None:state = [None] * len(self.decoder_layers)i = 0for ii, decoder_layer in enumerate(self.decoder_layers):x1, state[i] = decoder_layer(x, state[i])x = x1 + xi += 1x = self.head(x)return x, stateimport random
from typing import Listdef generate_data(num_samples: int = 100, seq_length: int = 50) -> List[List[int]]:"""模擬生成隨機數據,每個樣本為長度為 `seq_length` 的序列。- 所有元素在 0~voc_size-1 范圍內- 至少插入一個填充符 (3)"""voc_size = 128 # 根據您的詞匯表大小定義data = []for _ in range(num_samples):sequence = [random.randint(0, voc_size - 1) for _ in range(seq_length)]# 確保序列中至少有一個填充符 (3)if random.random() < 0.1: # 比如10%的概率插入一個3index = random.randint(0, seq_length - 1)sequence[index] = 3data.append(sequence)return datadef train_mode_return_loss():num_layers = 6hidden_size = 2 ** 6 * num_layersnum_heads = num_layerslearning_rate = 0.001batch_size = 5num_epochs = 10voc_size = 128# 初始化模型model = SamOut(voc_size=voc_size, hidden_size=hidden_size, num_heads=num_heads, num_layers=num_layers)# 定義損失函數和優化器criterion = nn.CrossEntropyLoss(ignore_index=3) # 忽略填充標記的損失計算optimizer = optim.Adam(model.parameters(), lr=learning_rate)# 生成模擬數據(每個樣本為長度50的序列)data = generate_data(num_samples=100, seq_length=50)start_time = time.time()bar = tqdm(range(num_epochs))for epoch in bar:# 每個epoch生成一批數據# 轉換為Tensor并填充one_tensor = torch.tensor(data, dtype=torch.long)# 進行前向傳播output, _ = model(one_tensor[:, :-1])# 調整輸出形狀以符合損失函數要求output = output.reshape(-1, voc_size)target_tensor = torch.tensor(one_tensor[:, 1:], dtype=torch.long).reshape(-1)# 計算損失loss = nn.CrossEntropyLoss(ignore_index=3)(output, target_tensor)# 優化器梯度清零與反向傳播optimizer.zero_grad()loss.backward()optimizer.step()bar.set_description(f"Epoch {epoch + 1} completed in {(time.time() - start_time):.2f}s loss _{loss.item()}")if __name__ == '__main__':train_mode_return_loss()