Gymnasium Cart Pole 環境與 REINFORCE 算法 —— 強化學習入門 2

Title: Gymnasium Cart Pole 環境與 REINFORCE 算法 —— 強化學習入門 2


文章目錄

  • I. Gymnasium Cart Pole 環境
  • II. REINFORCE 算法
    • 1. 原理說明
    • 2. REINFORCE 算法實現


I. Gymnasium Cart Pole 環境

Gymnasium Cart Pole 環境是一個倒立擺的動力學仿真環境.

狀態空間:

0: Cart Position

1: Cart Velocity

2: Pole Angle

3: Pole Angular Velocity

動作空間:

0: Push cart to the left

1: Push cart to the right

即時激勵:

為了更長時間地保持倒立擺呈倒立狀態, 每一時間步都是獲得即時激勵 +1.

回合結束判據:

Termination: Pole Angle is greater than ±12°

Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)

Truncation: Episode length is greater than 200


II. REINFORCE 算法

1. 原理說明

REINFORCE 算法原理及 Python實現, 我們參考了 Foundations of Deep Reinforcement Learning: Theory and Practice in Python.
需要說明的是, 我們此處采用了 Improving REINFORCE
? θ J ( π θ ) ≈ ∑ t = 0 T ( R t ( τ ) ? b ) ? θ log ? π θ ( a t ∣ s t ) \nabla_{\theta} J(\pi_\theta) \approx \sum_{t=0}^{T} \left(R_t(\tau)-b\right) \nabla_{\theta}\log\pi_\theta(a_t|s_t) ?θ?J(πθ?)t=0T?(Rt?(τ)?b)?θ?logπθ?(at?st?)
其中 b b b 是整個軌跡上的回報均值, 是每條軌跡的常值基線.
b = 1 T ∑ t = 0 T R t ( τ ) b=\frac{1}{T} \sum_{t=0}^{T} R_t(\tau) b=T1?t=0T?Rt?(τ)
另外, 我們設定連續 15 次倒立擺控制成功后, 結束 REINFORCE 算法訓練, 并保存策略映射神經網絡.

測試的時候, 加載已保存的策略映射神經網絡, 加長測試時間步, 也都能較好控制倒立擺.


2. REINFORCE 算法實現

REINFORCE 算法的策略映射網絡:

class Pi(nn.Module):# a policy network to be optimized in reinforcement learning# 待優化的策略網絡def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2# super(Pi, self).__init__()super().__init__()# a policy networklayers = [nn.Linear(in_dim, 64), # 4 -> 64nn.ReLU(), # activation functionnn.Linear(64, out_dim), # 64 -> 2]self.model = nn.Sequential(*layers) self.onpolicy_reset()  # initialize memoryself.train()  # Set the model to training modedef onpolicy_reset(self):self.log_probs = []self.rewards = []def forward(self, x): # x -> statepdparam = self.model(x) # forward passreturn pdparam# pdparam -> probability distribution# such as the logits of a categorical distributiondef act(self, state):# Convert the state from a NumPy array to a PyTorch tensor# 由策略網絡輸出的采樣動作和對數概率分布x = torch.from_numpy(state.astype(np.float32)) # print("state: {}".format(state))pdparam = self.forward(x)     # Perform a forward pass through the neural network   # print("pdparam: {}".format(pdparam))# to obtain the probability distribution parameterspd = torch.distributions.Categorical(logits=pdparam) # probability distribution# print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))action = pd.sample()            # pi(a|s) in action via pd#calculates the log probability of the sampled action action under the probability distribution pd#$\log(\pi_{\theta}(a_t|s_t))$#where $\pi_{\theta}$ is the policy network,#	$a_t$ is the action at time step $t$,#	$s_t$ is the state at time step $t$log_prob = pd.log_prob(action)  # log_prob of pi(a|s), log_prob = pd.logitsself.log_probs.append(log_prob) # store for trainingreturn action.item()  # extracts the value of a single-element tensor as a scalar

對策略映射網絡的方向傳播訓練:

