一、三種分詞器基本介紹
word-level:將文本按照空格或者標點分割成單詞,但是詞典大小太大
subword-level:詞根分詞(主流)
char-level:將文本按照字母級別分割成token
二、charlevel代碼
導包:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
from tqdm.auto import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as Fprint(sys.version_info)
for module in mpl, np, pd, sklearn, torch:print(module.__name__, module.__version__)device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
數據準備(需下載):
# https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt
#文件已經下載好了
with open("./shakespeare.txt", "r", encoding="utf8") as file:text = file.read()print("length", len(text))
print(text[0:100])
?構造字典:
# 1. generate vocab
# 2. build mapping char->id
# 3. data -> id_data 把數據都轉為id
# 4. a b c d [EOS] -> [BOS] b c d 預測下一個字符生成的模型,也就是輸入是a,輸出就是b#去重,留下獨立字符,并排序(排序是為了好看)
vocab = sorted(set(text)) # 利用set去重,sorted排序
print(len(vocab))
print(vocab)#每個字符都編好號,enumerate對每一個位置編號,生成的是列表中是元組,下面字典生成式
char2idx = {char:idx for idx, char in enumerate(vocab)}
print(char2idx)# 把vocab從列表變為ndarray
idx2char = np.array(vocab)
print(idx2char)#把字符都轉換為id
text_as_int = np.array([char2idx[c] for c in text])
print(text_as_int.shape)
print(len(text_as_int))
print(text_as_int[0:10])
print(text[0:10])
enumerate()
?是Python內置函數,用于給可迭代對象添加序號- 語法:
enumerate(iterable, start=0)
- 作用:將列表/字符串等轉換為(索引, 元素)元組的序列
一共1115394個字符,這里分為11043個batch,每個樣本101個字符,原因如下:
比如有Jeep四個字符,那么那前三個字母輸入J就預測到e,再輸入e預測到e再預測到p,相當于錯開預測。前100和最后一個錯開,就是上圖的效果。
把text分為樣本:
rom torch.utils.data import Dataset, DataLoaderclass CharDataset(Dataset):#text_as_int是字符的id列表,seq_length是每個樣本的長度def __init__(self, text_as_int, seq_length):self.sub_len = seq_length + 1 #一個樣本的長度self.text_as_int = text_as_intself.num_seq = len(text_as_int) // self.sub_len #樣本的個數def __getitem__(self, index):#index是樣本的索引,返回的是一個樣本,比如第一個,就是0-100的字符,總計101個字符return self.text_as_int[index * self.sub_len: (index + 1) * self.sub_len]def __len__(self): #返回樣本的個數return self.num_seq#batch是一個列表,列表中的每一個元素是一個樣本,有101個字符,前100個是輸入,后100個是輸出
def collat_fct(batch):src_list = [] #輸入trg_list = [] #輸出for part in batch:src_list.append(part[:-1]) #輸入trg_list.append(part[1:]) #輸出src_list = np.array(src_list) #把列表轉換為ndarraytrg_list = np.array(trg_list) #把列表轉換為ndarrayreturn torch.Tensor(src_list).to(dtype=torch.int64), torch.Tensor(trg_list).to(dtype=torch.int64) #返回的是一個元組,元組中的每一個元素是一個torch.Tensor#每個樣本的長度是101,也就是100個字符+1個結束符
train_ds = CharDataset(text_as_int, 100)
train_dl = DataLoader(train_ds, batch_size=64, shuffle=True, collate_fn=collat_fct)
#%%
-
seq_length
:模型輸入的序列長度(例如100) -
sub_len
:實際存儲長度 = 輸入長度 + 目標長度(每個樣本多存1個字符用于構造目標)
假設原始文本數字編碼為:[1,2,3,4,5,6,7,8,9,10]
,當seq_length=3
時:樣本1: [1,2,3,4] → 輸入[1,2,3],目標[2,3,4] 樣本2: [5,6,7,8] → 輸入[5,6,7],目標[6,7,8] 剩余字符[9,10]被舍棄
定義模型:
class CharRNN(nn.Module):def __init__(self, vocab_size, embedding_dim=256, hidden_dim=1024):super(CharRNN, self).__init__()self.embedding = nn.Embedding(vocab_size, embedding_dim)#batch_first=True,輸入的數據格式是(batch_size, seq_len, embedding_dim)self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)self.fc = nn.Linear(hidden_dim, vocab_size)def forward(self, x, hidden=None):x = self.embedding(x) #(batch_size, seq_len) -> (batch_size, seq_len, embedding_dim) (64, 100, 256)#這里和02的差異是沒有只拿最后一個輸出,而是把所有的輸出都拿出來了#(batch_size, seq_len, embedding_dim)->(batch_size, seq_len, hidden_dim)(64, 100, 1024)output, hidden = self.rnn(x, hidden)x = self.fc(output) #[bs, seq_len, hidden_dim]--->[bs, seq_len, vocab_size] (64, 100,65)return x, hidden #x的shape是(batch_size, seq_len, vocab_size)vocab_size = len(vocab)print("{:=^80}".format(" 一層單向 RNN "))
for key, value in CharRNN(vocab_size).named_parameters():print(f"{key:^40}paramerters num: {np.prod(value.shape)}")
因為字典太小,所以embedding_dim要放大。輸入形狀(bs,seq)→輸出形狀(bs,seq,emb_dim)。
這樣的話才能把里面的信息分的更清楚,其他情況都是縮小。
生成的時候不能只取最后一個時間步了,全都要。
前向傳播流程:x→Embedding→RNN→Linear
訓練:
class SaveCheckpointsCallback:def __init__(self, save_dir, save_step=5000, save_best_only=True):"""Save checkpoints each save_epoch epoch. We save checkpoint by epoch in this implementation.Usually, training scripts with pytorch evaluating model and save checkpoint by step.Args:save_dir (str): dir to save checkpointsave_epoch (int, optional): the frequency to save checkpoint. Defaults to 1.save_best_only (bool, optional): If True, only save the best model or save each model at every epoch."""self.save_dir = save_dirself.save_step = save_stepself.save_best_only = save_best_onlyself.best_metrics = -1# mkdirif not os.path.exists(self.save_dir):os.mkdir(self.save_dir)def __call__(self, step, state_dict, metric=None):if step % self.save_step > 0:returnif self.save_best_only:assert metric is not Noneif metric >= self.best_metrics:# save checkpointstorch.save(state_dict, os.path.join(self.save_dir, "best.ckpt"))# update best metricsself.best_metrics = metricelse:torch.save(state_dict, os.path.join(self.save_dir, f"{step}.ckpt"))#%%
# 訓練
def training(model, train_loader, epoch, loss_fct, optimizer, save_ckpt_callback=None,stateful=False # 想用stateful,batch里的數據就必須連續,不能打亂):record_dict = {"train": [],}global_step = 0model.train()hidden = Nonewith tqdm(total=epoch * len(train_loader)) as pbar:for epoch_id in range(epoch):# trainingfor datas, labels in train_loader:datas = datas.to(device)labels = labels.to(device)# 梯度清空optimizer.zero_grad()# 模型前向計算,如果數據集打亂了,stateful=False,hidden就要清空# 如果數據集沒有打亂,stateful=True,hidden就不需要清空logits, hidden = model(datas, hidden=hidden if stateful else None)# 計算損失,交叉熵損失第一個參數要是二階張量,第二個參數要是一階張量,所以要reshapeloss = loss_fct(logits.reshape(-1, vocab_size), labels.reshape(-1))# 梯度回傳loss.backward()# 調整優化器,包括學習率的變動等optimizer.step()loss = loss.cpu().item()# recordrecord_dict["train"].append({"loss": loss, "step": global_step})# 保存模型權重 save model checkpointif save_ckpt_callback is not None:save_ckpt_callback(global_step, model.state_dict(), metric=-loss)# udate stepglobal_step += 1pbar.update(1)pbar.set_postfix({"epoch": epoch_id})return record_dictepoch = 100model = CharRNN(vocab_size=vocab_size)# 1. 定義損失函數 采用交叉熵損失
loss_fct = nn.CrossEntropyLoss()
# 2. 定義優化器 采用 adam
# Optimizers specified in the torch.optim package
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)# save best
if not os.path.exists("checkpoints"):os.makedirs("checkpoints")
save_ckpt_callback = SaveCheckpointsCallback("checkpoints/text_generation", save_step=1000, save_best_only=True)model = model.to(device)#%%
record = training(model,train_dl,epoch,loss_fct,optimizer,save_ckpt_callback=save_ckpt_callback,)
#%%
plt.plot([i["step"] for i in record["train"][::50]], [i["loss"] for i in record["train"][::50]], label="train")
plt.grid()
plt.show()
#%% md
## 推理
#%%#下面的例子是為了說明temperature
logits = torch.tensor([400.0,600.0]) #這里是logitsprobs1 = F.softmax(logits, dim=-1)
print(probs1)
#%%
logits = torch.tensor([0.04,0.06]) #現在 temperature是2probs1 = F.softmax(logits, dim=-1)
print(probs1)
#%%
import torch# 創建一個概率分布,表示每個類別被選中的概率
# 這里我們有一個簡單的四個類別的概率分布
prob_dist = torch.tensor([0.1, 0.45, 0.35, 0.1])# 使用 multinomial 進行抽樣
# num_samples 表示要抽取的樣本數量
num_samples = 5# 抽取樣本,隨機抽樣,概率越高,抽到的概率就越高,1代表只抽取一個樣本,replacement=True表示可以重復抽樣
samples_index = torch.multinomial(prob_dist, 1, replacement=True)print("概率分布:", prob_dist)
print("抽取的樣本索引:", samples_index)# 顯示每個樣本對應的概率
print("每個樣本對應的概率:", prob_dist[samples_index])
#%%
def generate_text(model, start_string, max_len=1000, temperature=1.0, stream=True):input_eval = torch.Tensor([char2idx[char] for char in start_string]).to(dtype=torch.int64, device=device).reshape(1, -1) #bacth_size=1, seq_len長度是多少都可以 (1,5)hidden = Nonetext_generated = [] #用來保存生成的文本model.eval()pbar = tqdm(range(max_len)) # 進度條print(start_string, end="")# no_grad是一個上下文管理器,用于指定在其中的代碼塊中不需要計算梯度。在這個區域內,不會記錄梯度信息,用于在生成文本時不影響模型權重。with torch.no_grad():for i in pbar:#控制進度條logits, hidden = model(input_eval, hidden=hidden)# 溫度采樣,較高的溫度會增加預測結果的多樣性,較低的溫度則更加保守。#取-1的目的是只要最后,拼到原有的輸入上logits = logits[0, -1, :] / temperature #logits變為1維的# using multinomial to samplingprobs = F.softmax(logits, dim=-1) #算為概率分布idx = torch.multinomial(probs, 1).item() #從概率分布中抽取一個樣本,取概率較大的那些input_eval = torch.Tensor([idx]).to(dtype=torch.int64, device=device).reshape(1, -1) #把idx轉為tensortext_generated.append(idx)if stream:print(idx2char[idx], end="", flush=True)return "".join([idx2char[i] for i in text_generated])# load checkpoints
model.load_state_dict(torch.load("checkpoints/text_generation/best.ckpt", weights_only=True,map_location="cpu"))
start_string = "All: " #這里就是開頭,什么都可以
res = generate_text(model, start_string, max_len=1000, temperature=0.5, stream=True)