大劫大難以后,人不該失去銳氣,不該失去熱度,你鎮定了卻依舊燃燒,你平靜了卻依舊浩蕩,致那個從絕望中走出來的自己,共勉
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?—— 25.1.31
使用深度學習在文本匹配任務上主要有兩種方式:① 表示型 ② 交互型
一、實現方式 ① 表示型文本匹配
表示型文本匹配要訓練的目標是:得到一個編碼器,用來把一句話轉化為向量
????????實際訓練中,通常會共享一個紅框內的編碼器 / 表示層(可以看作一個完整的模型:輸入文本過完embedding嵌入層后過一個網絡層,最終輸出一句話的向量),在訓練時,我們通常輸入兩句話過同一個模型(參數共享),分別編碼這兩句話,得到兩個向量,經過一個?matching layer?匹配層(相似度計算)得到一個分數用來衡量兩向量的相似度,若兩句話向量(句子語義)相似,則分數接近為1,若兩句話向量不相似(語義不相似),則分數接近為0
????????表示層文本匹配主要訓練的是文本轉換成向量的這一模型部分
????????兩邊的表示層是參數共享的兩個模型,分別編碼輸入的兩句話,用來把輸入的話轉成向量形式
1.matching layer 匹配層?—— 方式 Ⅰ、基于文本相似度
① 計算文本相似度 方式Ⅰ:拼接兩個句向量過網絡層
????????輸入兩個文本過Embedding層得到兩個向量,拼接兩向量(拼接兩向量與兩向量之差 u,v,u-v),然后過一個線性層Softmax歸一化,映射到 0 或 1 ,然后用 均方差MSE 或 交叉熵 損失函數計算 Loss,然后根據梯度進行迭代優化,給它提供足夠多的句子對,就可以做訓練
????????對于兩個匹配的樣本,預期輸出分值為 1,
????????對于兩個不匹配的樣本,預期輸出分值為 0,
????????本質上相當于二分類任務(兩句話相似或不相似)
????????編碼器部分可以使用?Bert+Pooling,也可以用 LSTM,也可以用 CNN、GatedCNN、RNN 等其他模型結構,得到兩句話對應的向量
????????兩向量的拼接方式也有多種,在實際場景中最好針對不同場景進行訓練實驗對比獲得答案
????????只要編碼器?Embedding?部分正確, 兩向量無論怎么組合,數據都可以得到有效的訓練
② 計算文本相似度 方式Ⅱ:余弦相似度計算
????????編碼層也可以簡單點,不使用向量拼接過網絡層,而是直接過一個余弦相似度的計算,利用Cosine Embedding Loss做訓練
????????對于兩個相似的樣本匹配預期輸出分值為 1
????????對于兩個不相似的樣本匹配預期輸出分值為 -1
????????本質上相當于2分類任務(兩句話相似或是不相似)
Cosine Embedding Loss?
????????Cosine Embedding Loss:是一種用于衡量兩個向量相似性的損失函數,通過余弦相似度結合標簽信息(相似/不相似)來優化模型。
- ?相似樣本?(標簽?
y=1
):迫使它們的余弦相似度趨近于 ?1?(即方向完全一致)。 - ?不相似樣本?(標簽?
y=-1
):迫使它們的余弦相似度低于某個閾值 ?margin?(默認為 0)。 - margin:懲罰項/正則項(實際訓練中,一般會存儲為一個默認值:例如0.1),作用:①?允許不相似樣本的余弦相似度在不超過?
margin
?的情況下被忽略,避免過度優化。②?較大的?margin
?迫使模型更嚴格地區分不相似樣本(例如,設?margin=0.5
?時,不相似樣本間的相似度必須低于 0.5)
公式:
????????其中,cos(x_1, x_2) 表示兩個向量的余弦相似度
????????訓練兩個文本向量的網絡是同一個網絡,權重參數共享,又被稱為?孿生網絡
2.matching layer 匹配層?—— 方式 Ⅱ、Triplet Loss
① 訓練目標:
? ? ? ? 1.使具有相同標簽的樣本在 embedding空間 盡量接近
? ? ? ? 2.使具有不同標簽的樣本在 embedding空間 盡量遠離
? ? ? ? 3.通過調整模型的權重來改變三者在向量空間的位置關系
例:?
設計一個Loss函數,目標:衡量三個向量在空間中位置的相互關系
Anchor:某句話對應的向量在embedding空間中對應的位置
Positive:與這句話語義相同的向量
Negative:與這句話語義不同的向量
② Triplet Loss:
輸入是一個三元組<a,p,n>
a:anchor 原點(任意一個樣本)
p:positive 與a同一類別的樣本
n:negative 與a不同類別的樣本
在 Embedding 空間中,三元組損失函數為:L = max(d(a, p) - d(a, n) + margin,?0)
公式:
訓練目標:使 相似文本 a,p 之間的距離?小于 不相似文本 a,n 之間的距離?
d:表示兩向量間的一個距離函數?
d(a, p):任意一個樣本 a?和?與其同一類別的樣本 p?兩個文本轉成的向量間的向量距離
d(a, n):任意一個樣本 a?和 與其不同類別的樣本 n?兩個文本轉成的向量間的向量距離
margin:懲罰項/正則項(實際訓練中,一般會存儲為一個默認值:例如0.1),作用:① 即使兩距離相等,還是會產生 margin 大小的Loss,只有當相似文本間的距離 小于 不相似文本間的距離 + margin 時,Loss才會為 0 ②?較大的?margin
?迫使模型更嚴格地區分不相似樣本(例如:設?margin=0.5
?時,相似樣本間距離差 和 不相似樣本間距離差必須低于0.5,才會停止迭代優化)
Triplet Loss中,margin的作用
1. ?控制正負樣本的最小間隔
????????Margin 定義了正樣本對(anchor 與 positive)和負樣本對(anchor 與 negative)之間的最小距離差。其核心目標是確保在嵌入空間中,正樣本與錨點的距離不僅要小于負樣本與錨點的距離,還要至少保持一個固定的間隔(即 margin)
- ?公式作用:當?
d(a, p) - d(a, n) + margin < 0
?時,損失為 0,此時模型已滿足“正樣本距離更近,負樣本距離更遠”的要求。反之,若未滿足,則通過損失函數強制優化- ?物理意義:Margin 類似于支持向量機(SVM)中的間隔概念,通過設定一個“安全區域”,避免不同類別的樣本在嵌入空間中過于接近
2. ?防止模型學習到退化解
????????如果沒有 Margin(或 Margin=0),模型可能將所有樣本映射到同一個點(即?
d(a, p) = d(a, n) = 0
),此時損失恒為 0,但模型完全失去區分能力。Margin 的引入迫使模型必須拉開正負樣本的距離差異,從而學習到更有判別性的特征表示
3. ?平衡訓練難度
Margin 的大小直接影響訓練的難易程度:
????????過大:模型需要更大的距離差才能滿足條件,可能導致訓練困難(損失長期不收斂)或過擬合
????????過小:模型容易滿足條件,但學到的特征區分度不足實踐中
Margin 的大小通常通過實驗調整,例如在人臉識別任務中常用 0.2~1.0 的范圍
4. ?動態篩選有效三元組
????????Margin 幫助模型自動忽略“簡單樣本”(如負樣本已足夠遠離錨點),而專注于優化“困難樣本”(如負樣本距離較近或與正樣本距離差異不足)
例如:
- ?當?
d(a, n) > d(a, p) + margin
:損失為 0,無需優化。- ?當?
d(a, n) < d(a, p) + margin
:損失為正,需通過梯度下降調整參數
5.? ?與距離度量的關系
????????Margin 的作用依賴于具體使用的距離函數:
?????????歐氏距離:Margin 是絕對距離差閾值。
? ? ? ? 余弦相似度:Margin 轉化為相似度差異(如要求負樣本的相似度比正樣本低至少一個 Margin)
????????在代碼實現中,通常會對嵌入向量進行歸一化(如 L2 歸一化),以確保不同距離度量下 Margin 的物理意義一致
最小化L:?d(a, p) —> 0,d(a, n) —> margin,則L最小化
Triplet Loss同樣適用于人臉識別模型(人臉匹配)的訓練
同一個人不同角度可以照出多張圖片,同一個意圖也可以用多個文本(問法)表示
人臉識別是CV領域的文本匹配,文本匹配是NLP領域的人臉識別
3.表示型文本匹配 —— 代碼示例 🚀
Ⅰ、配置文件 config.py
① 路徑相關參數
model_path:指定訓練后模型的保存路徑。若加載預訓練模型,需提供模型文件的具體存儲位置
schema_path:定義數據結構的配置文件路徑,通常用于數據預處理或驗證輸入格式(如JSON/YAML文件)
train_data_path:指向訓練集的數據文件或目錄。若為目錄,需包含多個訓練文件(如文本、圖像等)
valid_data_path:指向驗證集的數據文件或目錄。若為目錄,需包含多個訓練文件(如文本、圖像等)
vocab_path:詞表文件路徑,用于自然語言處理任務中定義詞匯的映射關系(如將單詞轉為ID)
② 模型結構參數?
max_length:序列數據的最大長度(如文本的單詞數)。超過此長度的序列會被截斷或填充
hidden_size:神經網絡隱藏層的維度大小,影響模型的表達能力。例如,LSTM或Transformer中每層的神經元數量
③ 訓練控制參數
epoch:訓練過程中遍歷整個數據集的次數。適當增加輪次可提升模型性能,但可能過擬合
batch_size:每次輸入模型的樣本數量。較大的批次可加速訓練,但需更多顯存
epoch_data_size:每輪訓練中采樣的數量
positive_sample_rate:正樣本在批次中的占比,常用于不平衡數據任務(如分類)。需結合負樣本比例調整
④ 優化和學習參數
optimizer:選擇優化算法(如SGD、Adam、AdamW),影響參數更新策略
learning_rate:控制參數更新的步長。學習率過高可能導致震蕩,過低則收斂緩慢
# -*- coding: utf-8 -*-"""
配置參數信息
"""Config = {"model_path": "model_output","schema_path": "../data/schema.json","train_data_path": "../data/train.json","valid_data_path": "../data/valid.json","vocab_path":"../chars.txt","max_length": 20,"hidden_size": 128,"epoch": 10,"batch_size": 32,"epoch_data_size": 200, #每輪訓練中采樣數量"positive_sample_rate":0.5, #正樣本比例"optimizer": "adam","learning_rate": 1e-3,
}
Ⅱ、數據加載 loader.py
① 初始化 def __init__()
屬性 | 類型 | 描述 |
---|---|---|
config | 字典 | 存儲傳入的配置字典。 |
path | 字符串 | 數據存儲的路徑。 |
vocab | 列表或字典 | 從?config["vocab_path"] ?加載的詞匯表。 |
config["vocab_size"] | 整數 | 詞匯表的大小,通過?len(self.vocab) ?計算得到。 |
schema | 字典或其他類型 | 從?config["schema_path"] ?加載的模式(schema)。 |
train_data_size | 整數 | 每個 epoch 的采樣數量,用于控制隨機采樣的數據量。 |
data_type | 字符串 | 標識當前加載的數據類型,可以是?"train" ?或?"test" |
load_vocab():加載字詞表文件
load_schema():加載schema文件
load():在文件中加載數據
len():?Python 內置函數,用于返回對象的長度或元素數量。它適用于多種數據類型,包括字符串、列表、元組、字典、集合等。
參數名 | 類型 | 描述 |
---|---|---|
obj | 可迭代對象 | 需要計算長度的對象,如字符串、列表、元組、字典、集合等。 |
class DataGenerator:def __init__(self, data_path, config):self.config = configself.path = data_pathself.vocab = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.vocab)self.schema = load_schema(config["schema_path"])self.train_data_size = config["epoch_data_size"] #由于采取隨機采樣,所以需要設定一個采樣數量,否則可以一直采self.data_type = None #用來標識加載的是訓練集還是測試集 "train" or "test"self.load()
② 數據集加載(訓練 / 測試)def load()
初始化:測試數據集 self.data 和 標準問題與該標簽所有問題編碼結果的字典 self.knwb
打開文件并逐行讀取:使用with open
語句打開指定路徑的數據文件,編碼格式為utf8
,確保文件內容可以正確讀取
對每一行進行 JSON 解析:從解析后的JSON數據中提取tag
和title
字段,并將tag
轉換為對應的索引label
(通過self.label_to_index
字典映射)。
判斷數據類型并加載數據:根據每一行的數據格式判斷是訓練集還是測試集。????????
- 如果讀取的行是一個字典(即訓練集),首先設置
self.data_type
為"train"
。 - 然后提取該行的
"questions"
鍵對應的值(這是一個問題列表)以及"target"
鍵對應的值(這是一個標簽)。 - 對于問題列表中的每一個問題,先調用
self.encode_sentence(question)
方法對其進行詞匯編碼,然后再將其轉換為torch.LongTensor
格式,以適應后續的模型輸入。 - 最后,將編碼后的問題添加到
self.knwb
字典中對應標簽的列表里。 - 如果讀取的行是一個列表(即測試集),首先設置
self.data_type
為"test"
。 - 然后使用
assert
語句確保該行確實是一個列表,并從中提取問題和標簽。 - 對問題進行詞匯編碼,并轉換為
torch.LongTensor
格式。 - 同樣對標簽進行處理,根據
self.schema
字典將標簽轉換為索引,并也轉換為torch.LongTensor
格式。 - 最后,將編碼后的問題和標簽索引作為一個樣本添加到
self.data
列表中。
isinstance():Python 內置函數,用于檢查一個對象是否屬于某個類型或其子類的實例。
參數名 | 類型 | 描述 |
---|---|---|
object | 任意對象 | 需要檢查的對象。 |
classinfo | 類型或元組 | 可以是一個類型(如?int 、str ?等)或一個類型元組。如果?classinfo ?是元組,isinstance() ?會檢查對象是否屬于元組中任意一個類型的實例。 |
assert:Python 中的調試工具,用于檢查某個條件是否為真。如果條件為假,assert
?會拋出?AssertionError
?異常。
torch.LongTensor():創建一個包含整數的 PyTorch 張量(Tensor),元素類型為?torch.int64
參數名 | 類型 | 描述 |
---|---|---|
data | list ?或?array | 包含整數的列表或數組,用于初始化張量。 |
列表.append():在列表的末尾添加一個元素。
參數名 | 類型 | 描述 |
---|---|---|
element | any | 要添加到列表末尾的元素,可以是任意類型(如整數、字符串、列表等)。 |
def load(self):self.data = []self.knwb = defaultdict(list)with open(self.path, encoding="utf8") as f:for line in f:line = json.loads(line)#加載訓練集if isinstance(line, dict):self.data_type = "train"questions = line["questions"]label = line["target"]for question in questions:input_id = self.encode_sentence(question)input_id = torch.LongTensor(input_id)self.knwb[self.schema[label]].append(input_id)#加載測試集else:self.data_type = "test"assert isinstance(line, list)question, label = lineinput_id = self.encode_sentence(question)input_id = torch.LongTensor(input_id)label_index = torch.LongTensor([self.schema[label]])self.data.append([input_id, label_index])return
③ 文本編碼 def encode_sentence()
初始化空列表 input_id:用于存儲編碼后的結果
判斷使用詞表文件還是字符表:如果是此表文件,則使用jieba對輸入文本text進行分詞,如果指向的是字符表文件,則直接對每個字符進行編碼
對輸入序列進行填充或截斷:將編碼后的序列input_id
傳遞給padding
方法。這個方法會確保每個序列的長度都等于config["max_length"]
字典.get():dict.get()
?是 Python 字典的內置方法,用于安全地獲取字典中指定鍵的值。如果鍵存在,則返回對應的值;如果鍵不存在,則返回默認值(如果未提供默認值,則返回?None
)。它的主要作用是避免直接使用?dict[key]
?時可能引發的?KeyError
?異常。
參數名 | 類型 | 描述 |
---|---|---|
key | 任意類型 | 需要查找的鍵。 |
default | 任意類型 | 可選,如果鍵不存在時返回的默認值。默認為?None 。 |
列表.append():??Python 列表(list
)對象的一個方法,用于在列表的末尾添加一個元素。它會直接修改原列表,而不是返回一個新的列表。
參數名 | 類型 | 描述 |
---|---|---|
element | 任意類型 | 要添加到列表末尾的元素。可以是整數、字符串、列表、字典、元組等 |
def encode_sentence(self, text):input_id = []if self.config["vocab_path"] == "words.txt":for word in jieba.cut(text):input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))else:for char in text:input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))input_id = self.padding(input_id)return input_id
④ 數據規范 def padding()
截取 input_id 的前 max_length 個元素
如果 input_id 長度不足 max_length,則用 0 填充至 max_length
返回處理后的 input_id
len():?Python 內置函數,用于返回對象的長度或元素數量。它適用于多種數據類型,包括字符串、列表、元組、字典、集合等。
參數名 | 類型 | 描述 |
---|---|---|
obj | 可迭代對象 | 需要計算長度的對象,如字符串、列表、元組、字典、集合等。 |
#補齊或截斷輸入的序列,使其可以在一個batch內運算def padding(self, input_id):input_id = input_id[:self.config["max_length"]]input_id += [0] * (self.config["max_length"] - len(input_id))return input_id
⑤ 返回數據集的長度 def __len__()
該函數的作用是返回對象的“長度”,具體行為取決于對象的?data_type
?屬性。
如果?data_type
?為?"train"
,則返回?self.config["epoch_data_size"]
?的值,這通常表示訓練數據的規模。
如果?data_type
?為?"test"
,則返回?self.data
?的長度,即測試數據的元素個數。
如果?data_type
?不是?"train"
?或?"test"
,則會觸發?AssertionError
,并提示?self.data_type
?的值。
assert:Python 中的調試工具,用于檢查某個條件是否為真。如果條件為假,assert
?會拋出?AssertionError
?異常。?
len():?Python 內置函數,用于返回對象的長度或元素數量。它適用于多種數據類型,包括字符串、列表、元組、字典、集合等。
參數名 | 類型 | 描述 |
---|---|---|
obj | 可迭代對象 | 需要計算長度的對象,如字符串、列表、元組、字典、集合等。 |
def __len__(self):if self.data_type == "train":return self.config["epoch_data_size"]else:assert self.data_type == "test", self.data_typereturn len(self.data)
⑥ 通過索引訪問數據集中的樣本 def __getitem__()
判斷數據類型:?如果是訓練數據集,則調用self.random_train_sample()
方法來隨機生成一個訓練樣本。這是因為訓練數據集中的樣本是通過隨機采樣生成的,而不是固定的數據。
如果不是訓練數據集,則認為是測試數據集:直接返回self.data
列表中索引為index
的樣本。
def __getitem__(self, index):if self.data_type == "train":return self.random_train_sample() #隨機生成一個訓練樣本else:return self.data[index]
⑦ ?隨機生成訓練樣本?
兩兩為輸入,基于文本相似度的訓練方式?
????????如果決定生成正樣本:則從?standard_question_index
?中隨機選擇一個類別?p
。接著,檢查該類別下的問題數量是否大于等于2。
????????如果該類別下的問題數量不足2個,則重新調用?random_train_sample()
?方法,即重新隨機生成一個樣本。否則,從類別?p
?中隨機選取兩個問題?s1
?和?s2
,并返回這兩個問題及其標簽?[s1, s2, torch.LongTensor([1])]
。這里的標簽?1
?表示正樣本,即這兩個問題是屬于同一類別的。
????????如果決定生成負樣本:則從?standard_question_index
?中隨機選擇兩個不同的類別?p
?和?n
。接著,從每個類別中隨機選取一個問題,分別為?s1
?和?s2
,并返回這兩個問題及其標簽?[s1, s2, torch.LongTensor([-1])]
。這里的標簽?-1
?表示負樣本,即這兩個問題不屬于同一類別
list():將可迭代對象(如字符串、元組、集合等)轉換為列表。
參數名 | 類型 | 描述 |
---|---|---|
iterable | 可迭代對象 | 需要轉換為列表的可迭代對象,如字符串、元組、集合等。 |
字典.keys():返回字典中所有鍵的視圖對象。
random.random():生成一個范圍在?[0.0,1.0)?之間的隨機浮點數。
random.sample():?從序列中隨機選擇指定數量的唯一元素,返回一個新列表。
參數名 | 類型 | 描述 |
---|---|---|
population | 序列 | 需要從中選擇的序列,如列表、元組等。 |
k | 整數 | 需要選擇的元素數量。 |
random.choice():從非空序列中隨機選擇一個元素。
參數名 | 類型 | 描述 |
---|---|---|
seq | 序列 | 需要從中選擇的非空序列,如列表、元組等。 |
torch.LongTensor():創建一個包含整數的張量,元素類型為?torch.int64
。
參數名 | 類型 | 描述 |
---|---|---|
data | 列表或數組 | 包含整數的數據,用于創建張量。 |
#依照一定概率生成負樣本或正樣本#負樣本從隨機兩個不同的標準問題中各隨機選取一個#正樣本從隨機一個標準問題中隨機選取兩個def random_train_sample(self):standard_question_index = list(self.knwb.keys())#隨機正樣本if random.random() <= self.config["positive_sample_rate"]:p = random.choice(standard_question_index)#如果選取到的標準問下不足兩個問題,則無法選取,所以重新隨機一次if len(self.knwb[p]) < 2:return self.random_train_sample()else:s1, s2 = random.sample(self.knwb[p], 2)return [s1, s2, torch.LongTensor([1])]#隨機負樣本else:p, n = random.sample(standard_question_index, 2)s1 = random.choice(self.knwb[p])s2 = random.choice(self.knwb[n])return [s1, s2, torch.LongTensor([-1])]
⑧ 數據處理
????????加載詞匯表:從指定路徑的文件中讀取字表或詞表,并將其轉化為一個字典,其中字或詞作為鍵,對應的索引作為值。該函數確保索引從 1 開始,因為索引 0 被保留用于填充(padding)操作。這個字典在后續的數據編碼過程中會被頻繁使用,能夠將文本中的每個字或詞映射為唯一的數字索引,便于計算機處理。
open():用于打開文件并返回文件對象,支持讀取、寫入等文件操作。
參數名 | 類型 | 描述 |
---|---|---|
file | 字符串 | 文件路徑(包括文件名),可以是絕對路徑或相對路徑。 |
mode | 字符串 | 文件打開模式,默認為?'r' (只讀)。常見模式包括?'r' 、'w' 、'a' ?等。 |
buffering | 整數 | 緩沖設置,默認為?-1 (系統默認)。 |
encoding | 字符串 | 文件編碼方式,默認為?None 。 |
errors | 字符串 | 編碼錯誤的處理方式,默認為?None 。 |
newline | 字符串 | 換行符設置,默認為?None 。 |
closefd | 布爾值 | 控制?file ?參數的傳入值類型,默認為?True 。 |
enumerate():為可迭代對象(如列表、元組等)添加索引,返回一個枚舉對象,生成?(index, value)
?對。
參數名 | 類型 | 描述 |
---|---|---|
iterable | 可迭代對象 | 需要枚舉的對象,如列表、元組等。 |
start | 整數 | 索引的起始值,默認為?0 。 |
strip():去除字符串開頭和結尾的指定字符(默認去除空白字符,如空格、換行符等)。
參數名 | 類型 | 描述 |
---|---|---|
chars | 字符串 | 可選,指定要刪除的字符。默認為去除空白字符。 |
#加載字表或詞表
def load_vocab(vocab_path):token_dict = {}with open(vocab_path, encoding="utf8") as f:for index, line in enumerate(f):token = line.strip()token_dict[token] = index + 1 #0留給padding位置,所以從1開始return token_dict
加載schema:打開schema_path路徑下的文件,使用utf8編碼讀取文件內容;使用json.loads將讀取的內容解析為Python對象并返回
open():用于打開文件并返回文件對象,支持讀取、寫入等文件操作
參數名 | 類型 | 描述 |
---|---|---|
file | 字符串 | 文件路徑(包括文件名),可以是絕對路徑或相對路徑。 |
mode | 字符串 | 文件打開模式,默認為?'r' (只讀)。常見模式包括?'r' 、'w' 、'a' ?等。 |
buffering | 整數 | 緩沖設置,默認為?-1 (系統默認)。 |
encoding | 字符串 | 文件編碼方式,默認為?None 。 |
errors | 字符串 | 編碼錯誤的處理方式,默認為?None 。 |
newline | 字符串 | 換行符設置,默認為?None 。 |
closefd | 布爾值 | 控制?file ?參數的傳入值類型,默認為?True 。 |
json.loads():將 JSON 格式的字符串解析為 Python 對象(如字典、列表等)。
參數名 | 類型 | 描述 |
---|---|---|
s | 字符串 | 要解析的 JSON 字符串。 |
encoding | 字符串 | 字符編碼(Python 3 中已棄用)。 |
cls | 類 | 自定義解碼類,默認為?None 。 |
object_hook | 函數 | 可選函數,允許自定義將 JSON 對象轉換為其他類型的 Python 對象。 |
parse_float | 函數 | 自定義將 JSON 中的浮點數轉換為特定類型。 |
parse_int | 函數 | 自定義將 JSON 中的整數轉換為特定類型。 |
object_pairs_hook | 函數 | 用于處理 JSON 對象中的鍵值對,默認返回字典。 |
文件對象.read():從文件中讀取指定數量的字節或整個文件內容。
參數名 | 類型 | 描述 |
---|---|---|
size | 整數 | 可選,指定要讀取的字節數。如果未指定或為負數,則讀取整個文件內容。 |
#加載schema
def load_schema(schema_path):with open(schema_path, encoding="utf8") as f:return json.loads(f.read())
封裝數據加載器:根據給定的配置,將數據文件加載并編碼成模型可識別的格式,然后通過DataLoader
類封裝成一個數據加載器。
DataLoader():PyTorch 中的一個標準工具,用于高效地加載和處理數據。它支持批量加載、數據打亂、多線程加載等功能,是深度學習訓練中常用的數據加載器。
參數名 | 類型 | 描述 |
---|---|---|
dataset | 數據集對象 | 要加載的數據集,必須實現?__len__ ?和?__getitem__ ?方法。 |
batch_size | 整數 | 每個批次的大小,默認為?1 。 |
shuffle | 布爾值 | 是否在每個 epoch 開始時打亂數據順序,默認為?False 。 |
sampler | 采樣器對象 | 自定義采樣器,用于控制數據的采樣方式,默認為?None 。 |
batch_sampler | 批次采樣器對象 | 自定義批次采樣器,用于控制批次的采樣方式,默認為?None 。 |
num_workers | 整數 | 用于數據加載的子進程數量,默認為?0 (主進程加載)。 |
collate_fn | 函數 | 用于將一個批次的數據合并成一個張量或元組,默認為?None 。 |
pin_memory | 布爾值 | 是否將數據存儲在 pin memory 中(用于 GPU 加速),默認為?False 。 |
drop_last | 布爾值 | 如果數據不能完全分成批次,是否刪除最后一個不完整的批次,默認為?False 。 |
timeout | 整數 | 數據加載的最大等待時間(秒),默認為?0 (無限制)。 |
worker_init_fn | 函數 | 用于初始化每個數據加載器子進程的函數,默認為?None 。 |
#用torch自帶的DataLoader類封裝數據
def load_data(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)return dl
⑨ 加載數據文件
# -*- coding: utf-8 -*-import json
import re
import os
import torch
import random
import jieba
import numpy as np
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
"""
數據加載
"""class DataGenerator:def __init__(self, data_path, config):self.config = configself.path = data_pathself.vocab = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.vocab)self.schema = load_schema(config["schema_path"])self.train_data_size = config["epoch_data_size"] #由于采取隨機采樣,所以需要設定一個采樣數量,否則可以一直采self.data_type = None #用來標識加載的是訓練集還是測試集 "train" or "test"self.load()def load(self):self.data = []self.knwb = defaultdict(list)with open(self.path, encoding="utf8") as f:for line in f:line = json.loads(line)#加載訓練集if isinstance(line, dict):self.data_type = "train"questions = line["questions"]label = line["target"]for question in questions:input_id = self.encode_sentence(question)input_id = torch.LongTensor(input_id)self.knwb[self.schema[label]].append(input_id)#加載測試集else:self.data_type = "test"assert isinstance(line, list)question, label = lineinput_id = self.encode_sentence(question)input_id = torch.LongTensor(input_id)label_index = torch.LongTensor([self.schema[label]])self.data.append([input_id, label_index])returndef encode_sentence(self, text):input_id = []if self.config["vocab_path"] == "words.txt":for word in jieba.cut(text):input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))else:for char in text:input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))input_id = self.padding(input_id)return input_id#補齊或截斷輸入的序列,使其可以在一個batch內運算def padding(self, input_id):input_id = input_id[:self.config["max_length"]]input_id += [0] * (self.config["max_length"] - len(input_id))return input_iddef __len__(self):if self.data_type == "train":return self.config["epoch_data_size"]else:assert self.data_type == "test", self.data_typereturn len(self.data)def __getitem__(self, index):if self.data_type == "train":return self.random_train_sample() #隨機生成一個訓練樣本else:return self.data[index]#依照一定概率生成負樣本或正樣本#負樣本從隨機兩個不同的標準問題中各隨機選取一個#正樣本從隨機一個標準問題中隨機選取兩個def random_train_sample(self):standard_question_index = list(self.knwb.keys())#隨機正樣本if random.random() <= self.config["positive_sample_rate"]:p = random.choice(standard_question_index)#如果選取到的標準問下不足兩個問題,則無法選取,所以重新隨機一次if len(self.knwb[p]) < 2:return self.random_train_sample()else:s1, s2 = random.sample(self.knwb[p], 2)return [s1, s2, torch.LongTensor([1])]#隨機負樣本else:p, n = random.sample(standard_question_index, 2)s1 = random.choice(self.knwb[p])s2 = random.choice(self.knwb[n])return [s1, s2, torch.LongTensor([-1])]#加載字表或詞表
def load_vocab(vocab_path):token_dict = {}with open(vocab_path, encoding="utf8") as f:for index, line in enumerate(f):token = line.strip()token_dict[token] = index + 1 #0留給padding位置,所以從1開始return token_dict#加載schema
def load_schema(schema_path):with open(schema_path, encoding="utf8") as f:return json.loads(f.read())#用torch自帶的DataLoader類封裝數據
def load_data(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)return dlif __name__ == "__main__":from config import Configdg = DataGenerator(r"F:\人工智能NLP\NLP\Day8_文本匹配問題\data\train.json", Config)print(dg[1])
Ⅲ、模型定義?model.py
① 生成句子的嵌入表示(文本轉向量)
模型初始化
hidden_size
:隱藏層的大小,決定了 Embedding 和線性層的輸出維度。vocab_size
:詞匯表的大小,加 1 是為了處理填充索引(padding_idx=0
)。max_length
:句子的最大長度(雖然代碼中未直接使用,但可能是為后續處理預留的)。self.embedding
:nn.Embedding
?層:將詞索引映射為?hidden_size
?維的向量。self.layer
:nn.Linear
?層:對 Embedding 層的輸出進行線性變換。self.dropout
:nn.Dropout
?層,設置丟棄率為 0.5。
nn.Embedding():用于將離散的類別索引(如單詞索引)映射到連續的稠密向量空間中。它常用于自然語言處理(NLP)任務中,將單詞或其他類別信息轉換為向量表示。
參數名 | 類型 | 描述 |
---|---|---|
num_embeddings | 整數 | 嵌入字典的大小,即不同類別的總數(如詞匯表大小)。 |
embedding_dim | 整數 | 每個嵌入向量的維度。 |
padding_idx | 整數(可選) | 指定一個索引,用于填充(padding),該索引對應的嵌入向量不會被更新。 |
max_norm | 浮點數(可選) | 如果設置了該值,嵌入向量的范數會被裁剪到不超過?max_norm 。 |
norm_type | 浮點數(可選) | 用于裁剪范數的類型,默認為 2(L2 范數)。 |
scale_grad_by_freq | 布爾值(可選) | 如果為?True ,梯度會根據單詞在 mini-batch 中出現的頻率進行縮放。 |
sparse | 布爾值(可選) | 如果為?True ,梯度將變為稀疏張量,適用于大型詞匯表以節省內存。 |
nn.Linear():是一個全連接層,用于對輸入數據進行線性變換。它將輸入特征與權重矩陣相乘,并加上偏置項,輸出變換后的結果。
參數名 | 類型 | 描述 |
---|---|---|
in_features | 整數 | 輸入特征的數量。 |
out_features | 整數 | 輸出特征的數量。 |
bias | 布爾值(可選) | 是否使用偏置項,默認為?True 。 |
nn.Dropout():?一種正則化技術,用于防止模型過擬合。它在訓練過程中隨機將一部分神經元的輸出設置為 0,從而減少神經元之間的依賴性。
參數名 | 類型 | 描述 |
---|---|---|
p | 浮點數 | 神經元被丟棄的概率,默認為 0.5。 |
inplace | 布爾值(可選) | 是否就地操作,默認為?False 。 |
class SentenceEncoder(nn.Module):def __init__(self, config):super(SentenceEncoder, self).__init__()hidden_size = config["hidden_size"]vocab_size = config["vocab_size"] + 1max_length = config["max_length"]self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)# self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)self.layer = nn.Linear(hidden_size, hidden_size)self.dropout = nn.Dropout(0.5)
前向傳播
transpose():用于將數組或矩陣的行和列進行轉置。例如,將行數據轉換為列數據,或將列數據轉換為行數據。
參數名 | 類型 | 描述 |
---|---|---|
array | 數組或矩陣 | 需要進行轉置的數組或矩陣。 |
axes | 元組(可選) | 指定轉置的軸順序,默認為?None ,表示反轉所有軸。 |
squeeze():?用于移除數組中維度為 1 的維度,從而簡化數組的形狀。
參數名 | 類型 | 描述 |
---|---|---|
a | 數組 | 輸入的數組。 |
axis | 整數或元組(可選) | 指定要移除的維度,默認為?None ,表示移除所有維度為 1 的軸。 |
shape():?用于獲取數組或矩陣的形狀,返回一個表示各維度大小的元組。
參數名 | 類型 | 描述 |
---|---|---|
array | 數組或矩陣 | 需要獲取形狀的數組或矩陣。 |
#輸入為問題字符編碼def forward(self, x):x = self.embedding(x)#使用lstm# x, _ = self.lstm(x)#使用線性層x = self.layer(x)x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()return x
② 計算句子間相似度
模型初始化
super(SiameseNetwork, self).__init__()
:調用父類?nn.Module
?的初始化方法。self.sentence_encoder = SentenceEncoder(config)
:初始化一個?SentenceEncoder
?實例,用于對句子進行編碼。self.loss = nn.CosineEmbeddingLoss()
:初始化余弦嵌入損失函數,用于計算兩個句子向量之間的相似度損失。
nn.CosineEmbeddingLoss():?PyTorch 中的一個損失函數,用于衡量兩個輸入向量的相似性。它通過計算兩個輸入向量的余弦相似度來評估它們的相似性,并根據標簽值(1 或 -1)調整損失。
- 如果?y=1,表示兩個輸入向量應相似,損失為?1?cos(x1?,x2?)。
- 如果?y=?1,表示兩個輸入向量應不相似,損失為?max(0,cos(x1?,x2?))。
參數名 | 類型 | 描述 |
---|---|---|
margin | 浮點數(可選) | 用于調整不相似樣本的損失,默認為?0.0 。 |
reduction | 字符串(可選) | 指定損失的聚合方式,可選值為?'none' 、'mean' ?或?'sum' ,默認為?'mean' 。 |
class SiameseNetwork(nn.Module):def __init__(self, config):super(SiameseNetwork, self).__init__()self.sentence_encoder = SentenceEncoder(config)self.loss = nn.CosineEmbeddingLoss()
計算余弦距離:計算兩個輸入張量之間的余弦距離。它首先對輸入的張量進行歸一化處理,確保每個向量的長度為 1,然后計算歸一化張量之間的點積,最后通過?1 - 點積
?轉化為余弦距離。
torch.nn.functional.normalize():用于對輸入張量在指定維度上進行?Lp??范數歸一化。默認情況下,它使用?L2??范數(歐幾里得范數)對向量進行歸一化。
公式:
- v?是輸入張量的某一維度上的向量。
- ||v||_p??是?Lp??范數。
- ??是一個小值,用于避免除以零。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
p | float | 范數的指數值,默認為?2 (L2??范數)。 |
dim | int 或 tuple | 歸一化的維度,默認為?1 。 |
eps | float | 用于避免除以零的小值,默認為?1e-12 。 |
out | Tensor(可選) | 輸出張量。如果指定,操作不可微分。 |
torch.sum():計算輸入張量在指定維度上的元素和。如果不指定維度,則計算所有元素的和。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int 或 tuple | 求和的維度,默認為?None (計算所有元素的和)。 |
keepdim | bool | 是否保留求和后的維度,默認為?False 。 |
dtype | dtype(可選) | 輸出張量的數據類型,默認為?None 。 |
torch.mul():對兩個張量進行逐元素乘法。如果輸入張量的形狀不同,會進行廣播操作。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 第一個輸入張量。 |
other | Tensor 或 float | 第二個輸入張量或標量。 |
out | Tensor(可選) | 輸出張量。 |
# 計算余弦距離 1-cos(a,b)# cos=1時兩個向量相同,余弦距離為0;cos=0時,兩個向量正交,余弦距離為1def cosine_distance(self, tensor1, tensor2):tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)return 1 - cosine
計算Triplet_Loss三元組損失:三元組損失可以用于訓練神經網絡以學習特征向量之間的相對距離,確保對于錨點而言,正樣本更接近(較小的余弦距離),而負樣本更遠(較大的余弦距離)。通過這種方式,可以有效地訓練分類器或聚類模型,使得同一類的樣本點之間的距離小于不同類的樣本點之間的距離。這里的cosine_triplet_loss
的具體實現可以通過余弦距離來衡量樣本之間的相似性,并通過margin
參數來控制正負樣本之間距離的差距。
squeeze():?用于移除數組中維度為 1 的維度,從而簡化數組的形狀。
參數名 | 類型 | 描述 |
---|---|---|
a | 數組 | 輸入的數組。 |
axis | 整數或元組(可選) | 指定要移除的維度,默認為?None ,表示移除所有維度為 1 的軸。 |
torch.mean():?PyTorch 中的一個函數,用于計算張量(Tensor)的平均值。它可以計算整個張量所有元素的平均值,或者指定某個維度上的平均值。該函數在數據預處理、統計分析以及神經網絡訓練過程中計算損失等場景中非常有用
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 需要計算平均值的輸入張量。 |
dim | int 或 tuple of ints | 指定在哪個維度上計算平均值。例如,dim=0 ?表示按行計算,dim=1 ?表示按列計算。默認為?None ,計算所有元素的平均值。 |
keepdim | bool | 是否保持輸出張量的維度數量與輸入張量一致。默認為?False ,即減少維度數量。 |
out | Tensor (可選) | 指定輸出的張量。如果提供,結果將存儲在該張量中。 |
def cosine_triplet_loss(self, a, p, n, margin=None):ap = self.cosine_distance(a, p)an = self.cosine_distance(a, n)if margin is None:diff = ap - an + 0.1else:diff = ap - an + margin.squeeze()return torch.mean(diff[diff.gt(0)]) #greater than
前向傳播:處理單個或兩個句子的輸入。當輸入兩個句子時,模型會計算它們的向量表示,并根據是否提供標簽計算損失或余弦距離;當只提供一個句子時,模型返回該句子的向量表示。這種
squeeze():?用于移除數組中維度為 1 的維度,從而簡化數組的形狀。
參數名 | 類型 | 描述 |
---|---|---|
a | 數組 | 輸入的數組。 |
axis | 整數或元組(可選) | 指定要移除的維度,默認為?None ,表示移除所有維度為 1 的軸。 |
#sentence : (batch_size, max_length)def forward(self, sentence1, sentence2=None, target=None):#同時傳入兩個句子if sentence2 is not None:vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)vector2 = self.sentence_encoder(sentence2)#如果有標簽,則計算lossif target is not None:return self.loss(vector1, vector2, target.squeeze())#如果無標簽,計算余弦距離else:return self.cosine_distance(vector1, vector2)#單獨傳入一個句子時,認為正在使用向量化能力else:return self.sentence_encoder(sentence1)
③ 根據配置選擇優化器
????????根據用戶在配置字典中指定的優化器類型("adam" 或 "sgd")來選擇并初始化相應的優化器,并設置學習率。
model.parameters():PyTorch 中?torch.nn.Module
?類的一個方法,用于獲取模型中所有可訓練參數的迭代器。這些參數通常是模型的權重和偏置,它們會在訓練過程中通過優化器進行更新。
def choose_optimizer(config, model):optimizer = config["optimizer"]learning_rate = config["learning_rate"]if optimizer == "adam":return Adam(model.parameters(), lr=learning_rate)elif optimizer == "sgd":return SGD(model.parameters(), lr=learning_rate)
④ 模型驗證
????????設置配置參數、構造一個Siamese網絡模型,并在此模型上運行兩個句子的相似度計算。通過輸入一對句子的編碼和相應的標簽,模型可以評估這兩個句子的相似性,同時計算損失。最終的輸出將是損失值或相似度度量。?
torch.LongTensor():?PyTorch 中的一個函數,用于創建一個包含整數(64 位整型)的張量。它是一種特定的張量類型,通常用于存儲索引、標簽或其他整數值數據。
參數名 | 類型 | 描述 |
---|---|---|
data | 列表、元組或數組 | 輸入數據,用于創建張量。 |
dtype | torch.dtype | 張量的數據類型,默認為?torch.int64 (64 位整數)。 |
device | torch.device | 張量存儲的設備(如 CPU 或 GPU),默認為?None (使用默認設備)。 |
requires_grad | bool | 是否需要計算梯度,默認為?False 。 |
# -*- coding: utf-8 -*-import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
"""
建立網絡模型結構
"""class SentenceEncoder(nn.Module):def __init__(self, config):super(SentenceEncoder, self).__init__()hidden_size = config["hidden_size"]vocab_size = config["vocab_size"] + 1max_length = config["max_length"]self.embedding = nn.Embedding(vocab_size, hidden_size, padding_idx=0)# self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True, bidirectional=True)self.layer = nn.Linear(hidden_size, hidden_size)self.dropout = nn.Dropout(0.5)#輸入為問題字符編碼def forward(self, x):x = self.embedding(x)#使用lstm# x, _ = self.lstm(x)#使用線性層x = self.layer(x)x = nn.functional.max_pool1d(x.transpose(1, 2), x.shape[1]).squeeze()return xclass SiameseNetwork(nn.Module):def __init__(self, config):super(SiameseNetwork, self).__init__()self.sentence_encoder = SentenceEncoder(config)self.loss = nn.CosineEmbeddingLoss()# 計算余弦距離 1-cos(a,b)# cos=1時兩個向量相同,余弦距離為0;cos=0時,兩個向量正交,余弦距離為1def cosine_distance(self, tensor1, tensor2):tensor1 = torch.nn.functional.normalize(tensor1, dim=-1)tensor2 = torch.nn.functional.normalize(tensor2, dim=-1)cosine = torch.sum(torch.mul(tensor1, tensor2), axis=-1)return 1 - cosinedef cosine_triplet_loss(self, a, p, n, margin=None):ap = self.cosine_distance(a, p)an = self.cosine_distance(a, n)if margin is None:diff = ap - an + 0.1else:diff = ap - an + margin.squeeze()return torch.mean(diff[diff.gt(0)]) #greater than#sentence : (batch_size, max_length)def forward(self, sentence1, sentence2=None, target=None):#同時傳入兩個句子if sentence2 is not None:vector1 = self.sentence_encoder(sentence1) #vec:(batch_size, hidden_size)vector2 = self.sentence_encoder(sentence2)#如果有標簽,則計算lossif target is not None:return self.loss(vector1, vector2, target.squeeze())#如果無標簽,計算余弦距離else:return self.cosine_distance(vector1, vector2)#單獨傳入一個句子時,認為正在使用向量化能力else:return self.sentence_encoder(sentence1)def choose_optimizer(config, model):optimizer = config["optimizer"]learning_rate = config["learning_rate"]if optimizer == "adam":return Adam(model.parameters(), lr=learning_rate)elif optimizer == "sgd":return SGD(model.parameters(), lr=learning_rate)if __name__ == "__main__":from config import ConfigConfig["vocab_size"] = 10Config["max_length"] = 4model = SiameseNetwork(Config)s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]])s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]])l = torch.LongTensor([[1],[0]])y = model(s1, s2, l)print(y)# print(model.state_dict())
Ⅳ、模型效果測試?evaluate.py
① 模型初始化
????????初始化Evaluator類,設置配置、模型和日志記錄器。加載驗證數據集和訓練數據集(用于效果測試),并初始化統計字典以記錄正確和錯誤的數量。
class Evaluator:def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = loggerself.valid_data = load_data(config["valid_data_path"], config, shuffle=False)# 由于效果測試需要訓練集當做知識庫,再次加載訓練集。# 事實上可以通過傳參把前面加載的訓練集傳進來更合理,但是為了主流程代碼改動量小,在這里重新加載一遍self.train_data = load_data(config["train_data_path"], config)self.stats_dict = {"correct":0, "wrong":0} #用于存儲測試結果
② 將問題轉換為向量表示 ?
????????將訓練數據集中的問題轉換為向量形式,并記錄每個問題編號到標準問題編號的映射關系。這樣做的目的是為了在模型評估過程中,通過計算輸入問題與知識庫中所有問題向量的相似度,來判斷模型的預測結果是否正確。歸一化處理確保了向量之間的相似度計算更加準確可靠。?
初始化字典和列表
- 創建了一個空字典?
question_index_to_standard_question_index
,用于存儲問題編號到標準問題編號的映射。 - 創建了一個空列表?
question_ids
,用于存儲所有訓練問題的ID。
遍歷訓練數據集中的問題
- 使用?
for
?循環遍歷訓練數據集中的每個標準問題及其對應的問題列表。 - 對于每個問題ID,將其添加到?
question_ids
?列表中,并在?question_index_to_standard_question_index
?中記錄問題編號到標準問題編號的映射關系。
將問題ID轉換為向量
- 使用?
torch.no_grad()
?上下文管理器,確保在執行這段代碼時不會計算梯度,因為這里不需要進行反向傳播。 - 使用?
torch.stack
?將?question_ids
?列表中的所有問題ID堆疊在一起,形成一個二維的?question_matrixs
?張量,其中每一行對應一個問題ID。 - 檢查CUDA是否可用,如果可用,則將?
question_matrixs
?移動到GPU上。 - 將?
question_matrixs
?輸入到模型?self.model
?中,得到問題的向量化表示?knwb_vectors
。 - 對?
knwb_vectors
?中的所有向量進行歸一化處理,即將每個向量除以其自身的范數,這樣可以使得不同向量之間的相似度計算更加合理。
items():Python 字典的方法,返回字典中所有鍵值對的視圖,每個鍵值對以元組形式返回。
append():Python 列表的方法,用于在列表末尾添加一個元素。
參數名 | 類型 | 描述 |
---|---|---|
element | 任意類型 | 要添加到列表末尾的元素。 |
torch.no_grad():PyTorch 的上下文管理器,用于禁用梯度計算,通常在模型推理或評估時使用,以減少內存消耗并加速計算。
torch.stack():用于將多個張量沿著新的維度進行堆疊,所有輸入張量的形狀必須相同。
參數名 | 類型 | 描述 |
---|---|---|
tensors | 張量序列 | 要堆疊的張量序列。 |
dim | int | 指定堆疊的維度,默認為?0 。 |
torch.cuda.is_available():用于檢查當前系統是否支持 CUDA(即是否有可用的 GPU)。
cuda():PyTorch 張量或模型的方法,用于將張量或模型移動到 GPU 上。
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU)。 |
torch.nn.functional.normalize():用于對輸入張量在指定維度上進行?Lp??范數(p 是范數的指數值)歸一化,默認使用?L2??范數。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
p | float | 范數的指數值,默認為?2 (L2??范數)。 |
dim | int | 歸一化的維度,默認為?1 。 |
eps | float | 用于避免除以零的小值,默認為?1e-12 。 |
#將知識庫中的問題向量化,為匹配做準備#每輪訓練的模型參數不一樣,生成的向量也不一樣,所以需要每輪測試都重新進行向量化def knwb_to_vector(self):self.question_index_to_standard_question_index = {}self.question_ids = []for standard_question_index, question_ids in self.train_data.dataset.knwb.items():for question_id in question_ids:#記錄問題編號到標準問題標號的映射,用來確認答案是否正確self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_indexself.question_ids.append(question_id)with torch.no_grad():question_matrixs = torch.stack(self.question_ids, dim=0)if torch.cuda.is_available():question_matrixs = question_matrixs.cuda()self.knwb_vectors = self.model(question_matrixs)#將所有向量都作歸一化 v / |v|self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)return
③ 評估模型表現
1.記錄日志并初始化統計字典。
2.將模型設置為評估模式,并將知識庫轉換為向量表示。
3.遍歷驗證數據集,處理每一批次的數據:
????????如果有GPU可用,將數據移動到GPU。
????????使用模型進行預測,不計算梯度。
????????更新統計信息。
4.顯示最終的統計結果。
logger.info():Python 的?logging
?模塊中用于記錄信息級別(info level)日志的函數。它通常用于記錄程序運行時的關鍵信息,如狀態、進度等,而不是調試信息或錯誤。
參數名 | 類型 | 描述 |
---|---|---|
msg | str | 要記錄的日志信息。 |
*args | 任意類型 | 用于格式化日志信息的參數。 |
**kwargs | dict | 可選參數,如?exc_info 、stack_info ?等,用于附加異常或堆棧信息。 |
model.eval():PyTorch 中用于將模型設置為評估模式的方法。在評估模式下,模型會禁用?Dropout
?和?Batch Normalization
?等訓練時的特定行為,以確保測試結果的穩定性。
enumerate():Python 的內置函數,用于在遍歷可迭代對象(如列表、元組、字符串)時同時獲取索引和值。
參數名 | 類型 | 描述 |
---|---|---|
iterable | 可迭代對象 | 要遍歷的對象(如列表、元組、字符串)。 |
start | int | 索引的起始值,默認為?0 。 |
torch.cuda.is_available():PyTorch 中用于檢查當前系統是否支持 CUDA(即是否有可用的 GPU)的函數。
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU)。 |
cuda():PyTorch 中用于將張量或模型移動到 GPU 上的方法。
torch.no_grad():PyTorch 的上下文管理器,用于禁用梯度計算,通常在模型推理或評估時使用,以減少內存消耗并加速計算。
def eval(self, epoch):self.logger.info("開始測試第%d輪模型效果:" % epoch)self.stats_dict = {"correct":0, "wrong":0} #清空前一輪的測試結果self.model.eval()self.knwb_to_vector()for index, batch_data in enumerate(self.valid_data):if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]input_id, labels = batch_data #輸入變化時這里需要修改,比如多輸入,多輸出的情況with torch.no_grad():test_question_vectors = self.model(input_id) #不輸入labels,使用模型當前參數進行預測self.write_stats(test_question_vectors, labels)self.show_stats()return
④ 計算輸入問題向量與知識庫問題向量的相似度
斷言檢查:首先通過?assert
?確保?labels
?和?test_question_vectors
?的長度一致。
?矩陣乘法:對于每個測試問題向量?test_question_vector
,通過?torch.mm
?函數計算其與知識庫中所有問題向量?self.knwb_vectors
?的相似度。
? ? torch.mm
?是 PyTorch 中的矩陣乘法函數,用于計算兩個矩陣的乘積
? ? test_question_vector.unsqueeze(0)
:將?test_question_vector
?的形狀從?[vec_size]
?擴展為?[1, vec_size]
,以便與?self.knwb_vectors.T
?進行矩陣乘法。
???self.knwb_vectors.T
:知識庫問題向量的轉置,形狀為?[vec_size, n]
,其中?n
?是知識庫中問題的數量。
? ? res
:矩陣乘法的結果,形狀為?[1, n]
,表示測試問題與知識庫中每個問題的相似度。
命中問題標號:通過?torch.argmax
?找到相似度最高的索引?hit_index
,即命中問題的標號。
標準問編號轉換:將?hit_index
?轉換為標準問編號self.question_index_to_standard_question_index[hit_index]
。?
統計更新:如果命中問題的標準問編號與標簽?label
?一致,則增加?
self.stats_dict["correct"]
?的計數;否則增加?self.stats_dict["wrong"]
?的計數。
assert:是 Python 中的調試工具,用于檢查某個條件是否為真。如果條件為假,程序會拋出?AssertionError
?并終止執行。
參數名 | 類型 | 描述 |
---|---|---|
condition | bool | 要檢查的條件表達式。如果為假,則拋出?AssertionError 。 |
message | str | 可選參數,用于在斷言失敗時顯示的錯誤信息。 |
zip():Python 的內置函數,用于將多個可迭代對象(如列表、元組)的元素按索引配對,返回一個 zip 對象。
參數名 | 類型 | 描述 |
---|---|---|
iterables | 可迭代對象 | 要配對的多個可迭代對象。 |
unsqueeze():?是 PyTorch 中的函數,用于在指定維度上插入一個大小為 1 的維度,從而改變張量的形狀。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int | 要插入新維度的位置,范圍是?[-input.dim()-1, input.dim()] 。 |
.T:PyTorch 和 NumPy 中的屬性,用于返回張量或矩陣的轉置。
torch.mm():?PyTorch 中的函數,用于計算兩個二維張量(矩陣)的矩陣乘法。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 第一個矩陣,形狀為?(m, n) 。 |
mat2 | Tensor | 第二個矩陣,形狀為?(n, p) 。 |
torch.argmax():PyTorch 中的函數,用于返回張量中最大值所在的索引。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int | 指定沿哪個維度計算最大值索引。 |
keepdim | bool | 是否保持輸出張量的維度,默認為?False 。 |
squeeze():是 PyTorch 和 NumPy 中的函數,用于移除張量或數組中大小為 1 的維度。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int | 可選參數,指定要移除的維度。 |
def write_stats(self, test_question_vectors, labels):assert len(labels) == len(test_question_vectors)for test_question_vector, label in zip(test_question_vectors, labels):#通過一次矩陣乘法,計算輸入問題和知識庫中所有問題的相似度#test_question_vector shape [vec_size] knwb_vectors shape = [n, vec_size]res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)hit_index = int(torch.argmax(res.squeeze())) #命中問題標號hit_index = self.question_index_to_standard_question_index[hit_index] #轉化成標準問編號if int(hit_index) == int(label):self.stats_dict["correct"] += 1else:self.stats_dict["wrong"] += 1return
⑤ 展示模型預測的統計信息
- 統計信息提取:
correct = self.stats_dict["correct"]
:從?self.stats_dict
?中獲取預測正確的條目數。wrong = self.stats_dict["wrong"]
:從?self.stats_dict
?中獲取預測錯誤的條目數。
- ?日志輸出:
self.logger.info("預測集合條目總量:%d" % (correct + wrong))
:輸出預測集合的總條目數,即正確條目數與錯誤條目數之和。self.logger.info("預測正確條目:%d,預測錯誤條目:%d" % (correct, wrong))
:輸出預測正確的條目數和預測錯誤的條目數。self.logger.info("預測準確率:%f" % (correct / (correct + wrong)))
:輸出預測的準確率,即正確條目數占總條目數的比例。self.logger.info("--------------------")
:輸出分隔線,用于區分不同的日志信息。
logger.info():?是 Python 的?logging
?模塊中用于記錄信息級別(info level)日志的函數。它通常用于記錄程序運行時的關鍵信息,如狀態、進度等,而不是調試信息或錯誤。
參數名 | 類型 | 描述 |
---|---|---|
msg | str | 要記錄的日志信息。 |
*args | 任意類型 | 用于格式化日志信息的參數。 |
**kwargs | dict | 可選參數,如?exc_info 、stack_info ?等,用于附加異常或堆棧信息。 |
# -*- coding: utf-8 -*-
import torch
from loader import load_data"""
模型效果測試
"""class Evaluator:def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = loggerself.valid_data = load_data(config["valid_data_path"], config, shuffle=False)# 由于效果測試需要訓練集當做知識庫,再次加載訓練集。# 事實上可以通過傳參把前面加載的訓練集傳進來更合理,但是為了主流程代碼改動量小,在這里重新加載一遍self.train_data = load_data(config["train_data_path"], config)self.stats_dict = {"correct":0, "wrong":0} #用于存儲測試結果#將知識庫中的問題向量化,為匹配做準備#每輪訓練的模型參數不一樣,生成的向量也不一樣,所以需要每輪測試都重新進行向量化def knwb_to_vector(self):self.question_index_to_standard_question_index = {}self.question_ids = []for standard_question_index, question_ids in self.train_data.dataset.knwb.items():for question_id in question_ids:#記錄問題編號到標準問題標號的映射,用來確認答案是否正確self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_indexself.question_ids.append(question_id)with torch.no_grad():question_matrixs = torch.stack(self.question_ids, dim=0)if torch.cuda.is_available():question_matrixs = question_matrixs.cuda()self.knwb_vectors = self.model(question_matrixs)#將所有向量都作歸一化 v / |v|self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)returndef eval(self, epoch):self.logger.info("開始測試第%d輪模型效果:" % epoch)self.stats_dict = {"correct":0, "wrong":0} #清空前一輪的測試結果self.model.eval()self.knwb_to_vector()for index, batch_data in enumerate(self.valid_data):if torch.cuda.is_available():batch_data = [d.cuda() for d in batch_data]input_id, labels = batch_data #輸入變化時這里需要修改,比如多輸入,多輸出的情況with torch.no_grad():test_question_vectors = self.model(input_id) #不輸入labels,使用模型當前參數進行預測self.write_stats(test_question_vectors, labels)self.show_stats()returndef write_stats(self, test_question_vectors, labels):assert len(labels) == len(test_question_vectors)for test_question_vector, label in zip(test_question_vectors, labels):#通過一次矩陣乘法,計算輸入問題和知識庫中所有問題的相似度#test_question_vector shape [vec_size] knwb_vectors shape = [n, vec_size]res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)hit_index = int(torch.argmax(res.squeeze())) #命中問題標號hit_index = self.question_index_to_standard_question_index[hit_index] #轉化成標準問編號if int(hit_index) == int(label):self.stats_dict["correct"] += 1else:self.stats_dict["wrong"] += 1returndef show_stats(self):correct = self.stats_dict["correct"]wrong = self.stats_dict["wrong"]self.logger.info("預測集合條目總量:%d" % (correct +wrong))self.logger.info("預測正確條目:%d,預測錯誤條目:%d" % (correct, wrong))self.logger.info("預測準確率:%f" % (correct / (correct + wrong)))self.logger.info("--------------------")return
Ⅴ、訓練主流程 main.py
① 配置日志記錄
logging.basicConfig():用于配置日志系統的基本設置,包括日志級別、輸出格式、輸出目標(如控制臺或文件)等。它通常在程序初始化時調用一次。?
參數名 | 類型 | 描述 |
---|---|---|
level | int | 設置日志級別,如?logging.DEBUG 、logging.INFO ?等。 |
format | str | 設置日志輸出格式,如?'%(asctime)s - %(levelname)s - %(message)s' 。 |
filename | str | 設置日志輸出到文件,指定文件名。 |
filemode | str | 設置文件打開模式,默認為?'a' (追加模式)。 |
handlers | list | 設置自定義的日志處理器。 |
logging.getLogger():?用于獲取一個日志記錄器(Logger)對象。每個記錄器都有一個名稱(name),可以用來區分不同的日志記錄器。
參數名 | 類型 | 描述 |
---|---|---|
name | str | 日志記錄器的名稱。如果未提供或為?None ,則返回根記錄器(root logger)。 |
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
② 創建保存模型的目錄
?????????檢查指定的模型保存路徑是否存在,如果不存在,則創建該路徑作為目錄。
os.path.isdir():?Python 中?os.path
?模塊的函數,用于檢查指定的路徑是否為一個存在的目錄。如果路徑存在且是一個目錄,則返回?True
,否則返回?False
。
參數名 | 類型 | 描述 |
---|---|---|
path | str | 表示文件系統路徑的類路徑對象(可以是相對路徑或絕對路徑)。 |
os.mkdir():Python 中?os
?模塊的函數,用于創建一個新的目錄。如果目錄已經存在或路徑無效,會拋出?FileExistsError
?或?OSError
參數名 | 類型 | 描述 |
---|---|---|
path | str | 要創建的目錄路徑。 |
mode | int | 可選參數,設置目錄的權限(八進制模式),默認為?0o777 。 |
#創建保存模型的目錄if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])
③ 加載文件
torch.cuda.is_available():PyTorch 中用于檢查當前系統是否支持 CUDA(即是否有可用的 GPU)的函數。
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU)。 |
cuda():PyTorch 張量或模型的方法,用于將張量或模型移動到 GPU 上。
#加載訓練數據train_data = load_data(config["train_data_path"], config)#加載模型model = SiameseNetwork(config)# 標識是否使用gpucuda_flag = torch.cuda.is_available()if cuda_flag:logger.info("gpu可以使用,遷移模型至gpu")model = model.cuda()#加載優化器optimizer = choose_optimizer(config, model)#加載效果測試類evaluator = Evaluator(config, model, logger)
④ 訓練的核心過程 ?
-
?訓練循環:
for epoch in range(config["epoch"])
:遍歷每個訓練周期(epoch),config["epoch"]
?是訓練的總周期數。epoch += 1
:將當前周期數加1,用于日志記錄。model.train()
:將模型設置為訓練模式,啟用Dropout和Batch Normalization等訓練特定行為。
-
?數據加載與處理:
for index, batch_data in enumerate(train_data)
:遍歷訓練數據集的每個批次(batch)。optimizer.zero_grad()
:清空優化器的梯度緩存,避免梯度累積。if cuda_flag: batch_data = [d.cuda() for d in batch_data]
:如果啟用了CUDA(GPU加速),將數據移動到GPU上。
-
?模型前向傳播與損失計算:
input_id1, input_id2, labels = batch_data
:從批次數據中提取兩個輸入(input_id1
?和?input_id2
)以及對應的標簽(labels
)。
-
?反向傳播與優化:
train_loss.append(loss.item())
:將當前批次的損失值記錄下來。loss.backward()
:執行反向傳播,計算梯度。optimizer.step()
:更新模型參數,優化損失函數。
-
?日志記錄與評估:
logger.info("epoch average loss: %f" % np.mean(train_loss))
:記錄當前周期的平均損失值。evaluator.eval(epoch)
:調用評估器對模型進行評估,通常是在驗證集上計算準確率或其他指標。
model.train():將模型設置為訓練模式。在訓練模式下,模型會啟用?Dropout
?和?Batch Normalization
?等層的行為,以確保模型在訓練時能夠正常工作
enumerate():Python 的內置函數,用于在遍歷可迭代對象時同時獲取索引和值
參數名 | 類型 | 描述 |
---|---|---|
iterable | 可迭代對象 | 要遍歷的對象(如列表、元組、字符串)。 |
start | int | 索引的起始值,默認為?0 。 |
optimizer.zero_grad():將優化器中所有參數的梯度清零,避免梯度累積
cuda():將張量或模型移動到 GPU 上,以利用 GPU 的并行計算能力加速計算
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU)。 |
append():在列表末尾添加一個新元素
參數名 | 類型 | 描述 |
---|---|---|
element | 任意類型 | 要添加到列表末尾的元素。 |
item():將包含單個元素的張量轉換為 Python 標量(如?int
?或?float
)
loss.backward():計算損失函數對模型參數的梯度,用于反向傳播
optimizer.step():?根據計算出的梯度更新模型參數
#訓練for epoch in range(config["epoch"]):epoch += 1model.train()logger.info("epoch %d begin" % epoch)train_loss = []for index, batch_data in enumerate(train_data):optimizer.zero_grad()if cuda_flag:batch_data = [d.cuda() for d in batch_data]input_id1, input_id2, labels = batch_data #輸入變化時這里需要修改,比如多輸入,多輸出的情況loss = model(input_id1, input_id2, labels)train_loss.append(loss.item())# if index % int(len(train_data) / 2) == 0:# logger.info("batch loss %f" % loss)loss.backward()optimizer.step()logger.info("epoch average loss: %f" % np.mean(train_loss))evaluator.eval(epoch)
⑤ 保存模型
os.path.join():用于將多個路徑片段拼接成一個完整的路徑,并自動根據操作系統選擇正確的路徑分隔符(如 Windows 使用?\
,Linux 和 macOS 使用?/
)
參數名 | 類型 | 描述 |
---|---|---|
path | str | 初始路徑片段。 |
*paths | str | 需要拼接的后續路徑片段,可以接受任意數量的參數。 |
torch.save():用于將 PyTorch 對象(如模型、張量、字典等)保存到磁盤文件中,通常用于保存模型的權重或訓練狀態
參數名 | 類型 | 描述 |
---|---|---|
obj | 任意對象 | 需要保存的對象,如模型、張量、字典等。 |
f | str 或文件對象 | 保存的目標文件路徑或文件對象。 |
model.state_dict():?返回一個包含模型所有可學習參數(如權重和偏置)的有序字典,通常用于保存或加載模型參數
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)torch.save(model.state_dict(), model_path)
模型訓練主程序?
# -*- coding: utf-8 -*-import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SiameseNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_datalogging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)"""
模型訓練主程序
"""def main(config):#創建保存模型的目錄if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])#加載訓練數據train_data = load_data(config["train_data_path"], config)#加載模型model = SiameseNetwork(config)# 標識是否使用gpucuda_flag = torch.cuda.is_available()if cuda_flag:logger.info("gpu可以使用,遷移模型至gpu")model = model.cuda()#加載優化器optimizer = choose_optimizer(config, model)#加載效果測試類evaluator = Evaluator(config, model, logger)#訓練for epoch in range(config["epoch"]):epoch += 1model.train()logger.info("epoch %d begin" % epoch)train_loss = []for index, batch_data in enumerate(train_data):optimizer.zero_grad()if cuda_flag:batch_data = [d.cuda() for d in batch_data]input_id1, input_id2, labels = batch_data #輸入變化時這里需要修改,比如多輸入,多輸出的情況loss = model(input_id1, input_id2, labels)train_loss.append(loss.item())# if index % int(len(train_data) / 2) == 0:# logger.info("batch loss %f" % loss)loss.backward()optimizer.step()logger.info("epoch average loss: %f" % np.mean(train_loss))evaluator.eval(epoch)model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)torch.save(model.state_dict(), model_path)returnif __name__ == "__main__":main(Config)
Ⅵ、預測文件 predict.py
① 初始化預測器對象
- ?模型初始化:
self.config = config
:將傳入的配置信息保存到類屬性中。self.model = model
:將傳入的模型保存到類屬性中。self.train_data = knwb_data
:將知識庫數據保存到類屬性中。
- ?設備選擇:
if torch.cuda.is_available(): self.model = model.cuda()
:如果 GPU 可用,則將模型移動到 GPU 上。else: self.model = model.cpu()
:如果 GPU 不可用,則將模型移動到 CPU 上。
- ?模型模式設置:
self.model.eval()
:將模型設置為評估模式,禁用 Dropout 和 Batch Normalization 等訓練特定行為。
- ?知識庫向量化:
self.knwb_to_vector()
:調用?knwb_to_vector
?方法,將知識庫數據轉換為向量形式,通常用于后續的相似度計算或檢索。
torch.cuda.is_available():檢查當前系統是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已編譯為支持 CUDA。如果返回?True
,則表示可以使用 GPU 加速計算;如果返回?False
,則表示只能使用 CPU。
model.cuda():將模型從 CPU 移動到 GPU 上,以便利用 GPU 的并行計算能力加速訓練和推理。
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU) |
model.cpu():?將模型從 GPU 移動回 CPU 上
model.eval():將模型設置為評估模式,禁用?Dropout
?和?Batch Normalization
?等層的隨機行為。
class Predictor:def __init__(self, config, model, knwb_data):self.config = configself.model = modelself.train_data = knwb_dataif torch.cuda.is_available():self.model = model.cuda()else:self.model = model.cpu()self.model.eval()self.knwb_to_vector()
② 將知識庫中的問題向量化
-
?初始化映射和列表:
self.question_index_to_standard_question_index = {}
:創建一個空字典,用于記錄問題編號到標準問題編號的映射。self.question_ids = []
:創建一個空列表,用于存儲問題編號。
-
?加載詞匯表和模式:
self.vocab = self.train_data.dataset.vocab
:從訓練數據集中加載詞匯表。self.schema = self.train_data.dataset.schema
:從訓練數據集中加載模式。self.index_to_standard_question = dict((y, x) for x, y in self.schema.items())
:創建一個字典,將索引映射到標準問題。
-
?記錄問題編號到標準問題編號的映射:
- 遍歷知識庫中的每個標準問題及其對應的問題編號。
- 對于每個問題編號,記錄其到標準問題編號的映射,并將其添加到?
self.question_ids
?列表中。
-
?問題向量化:
with torch.no_grad():
:禁用梯度計算,因為這是推理階段,不需要更新模型參數。question_matrixs = torch.stack(self.question_ids, dim=0)
:將問題編號列表堆疊成一個矩陣。if torch.cuda.is_available(): question_matrixs = question_matrixs.cuda()
:如果 GPU 可用,則將矩陣移動到 GPU 上。self.knwb_vectors = self.model(question_matrixs)
:通過模型將問題矩陣轉換為向量。self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)
:對所有向量進行歸一化處理,使得每個向量的長度為 1。
dict():用于創建字典
參數名 | 類型 | 描述 |
---|---|---|
**kwargs | 關鍵字參數 | 關鍵字參數會被視為字典中的鍵值對。 |
mapping | 映射對象 | 映射對象中的元素會被復制到新創建的字典中。 |
iterable | 可迭代對象 | 可迭代對象的元素通常是長度為二的元組,每個元組的第一個元素作為鍵,第二個元素作為值。 |
items():返回字典中所有鍵值對的視圖
列表.append():在列表末尾添加一個元素
參數名 | 類型 | 描述 |
---|---|---|
item | 任意對象 | 要添加到列表末尾的元素。 |
torch.no_grad():禁用梯度計算,適用于模型評估或推理階段
torch.stack():沿指定維度連接張量序列,所有張量必須具有相同的大小
參數名 | 類型 | 描述 |
---|---|---|
tensors | 張量序列 | 需要連接的張量序列。 |
dim | int | 指定連接的維度,必須在 0 和所需連接的張量維度之間。 |
out | Tensor | 輸出張量,默認為?None 。 |
torch.cuda.is_available():檢查當前系統是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已編譯為支持 CUDA。如果返回?True
,則表示可以使用 GPU 加速計算;如果返回?False
,則表示只能使用 CPU。
cuda():將張量從 CPU 移動到 GPU 上,以便利用 GPU 的并行計算能力加速訓練和推理。
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU) |
torch.nn.functional.normalize():?對輸入張量沿指定維度進行歸一化
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
p | float | 計算范數的類型,默認為 2(L2 范數)。 |
dim | int | 計算范數的維度。 |
eps | float | 防止分母為 0 的小數,默認為?1e-12 。 |
out | Tensor | 輸出張量,默認為?None 。 |
#將知識庫中的問題向量化,為匹配做準備#每輪訓練的模型參數不一樣,生成的向量也不一樣,所以需要每輪測試都重新進行向量化def knwb_to_vector(self):self.question_index_to_standard_question_index = {}self.question_ids = []self.vocab = self.train_data.dataset.vocabself.schema = self.train_data.dataset.schemaself.index_to_standard_question = dict((y, x) for x, y in self.schema.items())for standard_question_index, question_ids in self.train_data.dataset.knwb.items():for question_id in question_ids:#記錄問題編號到標準問題標號的映射,用來確認答案是否正確self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_indexself.question_ids.append(question_id)with torch.no_grad():question_matrixs = torch.stack(self.question_ids, dim=0)if torch.cuda.is_available():question_matrixs = question_matrixs.cuda()self.knwb_vectors = self.model(question_matrixs)#將所有向量都作歸一化 v / |v|self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)return
③ 文本編碼
- 根據配置中的?
vocab_path
?值,選擇不同的分詞方式:- 如果?
vocab_path
?為?"words.txt"
,則使用?jieba
?對文本進行分詞,并將每個分詞結果映射到詞匯表(self.vocab
)中的 ID。 - 否則,將文本按字符逐個映射到詞匯表中的 ID。
- 如果?
- 如果詞匯表中不存在某個詞或字符,則使用?
"[UNK]"
(未知詞)的 ID 作為默認值。
jieba.cut():是?jieba
?庫中的函數,用于對中文文本進行分詞。它支持多種分詞模式,包括精確模式、全模式和搜索引擎模式。
參數名 | 類型 | 描述 |
---|---|---|
sentence | str | 需要分詞的中文字符串。 |
cut_all | bool | 是否使用全模式,默認為?False (精確模式)。 |
HMM | bool | 是否使用隱馬爾可夫模型(HMM)進行新詞發現,默認為?True |
字典.get():Python 字典對象的方法,用于安全地獲取字典中指定鍵的值。如果鍵不存在,返回默認值(默認為?None
),而不會引發?KeyError
?異常。
參數名 | 類型 | 描述 |
---|---|---|
key | 任意類型 | 需要檢索的鍵。 |
default | 任意類型 | 如果鍵不存在時返回的默認值,默認為?None 。 |
列表.append():在列表末尾添加一個元素
參數名 | 類型 | 描述 |
---|---|---|
item | 任意對象 | 要添加到列表末尾的元素。 |
def encode_sentence(self, text):input_id = []if self.config["vocab_path"] == "words.txt":for word in jieba.cut(text):input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))else:for char in text:input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))return input_id
④ 匹配相似的標準問題
-
輸入處理:
input_id = self.encode_sentence(sentence)
:調用?encode_sentence
?方法將輸入的句子轉換為 ID 序列。input_id = torch.LongTensor([input_id])
:將 ID 序列轉換為 PyTorch 的?LongTensor
?類型,并增加一個維度以適配模型輸入。
-
?設備選擇:
if torch.cuda.is_available(): input_id = input_id.cuda()
:如果 GPU 可用,則將輸入數據移動到 GPU 上。
-
?模型推理:
with torch.no_grad():
:禁用梯度計算,因為這是推理階段,不需要更新模型參數。test_question_vector = self.model(input_id)
:將輸入數據傳遞給模型,生成句子的向量表示。res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)
:計算輸入句子向量與知識庫中所有問題向量的相似度(通過矩陣乘法)。hit_index = int(torch.argmax(res.squeeze()))
:找到相似度最高的知識庫問題的索引。hit_index = self.question_index_to_standard_question_index[hit_index]
:將命中問題的索引轉換為標準問題的索引。
torch.LongTensor():?PyTorch 中的一個函數,用于創建一個包含整數(64 位整型)的張量。它是一種特定的張量類型,通常用于存儲索引、標簽或其他整數值數據。
參數名 | 類型 | 描述 |
---|---|---|
data | 列表、元組或數組 | 輸入數據,用于創建張量。 |
dtype | torch.dtype | 張量的數據類型,默認為?torch.int64 (64 位整數)。 |
device | torch.device | 張量存儲的設備(如 CPU 或 GPU),默認為?None (使用默認設備)。 |
requires_grad | bool | 是否需要計算梯度,默認為?False 。 |
torch.cuda.is_available():檢查當前系統是否支持 CUDA(即是否有可用的 NVIDIA GPU),并且 PyTorch 是否已編譯為支持 CUDA。如果返回?True
,則表示可以使用 GPU 加速計算;如果返回?False
,則表示只能使用 CPU。
cuda():將張量從 CPU 移動到 GPU 上,以便利用 GPU 的并行計算能力加速訓練和推理。
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU) |
torch.no_grad():禁用梯度計算,適用于模型評估或推理階段?
squeeze():是 PyTorch 和 NumPy 中的函數,用于移除張量或數組中大小為 1 的維度。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int | 可選參數,指定要移除的維度。 |
unsqueeze():?是 PyTorch 中的函數,用于在指定維度上插入一個大小為 1 的維度,從而改變張量的形狀。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int | 要插入新維度的位置,范圍是?[-input.dim()-1, input.dim()] 。 |
torch.argmax():PyTorch 中的函數,用于返回張量中最大值所在的索引。
參數名 | 類型 | 描述 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int | 指定沿哪個維度計算最大值索引。 |
keepdim | bool | 是否保持輸出張量的維度,默認為?False 。 |
def predict(self, sentence):input_id = self.encode_sentence(sentence)input_id = torch.LongTensor([input_id])if torch.cuda.is_available():input_id = input_id.cuda()with torch.no_grad():test_question_vector = self.model(input_id) #不輸入labels,使用模型當前參數進行預測res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)hit_index = int(torch.argmax(res.squeeze())) #命中問題標號hit_index = self.question_index_to_standard_question_index[hit_index] #轉化成標準問編號 return self.index_to_standard_question[hit_index]
⑤ 實現與用戶的交互問答
torch.load():PyTorch 中用于加載由?torch.save()
?保存的模型、張量或其他對象的函數。它可以將保存在文件中的數據加載到程序中,以便進行推理、繼續訓練或其他操作
參數名 | 類型/值 | 說明 |
---|---|---|
f | 類文件對象或字符串 | 類文件對象(必須實現?read() 、readline() 、tell() ?和?seek() ),或包含文件名的字符串 |
map_location | 函數、torch.device 、字符串或字典 | 指定如何重新映射存儲位置。例如,將 GPU 保存的模型加載到 CPU 上 |
pickle_module | 模塊(默認:pickle ) | 用于反序列化的模塊,必須與序列化時使用的模塊匹配。 |
**pickle_load_args | 可選關鍵字參數 | 傳遞給?
的可選參數 |
input():Python 內置函數,用于從標準輸入(通常是鍵盤)讀取用戶輸入的數據。它會將用戶輸入的內容作為字符串返回。
參數名 | 類型 | 說明 |
---|---|---|
prompt | 字符串 | 可選參數,用于在等待用戶輸入時顯示的提示信息。如果未提供,則不顯示任何提示。 |
# -*- coding: utf-8 -*-
import torch
from loader import load_data
from config import Config
from model import SiameseNetwork, choose_optimizer"""
模型效果測試
"""class Predictor:def __init__(self, config, model, knwb_data):self.config = configself.model = modelself.train_data = knwb_dataif torch.cuda.is_available():self.model = model.cuda()else:self.model = model.cpu()self.model.eval()self.knwb_to_vector()#將知識庫中的問題向量化,為匹配做準備#每輪訓練的模型參數不一樣,生成的向量也不一樣,所以需要每輪測試都重新進行向量化def knwb_to_vector(self):self.question_index_to_standard_question_index = {}self.question_ids = []self.vocab = self.train_data.dataset.vocabself.schema = self.train_data.dataset.schemaself.index_to_standard_question = dict((y, x) for x, y in self.schema.items())for standard_question_index, question_ids in self.train_data.dataset.knwb.items():for question_id in question_ids:#記錄問題編號到標準問題標號的映射,用來確認答案是否正確self.question_index_to_standard_question_index[len(self.question_ids)] = standard_question_indexself.question_ids.append(question_id)with torch.no_grad():question_matrixs = torch.stack(self.question_ids, dim=0)if torch.cuda.is_available():question_matrixs = question_matrixs.cuda()self.knwb_vectors = self.model(question_matrixs)#將所有向量都作歸一化 v / |v|self.knwb_vectors = torch.nn.functional.normalize(self.knwb_vectors, dim=-1)returndef encode_sentence(self, text):input_id = []if self.config["vocab_path"] == "words.txt":for word in jieba.cut(text):input_id.append(self.vocab.get(word, self.vocab["[UNK]"]))else:for char in text:input_id.append(self.vocab.get(char, self.vocab["[UNK]"]))return input_iddef predict(self, sentence):input_id = self.encode_sentence(sentence)input_id = torch.LongTensor([input_id])if torch.cuda.is_available():input_id = input_id.cuda()with torch.no_grad():test_question_vector = self.model(input_id) #不輸入labels,使用模型當前參數進行預測res = torch.mm(test_question_vector.unsqueeze(0), self.knwb_vectors.T)hit_index = int(torch.argmax(res.squeeze())) #命中問題標號hit_index = self.question_index_to_standard_question_index[hit_index] #轉化成標準問編號 return self.index_to_standard_question[hit_index]if __name__ == "__main__":knwb_data = load_data(Config["train_data_path"], Config)model = SiameseNetwork(Config)model.load_state_dict(torch.load("model_output/epoch_10.pth"))pd = Predictor(Config, model, knwb_data)while True:# sentence = "固定寬帶服務密碼修改"sentence = input("請輸入問題:")res = pd.predict(sentence)print(res)
二、實現方式 ② 交互型文本匹配
同時輸入兩句話,對兩句話之間進行分割,然后同時送入這個模型,然后過模型后取出【CLS】token 位置對應的向量送入這個模型,過模型后取出【CLS】token 位置對應的向量做一個二分類
Bert中進行匹配這兩句話判斷是否是上下文關系,而文本匹配模型中,將數據換為文本匹配的數據,最終匹配兩句話的語義相似度
表示型文本匹配:兩句話分別單獨進入這個模型,兩句話分別編碼,在編碼的過程中互不影響
交互式文本匹配:兩句話一同送入這個模型,計算交互時與self-attention計算類似,對比可以看到另一句話的信息,對比發現兩句話的主要區別,例:今天下雨了? ? ? ? 今天下雪了
representation layer:表示層輸出是否匹配,二分類任務
interaction layer:交互層進行信息融合,常以attention的方式
Embedding layer:嵌入層,權重共享
將兩句話同時輸入這個模型,讓模型看到,讓模型進行對比發現文本的重點
1.交互型文本匹配 —— 代碼示例🚀
Ⅰ、配置文件 config.py
① 路徑相關參數
model_path:指定訓練后模型的保存路徑。若加載預訓練模型,需提供模型文件的具體存儲位置
schema_path:定義數據結構的配置文件路徑,通常用于數據預處理或驗證輸入格式(如JSON/YAML文件)
train_data_path:指向訓練集的數據文件或目錄。若為目錄,需包含多個訓練文件(如文本、圖像等)
valid_data_path:指向驗證集的數據文件或目錄。若為目錄,需包含多個訓練文件(如文本、圖像等)
vocab_path:詞表文件路徑,用于自然語言處理任務中定義詞匯的映射關系(如將單詞轉為ID)
② 模型結構參數?
max_length:序列數據的最大長度(如文本的單詞數)。超過此長度的序列會被截斷或填充
hidden_size:神經網絡隱藏層的維度大小,影響模型的表達能力。例如,LSTM或Transformer中每層的神經元數量
③ 訓練控制參數
epoch:訓練過程中遍歷整個數據集的次數。適當增加輪次可提升模型性能,但可能過擬合
batch_size:每次輸入模型的樣本數量。較大的批次可加速訓練,但需更多顯存
epoch_data_size:每輪訓練中采樣的數量
positive_sample_rate:正樣本在批次中的占比,常用于不平衡數據任務(如分類)。需結合負樣本比例調整
④ 優化和學習參數
optimizer:選擇優化算法(如SGD、Adam、AdamW),影響參數更新策略
learning_rate:控制參數更新的步長。學習率過高可能導致震蕩,過低則收斂緩慢
# -*- coding: utf-8 -*-"""
配置參數信息
"""Config = {"model_path": "model_output","schema_path": "../data/schema.json","train_data_path": "../data/train.json","valid_data_path": "../data/valid.json","pretrain_model_path":r"F:\人工智能NLP\NLP資料\week6 語言模型\bert-base-chinese","vocab_path":r"F:\人工智能NLP\NLP資料\week6 語言模型\bert-base-chinese\vocab.txt","max_length": 20,"hidden_size": 256,"epoch": 10,"batch_size": 128,"epoch_data_size": 10000, #每輪訓練中采樣數量"positive_sample_rate":0.5, #正樣本比例"optimizer": "adam","learning_rate": 1e-3,
}
Ⅱ、數據加載 loader.py
① 設置日志級別
logging.getLogger():Python 中?logging
?模塊的核心函數,用于獲取或創建一個日志記錄器(Logger)實例。每個日志記錄器都有一個唯一的名稱,用于標識和配置日志記錄行為。如果未提供名稱,則返回根記錄器
參數名 | 類型 | 說明 |
---|---|---|
name | 字符串 | 可選參數,指定日志記錄器的名稱。如果未提供或為?None ,則返回根記錄器。 |
setLevel():Logger
?對象的方法,用于設置日志記錄器的日志級別。只有等于或高于該級別的日志消息才會被處理,低于該級別的日志消息將被忽略
參數名 | 類型 | 說明 |
---|---|---|
level | 整數 | 指定日志級別,常用值包括?logging.DEBUG 、logging.INFO 、logging.WARNING 、logging.ERROR ?和?logging.CRITICAL |
logging.getLogger("transformers").setLevel(logging.ERROR)
② ?初始化 def __init__()
self.config = config
:將傳入的配置字典保存到實例變量中。
self.path = data_path
:將數據路徑保存到實例變量中。
self.tokenizer = load_vocab(config["vocab_path"])
:加載詞匯表文件,初始化分詞器。config["vocab_path"]
?是詞匯表文件的路徑。
self.config["vocab_size"] = len(self.tokenizer.vocab)
:計算詞匯表的大小,并將其更新到配置字典中。
self.schema = load_schema(config["schema_path"])
:加載模式文件(如數據格式定義),config["schema_path"]
?是模式文件的路徑。
self.train_data_size = config["epoch_data_size"]
:設置每個 epoch 的采樣數據量。由于采用隨機采樣,需要指定一個固定的采樣數量,否則可以一直采樣self.max_length = config["max_length"]
**:設置輸入序列的最大長度。
self.data_type = None
:用于標識加載的數據類型是訓練集還是測試集,初始值為?None
,后續可以通過方法設置為?"train"
?或?"test"
。
self.load()
:調用?load
?方法,加載數據或執行其他初始化操作。
def __init__(self, data_path, config):self.config = configself.path = data_pathself.tokenizer = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.tokenizer.vocab)self.schema = load_schema(config["schema_path"])self.train_data_size = config["epoch_data_size"] #由于采取隨機采樣,所以需要設定一個采樣數量,否則可以一直采self.max_length = config["max_length"]self.data_type = None #用來標識加載的是訓練集還是測試集 "train" or "test"self.load()
③? 從指定路徑文件加載數據
- 打開文件(
self.path
),逐行讀取并解析為 JSON 對象。 - 根據數據的類型(字典或列表)進行分類:
- ?訓練集:如果數據是字典類型,提取?
questions
?和?target
,將問題存儲到?self.knwb
?中,鍵為標簽(通過?self.schema
?映射)。 - ?測試集:如果數據是列表類型,提取問題和標簽,將標簽轉換為?
torch.LongTensor
?并存儲到?self.data
?中。
- ?訓練集:如果數據是字典類型,提取?
defaultdict():是?collections
?模塊中的一個類,繼承自?dict
。它允許在訪問不存在的鍵時返回一個默認值,而不是拋出?KeyError
?異常。
參數名 | 類型 | 說明 |
---|---|---|
default_factory | 可調用對象 | 指定默認值的生成函數,如?list 、int 、set ?等。如果未提供,默認為?None 。 |
json.loads():將 JSON 格式的字符串解析為 Python 對象(如字典、列表等)。
參數名 | 類型 | 說明 |
---|---|---|
s | 字符串 | 要解析的 JSON 字符串。 |
encoding | 字符串 | 字符編碼(Python 3 中已棄用)。 |
cls | 類 | 自定義解碼類,默認為?None 。 |
object_hook | 函數 | 用于自定義 JSON 對象轉換為 Python 對象的函數。 |
parse_float | 函數 | 自定義將 JSON 中的浮點數轉換為特定類型的函數。 |
parse_int | 函數 | 自定義將 JSON 中的整數轉換為特定類型的函數。 |
parse_constant | 函數 | 自定義將 JSON 中的常量(如?Infinity )轉換為特定類型的函數。 |
object_pairs_hook | 函數 | 用于處理 JSON 對象中的鍵值對的函數,默認返回字典。 |
isinstance():檢查一個對象是否是指定類型或其子類的實例。
參數名 | 類型 | 說明 |
---|---|---|
object | 對象 | 需要檢查的對象。 |
classinfo | 類型或元組 | 可以是一個類型(如?int 、str ?等),也可以是這些類型的元組。 |
append():在列表的末尾添加一個元素。
參數名 | 類型 | 說明 |
---|---|---|
element | 任意類型 | 要添加到列表末尾的元素。 |
assert:用于調試,檢查一個條件是否為真。如果條件為假,則拋出?AssertionError
?異常。
參數名 | 類型 | 說明 |
---|---|---|
condition | 布爾表達式 | 需要檢查的條件。 |
message | 字符串 | 可選參數,當條件為假時拋出的錯誤信息。 |
torch.LongTensor():創建一個包含整數的張量(Tensor),元素類型為 64 位整數
參數名 | 類型 | 說明 |
---|---|---|
data | 列表或數組 | 用于初始化張量的數據。 |
dtype | 類型 | 指定張量的數據類型,默認為?torch.long 。 |
device | 設備 | 指定張量存儲的設備(如?cpu ?或?gpu ),默認為?cpu 。 |
requires_grad | 布爾值 | 指定是否需要計算張量的梯度,默認為?False 。 |
def load(self):self.data = []self.knwb = defaultdict(list)with open(self.path, encoding="utf8") as f:for line in f:line = json.loads(line)#加載訓練集if isinstance(line, dict):self.data_type = "train"questions = line["questions"]label = line["target"]for question in questions:self.knwb[self.schema[label]].append(question)#加載測試集else:self.data_type = "test"assert isinstance(line, list)question, label = linelabel_index = torch.LongTensor([self.schema[label]])self.data.append([question, label_index])return
④? 拼接兩句話轉成向量
-
拼接與編碼:
- 使用?
self.tokenizer.encode
?方法對?text1
?和?text2
?進行拼接和編碼。 - 拼接后的文本會被轉換為模型可接受的輸入格式(如?
input_ids
)。
- 使用?
-
?參數說明:
truncation='longest_first'
:如果拼接后的文本長度超過?max_length
,則從較長的部分開始截斷。max_length=self.max_length
:指定編碼后的最大長度,超過的部分會被截斷。padding='max_length'
:如果拼接后的文本長度不足?max_length
,則用填充符(如?[PAD]
)補齊。
-
?返回結果:
- 返回編碼后的?
input_id
,通常是一個包含 token ID 的列表或張量。
- 返回編碼后的?
tokenizer.encode():transformers
?庫中?Tokenizer
?類的一個方法,用于將文本轉換為模型可接受的 token ID 序列。它通常用于自然語言處理任務中,將輸入文本編碼為模型輸入格式。
參數名 | 類型 | 說明 |
---|---|---|
text | str ?或?List[str] | 需要編碼的文本,可以是單個字符串或字符串列表。 |
text_pair | str ?或?List[str] | 可選參數,第二個文本(如句子對任務中的第二句話)。 |
add_special_tokens | bool | 是否添加特殊標記(如?[CLS] ?和?[SEP] ),默認為?True 。 |
max_length | int | 指定編碼后的最大長度,超過的部分會被截斷。 |
truncation | bool ?或?str | 指定截斷策略,如?'longest_first' 、'only_first' ?或?'only_second' 。 |
padding | bool ?或?str | 指定填充策略,如?'max_length' ?或?'longest' 。 |
return_tensors | str | 指定返回的 tensor 類型,如?'pt' (PyTorch)或?'tf' (TensorFlow)。 |
return_token_type_ids | bool | 是否返回?token_type_ids (用于區分句子對任務中的兩個句子)。 |
return_attention_mask | bool | 是否返回?attention_mask (用于標識有效 token)。 |
#每次加載兩個文本,輸出他們的拼接后編碼def encode_sentence(self, text1, text2):input_id = self.tokenizer.encode(text1, text2,truncation='longest_first',max_length=self.max_length,padding='max_length',)return input_id
⑤ 返回數據集的長度 def __len__()
該函數的作用是返回對象的“長度”,具體行為取決于對象的?data_type
?屬性。
如果?data_type
?為?"train"
,則返回?self.config["epoch_data_size"]
?的值,這通常表示訓練數據的規模。
如果?data_type
?為?"test"
,則返回?self.data
?的長度,即測試數據的元素個數。
如果?data_type
?不是?"train"
?或?"test"
,則會觸發?AssertionError
,并提示?self.data_type
?的值。
assert:Python 中的調試工具,用于檢查某個條件是否為真。如果條件為假,assert
?會拋出?AssertionError
?異常。?
len():?Python 內置函數,用于返回對象的長度或元素數量。它適用于多種數據類型,包括字符串、列表、元組、字典、集合等。
參數名 | 類型 | 描述 |
---|---|---|
obj | 可迭代對象 | 需要計算長度的對象,如字符串、列表、元組、字典、集合等。 |
def __len__(self):if self.data_type == "train":return self.config["epoch_data_size"]else:assert self.data_type == "test", self.data_typereturn len(self.data)
⑥ 通過索引訪問數據集中的樣本 def __getitem__()
判斷數據類型:?如果是訓練數據集,則調用self.random_train_sample()
方法來隨機生成一個訓練樣本。這是因為訓練數據集中的樣本是通過隨機采樣生成的,而不是固定的數據。
如果不是訓練數據集,則認為是測試數據集:直接返回self.data
列表中索引為index
的樣本。
def __getitem__(self, index):if self.data_type == "train":return self.random_train_sample() #隨機生成一個訓練樣本else:return self.data[index]
⑦ 隨機生成訓練樣本 ?
?正樣本生成:從所有標準問題中隨機選擇一個標準問題?p
。如果該標準問題下至少有兩個問題,則從中隨機選取兩個問題?s1
?和?s2
。使用?encode_sentence
?方法將?s1
?和?s2
?編碼為模型輸入格式,并返回編碼結果及標簽?1
(表示正樣本)。如果標準問題下不足兩個問題,則重新隨機選擇。
負樣本生成:從所有標準問題中隨機選擇兩個不同的標準問題?p
?和?n
。分別從?p
?和?n
?中隨機選取一個問題?s1
?和?s2
。使用?encode_sentence
?方法將?s1
?和?s2
?編碼為模型輸入格式,并返回編碼結果及標簽?0
(表示負樣本)。
?概率控制:使用?random.random()
?和?self.config["positive_sample_rate"]
?控制生成正樣本的概率。
list():將可迭代對象(如字符串、元組、集合等)轉換為列表。
參數名 | 類型 | 說明 |
---|---|---|
iterable | 可迭代對象 | 需要轉換為列表的對象,如字符串、元組、集合等。如果未提供,則返回空列表。 |
字典.keys():返回字典中所有鍵的視圖對象(dict_keys
),可以轉換為列表
random.random():生成一個范圍在?[0.0, 1.0)
?之間的隨機浮點數。
random.choice():從非空序列(如列表、元組、字符串等)中隨機選擇一個元素。
參數名 | 類型 | 說明 |
---|---|---|
seq | 序列 | 非空序列,如列表、元組、字符串等。 |
random.sample():從序列中隨機選擇指定數量的唯一元素,返回一個新列表。
參數名 | 類型 | 說明 |
---|---|---|
population | 序列 | 需要從中選擇的序列,如列表、元組、字符串等。 |
k | 整數 | 需要選擇的元素數量。 |
torch.LongTensor():創建一個包含 64 位整數的張量(Tensor)
參數名 | 類型 | 說明 |
---|---|---|
data | 列表或數組 | 用于初始化張量的數據,如列表或數組。 |
#依照一定概率生成負樣本或正樣本#負樣本從隨機兩個不同的標準問題中各隨機選取一個#正樣本從隨機一個標準問題中隨機選取兩個def random_train_sample(self):standard_question_index = list(self.knwb.keys())#隨機正樣本if random.random() <= self.config["positive_sample_rate"]:p = random.choice(standard_question_index)#如果選取到的標準問下不足兩個問題,則無法選取,所以重新隨機一次if len(self.knwb[p]) < 2:return self.random_train_sample()else:s1, s2 = random.sample(self.knwb[p], 2)input_ids = self.encode_sentence(s1, s2)input_ids = torch.LongTensor(input_ids)return [input_ids, torch.LongTensor([1])]#隨機負樣本else:p, n = random.sample(standard_question_index, 2)s1 = random.choice(self.knwb[p])s2 = random.choice(self.knwb[n])input_ids = self.encode_sentence(s1, s2)input_ids = torch.LongTensor(input_ids)return [input_ids, torch.LongTensor([0])]
⑧ 數據處理
加載詞匯表:
?????????加載詞匯表:使用?vocab_path
?參數指定詞匯表文件的路徑。通過?BertTokenizer
?類加載詞匯表,并初始化分詞器。?
????????返回分詞器:返回初始化好的?BertTokenizer
?分詞器,用于后續的文本編碼和分詞操作。
#加載字表或詞表
def load_vocab(vocab_path):tokenizer = BertTokenizer(vocab_path)return tokenizer
加載schema:
?????????加載模式文件:使用?schema_path
?參數指定模式文件的路徑。打開文件并讀取其內容。
????????解析 JSON 數據:使用?json.loads
?函數將文件內容解析為 Python 對象(通常是字典)。
????????返回解析結果:返回解析后的 Python 對象,用于后續的處理或配置。
open():用于打開文件,并返回一個文件對象,以便進行讀取或寫入操作。
參數名 | 類型 | 說明 |
---|---|---|
file | 字符串 | 文件路徑,可以是相對路徑或絕對路徑。 |
mode | 字符串 | 打開文件的模式,如?'r' (只讀)、'w' (只寫)、'a' (追加)等。 |
encoding | 字符串 | 文件編碼格式,如?'utf-8' 。 |
errors | 字符串 | 指定編碼錯誤處理方式,如?'ignore' ?或?'strict' 。 |
newline | 字符串 | 控制換行符的行為,如?None (默認)或?'\n' 。 |
closefd | 布爾值 | 是否在關閉文件時關閉文件描述符,默認為?True 。 |
opener | 函數 | 自定義文件打開器。 |
json.loads():將 JSON 格式的字符串解析為 Python 對象(如字典、列表等)。
參數名 | 類型 | 說明 |
---|---|---|
s | 字符串 | 要解析的 JSON 字符串。 |
encoding | 字符串 | 字符編碼(Python 3 中已棄用)。 |
cls | 類 | 自定義解碼類,默認為?None 。 |
object_hook | 函數 | 自定義將 JSON 對象轉換為其他類型的 Python 對象。 |
parse_float | 函數 | 自定義將 JSON 中的浮點數轉換為特定類型。 |
parse_int | 函數 | 自定義將 JSON 中的整數轉換為特定類型。 |
文件對象.read():?從文件中讀取指定數量的字節或整個文件內容。
參數名 | 類型 | 說明 |
---|---|---|
size | 整數 | 要讀取的字節數。如果未指定或為負數,則讀取整個文件。 |
#加載schema
def load_schema(schema_path):with open(schema_path, encoding="utf8") as f:return json.loads(f.read())
封裝數據加載器:
????????創建數據生成器:使用?DataGenerator
?類(假設已定義)從?data_path
?和?config
?中加載數據,并生成數據集對象?dg
。?
? ? ? ? 封裝為 DataLoader:使用?DataLoader
?將?dg
?封裝為可迭代的數據加載器?dl
。batch_size
?從?config
?中獲取,用于指定每個批次的樣本數量。shuffle
?參數控制是否在每個訓練周期開始時打亂數據順序。????????
?????????返回 DataLoader:返回封裝好的?DataLoader
?對象?dl
,用于后續的訓練或驗證。
DataLoader():PyTorch 中的一個標準工具,用于高效地加載和處理數據。它支持批量加載、數據打亂、多線程加載等功能,是深度學習訓練中常用的數據加載器。
參數名 | 類型 | 描述 |
---|---|---|
dataset | 數據集對象 | 要加載的數據集,必須實現?__len__ ?和?__getitem__ ?方法。 |
batch_size | 整數 | 每個批次的大小,默認為?1 。 |
shuffle | 布爾值 | 是否在每個 epoch 開始時打亂數據順序,默認為?False 。 |
sampler | 采樣器對象 | 自定義采樣器,用于控制數據的采樣方式,默認為?None 。 |
batch_sampler | 批次采樣器對象 | 自定義批次采樣器,用于控制批次的采樣方式,默認為?None 。 |
num_workers | 整數 | 用于數據加載的子進程數量,默認為?0 (主進程加載)。 |
collate_fn | 函數 | 用于將一個批次的數據合并成一個張量或元組,默認為?None 。 |
pin_memory | 布爾值 | 是否將數據存儲在 pin memory 中(用于 GPU 加速),默認為?False 。 |
drop_last | 布爾值 | 如果數據不能完全分成批次,是否刪除最后一個不完整的批次,默認為?False 。 |
timeout | 整數 | 數據加載的最大等待時間(秒),默認為?0 (無限制)。 |
worker_init_fn | 函數 | 用于初始化每個數據加載器子進程的函數,默認為?None 。 |
#用torch自帶的DataLoader類封裝數據
def load_data(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)return dl
⑨?加載數據文件
# -*- coding: utf-8 -*-import json
import re
import os
import torch
import random
import logging
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
from transformers import BertTokenizer
"""
數據加載
"""logging.getLogger("transformers").setLevel(logging.ERROR)class DataGenerator:def __init__(self, data_path, config):self.config = configself.path = data_pathself.tokenizer = load_vocab(config["vocab_path"])self.config["vocab_size"] = len(self.tokenizer.vocab)self.schema = load_schema(config["schema_path"])self.train_data_size = config["epoch_data_size"] #由于采取隨機采樣,所以需要設定一個采樣數量,否則可以一直采self.max_length = config["max_length"]self.data_type = None #用來標識加載的是訓練集還是測試集 "train" or "test"self.load()def load(self):self.data = []self.knwb = defaultdict(list)with open(self.path, encoding="utf8") as f:for line in f:line = json.loads(line)#加載訓練集if isinstance(line, dict):self.data_type = "train"questions = line["questions"]label = line["target"]for question in questions:self.knwb[self.schema[label]].append(question)#加載測試集else:self.data_type = "test"assert isinstance(line, list)question, label = linelabel_index = torch.LongTensor([self.schema[label]])self.data.append([question, label_index])return#每次加載兩個文本,輸出他們的拼接后編碼def encode_sentence(self, text1, text2):input_id = self.tokenizer.encode(text1, text2,truncation='longest_first',max_length=self.max_length,padding='max_length',)return input_iddef __len__(self):if self.data_type == "train":return self.config["epoch_data_size"]else:assert self.data_type == "test", self.data_typereturn len(self.data)def __getitem__(self, index):if self.data_type == "train":return self.random_train_sample() #隨機生成一個訓練樣本else:return self.data[index]#依照一定概率生成負樣本或正樣本#負樣本從隨機兩個不同的標準問題中各隨機選取一個#正樣本從隨機一個標準問題中隨機選取兩個def random_train_sample(self):standard_question_index = list(self.knwb.keys())#隨機正樣本if random.random() <= self.config["positive_sample_rate"]:p = random.choice(standard_question_index)#如果選取到的標準問下不足兩個問題,則無法選取,所以重新隨機一次if len(self.knwb[p]) < 2:return self.random_train_sample()else:s1, s2 = random.sample(self.knwb[p], 2)input_ids = self.encode_sentence(s1, s2)input_ids = torch.LongTensor(input_ids)return [input_ids, torch.LongTensor([1])]#隨機負樣本else:p, n = random.sample(standard_question_index, 2)s1 = random.choice(self.knwb[p])s2 = random.choice(self.knwb[n])input_ids = self.encode_sentence(s1, s2)input_ids = torch.LongTensor(input_ids)return [input_ids, torch.LongTensor([0])]#加載字表或詞表
def load_vocab(vocab_path):tokenizer = BertTokenizer(vocab_path)return tokenizer#加載schema
def load_schema(schema_path):with open(schema_path, encoding="utf8") as f:return json.loads(f.read())#用torch自帶的DataLoader類封裝數據
def load_data(data_path, config, shuffle=True):dg = DataGenerator(data_path, config)dl = DataLoader(dg, batch_size=config["batch_size"], shuffle=shuffle)return dlif __name__ == "__main__":from config import Configdg = DataGenerator("../data/valid.json", Config)
Ⅲ、模型定義?model.py
① 從輸入張量提取第一個元素并返回
class GetFirst(nn.Module):def __init__(self):super(GetFirst, self).__init__()def forward(self, x):return x[0]
② 文本匹配網絡初始化?
輸入:輸入是一個詞索引序列,形狀為?(batch_size, max_len)
,其中?max_len
?是序列的最大長度。?
嵌入層:將詞索引映射為稠密向量,輸出形狀為?(batch_size, max_len, hidden_size)
。
?LSTM 編碼器:雙向 LSTM 編碼輸入序列,輸出形狀為?(batch_size, max_len, hidden_size * 2)
。GetFirst()
?提取第一個時間步的隱藏狀態,輸出形狀為?(batch_size, hidden_size * 2)
。線性層和 ReLU 激活函數進一步處理,輸出形狀為?(batch_size, hidden_size)
。
分類層:將隱藏層的輸出映射為 2 維向量,輸出形狀為?(batch_size, 2)
。
損失計算:使用交叉熵損失函數計算模型預測值與真實標簽之間的損失。
nn.Embedding():將離散的整數(如單詞索引)映射到低維的稠密向量空間,常用于自然語言處理任務。
參數名 | 類型 | 說明 |
---|---|---|
num_embeddings | int | 嵌入字典的大小,即詞匯表的大小。 |
embedding_dim | int | 每個嵌入向量的維度。 |
padding_idx | int, optional | 指定一個索引,用于填充(padding),在計算梯度時不會被更新。 |
max_norm | float, optional | 如果設置,嵌入向量的范數會被裁剪到不超過該值。 |
norm_type | float, optional | 用于裁剪范數的類型,默認為 2(L2 范數)。 |
scale_grad_by_freq | bool, optional | 如果為 True,梯度會根據單詞在 mini-batch 中的頻率進行縮放。 |
sparse | bool, optional | 如果為 True,梯度將會是稀疏張量。 |
nn.Sequential():按順序組織多個神經網絡層或模塊,簡化模型定義。
參數名 | 類型 | 說明 |
---|---|---|
*args | Module | 按順序傳入多個模塊(如層或激活函數)。 |
nn.ReLU():修正線性單元激活函數,將負值置為 0,正值保持不變,用于引入非線性。
參數名 | 類型 | 說明 |
---|---|---|
inplace | bool, optional | 如果為 True,會直接修改輸入數據,節省內存。默認為 False。 |
nn.Linear():全連接層,對輸入進行線性變換(矩陣乘法和偏置加法)。
參數名 | 類型 | 說明 |
---|---|---|
in_features | int | 輸入特征維度。 |
out_features | int | 輸出特征維度。 |
bias | bool, optional | 是否添加偏置項,默認為 True。 |
nn.CrossEntropyLoss():用于多分類任務的損失函數,結合了nn.LogSoftmax()
?和?nn.NLLLoss()
參數名 | 類型 | 說明 |
---|---|---|
weight | Tensor, optional | 類別權重,用于處理類別不平衡問題。 |
ignore_index | int, optional | 指定一個索引,忽略該類別的損失計算。 |
reduction | str, optional | 指定損失計算的方式,如?'mean' 、'sum' ?或?'none' 。默認為?'mean' 。 |
def __init__(self, config):super(SentenceMatchNetwork, self).__init__()# 可以用bert,參考下面# pretrain_model_path = config["pretrain_model_path"]# self.bert_encoder = BertModel.from_pretrained(pretrain_model_path)# 常規的embedding + layerhidden_size = config["hidden_size"]#20000應為詞表大小,這里借用bert的詞表,沒有用它精確的數字,因為里面有很多無用詞,舍棄一部分,不影響效果self.embedding = nn.Embedding(20000, hidden_size)#一種多層按順序執行的寫法,具體的層可以換#unidirection:batch_size, max_len, hidden_size#bidirection:batch_size, max_len, hidden_size * 2self.encoder = nn.Sequential(nn.LSTM(hidden_size, hidden_size, bidirectional=True, batch_first=True),GetFirst(),nn.ReLU(),nn.Linear(hidden_size * 2, hidden_size), #batch_size, max_len, hidden_sizenn.ReLU(),)self.classify_layer = nn.Linear(hidden_size, 2)self.loss = nn.CrossEntropyLoss()
③? 計算句子間相似度
?輸入:接收?input_ids
(兩個句子的拼接編碼)和可選的?target
(標簽)。
嵌入層:將?input_ids
?轉換為稠密向量?x
。?
編碼器:通過 LSTM、ReLU 和線性層處理?x
,得到編碼后的輸出。
?池化:對編碼后的輸出進行最大池化,提取特征。?
分類層:將池化后的輸出映射為 2 維向量,表示匹配和不匹配的得分。
輸出:如果有?target
,計算交叉熵損失并返回。如果沒有?target
,返回兩句話匹配的概率。
squeeze():移除張量中所有大小為 1 的維度,或根據需要移除特定維度。例如,形狀為?(1,3,1,4)?的張量,經過?squeeze()
?后將變為?(3,4)。
參數名 | 類型 | 說明 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int, optional | 指定要移除的維度。如果未指定,則移除所有大小為 1 的維度。 |
torch.softmax():?將輸入張量的元素轉換為概率分布,適用于多分類任務。輸出的每個元素值在?(0,1)?之間,且所有元素的和為 1。
參數名 | 類型 | 說明 |
---|---|---|
input | Tensor | 輸入張量。 |
dim | int | 指定在哪個維度上計算 Softmax。例如,dim=1 ?表示對每一行計算 Softmax。 |
dtype | dtype, optional | 輸出張量的數據類型。 |
# 同時傳入兩個句子的拼接編碼# 輸出一個相似度預測,不匹配的概率def forward(self, input_ids, target=None):# x = self.bert_encoder(input_ids)[1]#input_ids = batch_size, max_lengthx = self.embedding(input_ids) #x:batch_size, max_length, embedding_sizex = self.encoder(x) ##x: batch_size, max_len, hidden_sizex = nn.MaxPool1d(x.shape[1])(x.transpose(1,2)).squeeze()#x: batch_size, hidden_sizex = self.classify_layer(x)#x: batch_size, 2#如果有標簽,則計算lossif target is not None:return self.loss(x, target.squeeze())#如果無標簽,預測相似度else:return torch.softmax(x, dim=-1)[:, 1] #如果改為x[:,0]則是兩句話不匹配的概率
④?根據配置選擇優化器
????????根據用戶在配置字典中指定的優化器類型("adam" 或 "sgd")來選擇并初始化相應的優化器,并設置學習率。
model.parameters():PyTorch 中?torch.nn.Module
?類的一個方法,用于獲取模型中所有可訓練參數的迭代器。這些參數通常是模型的權重和偏置,它們會在訓練過程中通過優化器進行更新。
def choose_optimizer(config, model):optimizer = config["optimizer"]learning_rate = config["learning_rate"]if optimizer == "adam":return Adam(model.parameters(), lr=learning_rate)elif optimizer == "sgd":return SGD(model.parameters(), lr=learning_rate)
⑨ 加載數據文件
torch.LongTensor():?PyTorch 中的一個函數,用于創建一個包含整數(64 位整型)的張量。它主要用于處理整數類型的數據,例如索引、標簽或其他整數值。
參數名 | 類型 | 說明 |
---|---|---|
data | list, tuple, numpy array | 輸入數據,可以是列表、元組或 NumPy 數組,包含整數值。 |
device | torch.device, optional | 指定張量存儲的設備(如 CPU 或 GPU)。默認為 CPU。 |
requires_grad | bool, optional | 是否需要對張量計算梯度。默認為?False 。 |
# -*- coding: utf-8 -*-import torch
import torch.nn as nn
from torch.optim import Adam, SGD
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from transformers import BertModel, BertConfig"""
建立網絡模型結構
"""class GetFirst(nn.Module):def __init__(self):super(GetFirst, self).__init__()def forward(self, x):return x[0]class SentenceMatchNetwork(nn.Module):def __init__(self, config):super(SentenceMatchNetwork, self).__init__()# 可以用bert,參考下面# pretrain_model_path = config["pretrain_model_path"]# self.bert_encoder = BertModel.from_pretrained(pretrain_model_path)# 常規的embedding + layerhidden_size = config["hidden_size"]#20000應為詞表大小,這里借用bert的詞表,沒有用它精確的數字,因為里面有很多無用詞,舍棄一部分,不影響效果self.embedding = nn.Embedding(20000, hidden_size)#一種多層按順序執行的寫法,具體的層可以換#unidirection:batch_size, max_len, hidden_size#bidirection:batch_size, max_len, hidden_size * 2self.encoder = nn.Sequential(nn.LSTM(hidden_size, hidden_size, bidirectional=True, batch_first=True),GetFirst(),nn.ReLU(),nn.Linear(hidden_size * 2, hidden_size), #batch_size, max_len, hidden_sizenn.ReLU(),)self.classify_layer = nn.Linear(hidden_size, 2)self.loss = nn.CrossEntropyLoss()# 同時傳入兩個句子的拼接編碼# 輸出一個相似度預測,不匹配的概率def forward(self, input_ids, target=None):# x = self.bert_encoder(input_ids)[1]#input_ids = batch_size, max_lengthx = self.embedding(input_ids) #x:batch_size, max_length, embedding_sizex = self.encoder(x) ##x: batch_size, max_len, hidden_sizex = nn.MaxPool1d(x.shape[1])(x.transpose(1,2)).squeeze()#x: batch_size, hidden_sizex = self.classify_layer(x)#x: batch_size, 2#如果有標簽,則計算lossif target is not None:return self.loss(x, target.squeeze())#如果無標簽,預測相似度else:return torch.softmax(x, dim=-1)[:, 1] #如果改為x[:,0]則是兩句話不匹配的概率def choose_optimizer(config, model):optimizer = config["optimizer"]learning_rate = config["learning_rate"]if optimizer == "adam":return Adam(model.parameters(), lr=learning_rate)elif optimizer == "sgd":return SGD(model.parameters(), lr=learning_rate)if __name__ == "__main__":from config import ConfigConfig["vocab_size"] = 10Config["max_length"] = 4model = SentenceMatchNetwork(Config)s1 = torch.LongTensor([[1,2,3,0], [2,2,0,0]])s2 = torch.LongTensor([[1,2,3,4], [3,2,3,4]])l = torch.LongTensor([[1],[0]])# y = model(s1, s2, l)# print(y)# print(model.state_dict())
Ⅳ、模型效果測試?evaluate.py
① 模型初始化
?保存配置、模型和日志:self.config = config
:將傳入的配置字典保存到實例變量中。self.model = model
:將傳入的模型對象保存到實例變量中。self.logger = logger
:將傳入的日志對象保存到實例變量中。?
加載驗證數據:self.valid_data = load_data(config["valid_data_path"], config, shuffle=False)
:加載驗證數據,并通過?load_data
?函數封裝為?DataLoader
?對象。shuffle=False
?表示不打亂數據順序。?
加載訓練數據:self.train_data = load_data(config["train_data_path"], config)
:加載訓練數據,并通過?load_data
?函數封裝為?DataLoader
?對象。這里提到重新加載訓練數據是為了將訓練集作為知識庫進行效果測試。
獲取分詞器:self.tokenizer = self.train_data.dataset.tokenizer
:從訓練數據集中獲取分詞器對象,用于后續的文本處理。
初始化統計字典:self.stats_dict = {"correct":0, "wrong":0}
:用于存儲測試結果的字典,記錄正確和錯誤的樣本數量。
def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = loggerself.valid_data = load_data(config["valid_data_path"], config, shuffle=False)# 由于效果測試需要訓練集當做知識庫,再次加載訓練集。# 事實上可以通過傳參把前面加載的訓練集傳進來更合理,但是為了主流程代碼改動量小,在這里重新加載一遍self.train_data = load_data(config["train_data_path"], config)self.tokenizer = self.train_data.dataset.tokenizerself.stats_dict = {"correct":0, "wrong":0} #用于存儲測試結果
②? 將問題轉換為向量表示 ?
初始化?self.question_index_to_standard_question_index
?字典,用于存儲問題編號到標準問題編號的映射。
初始化?self.questions
?列表,用于存儲所有問題。
遍歷?self.train_data.dataset.knwb
,獲取每個標準問題編號及其對應的問題列表。對于每個問題,記錄其編號到標準問題編號的映射,并將問題添加到?self.questions
?列表中。
items():Python 字典的內置方法,用于返回字典中所有鍵值對的視圖對象。每個鍵值對以元組形式表示,方便遍歷或操作字典數據。
len():是 Python 的內置函數,用于返回序列(如字符串、列表、元組)或集合(如字典、集合)中元素的數量。
參數名 | 類型 | 說明 |
---|---|---|
sequence | 序列或集合 | 需要計算長度的對象,如字符串、列表、元組、字典等。 |
列表.append():Python 列表的內置方法,用于在列表末尾添加一個元素。該方法會直接修改原列表,而不是返回一個新列表。
參數名 | 類型 | 說明 |
---|---|---|
element | 任意類型 | 要添加到列表末尾的元素,可以是字符串、數字、列表、元組等。 |
#將知識庫中的問題向量化,為匹配做準備#每輪訓練的模型參數不一樣,生成的向量也不一樣,所以需要每輪測試都重新進行向量化def knwb_to_vector(self):self.question_index_to_standard_question_index = {}self.questions = []for standard_question_index, questions in self.train_data.dataset.knwb.items():for question in questions:#記錄問題編號到標準問題標號的映射,用來確認答案是否正確self.question_index_to_standard_question_index[len(self.questions)] = standard_question_indexself.questions.append(question)return
③? 評估模型表現?
- 日志記錄:使用?
self.logger.info
?記錄當前測試的輪次。? - 初始化統計字典:
self.stats_dict
?用于記錄正確和錯誤的預測數量。 - 模型設置為評估模式:
self.model.eval()
?將模型設置為評估模式,關閉 dropout 和 batch normalization 的隨機性。 - 知識庫向量化:調用?
self.knwb_to_vector()
?將知識庫中的問題向量化,為匹配做準備。? - 遍歷驗證數據:
- 對每個批次的數據,提取測試問題和標簽;
- 對每個測試問題,計算其與知識庫中所有問題的相似度得分;
- 使用 torch.no_grad0) 關閉梯度計算,提高效率將得分轉換為列表,并找到最高得分的索引(即最匹配的問題)
- 將預測結果添加到predicts 列表中?
- 記錄統計結果:調用?
self.write_stats
?記錄預測結果。 - 顯示統計結果:調用?
self.show_stats
?顯示測試結果。
logger.info():用于記錄信息級別的日志,通常用于記錄程序運行時的狀態或過程信息。
參數名 | 類型 | 說明 |
---|---|---|
msg | 字符串 | 要記錄的日志信息。 |
*args | 可變參數 | 用于格式化日志信息的參數。 |
**kwargs | 關鍵字參數 | 可選參數,如?exc_info=True ?用于記錄異常信息。 |
model.eval():將模型設置為評估模式,關閉 Dropout 和 Batch Normalization 的訓練模式,確保模型在推理時行為一致。
enumerate():將可迭代對象(如列表、元組、字符串)組合為索引序列,返回一個枚舉對象,包含索引和值。
參數名 | 類型 | 說明 |
---|---|---|
iterable | 可迭代對象 | 需要枚舉的對象,如列表、元組、字符串等。 |
start | 整數 | 可選參數,指定索引的起始值,默認為 0。 |
列表.append():在列表末尾添加一個元素。
參數名 | 類型 | 說明 |
---|---|---|
element | 任意類型 | 要添加到列表末尾的元素。 |
torch.no_grad():上下文管理器,用于禁用梯度計算,通常在模型評估或推理時使用,以減少內存消耗并加速計算。
torch.LongTensor():創建一個包含 64 位整數的張量。
參數名 | 類型 | 說明 |
---|---|---|
data | 列表、元組 | 輸入數據,用于初始化張量。 |
torch.cuda.is_available():檢查當前系統是否支持 CUDA(即是否有可用的 GPU)。
cuda():將張量或模型移動到 GPU 上,以利用 GPU 進行計算。
參數名 | 類型 | 說明 |
---|---|---|
device | 整數或設備 | 可選參數,指定目標 GPU 設備,默認為當前設備。 |
.detach():從計算圖中分離張量,返回一個不需要梯度的新張量
.cpu():將張量或模型移動到 CPU 上。
.tolist():將張量轉換為 Python 列表。
np.argmax():?返回數組中最大值的索引。
參數名 | 類型 | 說明 |
---|---|---|
array | 數組 | 輸入數組。 |
axis | 整數 | 可選參數,指定沿哪個軸查找最大值索引,默認為 None(展平數組)。 |
def eval(self, epoch):self.logger.info("開始測試第%d輪模型效果:" % epoch)self.stats_dict = {"correct":0, "wrong":0} #清空前一輪的測試結果self.model.eval()self.knwb_to_vector()for index, batch_data in enumerate(self.valid_data):test_questions, labels = batch_datapredicts = []for test_question in test_questions:input_ids = []for question in self.questions:input_ids.append(self.train_data.dataset.encode_sentence(test_question, question))with torch.no_grad():input_ids = torch.LongTensor(input_ids)if torch.cuda.is_available():input_ids = input_ids.cuda()scores = self.model(input_ids).detach().cpu().tolist()hit_index = np.argmax(scores)# print(hit_index)predicts.append(hit_index)self.write_stats(predicts, labels)self.show_stats()return
④ 記錄模型預測結果
斷言檢查:assert len(labels) == len(predicts)
:確保預測結果和標簽的數量一致,否則拋出異常。?
遍歷預測結果和標簽:使用?zip(predicts, labels)
?將預測結果和標簽一一對應。?
轉換預測索引:hit_index = self.question_index_to_standard_question_index[hit_index]
:將預測的問題索引轉換為標準問題索引。?
統計正確和錯誤的預測:如果預測的標準問題索引與標簽一致,則?self.stats_dict["correct"] += 1
。否則,self.stats_dict["wrong"] += 1
。
?返回:方法執行完畢后返回?None
。
zip():用于將多個可迭代對象的元素打包成元組,返回一個可迭代對象
參數名 | 類型 | 說明 |
---|---|---|
*iterables | 可迭代對象 | 一個或多個可迭代對象(如列表、元組、字符串等),用于打包成元組。 |
strict | 布爾值 | 可選參數,默認為?False 。如果為?True ,當可迭代對象長度不一致時會拋出?ValueError 。 |
len():用于返回對象的長度或項目數量,支持多種數據類型
參數名 | 類型 | 說明 |
---|---|---|
obj | 對象 | 需要計算長度的對象,可以是字符串、列表、元組、字典、集合等。 |
int():用于返回對象的長度或項目數量,支持多種數據類型
參數名 | 類型 | 說明 |
---|---|---|
x | 字符串、數字 | 需要轉換為整數的對象,可以是字符串、類似字節的對象或數字。 |
base | 整數 | 可選參數,默認為 10。指定?x ?的進制,例如 2(二進制)、16(十六進制)等。 |
def write_stats(self, predicts, labels):assert len(labels) == len(predicts)for hit_index, label in zip(predicts, labels):hit_index = self.question_index_to_standard_question_index[hit_index] #轉化成標準問編號if int(hit_index) == int(label):self.stats_dict["correct"] += 1else:self.stats_dict["wrong"] += 1return
⑤ 展示模型預測的統計信息?
- 統計信息提取:
correct = self.stats_dict["correct"]
:從?self.stats_dict
?中獲取預測正確的條目數。wrong = self.stats_dict["wrong"]
:從?self.stats_dict
?中獲取預測錯誤的條目數。
- ?日志輸出:
self.logger.info("預測集合條目總量:%d" % (correct + wrong))
:輸出預測集合的總條目數,即正確條目數與錯誤條目數之和。self.logger.info("預測正確條目:%d,預測錯誤條目:%d" % (correct, wrong))
:輸出預測正確的條目數和預測錯誤的條目數。self.logger.info("預測準確率:%f" % (correct / (correct + wrong)))
:輸出預測的準確率,即正確條目數占總條目數的比例。self.logger.info("--------------------")
:輸出分隔線,用于區分不同的日志信息。
logger.info():?是 Python 的?logging
?模塊中用于記錄信息級別(info level)日志的函數。它通常用于記錄程序運行時的關鍵信息,如狀態、進度等,而不是調試信息或錯誤。
參數名 | 類型 | 描述 |
---|---|---|
msg | str | 要記錄的日志信息。 |
*args | 任意類型 | 用于格式化日志信息的參數。 |
**kwargs | dict | 可選參數,如?exc_info 、stack_info ?等,用于附加異常或堆棧信息。 |
# -*- coding: utf-8 -*-
import torch
from loader import load_data
import numpy as np"""
模型效果測試
"""class Evaluator:def __init__(self, config, model, logger):self.config = configself.model = modelself.logger = loggerself.valid_data = load_data(config["valid_data_path"], config, shuffle=False)# 由于效果測試需要訓練集當做知識庫,再次加載訓練集。# 事實上可以通過傳參把前面加載的訓練集傳進來更合理,但是為了主流程代碼改動量小,在這里重新加載一遍self.train_data = load_data(config["train_data_path"], config)self.tokenizer = self.train_data.dataset.tokenizerself.stats_dict = {"correct":0, "wrong":0} #用于存儲測試結果#將知識庫中的問題向量化,為匹配做準備#每輪訓練的模型參數不一樣,生成的向量也不一樣,所以需要每輪測試都重新進行向量化def knwb_to_vector(self):self.question_index_to_standard_question_index = {}self.questions = []for standard_question_index, questions in self.train_data.dataset.knwb.items():for question in questions:#記錄問題編號到標準問題標號的映射,用來確認答案是否正確self.question_index_to_standard_question_index[len(self.questions)] = standard_question_indexself.questions.append(question)returndef eval(self, epoch):self.logger.info("開始測試第%d輪模型效果:" % epoch)self.stats_dict = {"correct":0, "wrong":0} #清空前一輪的測試結果self.model.eval()self.knwb_to_vector()for index, batch_data in enumerate(self.valid_data):test_questions, labels = batch_datapredicts = []for test_question in test_questions:input_ids = []for question in self.questions:input_ids.append(self.train_data.dataset.encode_sentence(test_question, question))with torch.no_grad():input_ids = torch.LongTensor(input_ids)if torch.cuda.is_available():input_ids = input_ids.cuda()scores = self.model(input_ids).detach().cpu().tolist()hit_index = np.argmax(scores)# print(hit_index)predicts.append(hit_index)self.write_stats(predicts, labels)self.show_stats()returndef write_stats(self, predicts, labels):assert len(labels) == len(predicts)for hit_index, label in zip(predicts, labels):hit_index = self.question_index_to_standard_question_index[hit_index] #轉化成標準問編號if int(hit_index) == int(label):self.stats_dict["correct"] += 1else:self.stats_dict["wrong"] += 1returndef show_stats(self):correct = self.stats_dict["correct"]wrong = self.stats_dict["wrong"]self.logger.info("預測集合條目總量:%d" % (correct +wrong))self.logger.info("預測正確條目:%d,預測錯誤條目:%d" % (correct, wrong))self.logger.info("預測準確率:%f" % (correct / (correct + wrong)))self.logger.info("--------------------")return
Ⅴ、訓練主流程 main.py
① 配置日志記錄
logging.basicConfig():用于配置日志系統的基本設置,包括日志級別、輸出格式、輸出目標(如控制臺或文件)等。它通常在程序初始化時調用一次。?
參數名 | 類型 | 描述 |
---|---|---|
level | int | 設置日志級別,如?logging.DEBUG 、logging.INFO ?等。 |
format | str | 設置日志輸出格式,如?'%(asctime)s - %(levelname)s - %(message)s' 。 |
filename | str | 設置日志輸出到文件,指定文件名。 |
filemode | str | 設置文件打開模式,默認為?'a' (追加模式)。 |
handlers | list | 設置自定義的日志處理器。 |
logging.getLogger():?用于獲取一個日志記錄器(Logger)對象。每個記錄器都有一個名稱(name),可以用來區分不同的日志記錄器。
參數名 | 類型 | 描述 |
---|---|---|
name | str | 日志記錄器的名稱。如果未提供或為?None ,則返回根記錄器(root logger)。 |
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
② 創建保存模型的目錄
??????????檢查指定的模型保存路徑是否存在,如果不存在,則創建該路徑作為目錄。
os.path.isdir():?Python 中?os.path
?模塊的函數,用于檢查指定的路徑是否為一個存在的目錄。如果路徑存在且是一個目錄,則返回?True
,否則返回?False
。
參數名 | 類型 | 描述 |
---|---|---|
path | str | 表示文件系統路徑的類路徑對象(可以是相對路徑或絕對路徑)。 |
os.mkdir():Python 中?os
?模塊的函數,用于創建一個新的目錄。如果目錄已經存在或路徑無效,會拋出?FileExistsError
?或?OSError
參數名 | 類型 | 描述 |
---|---|---|
path | str | 要創建的目錄路徑。 |
mode | int | 可選參數,設置目錄的權限(八進制模式),默認為?0o777 。 |
#創建保存模型的目錄if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])
③ 加載文件
?torch.cuda.is_available():PyTorch 中用于檢查當前系統是否支持 CUDA(即是否有可用的 GPU)的函數。
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU)。 |
cuda():PyTorch 張量或模型的方法,用于將張量或模型移動到 GPU 上。
#加載訓練數據train_data = load_data(config["train_data_path"], config)#加載模型model = SiameseNetwork(config)# 標識是否使用gpucuda_flag = torch.cuda.is_available()if cuda_flag:logger.info("gpu可以使用,遷移模型至gpu")model = model.cuda()#加載優化器optimizer = choose_optimizer(config, model)#加載效果測試類evaluator = Evaluator(config, model, logger)
④ 訓練的核心過程 ?
-
?訓練循環:
for epoch in range(config["epoch"])
:遍歷每個訓練周期(epoch),config["epoch"]
?是訓練的總周期數。epoch += 1
:將當前周期數加1,用于日志記錄。model.train()
:將模型設置為訓練模式,啟用Dropout和Batch Normalization等訓練特定行為。
-
?數據加載與處理:
for index, batch_data in enumerate(train_data)
:遍歷訓練數據集的每個批次(batch)。optimizer.zero_grad()
:清空優化器的梯度緩存,避免梯度累積。if cuda_flag: batch_data = [d.cuda() for d in batch_data]
:如果啟用了CUDA(GPU加速),將數據移動到GPU上。
-
?模型前向傳播與損失計算:
input_id1, input_id2, labels = batch_data
:從批次數據中提取兩個輸入(input_id1
?和?input_id2
)以及對應的標簽(labels
)。
-
?反向傳播與優化:
train_loss.append(loss.item())
:將當前批次的損失值記錄下來。loss.backward()
:執行反向傳播,計算梯度。optimizer.step()
:更新模型參數,優化損失函數。
-
?日志記錄與評估:
logger.info("epoch average loss: %f" % np.mean(train_loss))
:記錄當前周期的平均損失值。evaluator.eval(epoch)
:調用評估器對模型進行評估,通常是在驗證集上計算準確率或其他指標。
model.train():將模型設置為訓練模式。在訓練模式下,模型會啟用?Dropout
?和?Batch Normalization
?等層的行為,以確保模型在訓練時能夠正常工作
enumerate():Python 的內置函數,用于在遍歷可迭代對象時同時獲取索引和值
參數名 | 類型 | 描述 |
---|---|---|
iterable | 可迭代對象 | 要遍歷的對象(如列表、元組、字符串)。 |
start | int | 索引的起始值,默認為?0 。 |
optimizer.zero_grad():將優化器中所有參數的梯度清零,避免梯度累積
cuda():將張量或模型移動到 GPU 上,以利用 GPU 的并行計算能力加速計算
參數名 | 類型 | 描述 |
---|---|---|
device | int 或 str | 指定目標 GPU 設備,默認為?None (使用默認 GPU)。 |
append():在列表末尾添加一個新元素
參數名 | 類型 | 描述 |
---|---|---|
element | 任意類型 | 要添加到列表末尾的元素。 |
item():將包含單個元素的張量轉換為 Python 標量(如?int
?或?float
)
loss.backward():計算損失函數對模型參數的梯度,用于反向傳播
optimizer.step():?根據計算出的梯度更新模型參數
#訓練for epoch in range(config["epoch"]):epoch += 1model.train()logger.info("epoch %d begin" % epoch)train_loss = []for index, batch_data in enumerate(train_data):optimizer.zero_grad()if cuda_flag: #如果gpu可用則使用gpu加速batch_data = [d.cuda() for d in batch_data]input_ids, labels = batch_dataloss = model(input_ids, labels) #計算losstrain_loss.append(loss.item())#每輪訓練一半的時候輸出一下loss,觀察下降情況if index % int(len(train_data) / 2) == 0:logger.info("batch loss %f" % loss)loss.backward() #梯度計算optimizer.step() #梯度更新logger.info("epoch average loss: %f" % np.mean(train_loss))evaluator.eval(config["epoch"])
⑤ 保存模型
os.path.join():用于將多個路徑片段拼接成一個完整的路徑,并自動根據操作系統選擇正確的路徑分隔符(如 Windows 使用?\
,Linux 和 macOS 使用?/
)
參數名 | 類型 | 描述 |
---|---|---|
path | str | 初始路徑片段。 |
*paths | str | 需要拼接的后續路徑片段,可以接受任意數量的參數。 |
torch.save():用于將 PyTorch 對象(如模型、張量、字典等)保存到磁盤文件中,通常用于保存模型的權重或訓練狀態
參數名 | 類型 | 描述 |
---|---|---|
obj | 任意對象 | 需要保存的對象,如模型、張量、字典等。 |
f | str 或文件對象 | 保存的目標文件路徑或文件對象。 |
model.state_dict():?返回一個包含模型所有可學習參數(如權重和偏置)的有序字典,通常用于保存或加載模型參數
model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)torch.save(model.state_dict(), model_path)
模型訓練主程序
# -*- coding: utf-8 -*-import torch
import os
import random
import os
import numpy as np
import logging
from config import Config
from model import SentenceMatchNetwork, choose_optimizer
from evaluate import Evaluator
from loader import load_datalogging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)"""
模型訓練主程序
"""def main(config):#創建保存模型的目錄if not os.path.isdir(config["model_path"]):os.mkdir(config["model_path"])#加載訓練數據train_data = load_data(config["train_data_path"], config)#加載模型model = SentenceMatchNetwork(config)# 標識是否使用gpucuda_flag = torch.cuda.is_available()if cuda_flag:logger.info("gpu可以使用,遷移模型至gpu")model = model.cuda()#加載優化器optimizer = choose_optimizer(config, model)#加載效果測試類evaluator = Evaluator(config, model, logger)#訓練for epoch in range(config["epoch"]):epoch += 1model.train()logger.info("epoch %d begin" % epoch)train_loss = []for index, batch_data in enumerate(train_data):optimizer.zero_grad()if cuda_flag: #如果gpu可用則使用gpu加速batch_data = [d.cuda() for d in batch_data]input_ids, labels = batch_dataloss = model(input_ids, labels) #計算losstrain_loss.append(loss.item())#每輪訓練一半的時候輸出一下loss,觀察下降情況if index % int(len(train_data) / 2) == 0:logger.info("batch loss %f" % loss)loss.backward() #梯度計算optimizer.step() #梯度更新logger.info("epoch average loss: %f" % np.mean(train_loss))evaluator.eval(config["epoch"])model_path = os.path.join(config["model_path"], "epoch_%d.pth" % epoch)torch.save(model.state_dict(), model_path)returnif __name__ == "__main__":main(Config)
三、對比:交互型 vs 表示型
1.表示型
Ⅰ、模型結構與交互機制
① 結構特點
????????采用雙塔(Siamese)架構,兩個文本分別通過共享參數的編碼器(如MLP、CNN、LSTM、Transformer等)獨立生成固定維度的語義向量,再通過余弦相似度、點積等方式計算匹配得分。
② 典型模型
????????DSSM、CDSSM、MV-LSTM、ARC-I等。
③ 交互時機
????????僅在最終匹配層進行淺層交互(如相似度計算),編碼過程中文本間無信息交換。
Ⅱ、表示型模型的優勢
① 計算效率高
????????可預先計算文本向量,在線匹配僅需向量相似度計算,適用于大規模檢索場景(如搜索引擎)
?②?參數共享
????????雙塔共享編碼器參數,減少模型復雜度并提升泛化能力
Ⅲ、表示型模型的局限性
① 語義焦點丟失
????????全局向量可能受無關詞干擾(如長文本中冗余信息淹沒關鍵短語)
② 詞級信息缺失
????????無法捕捉詞法、句法層面的局部匹配特征(如近義詞替換或語序變化)
Ⅳ、應用場景
????????適合對響應速度要求高、候選集規模大的場景,如搜索引擎召回、推薦系統粗排
2.交互型
Ⅰ、模型結構與交互機制
① 結構特點
????????將兩個文本拼接后輸入單一編碼器(如BERT),通過自注意力機制或交叉注意力直接在詞 / 短語級別進行細粒度交互,再通過池化或分類器輸出匹配結果
② 典型模型
????????BERT-based模型、ESIM、InferSent等
③ 交互時機
????????在編碼層即引入詞級交互,構建交互矩陣或注意力圖,捕捉局部語義關聯
Ⅱ、交互型模型的優勢
① 匹配精度高
????????通過詞級交互矩陣捕捉細粒度語義關聯(如同義詞、反義詞、指代關系),顯著提升復雜語義匹配的準確率
② 上下文建模強
????????自注意力機制可動態加權重要詞匯,避免語義漂移
Ⅲ、交互型模型的局限性
① 計算成本高
????????需實時拼接文本對輸入模型,難以支持海量候選集的快速檢索
② 交互型模板
????????需實時拼接文本對輸入模型,難以支持海量候選集的快速檢索
Ⅳ、應用場景
????????適合對精度要求高、候選集較小的場景,如問答系統精排、復述檢測、語義相似度評測
3.總結
表示型:
優點:訓練好的模型可以對知識庫內的問題計算向量,在實際查找過程中,只對輸入文本做一次向量化,訓練速度快
缺點:在向量化的過程中不知道文本重點
交互型:
優點:通過對比把握句子重點
缺點:每次計算都需要兩個輸入,需要等到問題來了,再去知識庫中的問題作拼接,需要調入模型n次,比較耗時,模型效率低。
?維度? | ?表示型模型? | ?交互型模型? |
---|---|---|
?交互時機? | 后期(匹配層) | 早期(編碼層) |
?計算效率? | 高(支持預計算) | 低(需實時計算) |
?匹配精度? | 一般 | 高 |
?適用場景? | 大規模檢索、粗排 | 精細化匹配、精排 |
?典型優化方向? | 增強embedding層編碼器(如Transformer) | 輕量化交互(如蒸餾、剪枝) |
四、對比學習
主要解決的問題:文本表示 / 圖像表示
主要目標:訓練一個好的文本 / 圖像 編碼器(model)
輸入一個樣本,對樣本做一些處理(數據增強),就構造出了一個正樣本 / 相似樣本,再尋找一些負樣本,傳入模型讓其進行學習,相當于一種無監督的表示型學習方式
五、海量向量查找
假如我們有1億以上的候選向量
對于一個給定向量,希望查找距離最接近的
如何高效的完成?
快速的向量查找在問答,搜索,推薦等場景下均會使用
1.KD樹
原理 —— 空間切割:
事先構造一棵樹,步驟:
????????① 先選取計算維度,計算每個點x、y軸集合的方差,方差大的更加均勻
????????② 然后再選取當前維度下數據的中值位置
????????③ 將維度方差大的軸的中值位置作為根節點,軸的兩邊列為二叉樹的左右子樹
????????④ 然后對于子樹重復選取點的步驟,直到將所有點全部加入樹中
查詢步驟:?
????????① 依照建索引方式(左右子樹)找到查詢點的空間位置
????????② 向上回退,計算查詢點與離其最近的節點距離 和 查詢點到各個切割平面的距離
????????③ 如果到切割平面的距離大于到已知點的距離,就沒必要跨過平面計算距離,根據情況判斷是否需要查找平面另一側節點
????????④ 回退到根節點為止
優點:減少余弦距離的計算次數,帶來了效率的提升
2.KD樹 —— 代碼實現 🚀
Ⅰ、定義KD樹的節點類
① 初始化KD樹的結點
self.father:父節點
self.left:左子結點
self.right:右子結點
self.feature:當前結點用于分割數據的特征(維度)的索引
self.split:包含分割點向量和其對應的標簽(索引) ,在葉子節點中,split
包含了數據點本身及其標簽
class Node(object):def __init__(self):"""Node class to build tree leaves."""self.father = Noneself.left = Noneself.right = Noneself.feature = Noneself.split = None
② 字符串表示方法
????????返回了節點分割點向量的字符串表示,方便用戶查看單個節點的信息。
self.split:包含分割點向量和其對應的標簽(索引) ,在葉子節點中,split
包含了數據點本身及其標簽
def __str__(self):return str(self.split[0])
③ 獲取當前節點的兄弟節點
????????如果節點沒有父節點(即它本身是根節點),則返回None
。否則,檢查當前節點是父節點的左子節點還是右子節點,然后返回另一個子節點作為兄弟節點。
@property:@property
裝飾器是實現面向對象編程中屬性管理的重要工具,它通過將方法轉換為屬性訪問的形式,既保持了代碼的簡潔性,又能實現數據校驗、動態計算等高級功能
屬性式訪問:通過將方法偽裝成屬性,調用時無需添加括號,直接通過對象.屬性名
訪問
數據校驗與控制:通過@屬性名.setter
定義設置邏輯,攔截非法賦值操作
只讀屬性與刪除控制:若未定義setter
方法,則屬性為只讀;通過@屬性名.deleter
可自定義刪除行為
@propertydef brother(self):"""Find the node's brother.Returns:node -- Brother node."""if not self.father:ret = Noneelse:if self.father.left is self:ret = self.father.rightelse:ret = self.father.leftreturn ret
Ⅱ、定義KD樹類
① 初始化
????????初始化KDTree類,創建一個根節點。根節點是KD樹的起始點,用于后續的搜索和插入操作。
class KDTree(object):def __init__(self):"""KD Tree class to improve search efficiency in KNN.Attributes:root: the root node of KDTree."""self.root = Node()
②? 打印樹節點信息
????????使用廣度優先搜索遍歷樹節點,將節點信息格式化為字符串并返回。
list.pop():移除列表中指定索引位置的元素并返回該元素。若未指定索引,默認移除最后一個元素
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
index | int | 是 | 要移除元素的索引(默認-1 ,即最后一個元素)。若索引越界會引發IndexError |
list.append():在列表末尾添加單個元素,直接修改原列表且無返回值
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
element | 任意類型 | 否 | 要添加的元素(可以是數字、字符串、列表等復合類型) |
str.join():用指定字符串連接可迭代對象中的元素,生成新字符串。要求可迭代對象內的元素均為字符串類型
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
iterable | 可迭代對象 | 否 | 需連接的序列(如列表、元組),非字符串元素會引發TypeError |
def __str__(self):"""Show the relationship of each node in the KD Tree.Returns:str -- KDTree Nodes information."""ret = []i = 0que = [(self.root, -1)]while que:nd, idx_father = que.pop(0)ret.append("%d -> %d: %s" % (idx_father, i, str(nd)))if nd.left:que.append((nd.left, i))if nd.right:que.append((nd.right, i))i += 1return "\n".join(ret)
③ 計算中位數索引
????????計算給定二維數組中指定特征列的中位數所對應的行索引。通過從指定的列收集數據、排序并提取中位數的索引,該方法能有效地用于構建 KD 樹等數據結構中,以此提高相似度搜索的效率。
map():將指定函數依次作用于可迭代對象的每個元素,返回一個包含結果的迭代器(需轉換為列表或元組等容器類型)
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
function | 可調用對象 | 否 | 處理元素的函數(如內置函數、lambda ?或自定義函數) |
iterable | 可迭代對象 | 否 | 待處理的數據序列(如列表、元組),可傳入多個(函數需匹配參數數量) |
lambda:創建匿名函數,簡化一次性使用的簡單邏輯,常用于配合?map()
、filter()
?等高階函數
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
arguments | 參數列表 | 否 | 函數參數(如?x, y ) |
expression | 表達式 | 否 | 單行表達式,計算結果作為返回值(如?x + y ?或?x**2 ) |
sorted():對可迭代對象進行排序,返回新列表(原數據不變)
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
iterable | 可迭代對象 | 否 | 待排序的數據(如列表、元組) |
key | 函數 | 是 | 排序依據的函數(常用?lambda ,如?key=lambda x: x["age"] ) 8 |
reverse | 布爾值 | 是 | 是否降序排列(默認?False ?升序) |
list():?將可迭代對象(如字符串、元組、集合)轉換為列表
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
iterable | 可迭代對象 | 是 | 若省略則創建空列表;若傳入則轉換元素(如?list("abc") →["a","b","c"] ) |
def _get_median_idx(self, X, idxs, feature):"""Calculate the median of a column of data.Arguments:X {list} -- 2d list object with int or float.idxs {list} -- 1D list with int.feature {int} -- Feature number.sorted_idxs_2d {list} -- 2D list with int.Returns:list -- The row index corresponding to the median of this column."""n = len(idxs)# Ignoring the number of column elements is odd and even.k = n // 2# Get all the indexes and elements of column j as tuples.col = map(lambda i: (i, X[i][feature]), idxs)# Sort the tuples by the elements' values# and get the corresponding indexes.sorted_idxs = map(lambda x: x[0], sorted(col, key=lambda x: x[1]))# Search the median value.median_idx = list(sorted_idxs)[k]return median_idx
④ 計算給定二維列表在指定特征(維度)上的方差
?????????計算指定特征在給定數據集上的方差,這在構建 KD 樹時用于選擇最優的分割特征(即具有最大方差的特征),以提高后續最近鄰搜索的效率。通過這種方法,可以有效地找到數據集中最具區分度的特征,從而優化 KD 樹的構建和查詢過程。
def _get_variance(self, X, idxs, feature):"""Calculate the variance of a column of data.Arguments:X {list} -- 2d list object with int or float.idxs {list} -- 1D list with int.feature {int} -- Feature number.Returns:float -- variance"""n = len(idxs)col_sum = col_sum_sqr = 0for idx in idxs:xi = X[idx][feature]col_sum += xicol_sum_sqr += xi ** 2# D(X) = E{[X-E(X)]^2} = E(X^2)-[E(X)]^2return col_sum_sqr / n - (col_sum / n) ** 2
⑤ 指定特征維度
????????在KD樹構建過程中,選擇具有最大方差的特征作為當前節點的分割特征。這樣做可以確保樹的每個層上的數據分布盡可能均勻,從而提高后續的搜索效率。
map():將指定函數依次作用于可迭代對象的每個元素,返回一個包含結果的迭代器(需轉換為列表或元組等容器類型)
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
function | 可調用對象 | 否 | 處理元素的函數(如內置函數、lambda ?或自定義函數) |
iterable | 可迭代對象 | 否 | 待處理的數據序列(如列表、元組),可傳入多個(函數需匹配參數數量) |
lambda:創建匿名函數,簡化一次性使用的簡單邏輯,常用于配合?map()
、filter()
?等高階函數
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
arguments | 參數列表 | 否 | 函數參數(如?x, y ) |
expression | 表達式 | 否 | 單行表達式,計算結果作為返回值(如?x + y ?或?x**2 ) |
max():?Python 內置的高效函數,用于獲取可迭代對象或多個參數中的最大值。其功能靈活,支持多種數據類型和自定義比較邏輯。
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
iterable | 可迭代對象 | 是 | 如列表、元組、字符串等(需元素可比較) |
args | 多個參數 | 是 | 直接傳入多個參數進行比較(如?max(3,5,7) ) |
key | 函數 | 是 | 自定義比較規則(如?key=lambda x: x["age"] ) |
default | 任意類型 | 是 | 當可迭代對象為空時返回此默認值,否則引發?ValueError |
def _choose_feature(self, X, idxs):"""Choose the feature which has maximum variance.Arguments:X {list} -- 2d list object with int or float.idxs {list} -- 1D list with int.Returns:feature number {int}"""m = len(X[0])variances = map(lambda j: (j, self._get_variance(X, idxs, j)), range(m))return max(variances, key=lambda x: x[1])[0]
⑥??根據給定特征分割數據集?
????????根據特征的中位數將給定的索引列表分割成兩個子集。這在構建k-d樹(k維空間樹)時非常重要,有助于在構造樹的過程中有效地將數據劃分為更小的、更可管理的部分,從而在后續的查找中提高效率。
????????通過在指定特征上比較中位數,函數將數據分為小于和大于(或等于)中位數的兩部分,這種分割方式是 k-d樹構建的關鍵步驟之一,旨在平衡樹的高度,優化查找性能。
list.append():?Python 列表的內置方法,用于在列表末尾添加單個元素。它會直接修改原列表,?不返回新列表?(返回?None
)
參數 | 類型 | 是否可選 | 說明 |
---|---|---|---|
object | 任意類型 | 否 | 要添加的元素(支持數字、字符串、列表、元組、字典、None ?等所有類型) |
def _split_feature(self, X, idxs, feature, median_idx):"""Split indexes into two arrays according to split point.Arguments:X {list} -- 2d list object with int or float.idx {list} -- Indexes, 1d list object with int.feature {int} -- Feature number.median_idx {float} -- Median index of the feature.Returns:list -- [left idx, right idx]"""idxs_split = [[], []]split_val = X[median_idx][feature]for idx in idxs:# Keep the split point in current node.if idx == median_idx:continue# Splitxi = X[idx][feature]if xi < split_val: # 根據當前索引 idx 對應的特征值 xi 來判斷其相對于 split_val 的位置:idxs_split[0].append(idx)else:idxs_split[1].append(idx)return idxs_split
Ⅲ、構建KD樹
????????根據給定的數據集和標簽構建一棵Kd樹。通過選擇具有最大方差的特征來進行數據點的分割,并將分割點存儲在節點中。構建的過程采用廣度優先搜索的方式,逐層構建樹的結構。這樣構建的Kd樹可以有效地用于在多維空間中快速查找最近鄰點
列表,pop():移除列表中指定索引位置的元素并返回該元素。若未指定索引,默認移除最后一個元素
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
index | int | 是 | 要移除元素的索引(默認?-1 ,即最后一個元素)。若索引越界會引發?IndexError |
str.split():將字符串按指定分隔符拆分為列表,默認按空格分割
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
sep | str | 是 | 分隔符(默認空格)。 |
maxsplit | int | 是 | 最大分割次數(默認全部拆分) |
列表.append():在列表末尾添加單個元素,直接修改原列表且無返回值
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
element | 任意類型 | 否 | 要添加的元素(支持數字、字符串、列表等所有類型) |
def build_tree(self, X, y):"""Build a KD Tree. The data should be scaled so as to calculate variances.Arguments:X {list} -- 2d list object with int or float.y {list} -- 1d list object with int or float."""# Initialize with node, indexesnd = self.rootidxs = range(len(X))que = [(nd, idxs)]while que:nd, idxs = que.pop(0)n = len(idxs)# Stop split if there is only one element in this nodeif n == 1:nd.split = (X[idxs[0]], y[idxs[0]])continue# Splitfeature = self._choose_feature(X, idxs)median_idx = self._get_median_idx(X, idxs, feature)idxs_left, idxs_right = self._split_feature(X, idxs, feature, median_idx)# Update properties of current nodend.feature = featurend.split = (X[median_idx], y[median_idx])# Put children of current node in queif idxs_left != []:nd.left = Node()nd.left.father = ndque.append((nd.left, idxs_left))if idxs_right != []:nd.right = Node()nd.right.father = ndque.append((nd.right, idxs_right))
Ⅳ、最近鄰搜索?
① 查找葉子節點
????????從KD樹的根節點開始,沿著樹的路徑向下搜索,直到找到包含搜索樣本Xi
的葉節點,并返回該葉節點。
str.split():將字符串按指定分隔符拆分為列表,默認按空格分割
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
sep | str | 是 | 分隔符(默認空格)。 |
maxsplit | int | 是 | 最大分割次數(默認全部拆分) |
def _search(self, Xi, nd):"""Search Xi from the KDTree until Xi is at an leafnode.Arguments:Xi {list} -- 1d list with int or float.Returns:node -- Leafnode."""while nd.left or nd.right:if not nd.left:nd = nd.rightelif not nd.right:nd = nd.leftelse:if Xi[nd.feature] < nd.split[0][nd.feature]:nd = nd.leftelse:nd = nd.rightreturn nd
② 計算兩向量歐幾里得距離
????????從給定的KD樹節點中獲取與其分割點向量的歐幾里得距離。具體來說,它首先從節點對象中提取出分割點向量X0
,然后調用類中的另一個方法get_eu_dist
來計算輸入向量Xi
與分割點向量X0
之間的歐幾里得距離。
????????get_eu_dist:
計算兩個一維向量之間的歐幾里得距離。它通過將兩個向量對應位置的元素相減,求平方和,再取平方根來實現。
str.split():將字符串按指定分隔符拆分為列表,默認按空格分割
參數 | 類型 | 是否可選 | 描述 |
---|---|---|---|
sep | str | 是 | 分隔符(默認空格)。 |
maxsplit | int | 是 | 最大分割次數(默認全部拆分) |
sum():對可迭代對象中的數值元素求和,支持自定義起始值
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
iterable | 可迭代對象 | 否 | 無 | 必須為數值類型(整數、浮點數等)的可迭代對象(如列表、元組、集合)。 |
start | 數值 | 是 | 0 | 求和的初始值,會加到最終結果中。 |
zip():將多個可迭代對象的對應元素打包為元組,返回一個迭代器
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
*iterables | 多個可迭代對象 | 否 | 無 | 支持列表、元組、字符串等可迭代對象。 |
strict | 布爾值 | 是 | False | 若設為?True ,強制要求所有可迭代對象長度一致(Python 3.10+)。 |
列表推導式:?通過簡潔語法快速生成列表,支持條件篩選和嵌套循環
語法部分 | 類型 | 是否可選 | 描述 |
---|---|---|---|
expression | 表達式 | 否 | 對當前元素的操作(如?x**2 )。 |
item | 變量名 | 否 | 從可迭代對象中逐個提取元素。 |
iterable | 可迭代對象 | 否 | 數據來源(如?range(5) )。 |
if condition | 條件表達式 | 是 | 篩選元素的條件(如?x % 2 == 0 )。 |
def _get_eu_dist(self, Xi, nd):"""Calculate euclidean distance between Xi and node.Arguments:Xi {list} -- 1d list with int or float.nd {node}Returns:float -- Euclidean distance."""X0 = nd.split[0]return self.get_eu_dist(Xi, X0)def get_eu_dist(self, arr1, arr2):"""Calculate the Euclidean distance of two vectors.Arguments:arr1 {list} -- 1d list object with int or floatarr2 {list} -- 1d list object with int or floatReturns:float -- Euclidean distance"""return sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5
③ 計算點到分割超平面的距離
????????計算給定點?Xi
?到 KD 樹節點?nd
?所表示的超平面的歐幾里得距離。具體來說,它通過比較點?Xi
?在某個特征上的值與節點?nd
?的分割點在該特征上的值,來得到點?Xi
?到超平面的距離。這個距離用于判斷在最近鄰搜索過程中是否需要訪問兄弟節點。
nd.feature:當前結點用于分割數據的特征(維度)的索引?
str.split():將字符串按指定分隔符分割為列表,支持限制分割次數
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
sep | 字符串 | 是 | 所有空字符 | 分隔符(如?"," 、"*" ),若未指定則按空格、換行符等分割 |
maxsplit | 整數 | 是 | 不限分割次數 | 指定最大分割次數,剩余未分割部分作為列表最后一個元素 |
abs():返回數值的絕對值,支持整數、浮點數和復數
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
number | 數值(含復數) | 否 | 無 | 接受整數、浮點數或復數(返回復數的模) |
def _get_hyper_plane_dist(self, Xi, nd):"""Calculate euclidean distance between Xi and hyper plane.Arguments:Xi {list} -- 1d list with int or float.nd {node}Returns:float -- Euclidean distance."""j = nd.featureX0 = nd.split[0]return abs(Xi[j] - X0[j])
④ 執行最近鄰搜索
????????基于KD樹執行最近鄰搜索。首先,通過遞歸的方式找到待查找向量Xi
所屬的葉子節點nd_best
,然后在此基礎上進行回溯,檢查是否在兄弟子樹中存在距離更近的節點,最終返回距離Xi
最近的節點。
float():?將字符串或數字轉換為浮點數。如果未提供參數,則返回?0.0
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
x | 字符串或數字 | 是 | 無 | 要轉換為浮點數的字符串或數字。若未提供參數,則返回?0.0 。 |
列表.pop():移除列表中指定索引處的元素并返回該元素。若未指定索引,則移除并返回最后一個元素。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
index | 整數 | 是 | -1 | 要移除元素的索引。若未提供參數,則移除并返回最后一個元素。 |
列表.append():在列表末尾添加一個元素。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
obj | 任意類型 | 否 | 無 | 要添加到列表末尾的元素。 |
def nearest_neighbour_search(self, Xi):"""Nearest neighbour search and backtracking.Arguments:Xi {list} -- 1d list with int or float.Returns:node -- The nearest node to Xi."""# The leaf node after searching Xi.dist_best = float("inf")nd_best = self._search(Xi, self.root)que = [(self.root, nd_best)]while que:nd_root, nd_cur = que.pop(0)# Calculate distance between Xi and root nodedist = self._get_eu_dist(Xi, nd_root)# Update best node and distance.if dist < dist_best:dist_best, nd_best = dist, nd_rootwhile nd_cur is not nd_root:# Calculate distance between Xi and current nodedist = self._get_eu_dist(Xi, nd_cur)# Update best node, distance and visit flag.if dist < dist_best:dist_best, nd_best = dist, nd_cur# If it's necessary to visit brother node.if nd_cur.brother and dist_best > \self._get_hyper_plane_dist(Xi, nd_cur.father):_nd_best = self._search(Xi, nd_cur.brother)que.append((nd_cur.brother, _nd_best))# Back track.nd_cur = nd_cur.fatherreturn nd_best
Ⅴ、傳統搜索方法
?????????在一個給定的數據集matrix
中找到與目標向量arr1
最近的鄰居。通過逐個計算目標向量與數據集中每個向量之間的歐幾里得距離,并將這些距離與其對應的索引存儲在一個列表中,最后對這個列表進行排序以找到距離最近的向量。
enumerate():將可迭代對象組合為一個索引序列,返回枚舉對象(包含索引和值)。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
iterable | 可迭代對象 | 否 | 無 | 需要枚舉的可迭代對象(如列表、字符串等)。 |
start | 整數 | 是 | 0 | 索引的起始值。 |
sum():對可迭代對象中的數值元素求和,支持自定義起始值。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
iterable | 可迭代對象 | 否 | 無 | 包含數值元素的可迭代對象(如列表、元組等)。 |
start | 數值 | 是 | 0 | 求和的初始值,會加到最終結果中。 |
zip():將多個可迭代對象的對應元素打包為元組,返回一個迭代器。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
*iterables | 多個可迭代對象 | 否 | 無 | 支持列表、元組、字符串等可迭代對象。 |
strict | 布爾值 | 是 | False | 若設為?True ,強制要求所有可迭代對象長度一致(Python 3.10+)。 |
列表.append():在列表末尾添加一個元素。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
obj | 任意類型 | 否 | 無 | 要添加到列表末尾的元素。 |
sorted():對所有可迭代對象進行排序,返回一個新的列表。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
iterable | 可迭代對象 | 否 | 無 | 需要排序的可迭代對象(如列表、元組等)。 |
key | 函數 | 是 | 無 | 用于排序的函數(如?lambda x: x[1] )。 |
reverse | 布爾值 | 是 | False | 若為?True ,則降序排序;若為?False ,則升序排序。 |
#傳統方式,逐個計算并排序
def traditional_search(arr1, matrix):res = []for index, arr2 in enumerate(matrix):score = sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5res.append([score, index])res = sorted(res, key=lambda x:x[0])return matrix[res[0][1]]
Ⅵ、性能對比
?????????構建KD樹并使用KD樹搜索方法與傳統的逐個計算并排序的方法來查找最近鄰向量,并比較這兩種方法在相同數據集上搜索100次的耗時。
vec_dim:定義向量的維度為 8
matrix:創建了一個1000行、8列的隨機數矩陣matrix
np.random.random():生成一個或多個介于 0 和 1 之間的隨機浮點數,返回一個 NumPy 數組或單個浮點數。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
size | 整數或元組 | 是 | None | 輸出的形狀。若為?None ,則返回單個浮點數;否則返回指定形狀的數組。 |
list():將可迭代對象(如字符串、元組、集合等)轉換為列表。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
iterable | 可迭代對象 | 是 | 無 | 要轉換為列表的可迭代對象。若未提供參數,則返回空列表。 |
range():生成一個不可變的整數序列,通常用于循環控制或生成數字序列。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
start | 整數 | 是 | 0 | 序列的起始值(包含)。 |
stop | 整數 | 否 | 無 | 序列的結束值(不包含)。 |
step | 整數 | 是 | 1 | 步長,表示每次遞增或遞減的值。 |
len():返回對象(如字符串、列表、字典等)的長度或元素個數。
參數 | 類型 | 是否可選 | 默認值 | 描述 |
---|---|---|---|---|
obj | 可迭代對象 | 否 | 無 | 要計算長度的對象(如字符串、列表、字典等)。 |
time.time():返回當前時間的時間戳(從 1970 年 1 月 1 日 00:00:00 UTC 開始到現在的秒數)。
import time
import numpy as np'''
基于kd樹的向量快速查找
'''class Node(object):def __init__(self):"""Node class to build tree leaves."""self.father = Noneself.left = Noneself.right = Noneself.feature = Noneself.split = Nonedef __str__(self):return str(self.split[0])@propertydef brother(self):"""Find the node's brother.Returns:node -- Brother node."""if not self.father:ret = Noneelse:if self.father.left is self:ret = self.father.rightelse:ret = self.father.leftreturn retclass KDTree(object):def __init__(self):"""KD Tree class to improve search efficiency in KNN.Attributes:root: the root node of KDTree."""self.root = Node()def __str__(self):"""Show the relationship of each node in the KD Tree.Returns:str -- KDTree Nodes information."""ret = []i = 0que = [(self.root, -1)]while que:nd, idx_father = que.pop(0)ret.append("%d -> %d: %s" % (idx_father, i, str(nd)))if nd.left:que.append((nd.left, i))if nd.right:que.append((nd.right, i))i += 1return "\n".join(ret)def _get_median_idx(self, X, idxs, feature):"""Calculate the median of a column of data.Arguments:X {list} -- 2d list object with int or float.idxs {list} -- 1D list with int.feature {int} -- Feature number.sorted_idxs_2d {list} -- 2D list with int.Returns:list -- The row index corresponding to the median of this column."""n = len(idxs)# Ignoring the number of column elements is odd and even.k = n // 2# Get all the indexes and elements of column j as tuples.col = map(lambda i: (i, X[i][feature]), idxs)# Sort the tuples by the elements' values# and get the corresponding indexes.sorted_idxs = map(lambda x: x[0], sorted(col, key=lambda x: x[1]))# Search the median value.median_idx = list(sorted_idxs)[k]return median_idxdef _get_variance(self, X, idxs, feature):"""Calculate the variance of a column of data.Arguments:X {list} -- 2d list object with int or float.idxs {list} -- 1D list with int.feature {int} -- Feature number.Returns:float -- variance"""n = len(idxs)col_sum = col_sum_sqr = 0for idx in idxs:xi = X[idx][feature]col_sum += xicol_sum_sqr += xi ** 2# D(X) = E{[X-E(X)]^2} = E(X^2)-[E(X)]^2return col_sum_sqr / n - (col_sum / n) ** 2def _choose_feature(self, X, idxs):"""Choose the feature which has maximum variance.Arguments:X {list} -- 2d list object with int or float.idxs {list} -- 1D list with int.Returns:feature number {int}"""m = len(X[0])variances = map(lambda j: (j, self._get_variance(X, idxs, j)), range(m))return max(variances, key=lambda x: x[1])[0]def _split_feature(self, X, idxs, feature, median_idx):"""Split indexes into two arrays according to split point.Arguments:X {list} -- 2d list object with int or float.idx {list} -- Indexes, 1d list object with int.feature {int} -- Feature number.median_idx {float} -- Median index of the feature.Returns:list -- [left idx, right idx]"""idxs_split = [[], []]split_val = X[median_idx][feature]for idx in idxs:# Keep the split point in current node.if idx == median_idx:continue# Splitxi = X[idx][feature]if xi < split_val:idxs_split[0].append(idx)else:idxs_split[1].append(idx)return idxs_splitdef build_tree(self, X, y):"""Build a KD Tree. The data should be scaled so as to calculate variances.Arguments:X {list} -- 2d list object with int or float.y {list} -- 1d list object with int or float."""# Initialize with node, indexesnd = self.rootidxs = range(len(X))que = [(nd, idxs)]while que:nd, idxs = que.pop(0)n = len(idxs)# Stop split if there is only one element in this nodeif n == 1:nd.split = (X[idxs[0]], y[idxs[0]])continue# Splitfeature = self._choose_feature(X, idxs)median_idx = self._get_median_idx(X, idxs, feature)idxs_left, idxs_right = self._split_feature(X, idxs, feature, median_idx)# Update properties of current nodend.feature = featurend.split = (X[median_idx], y[median_idx])# Put children of current node in queif idxs_left != []:nd.left = Node()nd.left.father = ndque.append((nd.left, idxs_left))if idxs_right != []:nd.right = Node()nd.right.father = ndque.append((nd.right, idxs_right))def _search(self, Xi, nd):"""Search Xi from the KDTree until Xi is at an leafnode.Arguments:Xi {list} -- 1d list with int or float.Returns:node -- Leafnode."""while nd.left or nd.right:if not nd.left:nd = nd.rightelif not nd.right:nd = nd.leftelse:if Xi[nd.feature] < nd.split[0][nd.feature]:nd = nd.leftelse:nd = nd.rightreturn nddef _get_eu_dist(self, Xi, nd):"""Calculate euclidean distance between Xi and node.Arguments:Xi {list} -- 1d list with int or float.nd {node}Returns:float -- Euclidean distance."""X0 = nd.split[0]return self.get_eu_dist(Xi, X0)def get_eu_dist(self, arr1, arr2):"""Calculate the Euclidean distance of two vectors.Arguments:arr1 {list} -- 1d list object with int or floatarr2 {list} -- 1d list object with int or floatReturns:float -- Euclidean distance"""return sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5def _get_hyper_plane_dist(self, Xi, nd):"""Calculate euclidean distance between Xi and hyper plane.Arguments:Xi {list} -- 1d list with int or float.nd {node}Returns:float -- Euclidean distance."""j = nd.featureX0 = nd.split[0]return abs(Xi[j] - X0[j])def nearest_neighbour_search(self, Xi):"""Nearest neighbour search and backtracking.Arguments:Xi {list} -- 1d list with int or float.Returns:node -- The nearest node to Xi."""# The leaf node after searching Xi.dist_best = float("inf")nd_best = self._search(Xi, self.root)que = [(self.root, nd_best)]while que:nd_root, nd_cur = que.pop(0)# Calculate distance between Xi and root nodedist = self._get_eu_dist(Xi, nd_root)# Update best node and distance.if dist < dist_best:dist_best, nd_best = dist, nd_rootwhile nd_cur is not nd_root:# Calculate distance between Xi and current nodedist = self._get_eu_dist(Xi, nd_cur)# Update best node, distance and visit flag.if dist < dist_best:dist_best, nd_best = dist, nd_cur# If it's necessary to visit brother node.if nd_cur.brother and dist_best > \self._get_hyper_plane_dist(Xi, nd_cur.father):_nd_best = self._search(Xi, nd_cur.brother)que.append((nd_cur.brother, _nd_best))# Back track.nd_cur = nd_cur.fatherreturn nd_best#傳統方式,逐個計算并排序
def traditional_search(arr1, matrix):res = []for index, arr2 in enumerate(matrix):score = sum((x1 - x2) ** 2 for x1, x2 in zip(arr1, arr2)) ** 0.5res.append([score, index])res = sorted(res, key=lambda x:x[0])return matrix[res[0][1]]vec_dim = 8
matrix = np.random.random((1000, vec_dim))kd_tree = KDTree()
kd_tree.build_tree(matrix, list(range(len(matrix))))# x = np.random.random((vec_dim))
# print(kd_tree.nearest_neighbour_search(x))
# print(traditional_search(x, matrix))start_time = time.time()
for i in range(100):x = np.random.random((vec_dim))best = kd_tree.nearest_neighbour_search(x)
print("kd樹搜索耗時:%s"%(time.time() - start_time))start_time = time.time()
for i in range(100):x = np.random.random((vec_dim))best = traditional_search(x, matrix)
print("窮舉搜索耗時:%s"%(time.time() - start_time))
2.Annoy?
也是依據空間分割的原理來做,空間分割的過程相當于Kmeans聚類
重復分割過程,直到每個空間內的點個數小于設定值
可以同時在多個接近的分支上查找 或 通過不同初始劃分,生成多個樹