?前言の碎碎念
由于我做的模仿學習,可能由于沒有完全模仿,可以說效果很爛……后來用強化學習優化,這個倒是不用自己做數據集了,為方便大家只搞代碼,這里只說這部分的經歷和方法。
實踐基礎介紹
1-動作
先介紹一個強化學習中的“行為空間”的概念。
玩過游戲機的都知道,游戲按鍵就那幾個,頂多加上組合,所以你玩游戲可以操作的空間組合就那么大,你與游戲世界連接的輸入方式只有這幾個按鈕。
比如坦克大戰,你操縱的坦克只能上下左右和攻擊,那你只能在這幾個里進行操作,就好比你的手柄,你不能做出手柄按鈕以外的操作來作為游戲的輸入。
很自然的,智能體能做的只有行為空間中的行為,如果根據概率來選擇行為來行動,是很自然的一種方式。
2-狀態
我們玩游戲還得看游戲畫面才能采取下一個決策,這就是反饋給智能體的狀態。在實際工程中,需要將狀態中的各種要素摘取出來(比如坦克大戰,需要知道敵人的位置,自己的位置,哪里有墻,其他的信息就無所謂了)。
3-獎懲
我們打游戲,如果失誤掉血了,肯定以后就會盡量避免這個操作;如果試出來的組合絕招管用,那以后肯定要記下來好好利用。這就是獎勵和懲罰。衡量該獎還是懲罰的函數就叫獎勵函數。
4-智能體
就類比成玩游戲的人,不同人玩游戲的技術進步也不一樣,這就涉及到神經網絡設計,或者簡單的函數套路策略。
5-環境
游戲本身,畢竟玩坦克大戰的經驗在玩寶可夢的時候用不上,所以環境是根本。
范式
環境初始化:定義游戲規則和狀態表示。
智能體設計:選擇合適的算法(如 DQN),設計要被優化的目標神經網絡。
訓練循環:
初始化環境,獲取初始狀態。
選擇動作(epsilon-greedy策略)。
執行動作,獲取新的狀態、獎勵和是否終止。
存儲經驗到回放緩存。
從回放緩存中采樣一批數據,更新目標網絡參數。
更新目標網絡(每隔一定頻率步)。
保存模型(每隔一定局數,因為一局的獎勵不平均,所以以幾局為單位)。
評估:定期評估智能體的性能,記錄關鍵指標。(每隔一定輪數評估智能體,評估時禁止對目標網絡的改動,輸出模型得到的動作,然后評估)
原理的簡要介紹
1. 強化學習的基本概念
智能體(Agent):學習者和決策者,負責根據當前狀態選擇動作。
環境(Environment):智能體所處的外部世界,接收智能體的動作并返回新的狀態和獎勵。
狀態(State):環境的當前狀況,描述了智能體所處的環境信息。
動作(Action):智能體在某個狀態下可以采取的行為。
獎勵(Reward):環境對智能體動作的反饋,通常是一個標量值,用于衡量動作的好壞。
策略(Policy):智能體的行為規則,決定了在給定狀態下選擇哪個動作。
價值函數(Value Function):衡量某個狀態或動作的價值,通常用 V(s) 或 Q(s,a) 表示。
回報(Return):從某個時間步開始到結束的所有獎勵的累積和,通常用 Gt? 表示。
2. 馬爾科夫決策過程(Markov Decision Process, MDP)
強化學習的核心是馬爾科夫決策過程,它是一個數學框架,用于描述智能體與環境的交互過程。
馬爾科夫性(Markov Property):當前狀態包含了所有歷史信息,即未來的狀態只依賴于當前狀態,而與之前的狀態無關。
P(st+1?∣st?)=P(st+1?∣st?,st?1?,…,s1?)MDP 的組成:
狀態集合 S
動作集合 A
轉移概率 P(s′∣s,a):在狀態 s 下采取動作 a 轉移到狀態 s′ 的概率。
獎勵函數 R(s,a,s′):在狀態 s 下采取動作 a 轉移到狀態 s′ 時獲得的獎勵。
3. 貝爾曼方程(Bellman Equation)
貝爾曼方程是強化學習中的核心方程,用于遞歸地定義價值函數。
貝爾曼期望方程(Bellman Expectation Equation):
V(s)=a∈A∑?π(a∣s)[R(s,a)+γs′∈S∑?P(s′∣s,a)V(s′)]Q(s,a)=R(s,a)+γs′∈S∑?P(s′∣s,a)a′∈A∑?π(a′∣s′)Q(s′,a′)其中,π(a∣s) 是策略,γ 是折扣因子(0≤γ<1)。
貝爾曼最優方程(Bellman Optimality Equation):
V?(s)=a∈Amax?[R(s,a)+γs′∈S∑?P(s′∣s,a)V?(s′)]Q?(s,a)=R(s,a)+γs′∈S∑?P(s′∣s,a)a′∈Amax?Q?(s′,a′)這些方程用于定義最優價值函數和最優策略。
4. 價值函數與策略
價值函數(Value Function):
狀態價值函數 V(s):在策略 π 下,從狀態 s 開始的期望回報。
動作價值函數 Q(s,a):在策略 π 下,從狀態 s 開始并采取動作 a 的期望回報。
策略(Policy):
確定性策略:在每個狀態下選擇一個固定的動作。
隨機性策略:在每個狀態下根據概率分布選擇動作。
5. 策略迭代與價值迭代
策略迭代(Policy Iteration):
策略評估(Policy Evaluation):計算當前策略的價值函數。
策略改進(Policy Improvement):根據價值函數更新策略。
交替進行策略評估和策略改進,直到策略收斂。
價值迭代(Value Iteration):
直接求解貝爾曼最優方程,更新價值函數,直到收斂。
從初始價值函數開始,逐步逼近最優價值函數。
6. 探索與利用(Exploration vs. Exploitation)
探索(Exploration):嘗試新的動作以獲取更多信息。
利用(Exploitation):使用當前已知的最佳動作以獲取最大回報。
平衡方法:
?-貪心策略:以概率 ? 隨機選擇動作,以概率 1?? 選擇當前最優動作。
軟最大化策略(Softmax Policy):根據動作的價值以概率分布選擇動作。
7. 深度強化學習(Deep Reinforcement Learning)
將深度學習與強化學習結合,使用神經網絡來近似價值函數或策略函數。
DQN(Deep Q-Network):
使用神經網絡近似 Q(s,a)。
引入經驗回放緩存(Experience Replay)和目標網絡(Target Network)以提高訓練穩定性。
PPO(Proximal Policy Optimization):
基于策略梯度的方法,通過截斷概率比來限制策略更新的幅度,提高訓練的穩定性和效率。
8. 必須補充的基礎
概率論與數理統計:理解隨機過程、概率分布、期望值等概念。
線性代數:掌握向量、矩陣運算,用于表示狀態和動作。
動態規劃:理解貝爾曼方程和策略迭代、價值迭代等概念。
神經網絡:了解神經網絡的基本結構和訓練方法,用于近似價值函數或策略函數。
優化算法:掌握梯度下降、Adam 等優化算法,用于更新網絡參數。
總結
強化學習通過智能體與環境的交互來學習最優策略。其核心包括馬爾科夫決策過程、貝爾曼方程、價值函數、策略迭代和價值迭代等。深度強化學習結合了深度學習技術,進一步提升了強化學習的性能。理解這些基礎知識是掌握強化學習的關鍵。
最后,代碼。
由于大家沒有數據集和模仿學習的部分的神經網絡,這里放一個基本的梯度下降來優化目標網絡的強化學習的樣例代碼,需要裝一些包。
這部分由于設計的依賴比較多,都放出來放不下,這部分只做參考
1-環境準備
def init_wargame(redAgent , blueAgent):"""run demo in single agent mode"""print("running in single agent mode...")# instantiate agents and envred1 = redAgent #MyAgent1()#自己的agent,因為樣本數據都來自紅方blue1 = blueAgentenv1 = TrainEnv()begin = time.time()# get data ready, data can from files, web, or any other sourceswith open(gopt.scenpath+"201033029601.json", encoding='utf8') as f:scenario_data = json.load(f)with open(gopt.mappath+"basic.json", encoding='utf8') as f:basic_data = json.load(f)with open(gopt.mappath+"cost.pickle", 'rb') as file:cost_data = pickle.load(file)see_data = numpy.load(gopt.mappath+"see.npz")['data']# varialbe to build replayall_states = []# player setup infoplayer_info = [{"seat": 1,"faction": 0,"role": 1,"user_name": myAgentname,"user_id": 0},{"seat": 11,"faction": 1,"role": 1,"user_name": "demo","user_id": 0}]# env setup infoenv_step_info = {"scenario_data": scenario_data,"basic_data": basic_data,"cost_data": cost_data,"see_data": see_data,"player_info": player_info}# setup env - 初始化環境并獲取指針state = env1.setup(env_step_info)import copyall_states.append(copy.deepcopy(state[GREEN]))#初始幀print("Environment is ready.")# setup AIs - 注冊AI信息red1.setup({"scenario": scenario_data,"basic_data": basic_data,"cost_data": cost_data,"see_data": see_data,"seat": 1,"faction": 0,"role": 0,"user_name": myAgentname,"user_id": 0,"state": state,})blue1.setup({"scenario": scenario_data,"basic_data": basic_data,"cost_data": cost_data,"see_data": see_data,"seat": 11,"faction": 1,"role": 0,"user_name": "demo","user_id": 0,"state": state,})print("agents are ready.")return red1, blue1, env1, state, all_states, begin
2-訓練腳本
def train():"""智能體-主訓練函數"""# 初始化環境和智能體# 日志記錄writer = create_writer(LOG_DIR)# TensorBoardbest_reward = -float('inf')for episode in tqdm(range(EPOCH), desc="Training Episodes"):print(f"Episode {episode + 1}/{EPOCH} started...")red1, blue1, env1, state, all_states, begin = init_wargame(MyAgent56(), Agent())rewards = []losses = []# ====訓練階段episode_reward = 0episode_loss = 0step = 0# loop until the end of gameprint("steping")done = False#傳入actions-處理step-記錄all_states循環while not done :actions = []actions += red1.step(state[RED])actions += blue1.step(state[BLUE])state, done = env1.step(actions) #更新環境狀態print("step_", state[-1]["time"]["cur_step"])print("stage", state[-1]["time"]["stage"])all_states.append(copy.deepcopy(state[GREEN]))#原因可能是 state 和 all_states 中的元素共享相同的引用# # 方便驗證-cq20250421# if state[-1]["time"]["cur_step"] >= 29:# breakepisode_reward += red1.trainer.outrewardepisode_loss += red1.trainer.outlossstep += 1if done:break# 一局結束了,如果你的任務是分局進行的(例如,一局有1800幀step記錄了),并且每局的獎勵是獨立計算的,那么每局結束后保存一次模型是一個合理的選擇。# 記錄訓練指標rewards.append(episode_reward)losses.append(episode_loss / (step + 1))writer.add_scalar("Train/Reward", episode_reward, episode)writer.add_scalar("Train/Loss", episode_loss / (step + 1), episode)mean_eval_reward = np.mean(rewards)writer.add_scalar("Eval/Reward", mean_eval_reward , episode)# 保存模型的策略if episode_reward > best_reward:best_reward = episode_rewardred1.trainer.save(gopt.modelpath+'best_rl_model.pth') # 保存最佳模型print(f"回合:{episode}/{EPOCH},獎勵:{episode_reward:.2f}, \評估獎勵:{mean_eval_reward:.2f},最佳評估獎勵:{best_reward:.2f}")# if episode % save_interval == 0:# agent.save_model(f'model_episode_{episode}.pth') # 定期保存# 探索率衰減red1.trainer.epsilon = max(red1.trainer.epsilon * 0.995, red1.trainer.epsilon_min)# # ====評估階段,每5局評估一次# if episode % 5 == 0:# # 在訓練過程中定期凍結策略(暫停梯度更新),讓智能體在固定環境中運行多個回合,計算平均獎勵以衡量當前策略的優劣。# # 保存最佳模型# if mean_eval_reward > best_reward:# best_reward = mean_eval_reward# save_model_lite(red1.trainer.policy_net, f"{gopt.rlmodelsavepath}best_model.pth")# print(f"回合:{episode}/{1800},獎勵:{episode_reward:.2f}, \# 評估獎勵:{mean_eval_reward:.2f},最佳評估獎勵:{best_reward:.2f}")print(f"Total time: {time.time() - begin:.3f}s")begin_formatted = time.strftime("%Y%m%d%H%M%S", time.localtime(begin)) #年月日時分秒的字符串# 舊的連續方式zip_name = gopt.logdir+"replays/"+f"replay_{begin_formatted}_old.zip"with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED, compresslevel=9) as z:z.writestr(f"replay_{begin_formatted}.json", json.dumps(all_states, ensure_ascii=False, indent=None, separators=(',', ':')).encode('gbk'))print(f"Replay saved to {zip_name}")# 添加資源釋放# env1.reset()# red1.reset()# blue1.reset()del red1, blue1, env1, state, all_statesif torch.cuda.is_available():torch.cuda.empty_cache()# 訓練結束writer.close()print(f"Training completed. Best eval reward: {best_reward:.2f}")
3-強化學習部分(優化的目標網絡是self.policy_net)
def get_pred(policy_net ,obs_tensor: torch.Tensor) -> List[int]:"""預測動作的函數"""# 這里可以直接調用模型進行預測# 假設模型輸出的是一個概率分布with torch.no_grad():probs = policy_net(obs_tensor) # 輸出概率分布[1, 3, 8]# 在最后一個維度上取【概率最大】的類別索引max_indices = torch.argmax(probs, dim=-1) # 結果形狀為 [1, 3]# 如果需要去掉第一個維度(batch 維度),可以使用 squeezemax_indices = max_indices.squeeze(0) # 結果形狀為 [3]# 將 max_indices 轉換為整數列表actionLists = max_indices.tolist()return actionLists# ====可插拔強化學習類
class RLTrainer:def __init__(self, policy_net: nn.Module, lr: float = 1e-3):# 加載模仿學習的預訓練模型self.policy_net = policy_netself.target_update_freq = 20 # 目標網絡更新頻率(TARGET_UPDATE)# self.batch_size = 32 # 訓練批量大小(BATCH_SIZE)self.epsilon_min = 0.1 # 最小探索率self.epsilon = 0.1 # 探索率self.gamma=0.99 # 折扣因子self.step_counter = 0 # 全局步數計數器,用于調試和監控訓練進度self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)self.memory = [] # 簡單 bufferself.outreward = 0 # 初始化輸出獎勵(對外可獲取)self.outloss = 0 # 初始化輸出損失(對外可獲取)def select_action(self, obs_tensor: torch.Tensor) -> int:actions = get_pred(self.policy_net, obs_tensor)return actionsdef store_transition(self, obs, action, reward, next_obs): if len(self.memory) >= BUFFERSIZE:self.memory.pop(0) # 移除最舊的經驗self.memory.append((obs, action, reward, next_obs))def loss_fn(self, obs_batch_dict, next_obs_batch_dict, action_batch, reward_batch):# 計算損失函數的示例# 這里可以使用 PPO 或其他算法的損失函數# 這里只是一個簡單的示例# 將輸入數據移動到模型所在的設備# device = next(self.policy_net.parameters()).device # 獲取模型所在的設備action_batch.to(device) # 將 action_batch 移動到同一設備reward_batch = reward_batch.to(device) # 將 reward_batch 移動到同一設備i = 'PolicyGradient'if i == 'PolicyGradient':# 策略梯度 log_probs = torch.log(self.policy_net(obs_batch_dict))log_probs = log_probs.squeeze(0) # 去掉第 0 維,形狀變為 (3, 8)action_batch = action_batch.squeeze(0) # 去掉第 0 維,形狀變為 (3,)selected_log_probs = log_probs[range(len(action_batch)), action_batch]loss = -torch.mean(selected_log_probs * reward_batch) # 計算策略梯度損失elif i == "DQN":if next_obs_batch_dict is None:raise ValueError("next_obs_batch_dict must be provided for DQN loss calculation")# 假設有 Q 網絡 q_net 和目標 Q 網絡 target_q_net if not hasattr(self, 'q_net'): # 首次調用時初始化self.q_net = Noneq_values = self.q_net(obs_batch_dict)next_q_values = self.target_q_net(next_obs_batch_dict).max(dim=1)[0].detach()target_q_values = reward_batch + self.gamma * next_q_valuesloss = torch.mean((q_values[range(len(action_batch)), action_batch] - target_q_values) ** 2)else:raise ValueError("Unsupported loss function type")return lossdef train_step(self ):# 經驗回放訓練if not self.memory:return# 簡單策略梯度損失示意(可替換成 PPO)obs_batch, action_batch, reward_batch, _ = zip(*self.memory) # `zip` 函數:將多個可迭代對象的對應元素打包成元組。` 操作符*:將列表解包為獨立的參數(即拆開 `self.memory` 的外層列表)。# 將 obs_batch經過memory后會變成元組,tmd還得轉回來 -cq20250522obs_batch_temp = obs_batch[-1]obs_batch_dict = {"game_stats": obs_batch_temp["game_stats"],"bop_features": obs_batch_temp["bop_features"],"spatial_features": obs_batch_temp["spatial_features"],"bop_embeddings": obs_batch_temp["bop_embeddings"]}if len(self.memory) >= 20: # 使用len()獲取列表長度 obs_batch_temp = obs_batch[-20]else:obs_batch_temp = obs_batch[-1] # 如果只有一條記錄,使用最后一條記錄next_obs_batch_dict = {"game_stats": obs_batch_temp["game_stats"],"bop_features": obs_batch_temp["bop_features"],"spatial_features": obs_batch_temp["spatial_features"],"bop_embeddings": obs_batch_temp["bop_embeddings"]}action_batch = torch.tensor(action_batch)reward_batch = torch.tensor(reward_batch, dtype=torch.float32)# 構建loss --可替換的策略優化器(PPO, A2C, BCQ, DQN)loss = self.loss_fn(obs_batch_dict, next_obs_batch_dict, action_batch, reward_batch)# 更新參數self.optimizer.zero_grad()loss.backward()self.optimizer.step()self.memory.clear()# 監控訓練進度self.step_counter += 1if self.step_counter % 100 == 0: # 每100步打印一次損失,保存一次網絡模型print(f"Step: {self.step_counter}, Loss: {loss.item()}")# # 定期更新目標網絡# if self.count % self.target_update_freq == 0:# self.policy_net.load_state_dict(self.policy_net.state_dict())# 監控與保存import ossvaepath = RL_MODEL_PATH + f"_{current_date}/"if not os.path.exists(svaepath):os.makedirs(svaepath)self.save(svaepath + f"trainer_step_{self.step_counter}.pth") # 保存模型
4-智能體網絡設計?(比較復雜,考慮了各種信息作為了向量輸入)
這部分我沒全放出來,需要大家自己設計
而且這部分要搭配數據向量化處理的代碼,神經網絡的形狀也要考慮,十分繁瑣……
class troopsNNv2(nn.Module):def __init__(self, game_stat_dim=12,bop_feat_dim=43,enemy_num=3,ally_num=3,spatial_channels=6, # 適配精簡版空間通道數:enemy(3) + exposure(1) + see(1) + fire(1)output_dim=8,use_opponent_modeling=False):super(troopsNNv2, self).__init__()self.ally_num = ally_num# 游戲狀態信息(全局統計)self.mlp_game = nn.Linear(game_stat_dim, 32)# 單位特征處理self.unit_fc = nn.Linear(bop_feat_dim, 128)self.unit_gru = nn.GRU(input_size=128, hidden_size=128, batch_first=True)......................................# 最終動作預測頭(7方向 + 停止 = 8個動作)self.policy_head = nn.Sequential(nn.Linear(512, 256),nn.ReLU(),nn.Linear(256, output_dim))def forward(self, inputs):# print(type(inputs)) # 打印類型# 將輸入張量移動到設備device = torch.device(DEVICE)game_stats = inputs['game_stats'].to(device)bop_features = inputs['bop_features'].to(device)spatial_features = inputs['spatial_features'].to(device)bop_embeddings = inputs['bop_embeddings'].to(device)# 對敵的張量沒有t步的事情,所以不用進入下面的循環enemy_strategy = inputs['enemy_strategy'].to(device) if self.is_using_opponent else torch.empty(0, dtype=torch.float32, device=device)bop_enemy_embeddings = inputs['bop_enemy_embeddings'].to(device) if self.is_using_opponent else torch.empty(0, dtype=torch.float32, device=device)batch_size, seq_len = game_stats.shape[:2]outputs = []for t in range(seq_len):# 當前幀特征提取g = game_stats[:, t].float()bop = bop_features[:, t].float()spatial = spatial_features[:, t].float() # shape (B, 6, 13, 23)embeddings = bop_embeddings[:, t].float() # shape (B, 3, 13, 23)f_game = self.mlp_game(g) # (B, 32)# 單位特征編碼bop_embed = self.unit_fc(bop)_, bop_hidden = self.unit_gru(bop_embed)bop_hidden = bop_hidden.squeeze(0) # (B, 128)
......................................temporal_input = torch.stack(outputs, dim=1) # (B, T, D)_, final_hidden = self.temporal_gru(temporal_input) # final_hidden: (1, B, 512)final_feature = final_hidden.squeeze(0).unsqueeze(1).repeat(1, self.ally_num, 1) # (B, 3, 512)logits = self.policy_head(final_feature) # (B, 3, 8)# logits_all = []# logits_all.append(logits)return logits
在實際編碼時我曾經把3-4-1都放到一個python中,但是調試起來自己都亂了,所以分成這么4大塊挺好的,這是自己的經驗。
參考:
深度強化學習之模仿學習(Imitation Learning)-騰訊云開發者社區-騰訊云