def train(pi, optimizer):# 以下利用蒙特卡洛法計算損失函數值,并利用梯度上升法更新策略網絡參數# 蒙特卡洛法需要采樣多條軌跡來求損失函數的均值,但是為了簡化只采樣了一條軌跡當做均值# Inner gradient-ascent loop of REINFORCE algorithmT = len(pi.rewards)rets = np.empty(T, dtype=np.float32)  # Initialize returnsfuture_ret = 0.0# compute the returns efficiently in reverse order# R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}for t in reversed(range(T)):future_ret = pi.rewards[t] + gamma * future_retrets[t] = future_retbaseline = sum(rets) / Trets = torch.tensor(rets)rets = rets - baseline  # modify the returns by subtracting a baselinelog_probs = torch.stack(pi.log_probs)# - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))# Negative for maximizingloss = - log_probs * rets  #  - \Sigma_{t=0}^{T}  [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))] loss = torch.sum(loss)optimizer.zero_grad()# backpropagate, compute gradients# computes the gradients of the loss with respect to the model's parameters (\theta)loss.backward()   # gradient-ascent, update the weights of the policy network          optimizer.step()            return loss

多回合強化學習訓練, 連續多次控制倒立擺成功就結束整個 REINFORCE 算法的訓練.

def train_main():env = gym.make('CartPole-v1', render_mode="human")in_dim = env.observation_space.shape[0] # 4out_dim = env.action_space.n # 2pi = Pi(in_dim, out_dim)   # an ibstance of the policy network for REINFORCE algorithmoptimizer = optim.Adam(pi.parameters(), lr=0.01)episode = 0continuous_solved_episode = 0# for epi in range(300): # episode = 300while continuous_solved_episode <= 14:# state = env.reset() # gymstate, _ = env.reset()  # gymnasiumfor t in range(200):  # cartpole max timestep is 200action = pi.act(state)# state, reward, done, _ = env.step(action)  # gymstate, reward, done, _, _ = env.step(action)  # gymnasiumpi.rewards.append(reward)env.render()if done:breakloss = train(pi, optimizer) # train per episodetotal_reward = sum(pi.rewards)   solved = total_reward > 195.0episode += 1if solved:continuous_solved_episode += 1else:continuous_solved_episode = 0print(f'Episode {episode}, loss: {loss}, \total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')pi.onpolicy_reset()   # onpolicy: clear memory after trainingsave_model(pi)

一個簡單的訓練錄屏

REINFORCE_training

測試需要在神經網絡的 evaluation 模式下進行, 測試中可以完成更長時間的倒立擺控制.

def test_process():env = gym.make('CartPole-v1', render_mode="human")# in_dim = env.observation_space.shape[0] # 4# out_dim = env.action_space.n # 2# pi_model = Pi(in_dim, out_dim)pi_model = torch.load(model_path)# set the model to evaluation modepi_model.eval()# 進行前向傳播with torch.no_grad():pi_model.onpolicy_reset()   # onpolicy: clear memory after trainingstate, _ = env.reset()  # gymnasiumsteps = 600for t in range(steps):  # cartpole max timestep is 2000action = pi_model.act(state)state, reward, done, _, _ = env.step(action) pi_model.rewards.append(reward)env.render()if done:breaktotal_reward = sum(pi_model.rewards)   solved = total_reward >= stepsprint(f'[Test] total_reward: {total_reward}, solved: {solved}')

一個簡單的測試錄屏

REINFORCE_testing

完整代碼:

