學習目標
- 目標
- 掌握seq2seq模型特點
- 掌握集束搜索方式
- 掌握BLEU評估方法
- 掌握Attention機制
- 應用
- 應用Keras實現seq2seq對日期格式的翻譯
目錄
學習目標
1.seq2seq
1.1.定義
1.2.條件語言模型理解
1.3.應用場景
2.注意力機制
2.1.長句子問題
2.2.定義
2.3.公式
3.機器翻譯案例
3.1.環境配置
?3.2.代碼分析
3.3.加載數據
3.4.定義網絡
3.5.定義模型
3.6.測試模型
1.seq2seq
seq2seq模型是在2014年,是由Google Brain團隊和Yoshua Bengio 兩個團隊各自獨立的提出來。
1.1.定義
seq2seq是一個Encoder–Decoder 結構的網絡,它的輸入是一個序列,輸出也是一個序列,?Encoder 中將一個可變長度的信號序列變為固定長度的向量表達,Decoder 將這個固定長度的向量變成可變長度的目標的信號序列。
注:Cell可以用 RNN ,GRU,LSTM 等結構。
- 相當于將RNN模型當中的s0輸入變成一個encoder
1.2.條件語言模型理解
- 1、編解碼器作用
- 編碼器的作用是把一個不定長的輸入序列x?1??,…,x?t??,輸出到一個編碼狀態C
- 解碼器輸出y?t??的條件概率將基于之前的輸出序列y?1??,y?t?1??和編碼狀態C
argmaxP(y1,…,yT′∣x1,…,xT),給定輸入的序列,使得輸出序列的概率值最大。
- 2、根據最大似然估計,最大化輸出序列的概率
由于這個公式需要求出:...這個概率連乘會非常非常小不利于計算存儲,所以需要對公式取對數計算:
所以這樣就變成了..概率相加。
這樣也可以看成輸出結果通過softmax就變成了概率最大,而損失最小的問題,輸出序列損失最小化。
1.3.應用場景
- 神經機器翻譯(NMT)
- 聊天機器人
接下來我們來看注意力機制,那么普通的seq2seq會面臨什么樣的問題?
2.注意力機制
2.1.長句子問題
對于更長的句子,seq2seq就顯得力不從心了,無法做到準確的翻譯,一下是通常BLEU的分數隨著句子的長度變化,可以看到句子非常長的時候,分數就很低。
BLEU(Bilingual Evaluation Understudy)是一種用于評估機器翻譯質量的自動評價指標,它通過比較機器翻譯結果與人工參考翻譯之間的相似度來打分。BLEU 分數范圍在 0 到 1 之間(或 0% 到 100%),分數越高表示翻譯質量越好。
??BLEU 的核心思想??
- ??N-gram 匹配??:計算機器翻譯(候選文本)和參考翻譯之間的 ??n-gram(連續詞序列)?? 重疊程度。
- 例如,1-gram(單個詞)、2-gram(雙詞組合)等。
- ??精度(Precision)??:統計候選翻譯中有多少 n-gram 出現在參考翻譯中,并做標準化(避免長句懲罰)。
- ??短句懲罰(Brevity Penalty, BP)??:防止短翻譯因匹配少量 n-gram 而獲得高分。
本質原因:在Encoder-Decoder結構中,Encoder把所有的輸入序列都編碼成一個統一的語義特征C再解碼,因此,?C中必須包含原始序列中的所有信息,它的長度就成了限制模型性能的瓶頸。當要翻譯的句子較長時,一個C可能存不下那么多信息,就會造成翻譯精度的下降。
2.2.定義
- 建立Encoder的隱層狀態輸出到Decoder對應輸出y所需要的上下文信息
- 目的:增加編碼器信息輸入到解碼器中相同時刻的聯系,其它時刻信息減弱
2.3.公式
注意上述的幾個細節,顏色的連接深淺不一樣,假設Encoder的時刻記為t,而Decoder的時刻記為t??′????。
-
1、
- α?t??′????t??為參數,在網絡中訓練得到
- 理解:藍色的解碼器中的cell舉例子
-
-
2、α?t??′????t??的N個權重系數由來?
- 權重系數通過softmax計算:
- et′t??是由t時刻的編碼器隱層狀態輸出和解碼器t??′?????1時刻的隱層狀態輸出計算出來的
- s為解碼器隱層狀態輸出,h為編碼器隱層狀態輸出
- v,Ws,Wh??都是網絡學習的參數
- 權重系數通過softmax計算:
3.機器翻譯案例
使用簡單的“日期轉換”任務代替翻譯任務,為了不然訓練時間變得太長。
網絡將輸入以各種可能格式(例如“1958年8月29日”,“03/30/1968”,“1987年6月24日”,“July 3, 2025”)編寫的日期,并將其翻譯成標準化的機器可讀日期(例如“1958 -08-29“,”1968-03-30“,”1987-06-24“)。使用seq2seq網絡學習以通用機器可讀格式YYYY-MM-DD輸出日期。
3.1.環境配置
pip install faker
pip install tqdm
pip install babel
pip install keras==2.2.4
- faker:生成數據包
- tqdm:python擴展包
- babel:代碼裝換器
- keras:更加方便簡潔的深度學習庫
- 為了快速編寫代碼
?3.2.代碼分析
-
Seq2seq():
- 序列模型類
- load_data(self,m):加載數據類,選擇加載多少條數據
- init_seq2seq(self):初始化模型,需要自定義自己的模型
- self.get_encoder(self):定義編碼器
- self.get_decoder(self):定義解碼器
- self.get_attention(self):定義注意力機制
- self.get_output_layer(self):定義解碼器輸出層
- model(self):定義模型整體輸入輸出邏輯
- train(self, X_onehot, Y_onehot):訓練模型
- test(self):測試模型
-
訓練
if __name__ == '__main__':s2s = Seq2seq()X_onehot, Y_onehot = s2s.load_data(10000)s2s.init_seq2seq()s2s.train(X_onehot, Y_onehot)#s2s.test()
整個數據集特征值的形狀: (10000, 30, 37)
整個數據集目標值的形狀: (10000, 10, 11)
查看第一條數據集格式:特征值:9 may 1998, 目標值: 1998-05-09
[12 ?0 24 13 34 ?0 ?4 12 12 11 36 36 36 36 36 36 36 36 36 36 36 36 36 36
?36 36 36 36 36 36] [ 2 10 10 ?9 ?0 ?1 ?6 ?0 ?1 10]
one_hot編碼: [[0. 0. 0. ... 0. 0. 0.]
?[1. 0. 0. ... 0. 0. 0.]
?[0. 0. 0. ... 0. 0. 0.]
?...
?[0. 0. 0. ... 0. 0. 1.]
?[0. 0. 0. ... 0. 0. 1.]
?[0. 0. 0. ... 0. 0. 1.]] [[0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]
?[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
?[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
?[0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
?[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
?[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
?[0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
?[1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
?[0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
?[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]
Epoch 1/1
? 100/10000 [..............................] - ETA: 10:52 - loss: 23.9884 - dense_1_loss: 2.3992 - dense_1_acc: 0.3200 - dense_1_acc_1: 0.0000e+00 - dense_1_acc_2: 0.0100 - dense_1_acc_3: 0.1300 - dense_1_acc_4: 0.0000e+00 - dense_1_acc_5: 0.0400 - dense_1_acc_6: 0.0900 - dense_1_acc_7: 0.0000e+00 - dense_1_acc_8: 0.3500 - dense_1_acc_9: 0.1100
? 200/10000 [..............................] - ETA: 5:27 - loss: 23.9289 - dense_1_loss: 2.3991 - dense_1_acc: 0.2550 - dense_1_acc_1: 0.0000e+00 - dense_1_acc_2: 0.0050 - dense_1_acc_3: 0.1150 - dense_1_acc_4: 0.0950 - dense_1_acc_5: 0.0250 - dense_1_acc_6: 0.1150 - dense_1_acc_7: 0.0800 - dense_1_acc_8: 0.3400 - dense_1_acc_9: 0.1050
- 測試
3.3.加載數據
- 模型參數:
def __init__(self, Tx=30, Ty=10, n_x=32, n_y=64):# 定義網絡的相關參數self.model_param = {"Tx": Tx, # 定義encoder序列最大長度"Ty": Ty, # decoder序列最大長度"n_x": n_x, # encoder的隱層輸出值大小"n_y": n_y # decoder的隱層輸出值大小和cell輸出值大小}
對于加載數據來說,我們不需要去進行編寫邏輯了,看一下大概的邏輯.
- 加載數據
- 加載的代碼邏輯在nmt_utils當中
- from nmt_utils import *
- 加載的代碼邏輯在nmt_utils當中
nmt_utils.py:
import numpy as np
from faker import Faker # 用于生成虛假數據
import random
from tqdm import tqdm # 用于顯示進度條
from babel.dates import format_date # 用于格式化日期
from tensorflow.keras.utils import to_categorical # 用于one-hot編碼
import tensorflow.keras.backend as K # Keras后端引擎
import matplotlib.pyplot as plt # 繪圖庫# 設置隨機種子以保證結果可復現
from faker import Faker
import random# 設置全局隨機種子(推薦方式)
Faker.seed(12345) # ? 使用類方法設置 Faker 的種子
random.seed(12345) # 設置 Python random 模塊的種子fake = Faker() # 創建實例# 定義日期格式列表
FORMATS = ['short','medium','long','full','full','full','full','full','full','full','full','full','full','d MMM YYY','d MMMM YYY','dd MMM YYY','d MMM, YYY','d MMMM, YYY','dd, MMM YYY','d MM YY','d MMMM YYY','MMMM d YYY','MMMM d, YYY','dd.MM.YY']# 設置語言環境(可修改為其他語言)
LOCALES = ['en_US']#這是us英文,可修改為其他語言def load_date():"""生成虛假日期數據返回: 元組包含(人類可讀日期字符串, 機器可讀日期字符串, 日期對象)"""dt = fake.date_object() # 生成隨機日期對象,# 輸出: datetime.date(2023, 5, 15)try:# 格式化人類可讀日期human_readable = format_date(dt, format=random.choice(FORMATS), locale='en_US')# 格式化日期,locale為'en_US',可修改為其他語言,輸出: 'May 15, 2023'human_readable = human_readable.lower() # 轉為小寫,輸出: 'may 15, 2023'human_readable = human_readable.replace(',', '') # 移除逗號,輸出:'may 15 2023'machine_readable = dt.isoformat() # 機器可讀格式,輸出: '2023-05-15'except AttributeError as e:return None, None, Nonereturn human_readable, machine_readable, dtdef load_dataset(m):"""加載包含m個樣本的數據集并構建詞匯表:m: 要生成的樣本數量返回:dataset -- 包含(人類可讀日期字符串, 機器可讀日期字符串)對的列表human -- 人類可讀詞匯表字典machine -- 機器可讀詞匯表字典"""human_vocab = set() # 人類可讀日期字符集machine_vocab = set() # 機器可讀日期字符集dataset = [] # 數據集Tx = 30 # 輸入序列長度# 生成m個樣本for i in tqdm(range(m)):h, m, _ = load_date()if h is not None:dataset.append((h, m)) # 添加(人類可讀, 機器可讀)對human_vocab.update(tuple(h)) # 更新人類可讀字符集machine_vocab.update(tuple(m)) # 更新機器可讀字符集# 構建人類可讀詞匯表字典(添加未知詞和填充詞標記)human = dict(zip(sorted(human_vocab) + ['<unk>', '<pad>'],list(range(len(human_vocab) + 2))))# 構建機器可讀詞匯表字典和反向字典inv_machine = dict(enumerate(sorted(machine_vocab)))machine = {v: k for k, v in inv_machine.items()}return dataset, human, machinedef preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty):"""預處理數據:將字符串轉換為整數序列并進行one-hot編碼"""X, Y = zip(*dataset) # 解壓數據集# 將字符串轉換為整數序列X = np.array([string_to_int(i, Tx, human_vocab) for i in X])Y = [string_to_int(t, Ty, machine_vocab) for t in Y]# 進行one-hot編碼Xoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(human_vocab)), X)))Yoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(machine_vocab)), Y)))return X, np.array(Y), Xoh, Yohdef string_to_int(string, length, vocab):"""將字符串轉換為整數序列表示參數:string -- 輸入字符串,如 'Wed 10 Jul 2007'length -- 時間步長,決定輸出是填充還是截斷vocab -- 詞匯表字典返回:rep -- 整數列表(或'<unk>'),表示字符串字符在詞匯表中的位置"""# 標準化處理:轉為小寫并移除逗號string = string.lower()string = string.replace(',', '')# 如果字符串過長則截斷if len(string) > length:string = string[:length]# 將每個字符映射到詞匯表中的索引,未知字符用'<unk>'表示rep = list(map(lambda x: vocab.get(x, '<unk>'), string))# 如果字符串過短則填充if len(string) < length:rep += [vocab['<pad>']] * (length - len(string))return repdef softmax(x, axis=1):"""Softmax激活函數參數:x: 張量axis: 應用softmax歸一化的軸返回:softmax變換后的張量異常:當輸入是一維張量時拋出ValueError"""ndim = K.ndim(x)if ndim == 2:return K.softmax(x)elif ndim > 2:e = K.exp(x - K.max(x, axis=axis, keepdims=True))s = K.sum(e, axis=axis, keepdims=True)return e / selse:raise ValueError('Cannot apply softmax to a tensor that is 1D')
函數名 | 核心功能 | 典型應用場景 |
---|---|---|
??load_date()?? | 1. 生成隨機日期對象 2. 格式化為人類可讀字符串(如"may 15 2023") 3. 生成機器可讀ISO格式(如"2023-05-15") | 生成單條日期訓練樣本 |
??load_dataset(m)?? | m個日期樣本 2. 構建人類可讀字符詞匯表 3. 構建機器可讀字符詞匯表 4. 自動添加 <unk> 和<pad> 特殊標記 | 初始化訓練數據集和詞匯表 |
??preprocess_data()?? | 1. 將字符串轉為整數序列 2. 統一序列長度(填充/截斷) 3. 生成one-hot編碼 4. 輸出可直接用于Keras模型的張量 | 數據預處理管道 |
??string_to_int()?? | 1. 字符到詞匯表索引的映射 2. 處理未知字符(返回 <unk> )3. 序列長度標準化(填充 <pad> ) | 文本到數值的轉換 |
??softmax()?? | 1. 實現多維度softmax 2. 數值穩定性優化(減最大值) 3. 支持Keras后端運算 | 神經網絡激活函數 |
?主函數加載數據:
我們先看整個訓練的邏輯,并從中來實現整個模型的定義,計算邏輯
def load_data(self, m):"""指定獲取m條數據:param m: 數據的總樣本數:return:dataset:[('9 may 1998', '1998-05-09'), ('10.09.70', '1970-09-10')]x_vocab:翻譯前的格式對應數字{' ': 0, '.': 1, '/': 2, '0': 3, '1': 4, '2': 5, '3': 6, '4': 7,....}y_vocab:翻譯后的格式對應數字{'-': 0, '0': 1, '1': 2, '2': 3, '3': 4, '4': 5, '5': 6, '6': 7, '7': 8, '8': 9, '9': 10}"""# 獲取3個值:數據集,特征詞的字典映射,目標詞字典映射dataset, x_vocab, y_vocab = load_dataset(m)# 獲取處理好的數據:特征x以及目標y的one_hot編碼X, Y, X_onehot, Y_onehot = preprocess_data(dataset, x_vocab, y_vocab, self.model_param["Tx"], self.model_param["Ty"])print("整個數據集特征值的形狀:", X_onehot.shape)print("整個數據集目標值的形狀:", Y_onehot.shape)# 打印數據集print("查看第一條數據集格式:特征值:%s, 目標值: %s" % (dataset[0][0], dataset[0][1]))print(X[0], Y[0])print("one_hot編碼:", X_onehot[0], Y_onehot[0])# 添加特征詞個不重復個數以及目標詞的不重復個數self.model_param["x_vocab"] = x_vocabself.model_param["y_vocab"] = y_vocabself.model_param["x_vocab_size"] = len(x_vocab)self.model_param["y_vocab_size"] = len(y_vocab)return X_onehot, Y_onehot
3.4.定義網絡
- (1)定義好網絡的輸入輸出格式
- (2)定義好優化器(選擇Adam,參數lr=0.005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.001)
- from keras.optimizers import Adam
- model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
- (3)模型訓練,
- model.fit(inputs, outputs, epochs=1,batch_size=100)
def train(self, X_onehot, Y_onehot):"""訓練:param X_onehot: 特征值的one_hot編碼:param Y_onehot: 目標值的one_hot編碼:return:"""# 利用網絡結構定義好模型輸入輸出model = self.model()opt = Adam(lr=0.005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.001)model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])s0 = np.zeros((10000, self.model_param["n_y"]))c0 = np.zeros((10000, self.model_param["n_y"]))outputs = list(Y_onehot.swapaxes(0, 1))# 輸入x,以及decoder中LSTM的兩個初始化值model.fit([X_onehot, s0, c0], outputs, epochs=1, batch_size=100)return None
- (1)定義網絡好的輸入到輸出流程
- 步驟1、定義模型的輸入
- 步驟2:使用encoder的雙向LSTM結構得輸出a
- 步驟3:循環decoder的Ty次序列輸入,獲取decoder最后輸出
- 1: 定義decoder第t'時刻的注意力結構并輸出context
- context = self.computer_one_attention(a, s)(需要實現Attention結構的計算過程)
- 2: 對"context" 和初始兩個狀態s0,c0輸入到deocder當中,返回兩個輸出
- s, _, c = self.decoder(context, initial_state=[s, c])
- 3: 應用 Dense layere獲取deocder的t'時刻的輸出 out = self.output_layer(s)
- 1: 定義decoder第t'時刻的注意力結構并輸出context
- 步驟 4: 創建model實例,定義輸入輸出
- from keras.models import Model
定義整個網絡模型:
def model(self):"""定義整個網絡模型:return: keras 當中的model類型"""# 1、定義encoder的輸入X (30, 37)X = Input(shape=(self.model_param["Tx"], self.model_param["x_vocab_size"]), name="X")# 定義一個初始輸入的s0, 64大小s0 = Input(shape=(self.model_param["n_y"],), name="s0")c0 = Input(shape=(self.model_param["n_y"],), name="c0")s = s0c = c0# 定義一個裝有輸出的列表outputs = []# 2、輸入到encoder當中,得到aa = self.encoder(X)# 3、計算輸出結果,循環deocder當中t'個時刻,計算每個LSTM的輸出結果for t in range(self.model_param["Ty"]):# (1)循環計算每一個時刻的contextcontext = self.computer_one_attention(a, s)# (2)輸入s0,c0,context到某個時刻decoder得到下次的輸出s, c# 因為是LSTM結構,所以有兩個隱層狀態,其中s可以用作輸出s, _, c = self.decoder(context, initial_state=[s, c])# (3)s輸出到最后一層softmax得到預測結果out = self.output_layer(s)outputs.append(out)# 輸入輸出定義好了model = Model(inputs=(X, s0, c0), outputs=outputs)return model
3.5.定義模型
模型初始化模型結構定義
- 在訓練中有一些模型結構,所以現需要定義這些結構統一初始化,這些模型結構作為整個Seq2Seq類的屬性,初始化邏輯。
def init_seq2seq(self):"""初始化網絡結構:return:"""# 添加encoder屬性self.get_encoder()# 添加decoder屬性self.get_decoder()# 添加attention屬性self.get_attention()# 添加get_output_layer屬性self.get_output_layer()return None
- 定義編解碼器、Attention機制、輸出層
Keras是一個高級神經網絡API,用Python編寫,能夠在TensorFlow之上運行。它的開發重點是實現快速實驗。能夠以最小的延遲從想法到結果是進行良好研究的關鍵。
如果您需要深度學習庫,請使用Keras:允許簡單快速的原型設計(通過用戶友好性,模塊化和可擴展性)
- 編碼器
- 編碼器:使用雙向LSTM(隱層傳遞有雙向值傳遞)
- from keras.layers import LSTM, Bidirectional
- LSTM(units, return_sequences=False,name="")
- units: 正整數, units狀態輸出的維度
- return_sequences:布爾類型 是否返回輸出序列
- return:LSTM layer
- Bidirectional(layer, merge_mode='concat')
- 對RNN、LSTM進行雙向裝飾
- layer:RNN layer或者LSTM layer
- merge_mode:將RNN/LSTM的前向和后向輸出值進行合并
- {'sum', 'mul', 'concat', 'ave', None}
def get_encoder(self):"""定義編碼器結構:return:"""# 指定隱層值輸出的大小self.encoder = Bidirectional(LSTM(self.model_param["n_x"], return_sequences=True, name='bidirectional_1'), merge_mode='concat')return None
- 解碼器
- return_state: 布爾,是否返回輸出以及狀態值
- if return_state: 第一個值是output. 后面的值是狀態值shape 為 (batch_size, units).
- return_state: 布爾,是否返回輸出以及狀態值
def get_decoder(self):"""定義解碼器結構:return:"""# 定義decoder結構,指定隱層值的形狀大小,return_state=Trueself.decoder = LSTM(self.model_param["n_y"], return_state=True)return None
- 輸出層
- from keras.layer import Dense
- 指定一個普通的全連接層,并且可以指定激活函數
- Dense(units, activation=None)
- 神經元個數(輸出大小)
- activation=None:激活函數
def get_output_layer(self):"""定義輸出層:return: output_layer"""# 對decoder輸出進行softmax,輸出向量大小為y_vocab大小self.output_layer = Dense(self.model_param["y_vocab_size"], activation=softmax)return None
computer_one_attention函數實現:attention層結構
- 1、定義結構
- 2、實現輸入輸出結果
?
- from keras.layers import RepeatVector, Concatenate, Dot, Activation
def get_attention(self):"""定義Attention的結構:return: attention結構"""# 定義RepeatVector復制成多個維度repeator = RepeatVector(self.model_param["Tx"])# 進行矩陣拼接concatenator = Concatenate(axis=-1)# 進行全連接層10個神經元densor1 = Dense(10, activation="tanh", name='Dense1')# 接著relu函數densor2 = Dense(1, activation="relu", name='Dense2')# softmaxactivator = Activation(softmax,name='attention_weights')# context計算dotor = Dot(axes=1)# 將結構存儲在attention當中self.attention = {"repeator": repeator,"concatenator": concatenator,"densor1": densor1,"densor2": densor2,"activator": activator,"dotor": dotor}return None
- Attention輸入輸出邏輯
- 使用Attention結構去實現輸入到輸出的邏輯
def computer_one_attention(self, a, s_prev):"""利用定義好的attention結構計算中的alpha系數與a對應輸出:param a:隱層狀態值 (m, Tx, 2*n_a):param s_prev: LSTM的初始隱層狀態值, 形狀(sample, n_s):return: context"""# 使用repeator擴大數據s_prev的維度為(sample, Tx, n_y),這樣可以與a進行合并s_prev = self.attention["repeator"](s_prev)# 將a和s_prev 按照最后一個維度進行合并計算concat = self.attention["concatenator"]([a, s_prev])# 使用densor1全連接層網絡計算出ee = self.attention["densor1"](concat)# 使用densor2增加relu激活函數計算energies = self.attention["densor2"](e)# 使用"activator"的softmax函數計算權重"alphas"# 這樣一個attention的系數計算完成alphas = self.attention["activator"](energies)# 使用dotor,矩陣乘法,將 "alphas" and "a" 去計算context/ccontext = self.attention["dotor"]([alphas, a])return context
3.6.測試模型
訓練:
def train(self, X_onehot, Y_onehot):"""訓練:param X_onehot: 特征值的one_hot編碼:param Y_onehot: 目標值的one_hot編碼:return:"""# 利用網絡結構定義好模型輸入輸出model = self.model()opt = Adam(lr=0.005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.001)model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])s0 = np.zeros((10000, self.model_param["n_y"]))c0 = np.zeros((10000, self.model_param["n_y"]))outputs = list(Y_onehot.swapaxes(0, 1))# 輸入x,以及decoder中LSTM的兩個初始化值model.fit([X_onehot, s0, c0], outputs, epochs=1, batch_size=100)return None
測試邏輯
- model.load_weights(path):加載模型
def test(self):"""測試:return:"""model = self.model()model.load_weights("./models/model.h5")example = '1 March 2001'source = string_to_int(example, self.model_param["Tx"], self.model_param["x_vocab"])source = np.expand_dims(np.array(list(map(lambda x:to_categorical(x, num_classes=self.model_param["x_vocab_size"]),source))), axis=0)s0 = np.zeros((10000, self.model_param["n_y"]))c0 = np.zeros((10000, self.model_param["n_y"]))prediction = model.predict([source, s0, c0])prediction = np.argmax(prediction, axis=-1)output = [dict(zip(self.model_param["y_vocab"].values(), self.model_param["y_vocab"].keys()))[int(i)] for i in prediction]print("source:", example)print("output:", ''.join(output))return None
?
完整代碼:
?RNN.py:
# 替換原來的導入方式
# 使用 tensorflow.keras 替代 keras 或 keras.src
from tensorflow.keras.layers import Input, Dense, RepeatVector, Concatenate, Dot, Activation, LSTM, Bidirectional
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
from nmt_utils import *
import numpy as npclass Seq2seq(object):"""序列模型去進行日期的翻譯"""def __init__(self, Tx=30, Ty=10, n_x=32, n_y=64):self.model_param = {"Tx": Tx, # 定義encoder序列最大長度"Ty": Ty, # decoder序列最大長度"n_x": n_x, # encoder的隱層輸出值大小"n_y": n_y # decoder的隱層輸出值大小和cell輸出值大小}def load_data(self, m):"""指定獲取m條數據:param m: 數據的總樣本數:return:dataset:[('9 may 1998', '1998-05-09'), ('10.09.70', '1970-09-10')]x_vocab:翻譯前的格式對應數字{' ': 0, '.': 1, '/': 2, '0': 3, '1': 4, '2': 5, '3': 6, '4': 7,....}y_vocab:翻譯后的格式對應數字{'-': 0, '0': 1, '1': 2, '2': 3, '3': 4, '4': 5, '5': 6, '6': 7, '7': 8, '8': 9, '9': 10}"""# 獲取3個值:數據集,特征詞的字典映射,目標詞字典映射dataset, x_vocab, y_vocab = load_dataset(m)# 獲取處理好的數據:特征x以及目標y的one_hot編碼X, Y, X_onehot, Y_onehot = preprocess_data(dataset, x_vocab, y_vocab, self.model_param["Tx"], self.model_param["Ty"])print("整個數據集特征值的形狀:", X_onehot.shape)print("整個數據集目標值的形狀:", Y_onehot.shape)# 打印數據集print("查看第一條數據集格式:特征值:%s, 目標值: %s" % (dataset[0][0], dataset[0][1]))print(X[0], Y[0])print("one_hot編碼:", X_onehot[0], Y_onehot[0])# 添加特征詞個不重復個數以及目標詞的不重復個數self.model_param["x_vocab"] = x_vocabself.model_param["y_vocab"] = y_vocabself.model_param["x_vocab_size"] = len(x_vocab)self.model_param["y_vocab_size"] = len(y_vocab)return X_onehot, Y_onehotdef get_encoder(self):"""獲取encoder屬性:return: None"""self.encoder = Bidirectional(LSTM(self.model_param["n_x"], return_sequences=True, name="bidirectional_1"), merge_mode='concat')return Nonedef get_decoder(self):"""獲取deocder屬性:return: None"""self.decoder = LSTM(self.model_param["n_y"], return_state=True)return Nonedef get_output_layer(self):"""獲取輸出層:return: None"""self.output_layer = Dense(self.model_param["y_vocab_size"], activation=softmax)return Nonedef get_attention(self):"""實現attention的結構屬性:return: None"""# 1、定義Repeat函數repeator = RepeatVector(self.model_param["Tx"])# 2、定義Concat函數concatenator = Concatenate(axis=-1)# 3、定義Densedensor1 = Dense(10, activation="tanh", name="Dense1")densor2 = Dense(1, activation="relu", name='Dense2')# 4、Activatationactivator = Activation(softmax,name='attention_weights')# 5、Dot相當于npt.dotdotor = Dot(axes=1)# 將結構存儲在attention當中self.attention = {"repeator": repeator,"concatenator": concatenator,"densor1": densor1,"densor2": densor2,"activator": activator,"dotor": dotor}return Nonedef init_seq2seq(self):"""初始化網絡結構:return:"""# 添加encoder屬性self.get_encoder()# 添加decoder屬性self.get_decoder()# 添加attention屬性self.get_attention()# 添加get_output_layer屬性self.get_output_layer()return Nonedef computer_one_attention(self, a, s_prev):"""邏輯函數,計算context:param a: encoder的所有輸出,t'時刻,a=t=1,2,3,4,......Tx:param s_prev: decoder的輸出,t'-1:return: context"""# - 1、擴展s_prev的維度到encoder的所有時刻,編程Tx份s_prev = self.attention["repeator"](s_prev)# - 2、進行s_prev和a進行拼接concat = self.attention["concatenator"]([a, s_prev])# - 3、進行全連接計算得到e, 經過激活函數relu計算出e'e = self.attention["densor1"](concat)en = self.attention["densor2"](e)# - 4、e'進過softmax計算,得到系數,每個attention 有Tx個alphas參數alphas = self.attention["activator"](en)# - 5、系數與a進行計算得到contextcontext = self.attention["dotor"]([alphas, a])return contextdef model(self):"""定義整個網絡模型:return: keras 當中的model類型"""# 1、定義encoder的輸入X (30, 37)X = Input(shape=(self.model_param["Tx"], self.model_param["x_vocab_size"]), name="X")# 定義一個初始輸入的s0, 64大小s0 = Input(shape=(self.model_param["n_y"],), name="s0")c0 = Input(shape=(self.model_param["n_y"],), name="c0")s = s0c = c0# 定義一個裝有輸出的列表outputs = []# 2、輸入到encoder當中,得到aa = self.encoder(X)# 3、計算輸出結果,循環deocder當中t'個時刻,計算每個LSTM的輸出結果for t in range(self.model_param["Ty"]):# (1)循環計算每一個時刻的contextcontext = self.computer_one_attention(a, s)# (2)輸入s0,c0,context到某個時刻decoder得到下次的輸出s, c# 因為是LSTM結構,所以有兩個隱層狀態,其中s可以用作輸出s, _, c = self.decoder(context, initial_state=[s, c])# (3)s輸出到最后一層softmax得到預測結果out = self.output_layer(s)outputs.append(out)# 輸入輸出定義好了model = Model(inputs=(X, s0, c0), outputs=outputs)return modeldef train(self, X_onehot, Y_onehot):"""訓練:param X_onehot: 特征值的one_hot編碼:param Y_onehot: 目標值的one_hot編碼:return:"""# 利用網絡結構定義好模型輸入輸出model = self.model()opt = Adam(lr=0.005, beta_1=0.9, beta_2=0.999, epsilon=None, decay=0.001)model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])s0 = np.zeros((10000, self.model_param["n_y"]))c0 = np.zeros((10000, self.model_param["n_y"]))outputs = list(Y_onehot.swapaxes(0, 1))# 輸入x,以及decoder中LSTM的兩個初始化值model.fit([X_onehot, s0, c0], outputs, epochs=1, batch_size=100)return Nonedef test(self):model = self.model()model.load_weights("./models/model.h5")example = '1 March 2001'source = string_to_int(example, self.model_param["Tx"], self.model_param["x_vocab"])source = np.expand_dims(np.array(list(map(lambda x:to_categorical(x, num_classes=self.model_param["x_vocab_size"]),source))), axis=0)# 修改這里:s0和c0的形狀應與source一致(1個樣本)s0 = np.zeros((1, self.model_param["n_y"])) # 原來是(10000, n_y)c0 = np.zeros((1, self.model_param["n_y"])) # 原來是(10000, n_y)prediction = model.predict([source, s0, c0])prediction = np.argmax(prediction, axis=-1)output = [dict(zip(self.model_param["y_vocab"].values(),self.model_param["y_vocab"].keys()))[int(i)] for i in prediction]print("source:", example)print("output:", ''.join(output))if __name__ == '__main__':s2s = Seq2seq()X_onehot, Y_onehot = s2s.load_data(10000)s2s.init_seq2seq()# s2s.train(X_onehot, Y_onehot)s2s.test()
nmt_utils.py:
import numpy as np
from faker import Faker # 用于生成虛假數據
import random
from tqdm import tqdm # 用于顯示進度條
from babel.dates import format_date # 用于格式化日期
from tensorflow.keras.utils import to_categorical # 用于one-hot編碼
import tensorflow.keras.backend as K # Keras后端引擎
import matplotlib.pyplot as plt # 繪圖庫# 設置隨機種子以保證結果可復現
from faker import Faker
import random# 設置全局隨機種子(推薦方式)
Faker.seed(12345) # ? 使用類方法設置 Faker 的種子
random.seed(12345) # 設置 Python random 模塊的種子fake = Faker() # 創建實例# 定義日期格式列表
FORMATS = ['short','medium','long','full','full','full','full','full','full','full','full','full','full','d MMM YYY','d MMMM YYY','dd MMM YYY','d MMM, YYY','d MMMM, YYY','dd, MMM YYY','d MM YY','d MMMM YYY','MMMM d YYY','MMMM d, YYY','dd.MM.YY']# 設置語言環境(可修改為其他語言)
LOCALES = ['en_US']#這是us英文,可修改為其他語言def load_date():"""生成虛假日期數據返回: 元組包含(人類可讀日期字符串, 機器可讀日期字符串, 日期對象)"""dt = fake.date_object() # 生成隨機日期對象,# 輸出: datetime.date(2023, 5, 15)try:# 格式化人類可讀日期human_readable = format_date(dt, format=random.choice(FORMATS), locale='en_US')# 格式化日期,locale為'en_US',可修改為其他語言,輸出: 'May 15, 2023'human_readable = human_readable.lower() # 轉為小寫,輸出: 'may 15, 2023'human_readable = human_readable.replace(',', '') # 移除逗號,輸出:'may 15 2023'machine_readable = dt.isoformat() # 機器可讀格式,輸出: '2023-05-15'except AttributeError as e:return None, None, Nonereturn human_readable, machine_readable, dtdef load_dataset(m):"""加載包含m個樣本的數據集并構建詞匯表:m: 要生成的樣本數量返回:dataset -- 包含(人類可讀日期字符串, 機器可讀日期字符串)對的列表human -- 人類可讀詞匯表字典machine -- 機器可讀詞匯表字典"""human_vocab = set() # 人類可讀日期字符集machine_vocab = set() # 機器可讀日期字符集dataset = [] # 數據集Tx = 30 # 輸入序列長度# 生成m個樣本for i in tqdm(range(m)):h, m, _ = load_date()if h is not None:dataset.append((h, m)) # 添加(人類可讀, 機器可讀)對human_vocab.update(tuple(h)) # 更新人類可讀字符集machine_vocab.update(tuple(m)) # 更新機器可讀字符集# 構建人類可讀詞匯表字典(添加未知詞和填充詞標記)human = dict(zip(sorted(human_vocab) + ['<unk>', '<pad>'],list(range(len(human_vocab) + 2))))# 構建機器可讀詞匯表字典和反向字典inv_machine = dict(enumerate(sorted(machine_vocab)))machine = {v: k for k, v in inv_machine.items()}return dataset, human, machinedef preprocess_data(dataset, human_vocab, machine_vocab, Tx, Ty):"""預處理數據:將字符串轉換為整數序列并進行one-hot編碼"""X, Y = zip(*dataset) # 解壓數據集# 將字符串轉換為整數序列X = np.array([string_to_int(i, Tx, human_vocab) for i in X])Y = [string_to_int(t, Ty, machine_vocab) for t in Y]# 進行one-hot編碼Xoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(human_vocab)), X)))Yoh = np.array(list(map(lambda x: to_categorical(x, num_classes=len(machine_vocab)), Y)))return X, np.array(Y), Xoh, Yohdef string_to_int(string, length, vocab):"""將字符串轉換為整數序列表示參數:string -- 輸入字符串,如 'Wed 10 Jul 2007'length -- 時間步長,決定輸出是填充還是截斷vocab -- 詞匯表字典返回:rep -- 整數列表(或'<unk>'),表示字符串字符在詞匯表中的位置"""# 標準化處理:轉為小寫并移除逗號string = string.lower()string = string.replace(',', '')# 如果字符串過長則截斷if len(string) > length:string = string[:length]# 將每個字符映射到詞匯表中的索引,未知字符用'<unk>'表示rep = list(map(lambda x: vocab.get(x, '<unk>'), string))# 如果字符串過短則填充if len(string) < length:rep += [vocab['<pad>']] * (length - len(string))return repdef softmax(x, axis=1):"""Softmax激活函數參數:x: 張量axis: 應用softmax歸一化的軸返回:softmax變換后的張量異常:當輸入是一維張量時拋出ValueError"""ndim = K.ndim(x)if ndim == 2:return K.softmax(x)elif ndim > 2:e = K.exp(x - K.max(x, axis=axis, keepdims=True))s = K.sum(e, axis=axis, keepdims=True)return e / selse:raise ValueError('Cannot apply softmax to a tensor that is 1D')