Title: Gymnasium Cart Pole 環境與 REINFORCE 算法 —— 強化學習入門 2
文章目錄
- I. Gymnasium Cart Pole 環境
- II. REINFORCE 算法
- 1. 原理說明
- 2. REINFORCE 算法實現
I. Gymnasium Cart Pole 環境
Gymnasium Cart Pole 環境是一個倒立擺的動力學仿真環境.
狀態空間:
0: Cart Position
1: Cart Velocity
2: Pole Angle
3: Pole Angular Velocity
動作空間:
0: Push cart to the left
1: Push cart to the right
即時激勵:
為了更長時間地保持倒立擺呈倒立狀態, 每一時間步都是獲得即時激勵 +1
.
回合結束判據:
Termination: Pole Angle is greater than ±12°
Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)
Truncation: Episode length is greater than 200
II. REINFORCE 算法
1. 原理說明
REINFORCE 算法原理及 Python實現, 我們參考了 Foundations of Deep Reinforcement Learning: Theory and Practice in Python.
需要說明的是, 我們此處采用了 Improving REINFORCE
? θ J ( π θ ) ≈ ∑ t = 0 T ( R t ( τ ) ? b ) ? θ log ? π θ ( a t ∣ s t ) \nabla_{\theta} J(\pi_\theta) \approx \sum_{t=0}^{T} \left(R_t(\tau)-b\right) \nabla_{\theta}\log\pi_\theta(a_t|s_t) ?θ?J(πθ?)≈t=0∑T?(Rt?(τ)?b)?θ?logπθ?(at?∣st?)
其中 b b b 是整個軌跡上的回報均值, 是每條軌跡的常值基線.
b = 1 T ∑ t = 0 T R t ( τ ) b=\frac{1}{T} \sum_{t=0}^{T} R_t(\tau) b=T1?t=0∑T?Rt?(τ)
另外, 我們設定連續 15 次倒立擺控制成功后, 結束 REINFORCE 算法訓練, 并保存策略映射神經網絡.
測試的時候, 加載已保存的策略映射神經網絡, 加長測試時間步, 也都能較好控制倒立擺.
2. REINFORCE 算法實現
REINFORCE 算法的策略映射網絡:
class Pi(nn.Module):# a policy network to be optimized in reinforcement learning# 待優化的策略網絡def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2# super(Pi, self).__init__()super().__init__()# a policy networklayers = [nn.Linear(in_dim, 64), # 4 -> 64nn.ReLU(), # activation functionnn.Linear(64, out_dim), # 64 -> 2]self.model = nn.Sequential(*layers) self.onpolicy_reset() # initialize memoryself.train() # Set the model to training modedef onpolicy_reset(self):self.log_probs = []self.rewards = []def forward(self, x): # x -> statepdparam = self.model(x) # forward passreturn pdparam# pdparam -> probability distribution# such as the logits of a categorical distributiondef act(self, state):# Convert the state from a NumPy array to a PyTorch tensor# 由策略網絡輸出的采樣動作和對數概率分布x = torch.from_numpy(state.astype(np.float32)) # print("state: {}".format(state))pdparam = self.forward(x) # Perform a forward pass through the neural network # print("pdparam: {}".format(pdparam))# to obtain the probability distribution parameterspd = torch.distributions.Categorical(logits=pdparam) # probability distribution# print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))action = pd.sample() # pi(a|s) in action via pd#calculates the log probability of the sampled action action under the probability distribution pd#$\log(\pi_{\theta}(a_t|s_t))$#where $\pi_{\theta}$ is the policy network,# $a_t$ is the action at time step $t$,# $s_t$ is the state at time step $t$log_prob = pd.log_prob(action) # log_prob of pi(a|s), log_prob = pd.logitsself.log_probs.append(log_prob) # store for trainingreturn action.item() # extracts the value of a single-element tensor as a scalar
對策略映射網絡的方向傳播訓練:
def train(pi, optimizer):# 以下利用蒙特卡洛法計算損失函數值,并利用梯度上升法更新策略網絡參數# 蒙特卡洛法需要采樣多條軌跡來求損失函數的均值,但是為了簡化只采樣了一條軌跡當做均值# Inner gradient-ascent loop of REINFORCE algorithmT = len(pi.rewards)rets = np.empty(T, dtype=np.float32) # Initialize returnsfuture_ret = 0.0# compute the returns efficiently in reverse order# R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}for t in reversed(range(T)):future_ret = pi.rewards[t] + gamma * future_retrets[t] = future_retbaseline = sum(rets) / Trets = torch.tensor(rets)rets = rets - baseline # modify the returns by subtracting a baselinelog_probs = torch.stack(pi.log_probs)# - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))# Negative for maximizingloss = - log_probs * rets # - \Sigma_{t=0}^{T} [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))] loss = torch.sum(loss)optimizer.zero_grad()# backpropagate, compute gradients# computes the gradients of the loss with respect to the model's parameters (\theta)loss.backward() # gradient-ascent, update the weights of the policy network optimizer.step() return loss
多回合強化學習訓練, 連續多次控制倒立擺成功就結束整個 REINFORCE 算法的訓練.
def train_main():env = gym.make('CartPole-v1', render_mode="human")in_dim = env.observation_space.shape[0] # 4out_dim = env.action_space.n # 2pi = Pi(in_dim, out_dim) # an ibstance of the policy network for REINFORCE algorithmoptimizer = optim.Adam(pi.parameters(), lr=0.01)episode = 0continuous_solved_episode = 0# for epi in range(300): # episode = 300while continuous_solved_episode <= 14:# state = env.reset() # gymstate, _ = env.reset() # gymnasiumfor t in range(200): # cartpole max timestep is 200action = pi.act(state)# state, reward, done, _ = env.step(action) # gymstate, reward, done, _, _ = env.step(action) # gymnasiumpi.rewards.append(reward)env.render()if done:breakloss = train(pi, optimizer) # train per episodetotal_reward = sum(pi.rewards) solved = total_reward > 195.0episode += 1if solved:continuous_solved_episode += 1else:continuous_solved_episode = 0print(f'Episode {episode}, loss: {loss}, \total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')pi.onpolicy_reset() # onpolicy: clear memory after trainingsave_model(pi)
一個簡單的訓練錄屏