import gymnasium as gym
# import gymimport numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import sysgamma = 0.99 # discount factor
model_path = "./reinforce_pi.pt" class Pi(nn.Module):# a policy network to be optimized in reinforcement learning# 待優化的策略網絡def __init__(self, in_dim, out_dim): # in_dim = 4, out_dim = 2# super(Pi, self).__init__()super().__init__()# a policy networklayers = [nn.Linear(in_dim, 64), # 4 -> 64nn.ReLU(), # activation functionnn.Linear(64, out_dim), # 64 -> 2]self.model = nn.Sequential(*layers) self.onpolicy_reset()  # initialize memoryself.train()  # Set the model to training modedef onpolicy_reset(self):self.log_probs = []self.rewards = []def forward(self, x): # x -> statepdparam = self.model(x) # forward passreturn pdparam# pdparam -> probability distribution# such as the logits of a categorical distributiondef act(self, state):# Convert the state from a NumPy array to a PyTorch tensor# 由策略網絡輸出的采樣動作和對數概率分布x = torch.from_numpy(state.astype(np.float32)) # print("state: {}".format(state))pdparam = self.forward(x)     # Perform a forward pass through the neural network   # print("pdparam: {}".format(pdparam))# to obtain the probability distribution parameterspd = torch.distributions.Categorical(logits=pdparam) # probability distribution# print("pd.probs: {}\t pd.logits: {}".format(pd.probs, pd.logits))action = pd.sample()            # pi(a|s) in action via pd#calculates the log probability of the sampled action action under the probability distribution pd#$\log(\pi_{\theta}(a_t|s_t))$#where $\pi_{\theta}$ is the policy network,#	$a_t$ is the action at time step $t$,#	$s_t$ is the state at time step $t$log_prob = pd.log_prob(action)  # log_prob of pi(a|s), log_prob = pd.logitsself.log_probs.append(log_prob) # store for trainingreturn action.item()  # extracts the value of a single-element tensor as a scalardef train(pi, optimizer):# 以下利用蒙特卡洛法計算損失函數值,并利用梯度上升法更新策略網絡參數# 蒙特卡洛法需要采樣多條軌跡來求損失函數的均值,但是為了簡化只采樣了一條軌跡當做均值# Inner gradient-ascent loop of REINFORCE algorithmT = len(pi.rewards)rets = np.empty(T, dtype=np.float32)  # Initialize returnsfuture_ret = 0.0# compute the returns efficiently in reverse order# R_t(\tau) = \Sigma_{t'=t}^{T} {\gamma^{t'-t} r_{t'}}for t in reversed(range(T)):future_ret = pi.rewards[t] + gamma * future_retrets[t] = future_retbaseline = sum(rets) / Trets = torch.tensor(rets)rets = rets - baseline  # modify the returns by subtracting a baselinelog_probs = torch.stack(pi.log_probs)# - R_t(\tau) * log(\pi_{\theta}(a_t|s_t))# Negative for maximizingloss = - log_probs * rets  #  - \Sigma_{t=0}^{T}  [R_t(\tau) * log(\pi_{\theta}(a_t|s_t))] loss = torch.sum(loss)optimizer.zero_grad()# backpropagate, compute gradients# computes the gradients of the loss with respect to the model's parameters (\theta)loss.backward()   # gradient-ascent, update the weights of the policy network          optimizer.step()            return lossdef save_model(pi):print("pi.state_dict(): {}\n\n".format(pi.state_dict()))for param_tensor in pi.state_dict():print(param_tensor, "\t", pi.state_dict()[param_tensor].size())torch.save(pi, model_path)def train_main():env = gym.make('CartPole-v1', render_mode="human")in_dim = env.observation_space.shape[0] # 4out_dim = env.action_space.n # 2pi = Pi(in_dim, out_dim)   # an ibstance of the policy network for REINFORCE algorithmoptimizer = optim.Adam(pi.parameters(), lr=0.01)episode = 0continuous_solved_episode = 0# for epi in range(300): # episode = 300while continuous_solved_episode <= 14:# state = env.reset() # gymstate, _ = env.reset()  # gymnasiumfor t in range(200):  # cartpole max timestep is 200action = pi.act(state)# state, reward, done, _ = env.step(action)  # gymstate, reward, done, _, _ = env.step(action)  # gymnasiumpi.rewards.append(reward)env.render()if done:breakloss = train(pi, optimizer) # train per episodetotal_reward = sum(pi.rewards)   solved = total_reward > 195.0episode += 1if solved:continuous_solved_episode += 1else:continuous_solved_episode = 0print(f'Episode {episode}, loss: {loss}, \total_reward: {total_reward}, solved: {solved}, contnuous_solved: {continuous_solved_episode}')pi.onpolicy_reset()   # onpolicy: clear memory after trainingsave_model(pi)def usage():if len(sys.argv) != 2:print("Usage: python ./REINFORCE.py --train/--test")sys.exit()mode = sys.argv[1]return mode def test_process():env = gym.make('CartPole-v1', render_mode="human")# in_dim = env.observation_space.shape[0] # 4# out_dim = env.action_space.n # 2# pi_model = Pi(in_dim, out_dim)pi_model = torch.load(model_path)# set the model to evaluation modepi_model.eval()# 進行前向傳播with torch.no_grad():pi_model.onpolicy_reset()   # onpolicy: clear memory after trainingstate, _ = env.reset()  # gymnasiumsteps = 600for t in range(steps):  # cartpole max timestep is 2000action = pi_model.act(state)state, reward, done, _, _ = env.step(action) pi_model.rewards.append(reward)env.render()if done:breaktotal_reward = sum(pi_model.rewards)   solved = total_reward >= stepsprint(f'[Test] total_reward: {total_reward}, solved: {solved}')if __name__ == '__main__':mode = usage()if mode == "--train":train_main()elif mode == "--test":test_process()

