文章目錄
- 前言
- 一、class CVRPModel(nn.Module):__init__(self, **model_params)
- 函數功能
- 函數代碼
- 二、class CVRPModel(nn.Module):pre_forward(self, reset_state)
- 函數功能
- 函數代碼
- 三、class CVRPModel(nn.Module):forward(self, state)
- 函數功能
- 函數代碼
- 四、def _get_encoding(encoded_nodes, node_index_to_pick)
- 函數功能
- 函數代碼
- 五、class CVRP_Encoder(nn.Module)
- 六、class EncoderLayer(nn.Module)
- 七、CVRP_Decoder(nn.Module)
- 八、def reshape_by_heads(qkv, head_num)
- 函數功能
- 函數代碼
- 九、def multi_head_attention(q, k, v, rank2_ninf_mask=None, rank3_ninf_mask=None)
- 函數功能
- 函數代碼
- 十、class AddAndInstanceNormalization(nn.Module):__init__(self, **model_params)
- 函數功能
- Batch Normalization (BN) 是什么?
- Batch Normalization 的具體操作
- 1. **計算均值和方差**
- 2. **標準化**
- 3. **縮放和平移**
- Batch Normalization 的優勢
- 函數代碼
- 十一、class AddAndInstanceNormalization(nn.Module):forward(self, input1, input2)
- 函數功能
- 函數代碼
- 十二、class FeedForward(nn.Module):__init__(self, **model_params)
- 函數功能
- 函數代碼
- 十三、class FeedForward(nn.Module):forward(self, input1)
- 函數功能
- 函數代碼
- 附錄
- 代碼(全)
前言
學習代碼:
class CVRPModel(nn.Module):
class CVRP_Encoder(nn.Module):
class EncoderLayer(nn.Module):
class CVRP_Decoder(nn.Module):
class AddAndInstanceNormalization(nn.Module):
class AddAndBatchNormalization(nn.Module):
class FeedForward(nn.Module):
/home/tang/RL_exa/NCO_code-main/single_objective/LCH-Regret/Regret-POMO/CVRP/POMO/CVRPModel.py
一、class CVRPModel(nn.Module):init(self, **model_params)
函數功能
init 是 CVRPModel 類的構造函數,負責初始化模型的各個組件。
主要任務包括:
- 接收和存儲模型的參數(model_params)。
- 初始化編碼器(encoder)和解碼器(decoder)子模塊。
- 初始化 encoded_nodes 變量,用于存儲經過編碼的節點數據。
執行流程圖鏈接
函數代碼
def __init__(self, **model_params):super().__init__()self.model_params = model_paramsself.encoder = CVRP_Encoder(**model_params)self.decoder = CVRP_Decoder(**model_params)self.encoded_nodes = None# shape: (batch, problem+1, EMBEDDING_DIM)
二、class CVRPModel(nn.Module):pre_forward(self, reset_state)
函數功能
pre_forward 是 CVRPModel 類的一個前向傳播前的準備函數。它的主要任務是根據給定的初始狀態(reset_state)準備和編碼數據,為模型的后續前向傳播(forward)過程做準備。
具體來說,函數的作用是:
- 提取并處理初始狀態的數據。
- 使用編碼器對節點進行編碼,得到編碼后的節點表示。
- 為解碼器設置額外的嵌入信息,并將編碼后的節點與額外的嵌入信息拼接。
- 設置解碼器中的 kv(key-value)信息,為解碼過程做準備。
執行流程圖鏈接
函數代碼
def pre_forward(self, reset_state):depot_xy = reset_state.depot_xy# shape: (batch, 1, 2)node_xy = reset_state.node_xy# shape: (batch, problem, 2)node_demand = reset_state.node_demand# shape: (batch, problem)node_xy_demand = torch.cat((node_xy, node_demand[:, :, None]), dim=2)# shape: (batch, problem, 3)encoded_nodes = self.encoder(depot_xy, node_xy_demand)# shape: (batch, problem+1, embedding)_ = self.decoder.regret_embedding[None, None, :].expand(encoded_nodes.size(0), 1,self.decoder.regret_embedding.size(-1))# _ 的shape:(batch,1,embedding)self.encoded_nodes = torch.cat((encoded_nodes, _), dim=1)# self.encoded_nodes的shape:(batch,problem+2,embedding)self.decoder.set_kv(self.encoded_nodes)
三、class CVRPModel(nn.Module):forward(self, state)
函數功能
forward 是 CVRPModel 類的核心前向傳播函數,用于根據當前狀態(state)生成模型的輸出,包括選擇的節點(selected)和相關的概率(prob)。
它的主要功能是基于當前的狀態和歷史選擇來決定接下來應該選擇哪個節點,并輸出相應的概率。
執行流程圖鏈接
函數代碼
def forward(self, state):batch_size = state.BATCH_IDX.size(0)pomo_size = state.BATCH_IDX.size(1)if state.selected_count == 0: # First Move, depotselected = torch.zeros(size=(batch_size, pomo_size), dtype=torch.long)prob = torch.ones(size=(batch_size, pomo_size))# # Use Averaged encoded nodes for decoder input_1# encoded_nodes_mean = self.encoded_nodes.mean(dim=1, keepdim=True)# # shape: (batch, 1, embedding)# self.decoder.set_q1(encoded_nodes_mean)# Use encoded_depot for decoder input_2encoded_first_node = self.encoded_nodes[:, [0], :]# shape: (batch, 1, embedding)self.decoder.set_q2(encoded_first_node)elif state.selected_count == 1: # Second Move, POMOselected = torch.arange(start=1, end=pomo_size+1)[None, :].expand(batch_size, pomo_size)prob = torch.ones(size=(batch_size, pomo_size))else:encoded_last_node = _get_encoding(self.encoded_nodes, state.current_node)# shape: (batch, pomo, embedding)probs = self.decoder(encoded_last_node, state.load, ninf_mask=state.ninf_mask)# shape: (batch, pomo, problem+1)if self.training or self.model_params['eval_type'] == 'softmax':while True: # to fix pytorch.multinomial bug on selecting 0 probability elementswith torch.no_grad():selected = probs.reshape(batch_size * pomo_size, -1).multinomial(1) \.squeeze(dim=1).reshape(batch_size, pomo_size)# shape: (batch, pomo)prob = probs[state.BATCH_IDX, state.POMO_IDX, selected].reshape(batch_size, pomo_size)# shape: (batch, pomo)if (prob != 0).all():breakelse:probs=probs[:,:,:-1]selected = probs.argmax(dim=2)# shape: (batch, pomo)prob = None # value not needed. Can be anything.return selected, prob
四、def _get_encoding(encoded_nodes, node_index_to_pick)
函數功能
_get_encoding 的作用是從 encoded_nodes 中按照 node_index_to_pick 選擇相應的編碼,并返回選中的編碼信息。
函數執行流程圖鏈接
函數代碼
def _get_encoding(encoded_nodes, node_index_to_pick):# encoded_nodes.shape: (batch, problem, embedding)# node_index_to_pick.shape: (batch, pomo)batch_size = node_index_to_pick.size(0)pomo_size = node_index_to_pick.size(1)embedding_dim = encoded_nodes.size(2)gathering_index = node_index_to_pick[:, :, None].expand(batch_size, pomo_size, embedding_dim)# shape: (batch, pomo, embedding)picked_nodes = encoded_nodes.gather(dim=1, index=gathering_index)# shape: (batch, pomo, embedding)return picked_nodes
五、class CVRP_Encoder(nn.Module)
筆記:20250226-代碼筆記04-class CVRP_Encoder AND class EncoderLayer
六、class EncoderLayer(nn.Module)
筆記:20250226-代碼筆記04-class CVRP_Encoder AND class EncoderLayer
七、CVRP_Decoder(nn.Module)
筆記:20250226-代碼筆記05-class CVRP_Decoder
八、def reshape_by_heads(qkv, head_num)
函數功能
reshape_by_heads
函數的功能是將輸入的張量(如查詢 q
, 鍵 k
, 或值 v
)從一個緊湊的多頭結構 (batch, n, head_num * key_dim)
轉換為適合多頭注意力機制計算的結構 (batch, head_num, n, key_dim)
。
此操作將多個注意力頭的維度進行拆分,并將其調整為每個頭獨立計算的格式。
執行流程圖鏈接
函數代碼
def reshape_by_heads(qkv, head_num):# q.shape: (batch, n, head_num*key_dim) : n can be either 1 or PROBLEM_SIZEbatch_s = qkv.size(0)n = qkv.size(1)q_reshaped = qkv.reshape(batch_s, n, head_num, -1)# shape: (batch, n, head_num, key_dim)q_transposed = q_reshaped.transpose(1, 2)# shape: (batch, head_num, n, key_dim)return q_transposed
九、def multi_head_attention(q, k, v, rank2_ninf_mask=None, rank3_ninf_mask=None)
函數功能
multi_head_attention
函數的主要功能是實現 多頭注意力機制。該函數接收查詢(Q)、鍵(K)和值(V),并計算多頭注意力輸出。它通過計算查詢與鍵之間的相似度,生成加權值的結果,并結合所有頭的輸出生成最終的注意力表示。
執行流程圖鏈接
函數代碼
def multi_head_attention(q, k, v, rank2_ninf_mask=None, rank3_ninf_mask=None):# q shape: (batch, head_num, n, key_dim) : n can be either 1 or PROBLEM_SIZE# k,v shape: (batch, head_num, problem, key_dim)# rank2_ninf_mask.shape: (batch, problem)# rank3_ninf_mask.shape: (batch, group, problem)batch_s = q.size(0)head_num = q.size(1)n = q.size(2)key_dim = q.size(3)input_s = k.size(2)score = torch.matmul(q, k.transpose(2, 3))# shape: (batch, head_num, n, problem)score_scaled = score / torch.sqrt(torch.tensor(key_dim, dtype=torch.float))if rank2_ninf_mask is not None:score_scaled = score_scaled + rank2_ninf_mask[:, None, None, :].expand(batch_s, head_num, n, input_s)if rank3_ninf_mask is not None:score_scaled = score_scaled + rank3_ninf_mask[:, None, :, :].expand(batch_s, head_num, n, input_s)weights = nn.Softmax(dim=3)(score_scaled)# shape: (batch, head_num, n, problem)out = torch.matmul(weights, v)# shape: (batch, head_num, n, key_dim)out_transposed = out.transpose(1, 2)# shape: (batch, n, head_num, key_dim)out_concat = out_transposed.reshape(batch_s, n, head_num * key_dim)# shape: (batch, n, head_num*key_dim)return out_concat
十、class AddAndInstanceNormalization(nn.Module):init(self, **model_params)
函數功能
對輸入數據進行基于嵌入維度的批量標準化操作,從而使得模型在訓練過程中能夠更好地收斂和提高穩定性。
Batch Normalization (BN) 是什么?
Batch Normalization (BN) 是一種在訓練深度神經網絡時常用的技術,它的目的是提高網絡的訓練速度、穩定性,并幫助避免梯度消失或爆炸問題。
Batch Normalization 操作的核心思想是對每一層的輸入數據進行標準化,使得輸入數據的均值接近 0,方差接近 1。這樣可以避免激活函數輸出過大或過小的問題,幫助優化過程更加穩定。
Batch Normalization 的具體操作
1. 計算均值和方差
對于一批輸入樣本(batch),在每個特征維度上計算均值和方差:
-
均值:
μ B = 1 m ∑ i = 1 m x i \mu_B = \frac{1}{m} \sum_{i=1}^{m} x_i μB?=m1?∑i=1m?xi? -
方差:
σ B 2 = 1 m ∑ i = 1 m ( x i ? μ B ) 2 \sigma_B^2 = \frac{1}{m} \sum_{i=1}^{m} (x_i - \mu_B)^2 σB2?=m1?∑i=1m?(xi??μB?)2
其中, m m m 是一個批次中的樣本數, x i x_i xi?是每個樣本的輸入值。
2. 標準化
使用計算出的均值和方差將輸入數據標準化,使得每個特征的均值為 0,方差為 1:
x ^ i = x i ? μ B σ B 2 + ? \hat{x}_i = \frac{x_i - \mu_B}{\sqrt{\sigma_B^2 + \epsilon}} x^i?=σB2?+??xi??μB??
這里 ? \epsilon ?是一個非常小的數值,用來防止除以零的情況。
3. 縮放和平移
由于標準化可能會影響到模型的表達能力,Batch Normalization 還會引入兩個可學習的參數 γ \gamma γ(縮放參數)和 β \beta β(平移參數),它們允許模型重新調整標準化后的數據:
y i = γ x ^ i + β y_i = \gamma \hat{x}_i + \beta yi?=γx^i?+β
其中, γ \gamma γ 和 β \beta β是學習的參數,通常會通過反向傳播進行優化。
Batch Normalization 的優勢
- 加速訓練:Batch Normalization 通過減少輸入數據的偏移(internal covariate shift),使得每一層的輸入分布更加穩定,從而加速了網絡的訓練過程。
- 提高穩定性:由于它通過標準化輸入避免了梯度爆炸或梯度消失問題,使得訓練更加穩定。
- 緩解過擬合:在一些情況下,Batch Normalization 也可以起到正則化的作用,減少了模型對訓練數據的過擬合。
- 減少對初始化的依賴:Batch Normalization 可以在一定程度上緩解對權重初始化的敏感性。
函數代碼
def __init__(self, **model_params):super().__init__()embedding_dim = model_params['embedding_dim']self.norm = nn.InstanceNorm1d(embedding_dim, affine=True, track_running_stats=False)
十一、class AddAndInstanceNormalization(nn.Module):forward(self, input1, input2)
函數功能
forward 方法,它執行了加法和批量歸一化操作。
forward
方法的主要功能是:
- 加法操作:將兩個輸入張量
input1
和input2
相加。 - 批量歸一化:將加法結果進行批量歸一化(Batch Normalization),標準化其特征維度。
- 形狀恢復:批量歸一化后,將張量的形狀恢復到原來的維度。
執行流程:
函數代碼
- 獲取輸入張量的維度:
batch_s = input1.size(0)
problem_s = input1.size(1)
embedding_dim = input1.size(2)
batch_s
表示批次大小,problem_s
表示問題的大小(特征的數量),embedding_dim
表示嵌入的維度。- 這些維度來自輸入張量
input1
,并且假設input2
具有相同的形狀。
- 加法操作:
added = input1 + input2
- 對
input1
和nput2
進行逐元素加法。此時,added
張量的形狀與input1
和input2
相同,仍為(batch_s, problem_s, embedding_dim)
。
- 批量歸一化:
normalized = self.norm_by_EMB(added.reshape(batch_s * problem_s, embedding_dim))
- 將
added
張量的形狀重塑為(batch_s * problem_s, embedding_dim)
,將批次維度和問題維度合并,以便進行批量歸一化操作。這樣就對每個特征維度(embedding_dim)
做了批量標準化。 self.norm_by_EMB
是一個BatchNorm1d
層,它會對每個特征維度執行標準化,使得每個特征的均值接近 0,方差接近 1。
- 恢復形狀:
back_trans = normalized.reshape(batch_s, problem_s, embedding_dim)
- 批量歸一化后,將
normalized
張量的形狀恢復回(batch_s, problem_s, embedding_dim)
,即恢復原本的輸入形狀。
- 返回結果:
return back_trans
- 返回經過批量歸一化的張量
back_trans
,它的形狀與輸入相同,并且每個特征維度已經經過標準化。
def forward(self, input1, input2):# input.shape: (batch, problem, embedding)added = input1 + input2# shape: (batch, problem, embedding)transposed = added.transpose(1, 2)# shape: (batch, embedding, problem)normalized = self.norm(transposed)# shape: (batch, embedding, problem)back_trans = normalized.transpose(1, 2)# shape: (batch, problem, embedding)return back_trans
十二、class FeedForward(nn.Module):init(self, **model_params)
函數功能
FeedForward
的類,它是一個典型的前饋神經網絡(Feedforward Neural Network)模塊,實現了一個簡單的兩層神經網絡。
__init__
方法是類的構造函數,用來初始化網絡的層和超參數。embedding_dim
和ff_hidden_dim
是通過model_params
傳遞的超參數,分別表示嵌入維度和前饋神經網絡隱藏層的維度。embedding_dim
是輸入和輸出的維度。ff_hidden_dim
是隱藏層的維度,即在網絡的中間層。
self.W1
和self.W2
是兩個全連接層(nn.Linear
):self.W1
將輸入的embedding_dim
維度的向量轉換為ff_hidden_dim
維度的向量。self.W2
將ff_hidden_dim
維度的向量轉換回embedding_dim
維度的向量。
函數代碼
def __init__(self, **model_params):super().__init__()embedding_dim = model_params['embedding_dim']ff_hidden_dim = model_params['ff_hidden_dim']self.W1 = nn.Linear(embedding_dim, ff_hidden_dim)self.W2 = nn.Linear(ff_hidden_dim, embedding_dim)
十三、class FeedForward(nn.Module):forward(self, input1)
函數功能
forward
方法定義了數據流通過網絡的方式,也就是前向傳播過程。- 輸入
input1
的形狀為(batch, problem, embedding)
,即批次大小batch
、問題數量problem
和每個問題的嵌入維度embedding
。 - 執行的步驟如下:
-
1.第一層線性變換(
self.W1
):輸入通過self.W1
進行線性變換,將輸入的嵌入維度轉換為隱藏層的維度(ff_hidden_dim
)。變換公式為:
其中x
是輸入,W1
是權重矩陣,b1
是偏置。 -
2.激活函數(ReLU):對
self.W1
的輸出應用 ReLU 激活函數,ReLU 將負值歸零,保留正值。公式為:
-
3.第二層線性變換(
self.W2
):通過self.W2
進行線性變換,將隱藏層的輸出轉換回原始的嵌入維度(embedding_dim
)。變換公式為:
-
- 最終輸出是經過兩層線性變換和 ReLU 激活函數處理的結果,形狀仍然是 (batch, problem, embedding)。
函數代碼
def forward(self, input1):# input.shape: (batch, problem, embedding)return self.W2(F.relu(self.W1(input1)))
附錄
代碼(全)
import torch
import torch.nn as nn
import torch.nn.functional as Fclass CVRPModel(nn.Module):def __init__(self, **model_params):super().__init__()self.model_params = model_paramsself.encoder = CVRP_Encoder(**model_params)self.decoder = CVRP_Decoder(**model_params)self.encoded_nodes = None# shape: (batch, problem+1, EMBEDDING_DIM)def pre_forward(self, reset_state):depot_xy = reset_state.depot_xy# shape: (batch, 1, 2)node_xy = reset_state.node_xy# shape: (batch, problem, 2)node_demand = reset_state.node_demand# shape: (batch, problem)node_xy_demand = torch.cat((node_xy, node_demand[:, :, None]), dim=2)# shape: (batch, problem, 3)encoded_nodes = self.encoder(depot_xy, node_xy_demand)# shape: (batch, problem+1, embedding)_ = self.decoder.regret_embedding[None, None, :].expand(encoded_nodes.size(0), 1,self.decoder.regret_embedding.size(-1))# _ 的shape:(batch,1,embedding)self.encoded_nodes = torch.cat((encoded_nodes, _), dim=1)# self.encoded_nodes的shape:(batch,problem+2,embedding)self.decoder.set_kv(self.encoded_nodes)def forward(self, state):batch_size = state.BATCH_IDX.size(0)pomo_size = state.BATCH_IDX.size(1)if state.selected_count == 0: # First Move, depotselected = torch.zeros(size=(batch_size, pomo_size), dtype=torch.long)prob = torch.ones(size=(batch_size, pomo_size))# # Use Averaged encoded nodes for decoder input_1# encoded_nodes_mean = self.encoded_nodes.mean(dim=1, keepdim=True)# # shape: (batch, 1, embedding)# self.decoder.set_q1(encoded_nodes_mean)# Use encoded_depot for decoder input_2encoded_first_node = self.encoded_nodes[:, [0], :]# shape: (batch, 1, embedding)self.decoder.set_q2(encoded_first_node)elif state.selected_count == 1: # Second Move, POMOselected = torch.arange(start=1, end=pomo_size+1)[None, :].expand(batch_size, pomo_size)prob = torch.ones(size=(batch_size, pomo_size))else:encoded_last_node = _get_encoding(self.encoded_nodes, state.current_node)# shape: (batch, pomo, embedding)probs = self.decoder(encoded_last_node, state.load, ninf_mask=state.ninf_mask)# shape: (batch, pomo, problem+1)if self.training or self.model_params['eval_type'] == 'softmax':while True: # to fix pytorch.multinomial bug on selecting 0 probability elementswith torch.no_grad():selected = probs.reshape(batch_size * pomo_size, -1).multinomial(1) \.squeeze(dim=1).reshape(batch_size, pomo_size)# shape: (batch, pomo)prob = probs[state.BATCH_IDX, state.POMO_IDX, selected].reshape(batch_size, pomo_size)# shape: (batch, pomo)if (prob != 0).all():breakelse:probs=probs[:,:,:-1]selected = probs.argmax(dim=2)# shape: (batch, pomo)prob = None # value not needed. Can be anything.return selected, probdef _get_encoding(encoded_nodes, node_index_to_pick):# encoded_nodes.shape: (batch, problem, embedding)# node_index_to_pick.shape: (batch, pomo)batch_size = node_index_to_pick.size(0)pomo_size = node_index_to_pick.size(1)embedding_dim = encoded_nodes.size(2)gathering_index = node_index_to_pick[:, :, None].expand(batch_size, pomo_size, embedding_dim)# shape: (batch, pomo, embedding)picked_nodes = encoded_nodes.gather(dim=1, index=gathering_index)# shape: (batch, pomo, embedding)return picked_nodes########################################
# ENCODER
########################################class CVRP_Encoder(nn.Module):def __init__(self, **model_params):super().__init__()self.model_params = model_paramsembedding_dim = self.model_params['embedding_dim']encoder_layer_num = self.model_params['encoder_layer_num']self.embedding_depot = nn.Linear(2, embedding_dim)self.embedding_node = nn.Linear(3, embedding_dim)self.layers = nn.ModuleList([EncoderLayer(**model_params) for _ in range(encoder_layer_num)])def forward(self, depot_xy, node_xy_demand):# depot_xy.shape: (batch, 1, 2)# node_xy_demand.shape: (batch, problem, 3)embedded_depot = self.embedding_depot(depot_xy)# shape: (batch, 1, embedding)embedded_node = self.embedding_node(node_xy_demand)# shape: (batch, problem, embedding)out = torch.cat((embedded_depot, embedded_node), dim=1)# shape: (batch, problem+1, embedding)for layer in self.layers:out = layer(out)return out# shape: (batch, problem+1, embedding)class EncoderLayer(nn.Module):def __init__(self, **model_params):super().__init__()self.model_params = model_paramsembedding_dim = self.model_params['embedding_dim']head_num = self.model_params['head_num']qkv_dim = self.model_params['qkv_dim']self.Wq = nn.Linear(embedding_dim, head_num * qkv_dim, bias=False)self.Wk = nn.Linear(embedding_dim, head_num * qkv_dim, bias=False)self.Wv = nn.Linear(embedding_dim, head_num * qkv_dim, bias=False)self.multi_head_combine = nn.Linear(head_num * qkv_dim, embedding_dim)self.add_n_normalization_1 = AddAndInstanceNormalization(**model_params)self.feed_forward = FeedForward(**model_params)self.add_n_normalization_2 = AddAndInstanceNormalization(**model_params)def forward(self, input1):# input1.shape: (batch, problem+1, embedding)head_num = self.model_params['head_num']q = reshape_by_heads(self.Wq(input1), head_num=head_num)k = reshape_by_heads(self.Wk(input1), head_num=head_num)v = reshape_by_heads(self.Wv(input1), head_num=head_num)# qkv shape: (batch, head_num, problem, qkv_dim)out_concat = multi_head_attention(q, k, v)# shape: (batch, problem, head_num*qkv_dim)multi_head_out = self.multi_head_combine(out_concat)# shape: (batch, problem, embedding)out1 = self.add_n_normalization_1(input1, multi_head_out)out2 = self.feed_forward(out1)out3 = self.add_n_normalization_2(out1, out2)return out3# shape: (batch, problem, embedding)########################################
# DECODER
########################################class CVRP_Decoder(nn.Module):def __init__(self, **model_params):super().__init__()self.model_params = model_paramsembedding_dim = self.model_params['embedding_dim']head_num = self.model_params['head_num']qkv_dim = self.model_params['qkv_dim']# self.Wq_1 = nn.Linear(embedding_dim, head_num * qkv_dim, bias=False)self.Wq_2 = nn.Linear(embedding_dim, head_num * qkv_dim, bias=False)self.Wq_last = nn.Linear(embedding_dim+1, head_num * qkv_dim, bias=False)self.Wk = nn.Linear(embedding_dim, head_num * qkv_dim, bias=False)self.Wv = nn.Linear(embedding_dim, head_num * qkv_dim, bias=False)self.regret_embedding = nn.Parameter(torch.Tensor(embedding_dim))self.regret_embedding.data.uniform_(-1, 1)self.multi_head_combine = nn.Linear(head_num * qkv_dim, embedding_dim)self.k = None # saved key, for multi-head attentionself.v = None # saved value, for multi-head_attentionself.single_head_key = None # saved, for single-head attention# self.q1 = None # saved q1, for multi-head attentionself.q2 = None # saved q2, for multi-head attentiondef set_kv(self, encoded_nodes):# encoded_nodes.shape: (batch, problem+1, embedding)head_num = self.model_params['head_num']self.k = reshape_by_heads(self.Wk(encoded_nodes), head_num=head_num)self.v = reshape_by_heads(self.Wv(encoded_nodes), head_num=head_num)# shape: (batch, head_num, problem+1, qkv_dim)self.single_head_key = encoded_nodes.transpose(1, 2)# shape: (batch, embedding, problem+1)def set_q1(self, encoded_q1):# encoded_q.shape: (batch, n, embedding) # n can be 1 or pomohead_num = self.model_params['head_num']self.q1 = reshape_by_heads(self.Wq_1(encoded_q1), head_num=head_num)# shape: (batch, head_num, n, qkv_dim)def set_q2(self, encoded_q2):# encoded_q.shape: (batch, n, embedding) # n can be 1 or pomohead_num = self.model_params['head_num']self.q2 = reshape_by_heads(self.Wq_2(encoded_q2), head_num=head_num)# shape: (batch, head_num, n, qkv_dim)def forward(self, encoded_last_node, load, ninf_mask):# encoded_last_node.shape: (batch, pomo, embedding)# load.shape: (batch, pomo)# ninf_mask.shape: (batch, pomo, problem)head_num = self.model_params['head_num']# Multi-Head Attention#######################################################input_cat = torch.cat((encoded_last_node, load[:, :, None]), dim=2)# shape = (batch, group, EMBEDDING_DIM+1)q_last = reshape_by_heads(self.Wq_last(input_cat), head_num=head_num)# shape: (batch, head_num, pomo, qkv_dim)# q = self.q1 + self.q2 + q_last# # shape: (batch, head_num, pomo, qkv_dim)# q = q_last# shape: (batch, head_num, pomo, qkv_dim)q = self.q2 + q_last# # shape: (batch, head_num, pomo, qkv_dim)out_concat = multi_head_attention(q, self.k, self.v, rank3_ninf_mask=ninf_mask)# shape: (batch, pomo, head_num*qkv_dim)mh_atten_out = self.multi_head_combine(out_concat)# shape: (batch, pomo, embedding)# Single-Head Attention, for probability calculation#######################################################score = torch.matmul(mh_atten_out, self.single_head_key)# shape: (batch, pomo, problem)sqrt_embedding_dim = self.model_params['sqrt_embedding_dim']logit_clipping = self.model_params['logit_clipping']score_scaled = score / sqrt_embedding_dim# shape: (batch, pomo, problem)score_clipped = logit_clipping * torch.tanh(score_scaled)score_masked = score_clipped + ninf_maskprobs = F.softmax(score_masked, dim=2)# shape: (batch, pomo, problem)return probs########################################
# NN SUB CLASS / FUNCTIONS
########################################def reshape_by_heads(qkv, head_num):# q.shape: (batch, n, head_num*key_dim) : n can be either 1 or PROBLEM_SIZEbatch_s = qkv.size(0)n = qkv.size(1)q_reshaped = qkv.reshape(batch_s, n, head_num, -1)# shape: (batch, n, head_num, key_dim)q_transposed = q_reshaped.transpose(1, 2)# shape: (batch, head_num, n, key_dim)return q_transposeddef multi_head_attention(q, k, v, rank2_ninf_mask=None, rank3_ninf_mask=None):# q shape: (batch, head_num, n, key_dim) : n can be either 1 or PROBLEM_SIZE# k,v shape: (batch, head_num, problem, key_dim)# rank2_ninf_mask.shape: (batch, problem)# rank3_ninf_mask.shape: (batch, group, problem)batch_s = q.size(0)head_num = q.size(1)n = q.size(2)key_dim = q.size(3)input_s = k.size(2)score = torch.matmul(q, k.transpose(2, 3))# shape: (batch, head_num, n, problem)score_scaled = score / torch.sqrt(torch.tensor(key_dim, dtype=torch.float))if rank2_ninf_mask is not None:score_scaled = score_scaled + rank2_ninf_mask[:, None, None, :].expand(batch_s, head_num, n, input_s)if rank3_ninf_mask is not None:score_scaled = score_scaled + rank3_ninf_mask[:, None, :, :].expand(batch_s, head_num, n, input_s)weights = nn.Softmax(dim=3)(score_scaled)# shape: (batch, head_num, n, problem)out = torch.matmul(weights, v)# shape: (batch, head_num, n, key_dim)out_transposed = out.transpose(1, 2)# shape: (batch, n, head_num, key_dim)out_concat = out_transposed.reshape(batch_s, n, head_num * key_dim)# shape: (batch, n, head_num*key_dim)return out_concatclass AddAndInstanceNormalization(nn.Module):def __init__(self, **model_params):super().__init__()embedding_dim = model_params['embedding_dim']self.norm = nn.InstanceNorm1d(embedding_dim, affine=True, track_running_stats=False)def forward(self, input1, input2):# input.shape: (batch, problem, embedding)added = input1 + input2# shape: (batch, problem, embedding)transposed = added.transpose(1, 2)# shape: (batch, embedding, problem)normalized = self.norm(transposed)# shape: (batch, embedding, problem)back_trans = normalized.transpose(1, 2)# shape: (batch, problem, embedding)return back_transclass AddAndBatchNormalization(nn.Module):def __init__(self, **model_params):super().__init__()embedding_dim = model_params['embedding_dim']self.norm_by_EMB = nn.BatchNorm1d(embedding_dim, affine=True)# 'Funny' Batch_Norm, as it will normalized by EMB dimdef forward(self, input1, input2):# input.shape: (batch, problem, embedding)batch_s = input1.size(0)problem_s = input1.size(1)embedding_dim = input1.size(2)added = input1 + input2normalized = self.norm_by_EMB(added.reshape(batch_s * problem_s, embedding_dim))back_trans = normalized.reshape(batch_s, problem_s, embedding_dim)return back_transclass FeedForward(nn.Module):def __init__(self, **model_params):super().__init__()embedding_dim = model_params['embedding_dim']ff_hidden_dim = model_params['ff_hidden_dim']self.W1 = nn.Linear(embedding_dim, ff_hidden_dim)self.W2 = nn.Linear(ff_hidden_dim, embedding_dim)def forward(self, input1):# input.shape: (batch, problem, embedding)return self.W2(F.relu(self.W1(input1)))