測試需要在神經網絡的 evaluation 模式下進行, 測試中可以完成更長時間的倒立擺控制.
def test_process():env = gym.make('CartPole-v1', render_mode="human")# in_dim = env.observation_space.shape[0] # 4# out_dim = env.action_space.n # 2# pi_model = Pi(in_dim, out_dim)pi_model = torch.load(model_path)# set the model to evaluation modepi_model.eval()# 進行前向傳播with torch.no_grad():pi_model.onpolicy_reset() # onpolicy: clear memory after trainingstate, _ = env.reset() # gymnasiumsteps = 600for t in range(steps): # cartpole max timestep is 2000action = pi_model.act(state)state, reward, done, _, _ = env.step(action) pi_model.rewards.append(reward)env.render()if done:breaktotal_reward = sum(pi_model.rewards) solved = total_reward >= stepsprint(f'[Test] total_reward: {total_reward}, solved: {solved}')
一個簡單的測試錄屏

完整代碼:
import gymnasium as gym
# import gymimport numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import sysgamma = 0.99 # discount factor
model_path = "./reinforce_pi.pt" class Pi(nn.Module):# a policy network to be optimized in reinforcement learning# 待優化的策略網絡def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2# super(Pi, self).__init__()super().__init__()# a policy networklayers = [nn.Linear(in_dim, 64), # 4 -> 64nn.ReLU(), # activation functionnn.Linear(64, out_dim), # 64 -> 2]self.model = nn.Sequential(*layers) self.onpolicy_reset() # initialize memoryself.train() # Set the model to training modedef onpolicy_reset(self):self.log_probs = []self.rewards = []def forward(self, x): # x -> statepdparam = self.model(x) # forward passreturn pdparam# pdparam -> probability distribution# such as the logits of a categorical distributiondef act(self, state):# Convert the state from a NumPy array to a PyTorch tensor# 由策略網絡輸出的采樣動作和對數概率分布x = torch.from_numpy(state.astype(np.float32)) # print("state: {}".format(state))pdparam = self.forward(x) # Perform a forward pass through the neural network # print("pdparam: {}".format(pdparam))# to obtain the probability distribution parameterspd = torch.distributions.Categorical(logits=pdparam) # probability distribution# print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))action = pd.sample() # pi(a|s) in action via pd#calculates the log probability of the sampled action action under the probability distribution pd#$\log(\pi_{\theta}(a_t|s_t))$#where $\pi_{\theta}$ is the policy network,# $a_t$ is the action at time step $t$,# $s_t$ is the state at time step $t$log_prob = pd.log_prob(action) # log_prob of pi(a|s), log_prob = pd.logitsself.log_probs.append(log_prob) # store for trainingreturn action.item() # extracts the value of a single-element tensor as a scalardef train(pi, optimizer):# 以下利用蒙特卡洛法計算損失函數值,并利用梯度上升法更新策略網絡參數# 蒙特卡洛法需要采樣多條軌跡來求損失函數的均值,但是為了簡化只采樣了一條軌跡當做均值# Inner gradient-ascent loop of REINFORCE algorithmT = len(pi.rewards)rets = np.empty(T, dtype=np.float32) # Initialize returnsfuture_ret = 0.0# compute the returns efficiently in reverse order# R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}for t in reversed(range(T)):future_ret = pi.rewards[t] + gamma * future_retrets[t] = future_retbaseline = sum(rets) / Trets = torch.tensor(rets)rets = rets - baseline # modify the returns by subtracting a baselinelog_probs = torch.stack(pi.log_probs)# - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))# Negative for maximizingloss = - log_probs * rets # - \Sigma_{t=0}^{T} [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))] loss = torch.sum(loss)optimizer.zero_grad()# backpropagate, compute gradients# computes the gradients of the loss with respect to the model's parameters (\theta)loss.backward() # gradient-ascent, update the weights of the policy network optimizer.step() return lossdef save_model(pi):print("pi.state_dict(): {}\n\n".format(pi.state_dict()))for param_tensor in pi.state_dict():print(param_tensor, "\t", pi.state_dict()[param_tensor].size())torch.save(pi, model_path)def train_main():env = gym.make('CartPole-v1', render_mode="human")in_dim = env.observation_space.shape[0] # 4out_dim = env.action_space.n # 2pi = Pi(in_dim, out_dim) # an ibstance of the policy network for REINFORCE algorithmoptimizer = optim.Adam(pi.parameters(), lr=0.01)episode = 0continuous_solved_episode = 0# for epi in range(300): # episode = 300while continuous_solved_episode <= 14:# state = env.reset() # gymstate, _ = env.reset() # gymnasiumfor t in range(200): # cartpole max timestep is 200action = pi.act(state)# state, reward, done, _ = env.step(action) # gymstate, reward, done, _, _ = env.step(action) # gymnasiumpi.rewards.append(reward)env.render()if done:breakloss = train(pi, optimizer) # train per episodetotal_reward = sum(pi.rewards) solved = total_reward > 195.0episode += 1if solved:continuous_solved_episode += 1else:continuous_solved_episode = 0print(f'Episode {episode}, loss: {loss}, \total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')pi.onpolicy_reset() # onpolicy: clear memory after trainingsave_model(pi)def usage():if len(sys.argv) != 2:print("Usage: python ./REINFORCE.py --train/--test")sys.exit()mode = sys.argv[1]return mode def test_process():env = gym.make('CartPole-v1', render_mode="human")# in_dim = env.observation_space.shape[0] # 4# out_dim = env.action_space.n # 2# pi_model = Pi(in_dim, out_dim)pi_model = torch.load(model_path)# set the model to evaluation modepi_model.eval()# 進行前向傳播with torch.no_grad():pi_model.onpolicy_reset() # onpolicy: clear memory after trainingstate, _ = env.reset() # gymnasiumsteps = 600for t in range(steps): # cartpole max timestep is 2000action = pi_model.act(state)state, reward, done, _, _ = env.step(action) pi_model.rewards.append(reward)env.render()if done:breaktotal_reward = sum(pi_model.rewards) solved = total_reward >= stepsprint(f'[Test] total_reward: {total_reward}, solved: {solved}')if __name__ == '__main__':mode = usage()if mode == "--train":train_main()elif mode == "--test":test_process()
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/woyaomaishu2/article/details/146382384
本文作者:wzf@robotics_notes