版權聲明:本文為博主原創文章,遵循 CC 4.0 BY 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/woyaomaishu2/article/details/146382384
本文作者:wzf@robotics_notes

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/73891.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/73891.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/73891.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python高級:GIL、C擴展與分布式系統深度解析

文章目錄 &#x1f4cc; **前言**&#x1f527; **第一章&#xff1a;Python語言的本質與生態**1.1 **Python的實現與版本演進**1.2 **開發環境與工具鏈** &#x1f527; **第二章&#xff1a;元編程與動態特性**2.1 **描述符協議&#xff08;Descriptor Protocol&#xff09;*…

C++學習筆記(二十一)——文件讀寫

一、文件讀寫 作用&#xff1a; 文件讀寫指的是將數據從程序存儲到文件&#xff0c;或從文件讀取數據&#xff0c;以實現數據的持久化存儲。 C 提供了 fstream 頭文件&#xff0c;用于文件操作&#xff0c;主要包括&#xff1a; ofstream&#xff08;輸出文件流&#xff09;—…

RBA+minibatch的嘗試

目錄 還是咬著牙來寫 RBA了 JAX JAX->TORCH torch tensor的變形 pytorch怎么把一個【3,3,5】的tensor變成【3,10,5】&#xff0c;多的用0填充 pytorch如何把shape【100】轉成【100,1】 把torch shape【100,1】變成【100】 SQUEEZE grad_fn 不能兩次反向傳播 還…

基于Python+Django的二手房信息管理系統

項目介紹 PythonDjango二手房信息管理系統(Pycharm Django Vue Mysql) 平臺采用B/S結構&#xff0c;后端采用主流的Python語言進行開發&#xff0c;前端采用主流的Vue.js進行開發。 整個平臺包括前臺和后臺兩個部分。 - 前臺功能包括&#xff1a;首頁、二手房信息、公告管理、…

爬蟲基礎之爬取貓眼Top100 可視化

網站: TOP100榜 - 貓眼電影 - 一網打盡好電影 本次案例所需用到的模塊 requests (發送HTTP請求) pandas(數據處理和分析 保存數據) parsel(解析HTML數據) pyecharts(數據可視化圖表) pymysql(連接和操作MySQL數據庫) lxml(數據解析模塊) 確定爬取的內容: 電影名稱 電影主演…

解決Qt信號在構造函數中失效的問題

情景引入&#xff1a;音樂播放器的“幽靈列表”問題 假設你正在開發一個音樂播放器應用&#xff0c;其中有一個功能是用戶首次打開應用時&#xff0c;需要從服務器拉取最新的歌曲列表并顯示在“本地音樂”頁面中。你可能會寫出類似這樣的代碼&#xff1a; // LocalSong 類的構…

Hadoop 啟動,發現 namenode、secondary namenodes,這兩個沒有啟動,報錯超時。

今天在啟動 hadoop 的時候&#xff0c;發現本應該同時啟動的 namenode、secondary namenodes 卻都沒有啟動。我還以為是壞了又重新裝了虛擬機&#xff0c;重新下載 Hadoop 重新配置結果還是同樣的問題&#xff0c;那沒辦法只能去解決問題了。 首先先再次嘗試啟動看他報錯是什么…

Ranger 鑒權

Apache Ranger 是一個用來在 Hadoop 平臺上進行監控&#xff0c;啟用服務&#xff0c;以及全方位數據安全訪問管理的安全框架。 使用 ranger 后&#xff0c;會通過在 Ranger 側配置權限代替在 Doris 中執行 Grant 語句授權。 Ranger 的安裝和配置見下文&#xff1a;安裝和配置 …

Sqlserver安全篇之_啟用和禁用Named Pipes的案列介紹

https://learn.microsoft.com/zh-cn/sql/tools/configuration-manager/named-pipes-properties?viewsql-server-ver16 https://learn.microsoft.com/zh-cn/sql/tools/configuration-manager/client-protocols-named-pipes-properties-protocol-tab?viewsql-server-ver16 默認…

深入解析過濾器模式(Filter Pattern):一種靈活高效的設計模式

過濾器模式&#xff08;Filter Pattern&#xff09;&#xff0c;也被稱為標準模式&#xff0c;是一種常見的結構型設計模式。它通過將對象分為不同的標準或條件&#xff0c;使得對對象集合的操作變得更加靈活和高效。特別適用于處理復雜查詢和條件過濾的場景。過濾器模式不僅能…

Spring Boot 整合 Elasticsearch 實踐:從入門到上手

引言 Elasticsearch 是一個開源的分布式搜索引擎&#xff0c;廣泛用于日志分析、搜索引擎、數據分析等場景。本文將帶你通過一步步的教程&#xff0c;在 Spring Boot 項目中整合 Elasticsearch&#xff0c;輕松實現數據存儲與查詢。 1. 創建 Spring Boot 項目 首先&#xff…

2025年Postman的五大替代工具

雖然Postman是一個廣泛使用的API測試工具&#xff0c;但許多用戶在使用過程中會遇到各種限制和不便。因此&#xff0c;可能需要探索替代解決方案。本文介紹了10款強大的替代工具&#xff0c;它們能夠有效替代Postman&#xff0c;成為你API測試工具箱的一部分。 什么是Postman&…

Redis之單線程與多線程

redis 單線程與多線程 Redis是單線程&#xff0c;主要是指Redis的網絡IO和鍵值對讀寫是由一個線程來完成的&#xff0c;Redis在處理客戶端的請求時包含獲取(socket讀)、解析、執行、內容返回&#xff08;socket寫&#xff09;等都由一個順序串行的主線程處理&#xff0c;這就是…

C#的簡單工廠模式、工廠方法模式、抽象工廠模式

工廠模式是一種創建型設計模式&#xff0c;主要將對象的創建和使用分離&#xff0c;使得系統更加靈活和可維護。常見的工廠模式有簡單工廠模式、工廠方法模式和抽象工廠模式&#xff0c;以下是 C# 實現的三個案例&#xff1a; 簡單工廠模式 簡單工廠模式通過一個工廠類來創建…

python基礎8 單元測試

通過前面的7個章節&#xff0c;作者學習了python的各項基礎知識&#xff0c;也學習了python的編譯和執行。但在實際環境上&#xff0c;我們需要驗證我們的代碼功能符合我們的設計預期&#xff0c;所以需要結合python的單元測試類&#xff0c;編寫單元測試代碼。 Python有一個內…

算法刷題力扣

先把大寫的字母變成小寫的&#xff0c;用大寫字母32即可變為小寫字母。 寫循環跳過字符。 然后判斷是否相等即可。具體代碼如下&#xff1a; class Solution { public: bool isPalindrome(string s) { int sizes.size(); int begin0; int ends.size()-1; for(int i0;i<s…

allure下載安裝及配置

這里寫目錄標題 一、JDK下載安裝及配置二、allure下載三、allure安裝四、allure環境變量配置五、allure驗證是否安裝成功 一、JDK下載安裝及配置 allure 是一個java測試報告框架。所以要基于JDK環境。 JDK下載與安裝及配置&#xff1a;https://blog.csdn.net/qq_24741027/arti…

linux之 內存管理(1)-armv8 內核啟動頁表建立過程

一、內核啟動時&#xff0c;頁表映射有哪些&#xff1f; Linux初始化過程&#xff0c;會依次建立如下頁表映射&#xff1a; 1.恒等映射&#xff1a;頁表基地址idmap_pg_dir; 2.粗粒度內核鏡像映射&#xff1a;頁表基地址init_pg_dir; 3.fixmap映射&#xff1a;頁表基地址為…

【面試問題】Java 接口與抽象類的區別

引言 在 Java 面向對象編程中&#xff0c;接口&#xff08;Interface&#xff09;和抽象類&#xff08;Abstract Class&#xff09;是兩個重要的抽象工具。它們都能定義未實現的方法&#xff0c;但設計目標和使用場景截然不同。本文將通過語法、特性和實際案例&#xff0c;深入…

【資料分享】全志科技T113-i全國產(1.2GHz雙核A7 RISC-V)工業核心板規格書

核心板簡介 創龍科技SOM-TLT113 是一款基于全志科技T113-i 雙核ARM Cortex-A7 玄鐵C906 RISC-V HiFi4 DSP 異構多核處理器設計的全國產工業核心板&#xff0c;ARM Cortex-A7 處理單元主頻高達1.2GHz。核心板 CPU、ROM、RAM、電源、晶振等所有元器件均采用國產工業級方案&…