增量學習ASAP的源碼剖析:如何實現人形的運動追蹤和全身控制(核心涉及HumanoidVerse中的agents模塊)

前言

過去一周,我司「七月在線」長沙分部的具身團隊在機械臂和人形上并行發力

  • 關于機械臂
    一方面,在IL和VLA的路線下,先后采集了抓杯子、桌面收納、插入耳機孔的數據,然后云端訓-本地5090推理
    二方面,在RL的路線下,通過復現UC伯克利的HIL-SERL先后在仿真、真機上抓方塊
  • 關于人形
    一者,對于manipulation,本周暫先實現了通過VR遙操宇樹G1采集數據
    二者,對于locomotion,我司長沙具身團隊的其中一個小組準備復現基于PBHC的KungfuBot

    故為了和團隊更好的復現之,我在解讀完該論文之后,準備把其源碼也好好解析下
    而根據此文《KungfuBot——基于物理約束和自適應運動追蹤的人形全身控制PBHC,用于學習打拳或跳舞(即RL下的動作模仿和運控)》可知,KungfuBot中的RL模塊是基于ASAP實現的

故為了更好的復現KungfuBot,本文先來解讀下ASAP目前已經對外開源的代碼

根據ASAP的GitHub代碼倉庫可知,ASAP專注于學習敏捷的人形機器人全身技能,構建在HumanoidVerse多模擬器框架之上,其

  1. 支持多種物理模擬器:IsaacGym、IsaacSim、Genesis
  2. 實現人形機器人的運動追蹤和全身控制
  3. 提供從仿真到現實的轉移能力
  4. 模塊化設計,支持算法、環境、模擬器的分離
┌─────────────────────────────────────────────────┐
│                配置系統 (config/)                │
│  base.yaml → algo/ → env/ → robot/ → rewards/   │
└─────────────┬───────────────────────────────────┘│ Hydra instantiate()▼
┌─────────────────────────────────────────────────┐
│            環境層 (envs/)                       │
│  BaseTask ← locomotion/motion_tracking          │
└─────────────┬───────────────────────────────────┘│ composition▼
┌─────────────────────────────────────────────────┐
│          模擬器層 (simulator/)                   │
│  BaseSimulator ← IsaacGym/IsaacSim/Genesis      │
└─────────────┬───────────────────────────────────┘│ used by▼
┌─────────────────────────────────────────────────┐
│            算法層 (agents/)                     │
│  BaseAlgo ← PPO/DAGGER/DeltaA/ForceControl     │
└─────────────────────────────────────────────────┘

運行時調用順序如下

1. train_agent.py::main()
2. ├── config加載 (Hydra)
3. ├── 模擬器選擇 (IsaacGym/IsaacSim/Genesis)
4. ├── env實例化 (BaseTask子類)
5. │   ├── simulator實例化 (BaseSimulator子類)
6. │   ├── robot配置加載
7. │   └── terrain/obs/rewards設置
8. ├── algo實例化 (BaseAlgo子類)
9. │   ├── actor/critic網絡創建
10. │   └── 數據緩沖區初始化
11. └── algo.learn() 訓練循環

第一部分?算法模塊 agents/

包含

  • train_agent.py: 訓練智能體的主入口
    train_agent.py:
    ├── hydra → 配置加載
    ├── utils.config_utils → 配置預處理
    ├── BaseTask → 環境創建
    ├── BaseAlgo → 算法創建
    └── wandb → 日志記錄
  • eval_agent.py: 評估智能體的主入口
    提供實時鍵盤控制接口
    支持力控制調試功能
    可視化評估結果

agents模塊的完整目錄如下

agents/
├── base_algo/              # 算法基類定義
│   └── base_algo.py       # BaseAlgo抽象基類
├── modules/                # 神經網絡核心組件
│   ├── ppo_modules.py     # PPO Actor/Critic網絡
│   ├── modules.py         # 通用神經網絡模塊
│   ├── encoder_modules.py # 編碼器模塊
│   ├── world_models.py    # 世界模型實現
│   └── data_utils.py      # 數據存儲和GAE計算
├── callbacks/              # 實時分析和可視化系統
│   ├── base_callback.py   # 回調基類
│   ├── analysis_plot_*.py # 各類分析繪圖回調
│   └── analysis_plot_template.html # 實時可視化模板
├── ppo/                   # 標準PPO算法
├── dagger/                # 模仿學習算法
├── decouple/              # 解耦合控制算法
├── delta_a/               # ASAP核心創新:增量學習
├── delta_dynamics/        # Delta動力學模型
├── force_control/         # 力感知控制算法
├── mppi/                  # 模型預測路徑積分控制
└── ppo_locomanip.py       # 運動操作專用PPO

1.1 模塊組件agents/modules/

1.1.1 ppo_modules.py: PPO算法的Actor-Critic網絡

1.1.2 modules.py: 通用神經網絡模塊

1.1.3 encoder_modules.py:?編碼器模塊

1.1.4 world_models.py: 世界模型實現

1.1.5 data_utils.py: 數據處理工具,包含經驗回放緩沖區

// 待更

1.2 回調系統 agents/callbacks/

**分析和可視化:**
- **analysis_plot_*.py**: 各種分析圖表生成
- **base_callback.py**: 回調基類
- 支持力估計、運動分析、開環跟蹤等多種分析

1.3 模仿學習:DAgger 算法實現(dagger.py)

本文件實現了 DAgger(Dataset Aggregation)模仿學習算法,是 ASAP 框架中用于“模仿學習/專家演示學習”的核心類。它讓智能體在自己的策略下探索環境,并在這些狀態下查詢專家動作,逐步聚合數據,提升策略泛化能力

整個算法的流程為

  1. 初始化:加載學生和專家網絡,準備數據存儲
  2. 訓練循環:
    用學生策略探索環境,記錄每一步的狀態和專家動作
    用專家動作作為標簽,訓練學生策略模仿專家
    記錄和可視化訓練過程
  3. 評估:支持回調式評估和推理

DAgger(BaseAlgo)

- 繼承自 BaseAlgo,擁有統一的 setup、learn、evaluate_policy 等接口
- 關鍵成員變量包括:
? - self.actor:待訓練的策略網絡
? - self.gt_actor:專家(教師)策略網絡
? - self.optimizer:優化器
? - self.storage:數據存儲(經驗回放)
? - self.writer:Tensorboard 日志
? - self.eval_callbacks:評估回調
? - self.episode_env_tensors:環境統計

1.3.1 setup:模仿學習中"學生-教師"雙網絡架構的實現

該方法主要

  1. 網絡初始化:創建學生網絡(需要訓練的actor)和教師網絡(提供專家示范的gt_actor)
  2. 優化器設置:為學生網絡配置Adam優化器
  3. 存儲初始化:設置用于存儲軌跡數據的RolloutStorage,注冊各種數據類型的存儲鍵
  4. 統計跟蹤:初始化用于跟蹤訓練過程中各種統計信息的變量和緩沖區
    即初始化 RolloutStorage,注冊觀測、動作、專家動作等數據字段

具體而言

  1. 方法開始時通過日志記錄初始化過程,然后創建兩個關鍵的神經網絡組件
    第一個是學習者網絡 (`self.actor`),它是需要被訓練的策略網絡,使用當前配置中的網絡架構參數進行初始化
    def setup(self):logger.info("Setting up Dagger")  # 記錄開始設置DAgger算法的日志信息# 從配置中提取演員網絡的配置字典actor_network_dict = dict(actor=self.config.network_dict['actor'])
    第二個是專家網絡 (`self.gt_actor`),它代表"ground truth"或教師策略,不僅使用專門的網絡配置,還通過 `teacher_actor_network_load_dict` 加載預訓練的權重
        # 從配置中提取教師演員網絡的配置字典teacher_actor_network_dict = dict(actor=self.config.network_dict['teacher_actor'])# 從配置中提取教師演員網絡的加載配置字典teacher_actor_network_load_dict = dict(actor=self.config.network_load_dict['teacher_actor'])
    最終,分別創建學生actor、教師gt_actor
       # 創建學生演員網絡(PPO演員,固定標準差)self.actor = PPOActorFixSigma(self.algo_obs_dim_dict, actor_network_dict, {}, self.num_act)# 創建教師演員網絡(ground truth演員,用于提供專家示范)self.gt_actor = PPOActorFixSigma(self.algo_obs_dim_dict, teacher_actor_network_dict, teacher_actor_network_load_dict, self.num_act)
    這種設計使得專家網絡能夠在訓練過程中為學習者提供高質量的示范動作
  2. 網絡初始化完成后,代碼將兩個網絡都移動到指定的計算設備上,確保訓練過程中的計算效率
        # 將學生演員網絡移動到指定設備(CPU或GPU)self.actor.to(self.device)# 將教師演員網絡移動到指定設備(CPU或GPU)self.gt_actor.to(self.device)
  3. 接下來,為學習者網絡創建 Adam 優化器,這是訓練過程中唯一需要更新參數的網絡,而專家網絡的參數保持固定
        # 創建Adam優化器來訓練學生演員網絡self.optimizer = optim.Adam(self.actor.parameters(), lr=self.learning_rate)logger.info(f"Setting up Storage")  # 記錄開始設置存儲的日志信息
  4. 然后,方法初始化了一個專門的存儲系統 `RolloutStorage`,用于管理訓練數據的收集和批處理
    存儲系統的設計特別體現了 DAgger 算法的數據需求

    首先,它為所有觀測變量分配存儲空間
        # 創建回合存儲對象,用于存儲軌跡數據self.storage = RolloutStorage(self.env.num_envs, self.num_steps_per_env)## Register obs keys  # 注冊觀測鍵# 遍歷算法觀測維度字典,為每個觀測類型注冊存儲鍵for obs_key, obs_dim in self.algo_obs_dim_dict.items():self.storage.register_key(obs_key, shape=(obs_dim,), dtype=torch.float)
    然后注冊兩套完整的動作相關數據:
    \rightarrow? 一套用于學習者的動作、動作概率、動作均值和方差
        # 注冊學生演員的動作存儲鍵self.storage.register_key('actions', shape=(self.num_act,), dtype=torch.float)# 注冊學生演員的動作對數概率存儲鍵self.storage.register_key('actions_log_prob', shape=(1,), dtype=torch.float)# 注冊學生演員的動作均值存儲鍵self.storage.register_key('action_mean', shape=(self.num_act,), dtype=torch.float)# 注冊學生演員的動作標準差存儲鍵self.storage.register_key('action_sigma', shape=(self.num_act,), dtype=torch.float)
    \rightarrow??另一套用于專家的對應數據(以 `gt_` 前綴標識)
        # 注冊教師演員的動作存儲鍵(ground truth actions)self.storage.register_key('gt_actions', shape=(self.num_act,), dtype=torch.float)# 注冊教師演員的動作對數概率存儲鍵self.storage.register_key('gt_actions_log_prob', shape=(1,), dtype=torch.float)# 注冊教師演員的動作均值存儲鍵self.storage.register_key('gt_action_mean', shape=(self.num_act,), dtype=torch.float)# 注冊教師演員的動作標準差存儲鍵self.storage.register_key('gt_action_sigma', shape=(self.num_act,), dtype=torch.float)
    這種并行存儲機制使得算法能夠在每個時間步同時記錄學習者的行為和專家在相同狀態下的示范行為,為后續的監督學習提供配對的訓練數據
  5. 最后,方法初始化了訓練過程中的統計和監控組件
    這包括用于記錄回合信息的列表,以及使用雙端隊列實現的滑動窗口統計,可以跟蹤最近100個回合的獎勵和長度
        # 初始化回合信息列表,用于存儲每個回合的統計信息self.ep_infos = []# 創建獎勵緩沖區,最多存儲100個回合的獎勵(用于計算平均獎勵)self.rewbuffer = deque(maxlen=100)# 創建長度緩沖區,最多存儲100個回合的長度(用于計算平均長度)self.lenbuffer = deque(maxlen=100)
    同時,為每個并行環境分配了當前獎勵總和和回合長度的張量,支持多環境并行訓練的實時統計
        # 初始化當前獎勵累計張量,為每個環境跟蹤當前回合的獎勵總和self.cur_reward_sum = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device)# 初始化當前回合長度張量,為每個環境跟蹤當前回合的步數self.cur_episode_length = torch.zeros(self.env.num_envs, dtype=torch.float, device=self.device)

1.3.2 learn:訓練主循環,分為數據采集和訓練兩個階段

每次迭代:

  1. _rollout_step:用當前策略與環境交互,收集狀態、動作、專家動作等數據
  2. _training_step:用行為克隆損失(BC Loss)訓練 actor,使其輸出盡量接近專家動作
  3. 日志記錄與模型保存

具體而言

  1. 首先,如果設置了 `init_at_random_ep_len`,會將環境的 episode 長度緩沖區隨機初始化,以增加訓練多樣性
    def learn(self):# 如果需要在隨機的episode長度初始化,則對環境的episode長度緩沖區進行隨機賦值if self.init_at_random_ep_len:self.env.episode_length_buf = torch.randint_like(self.env.episode_length_buf, high=int(self.env.max_episode_length))
    接著,環境被重置,獲得初始觀測,并將所有觀測張量移動到指定設備(如 GPU),確保后續計算高效
        # 重置所有環境,獲取初始觀測obs_dict = self.env.reset_all()# 將所有觀測數據轉移到指定設備(如GPU或CPU)for obs_key in obs_dict.keys():obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
  2. 隨后,模型被切換到訓練模式
        # 設置模型為訓練模式self._train_mode()
    主循環根據 `num_learning_iterations` 控制迭代次數,每次迭代都記錄起始時間
        # 獲取本次學習的迭代次數num_learning_iterations = self.num_learning_iterations# 計算總的迭代次數tot_iter = self.current_learning_iteration + num_learning_iterations# 遍歷每一次學習迭代# for it in track(range(self.current_learning_iteration, tot_iter), description="Learning Iterations"):for it in range(self.current_learning_iteration, tot_iter):# 記錄本次迭代的起始時間self.start_time = time.time()
    每一輪,首先通過 `_rollout_step` 與環境交互,采集一批新數據,并返回最新的觀測
            # 進行一次rollout,返回新的觀測(必須更新,否則一直用初始觀測)obs_dict = self._rollout_step(obs_dict)
    然后調用 `_training_step`,對采集到的數據進行行為克隆(BC)訓練,得到平均損失
            # 執行一次訓練步驟,返回平均BC損失mean_bc_loss = self._training_step()
    完成訓練后,記錄本輪訓練所用時間
            # 記錄本次迭代的結束時間self.stop_time = time.time()# 計算本次學習所用時間self.learn_time = self.stop_time - self.start_time
  3. 每輪結束后,會整理日志信息(如當前迭代數、采集和訓練時間、平均損失、獎勵等)
    并調用 `_post_epoch_logging` 進行可視化和記錄
    如果當前迭代數滿足保存間隔條件,還會保存模型檢查點
    每輪結束后,episode 相關信息會被清空,為下次迭代做準備
  4. 循環結束后,更新當前的學習迭代計數,并再次保存最終模型

整體來看,該方法實現了 DAgger 算法的標準訓練流程,包括數據采集、模型訓練、日志記錄和模型保存等關鍵步驟

1.3.3 _rollout_step

  1. 用 actor 采樣動作與環境交互,同時用 gt_actor 計算同一狀態下的專家動作
  2. 將觀測、動作、專家動作等信息存入 storage
  3. 統計獎勵、episode 長度等信息

1.3.4 _training_step

  1. 從 storage 生成小批量數據
  2. 計算行為克隆損失(BC Loss):均方誤差,目標是讓 actor 輸出接近專家動作
  3. 反向傳播并優化 actor 網絡

1.3.5 日志與評估

  1. _post_epoch_logging:詳細記錄訓練過程中的損失、獎勵、性能等信息,支持 Tensorboard 和 Rich 控制臺美化
  2. evaluate_policy:支持評估模式,結合回調機制靈活擴展評估邏輯

1.4 PPO:標準實現與運動操作專用PPO的實現

1.4.1 PPO的標準實現

這段代碼實現了 PPO(Proximal Policy Optimization)強化學習算法的主類。它負責整個算法的生命周期管理,包括初始化、模型和優化器的構建、數據存儲、訓練循環、模型保存與加載、評估流程等

1.4.1.1 __init__
  • 在 `__init__` 構造函數中,PPO 類會初始化環境、配置、日志記錄器(用于 TensorBoard)、以及一些用于統計和追蹤訓練過程的變量。它還會初始化獎勵和回合長度的緩沖區,用于后續的統計分析
  • `_init_config` 方法會從配置對象中提取所有關鍵超參數,包括環境數量、觀測和動作維度、學習率、損失系數、折扣因子等,確保算法的靈活性和可配置性
1.4.1.2 setup方法:調用 `_setup_models_and_optimizer` 和 `_setup_storage`

`setup` 方法會調用 `_setup_models_and_optimizer` 和 `_setup_storage`,分別初始化 actor/critic 網絡及其優化器,以及 rollout 數據存儲結構

  • 模型采用 PPOActor 和 PPOCritic 兩個神經網絡,優化器使用 Adam
  • 數據存儲結構會注冊所有需要追蹤的張量,包括觀測、動作、獎勵、優勢等
1.4.1.3?訓練主循環在 `learn` 方法

簡言之,訓練主循環在 `learn` 方法中實現。每次迭代會先進行 rollout(與環境交互收集數據),然后進行訓練步驟(mini-batch SGD),并記錄訓練過程中的各種統計信息。訓練過程中會定期保存模型和優化器狀態,便于斷點恢復

具體而言

  1. 首先,如果配置了 `init_at_random_ep_len`,會將環境的 episode 長度緩沖區隨機初始化,這有助于增加訓練的多樣性
    def learn(self):# 如果需要在隨機的episode長度初始化,則對環境的episode長度緩沖區進行隨機賦值if self.init_at_random_ep_len:self.env.episode_length_buf = torch.randint_like(self.env.episode_length_buf, high=int(self.env.max_episode_length))
  2. 接著,環境會被重置,獲得初始觀測
        # 重置所有環境,獲取初始觀測obs_dict = self.env.reset_all()
    并將所有觀測張量移動到指定的設備(如 GPU 或 CPU),以確保后續計算的高效性
        # 將每個觀測都轉移到指定的設備上(如GPU或CPU)for obs_key in obs_dict.keys():obs_dict[obs_key] = obs_dict[obs_key].to(self.device)
  3. 隨后,模型被切換到訓練模式(`self._train_mode()`),以啟用如 dropout、BN 等訓練相關的行為
        # 設置模型為訓練模式self._train_mode()
    主循環根據當前迭代次數和總訓練迭代數進行
        # 獲取本次學習的迭代次數num_learning_iterations = self.num_learning_iterations# 計算總的迭代次數(當前迭代數 + 本次要迭代的次數)tot_iter = self.current_learning_iteration + num_learning_iterations
    每次迭代包括以下步驟:
    1. 記錄當前時間,便于后續統計訓練耗時
        # 不使用track進度條,因為會和motion loading bar沖突# for it in track(range(self.current_learning_iteration, tot_iter), description="Learning Iterations"):for it in range(self.current_learning_iteration, tot_iter):# 記錄本次迭代的起始時間self.start_time = time.time()
    2. 調用 `_rollout_step` 與環境交互,收集一批數據(觀測、動作、獎勵等),并返回最新的觀測
            # 進行一次rollout,返回新的觀測(必須更新,否則一直用初始觀測)obs_dict = self._rollout_step(obs_dict)
    3. 調用 `_training_step`,對采集到的數據進行策略和價值網絡的訓練
            # 進行一次訓練步驟,返回損失字典loss_dict = self._training_step()
    4. 記錄本次迭代的耗時,并將相關統計信息(如損失、采集時間、獎勵等)打包到 `log_dict`,用于日志記錄和可視化
            # 記錄本次迭代的結束時間self.stop_time = time.time()# 計算本次學習所用時間self.learn_time = self.stop_time - self.start_time# 日志信息字典,包含當前迭代、損失、采集和學習時間、獎勵等log_dict = {'it': it,'loss_dict': loss_dict,'collection_time': self.collection_time,'learn_time': self.learn_time,'ep_infos': self.ep_infos,'rewbuffer': self.rewbuffer,'lenbuffer': self.lenbuffer,'num_learning_iterations': num_learning_iterations}# 執行日志記錄self._post_epoch_logging(log_dict)
    5. 每隔 `save_interval` 次迭代保存一次模型和優化器狀態,便于斷點恢復
            # 每隔save_interval步保存一次模型if it % self.save_interval == 0:self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(it)))
    6. 清空本輪的 episode 信息緩存,為下輪訓練做準備
            # 清空本輪收集的episode信息self.ep_infos.clear()
  4. 循環結束后,更新當前的訓練迭代計數,并再次保存一次模型,確保所有訓練進度都被持久化
        # 累加當前學習迭代次數self.current_learning_iteration += num_learning_iterations# 保存最終模型self.save(os.path.join(self.log_dir, 'model_{}.pt'.format(self.current_learning_iteration)))
1.4.1.4?rollout (采樣和數據收集)的實現:_rollout_step與_actor_rollout_step

rollout 過程由 `_rollout_step` 實現——_rollout_step又會調用_actor_rollout_step,負責與環境交互、收集數據、處理獎勵和終止信息,并將數據寫入存儲

具體而言,

  1. 它首先在 `torch.inference_mode()` 上下文中運行,這樣可以避免梯度計算,提高推理效率。方法內部通過循環 `self.num_steps_per_env` 次,每次循環代表智能體在環境中的一步
    def _rollout_step(self, obs_dict):# 在推理模式下,不計算梯度,提升效率with torch.inference_mode():# 遍歷每個環境步數for i in range(self.num_steps_per_env):# 初始化策略狀態字典policy_state_dict = {}
  2. 在每一步中
    首先通過 `_actor_rollout_step` 計算當前觀測下的動作及相關策略信息
                # 通過actor獲取動作及相關策略信息,存入policy_state_dictpolicy_state_dict = self._actor_rollout_step(obs_dict, policy_state_dict)
    并通過 `_critic_eval_step` 評估當前狀態的價值
                # 通過critic評估當前狀態的價值values = self._critic_eval_step(obs_dict).detach()# 將價值信息加入策略狀態字典policy_state_dict["values"] = values
    所有與觀測和策略相關的數據都會被存儲到 `self.storage`,以便后續訓練使用
                # 將觀測信息存入rollout存儲for obs_key in obs_dict.keys():self.storage.update_key(obs_key, obs_dict[obs_key])# 將策略相關信息存入rollout存儲for obs_ in policy_state_dict.keys():self.storage.update_key(obs_, policy_state_dict[obs_])
    隨后,智能體根據動作與環境交互,獲得新的觀測、獎勵、終止信號和額外信息
                # 獲取當前動作actions = policy_state_dict["actions"]# 構造actor_state字典actor_state = {}actor_state["actions"] = actions# 與環境交互,獲得新的觀測、獎勵、done標志和額外信息obs_dict, rewards, dones, infos = self.env.step(actor_state)
    并將這些數據轉移到設備上(如 GPU)
                # 將新的觀測轉移到指定設備for obs_key in obs_dict.keys():obs_dict[obs_key] = obs_dict[obs_key].to(self.device)# 獎勵和done也轉移到設備rewards, dones = rewards.to(self.device), dones.to(self.device)
  3. 獎勵會根據是否有超時(`time_outs`)進行修正,確保獎勵的準確性
    每一步的數據(獎勵、終止信號等)都會被存儲,并通過 `self._process_env_step` 處理智能體和價值網絡的重置
    若啟用了日志記錄,還會統計每個 episode 的獎勵和長度,并在 episode 結束時重置相關統計量
  4. 循環結束后,方法會記錄采集數據所用的時間,并調用 `_compute_returns` 計算每一步的回報(returns)和優勢(advantages)
            # 為訓練準備數據,計算returns和advantagesreturns, advantages = self._compute_returns(last_obs_dict=obs_dict,policy_state_dict=dict(values=self.storage.query_key('values'), dones=self.storage.query_key('dones'), rewards=self.storage.query_key('rewards')))
    這些數據隨后批量存儲到 `self.storage`,為后續的策略優化做準備
  5. 最終,方法返回最新的觀測字典
        # 返回最新的觀測字典return obs_dict
1.4.1.5? GAE(廣義優勢估計):在 `_compute_returns` 方法中實現

優勢和回報的計算采用 GAE(廣義優勢估計),在 `_compute_returns` 方法中實現,能夠有效降低策略梯度的方差

  1. 首先,方法會用當前環境的最后一個觀測(`last_obs_dict`)通過價值網絡(critic)計算最后狀態的價值 `last_values`
    def _compute_returns(self, last_obs_dict, policy_state_dict):"""計算每一步的回報(returns)和優勢(advantages),用于PPO訓練使用廣義優勢估計(GAE)來降低策略梯度的方差。"""# 用critic網絡評估最后一個觀測的狀態價值,并去除梯度last_values = self.critic.evaluate(last_obs_dict["critic_obs"]).detach()# 初始化優勢為0advantage = 0# 從策略狀態字典中獲取values、dones和rewardsvalues = policy_state_dict['values']dones = policy_state_dict['dones']rewards = policy_state_dict['rewards']
    并將所有相關張量(values、dones、rewards、last_values)移動到指定設備(如 GPU)
        # 將所有張量轉移到指定設備(如GPU)last_values = last_values.to(self.device)values = values.to(self.device)dones = dones.to(self.device)rewards = rewards.to(self.device)
  2. 隨后,初始化一個與 values 形狀相同的 returns 張量,用于存儲每一步的回報
        # 初始化returns張量,形狀與values相同returns = torch.zeros_like(values)
  3. 為方便大家更好的理解,我直接引用此文《ChatGPT技術原理解析:從RL之PPO算法、RLHF到GPT4、instructGPT》的中的「3.3 針對「對話序列/獎勵值序列/values/DSC優勢函數/returns」的一個完整示例」內容 給大家解釋說明下
    首先,GAE的計算公式為?

    且如此文所說,考慮到有

    故,實際計算的時候,為減少計算量,可以先計算A_7,再分別計算A_6、A_5、A_4、A_3、A_2、A_1

    其次,再回顧一下TD誤差的含義:δ_1?=?r1?+ γV_old(2)?- V_old(1)
    比如對于上式:實際獲得的即時獎勵 r1 加上折扣后的未來獎勵預測 γV_old(2),再減去我們原先預測的當前時間步的獎勵 V_old(1),這就是后者的預測與前者實際經驗之間的差距


    SO,核心計算在一個反向循環中完成(從最后一步到第一步)。對于每一步,方法會判斷下一個狀態是否為終止狀態(`next_is_not_terminal`),并據此計算 TD 殘差(delta)

        # 獲取步數num_steps = returns.shape[0]# 反向遍歷每一步,計算GAE優勢和回報for step in reversed(range(num_steps)):# 最后一步的next_values用last_values,其余用下一步的valuesif step == num_steps - 1:next_values = last_valueselse:next_values = values[step + 1]# 判斷下一步是否為終止狀態next_is_not_terminal = 1.0 - dones[step].float()# 計算TD殘差delta = rewards[step] + next_is_not_terminal * self.gamma * next_values - values[step]
    優勢(advantage)通過遞歸方式累加,結合了當前步的 TD 殘差和未來步的優勢,體現了 GAE 的思想
            # 遞歸計算GAE優勢advantage = delta + next_is_not_terminal * self.gamma * self.lam * advantage
    每一步的回報則等于當前優勢加上當前狀態的價值,即,故有
            # 當前步的回報等于優勢加上當前價值returns[step] = advantage + values[step]
  4. 最后,方法計算優勢(returns - values)
        # 計算優勢(returns - values),并進行標準化advantages = returns - values
    并進行標準化處理(減去均值,除以標準差),以便后續訓練時數值更穩定
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)# 返回回報和標準化后的優勢
  5. 最終返回每一步的回報和標準化后的優勢。這個過程確保了 PPO 算法在更新策略時能夠利用高質量、低方差的優勢估計
        return returns, advantages
1.4.1.6?_training_step

訓練步驟 `_training_step` 會從存儲中生成 mini-batch,依次進行策略和價值網絡的更新

1.4.1.7?_update_ppo

PPO 的核心損失計算在 `_update_ppo` 方法中實現,包括代理損失、價值損失和熵損失,并支持自適應 KL 散度調整學習率

1.4.1.8 其他

此外,類還實現了模型的保存與加載、評估流程(包括回調機制)、以及詳細的日志記錄和可視化

1.4.2?運動操作專用PPO

// 待更

1.5 ASAP的核心創新模塊:增量學習與Delta動力學模型

1.5.1 機器人運控train_delta_a.py:基于PPO的擴展(支持雙策略)

本文件定義了一個名為 `PPO DeltaA` 的類,是基于 PPO(Proximal Policy Optimization)算法的擴展,主要用于機器人運動控制任務,支持“雙策略”機制: ?

  1. 主策略:當前正在訓練的 PPO 策略 ?
  2. 參考策略:從 checkpoint 加載的、凍結參數的預訓練策略(可用于對比、輔助、模仿等)

其主要依賴包括

  • `torch`、`torch.nn`、`torch.optim`:PyTorch 深度學習框架
  • humanoidverse 相關模塊:自定義的環境、網絡結構、工具等
  • `hydra`、`omegaconf`:配置管理
  • `loguru`、`rich`:日志和美觀的終端輸出
  • 其他:`os`、`time`、`deque`、`statistics` 等標準庫

如此文《ASAP——讓宇樹G1后仰跳投且跳舞:仿真中重現現實軌跡,然后通過增量動作模型預測仿真與現實的差距,最終縮小差距以對齊》所說

  1. 工作流程如圖2(b)所示

  2. PPO 用于訓練增量動作策略\pi_{\theta}^{\Delta},學習修正后的\Delta a_{t}以匹配仿真與真實世界
1.5.1.1 初始化(`__init__`)
  1. -繼承自 PPO 基類,初始化環境、配置、日志目錄、設備等
    如果配置中指定了 `policy_checkpoint`,則:
    自動查找并加載 checkpoint 對應的 config.yaml 配置文件
    合并 `eval_overrides` 配置(如果有)
  2. 預處理配置
            pre_process_config(policy_config)  # 對配置進行預處理
  3. 使用 hydra 的 `instantiate` 動態實例化參考策略(`self.loaded_policy`),并調用其 `setup()` 方法
    加載 checkpoint 權重到參考策略
  4. 設置參考策略為評估模式(凍結參數,不參與訓練)
    禁用參考策略所有參數的梯度(`param.requires_grad = False`)
    獲取參考策略的推理函數 `eval_policy`
1.5.1.2 rollout 步驟(`_rollout_step`):采集一段軌跡(rollout),供后續訓練使用

這段 `_rollout_step` 方法負責在強化學習訓練中收集一批(batch)環境交互數據。它的主要流程是在不計算梯度的情況下(`torch.inference_mode()`),循環執行 `num_steps_per_env` 次,每次代表環境中的一步

簡言之,每步流程:

  1. 主策略推理,得到動作(actions)和值(values)
  2. 參考策略推理,得到 `actions_closed_loop`
  3. 將主策略和參考策略的動作都傳入環境 `env.step(actor_state)`
  4. 收集環境返回的觀測、獎勵、done、info 等
  5. 更新存儲器(RolloutStorage),包括獎勵、done、values 等
  6. 統計回合獎勵、長度等信息
  7. 采集完一段后,計算 returns 和 advantages,供 PPO 算法訓練

具體而言,在每一步中

  1. 首先,主策略推理,得到動作(actions)和值(values)
    通過 `_actor_rollout_step` 計算當前策略的動作,并通過 `_critic_eval_step` 計算當前狀態的價值(value),這些信息被存儲在 `policy_state_dict` 中
        def _rollout_step(self, obs_dict):  # 采集一段rollout軌跡with torch.inference_mode():  # 關閉梯度計算,加速推理for i in range(self.num_steps_per_env):  # 遍歷每個環境步# 計算動作和值# 策略狀態字典policy_state_dict = {}  # 主策略推理policy_state_dict = self._actor_rollout_step(obs_dict, policy_state_dict)  # 評估當前狀態的價值values = self._critic_eval_step(obs_dict).detach()  # 存儲valuepolicy_state_dict["values"] = values
  2. 隨后,所有觀測和策略相關的數據都會被存入 `self.storage`,用于后續訓練
                    ## 存儲觀測for obs_key in obs_dict.keys():# 存儲每個觀測self.storage.update_key(obs_key, obs_dict[obs_key])  for obs_ in policy_state_dict.keys():# 存儲策略相關數據self.storage.update_key(obs_, policy_state_dict[obs_])  # 獲取主策略動作actions = policy_state_dict["actions"]  # 構造actor狀態actor_state = {}  # 主策略動作actor_state["actions"] = actions
  3. 接下來,參考策略推理,得到 `actions_closed_loop`
    方法還會用一個“參考策略”(`self.loaded_policy`,通常是預訓練或專家策略)對 `closed_loop_actor_obs` 進行推理,得到 `actions_closed_loop`
                    # 參考策略推理policy_output = self.loaded_policy.eval_policy(obs_dict['closed_loop_actor_obs']).detach()  
    并將其與主策略的動作一起組成 `actor_state`,用于環境的下一步模擬
                    # 存儲參考策略動作actor_state["actions_closed_loop"] = policy_output  
  4. 環境執行一步后,新的觀測、獎勵、終止標志和信息被返回。所有張量會被轉移到正確的設備(如 GPU),獎勵和終止信息也會被存儲
                    # 與環境交互,獲得新觀測、獎勵、done等obs_dict, rewards, dones, infos = self.env.step(actor_state)  for obs_key in obs_dict.keys():# 移動到指定設備obs_dict[obs_key] = obs_dict[obs_key].to(self.device)  # 獎勵和done也移動到設備rewards, dones = rewards.to(self.device), dones.to(self.device)  # 記錄環境統計信息self.episode_env_tensors.add(infos["to_log"])  # 獎勵擴展維度rewards_stored = rewards.clone().unsqueeze(1)  
    若出現超時(`time_outs`),獎勵會做相應調整
                    # 如果有超時信息if 'time_outs' in infos:  # 修正獎勵rewards_stored += self.gamma * policy_state_dict['values'] * infos['time_outs'].unsqueeze(1).to(self.device)  # 檢查獎勵維度assert len(rewards_stored.shape) == 2  self.storage.update_key('rewards', rewards_stored)    # 存儲獎勵self.storage.update_key('dones', dones.unsqueeze(1))  # 存儲doneself.storage.increment_step()                         # 存儲步數+1
    每一步還會調用 `_process_env_step` 進行額外處理
                    self._process_env_step(rewards, dones, infos)  # 處理環境步
    
  5. 如果設置了日志目錄,還會記錄每個 episode 的獎勵和長度,便于后續統計和分析
  6. 循環結束后,方法會統計采集數據所用的時間,并調用 `_compute_returns` 計算每一步的回報(returns)和優勢(advantages)
                # 計算回報和優勢returns, advantages = self._compute_returns(last_obs_dict=obs_dict,policy_state_dict=dict(values=self.storage.query_key('values'), dones=self.storage.query_key('dones'), rewards=self.storage.query_key('rewards')))  
    這些數據會批量更新到存儲中,為后續的策略優化做準備
                self.storage.batch_update_data('returns', returns)        # 存儲回報self.storage.batch_update_data('advantages', advantages)  # 存儲優勢
  7. 最后返回最新的觀測字典
            return obs_dict          # 返回最新觀測

整個過程實現了數據采集、存儲和預處理的自動化,是 PPO 及其變體算法訓練流程的核心部分。

    1.5.1.3 評估前處理(`_pre_eval_env_step`)

    簡言之

    1. 用于評估階段,每步都讓主策略和參考策略分別推理動作
    2. 更新 `actor_state`,包含主策略動作和參考策略動作
    3. 支持回調機制(`eval_callbacks`),可擴展評估邏輯

    1.5.2?Delta動力學模型

    // 待更

    1.6 控制算法:解耦合控制、力感知控制、模型預測路徑積分控制

    // 待更

    第二部分 環境模塊 envs/

    BaseTask (envs/base_task/base_task.py):
    ├── BaseSimulator → 物理模擬器接口
    ├── terrain → 地形生成
    ├── robot配置 → 機器人參數
    ├── obs配置 → 觀察空間定義
    └── rewards配置 → 獎勵函數設計具體任務繼承關系:
    BaseTask
    ├── LocomotionTask (locomotion/)
    ├── MotionTrackingTask (motion_tracking/) 
    └── LeggedBaseTask (legged_base_task/)

    **基礎任務架構:**
    - **BaseTask**: 所有RL任務的基類,處理模擬器初始化、環境設置、觀察空間定義
    - **base_task/**: 包含基礎任務實現和配置

    **專門任務類型:**
    - **locomotion/**: 運動控制任務,實現機器人行走、跑步等基礎運動
    - **motion_tracking/**: 運動追蹤任務,實現對人類動作的模仿和跟蹤
    - **legged_base_task/**: 腿式機器人基礎任務
    - **env_utils/**: 環境工具函數,包含地形生成、物理參數設置等

    第三部分 模擬器模塊 simulator/

    支持多種物理引擎:
    - **IsaacGym**: NVIDIA的GPU加速物理模擬器
    - **IsaacSim**: 基于Omniverse的仿真平臺
    - **Genesis**: 新興的高性能物理模擬器
    - **base_simulator/**: 模擬器基類,提供統一接口

    第四部分 配置系統 config/

    采用Hydra配置管理,結構化組織:
    - **base.yaml/base_eval.yaml**: 基礎配置
    - **algo/**: 算法特定配置
    - **env/**: 環境配置
    - **robot/**: 機器人配置 (如G1機器人的29自由度配置)
    - **rewards/**: 獎勵函數配置
    - **obs/**: 觀察空間配置
    - **terrain/**: 地形配置

    第五部分 工具模塊 utils/

    **通用工具:**
    - **common.py**: 通用函數
    - **math.py**: 數學工具函數
    - **torch_utils.py**: PyTorch相關工具
    - **config_utils.py**: 配置處理工具
    - **motion_lib/**: 動作庫,存儲和處理人類動作數據

    第六部分 數據模塊data/

    - **motions/**: 存儲人類動作數據
    - **robots/**: 機器人模型文件和配置

    第七部分 Isaac工具isaac_utils/

    專門為Isaac模擬器提供的數學和旋轉工具:
    - **maths.py**: 數學計算函數
    - **rotations.py**: 旋轉變換工具

    // 待更

    本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
    如若轉載,請注明出處:http://www.pswp.cn/web/84602.shtml
    繁體地址,請注明出處:http://hk.pswp.cn/web/84602.shtml
    英文地址,請注明出處:http://en.pswp.cn/web/84602.shtml

    如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

    相關文章

    計算機網絡學習筆記:應用層概述、動態主機配置協議DHCP

    文章目錄 一、應用層概述1.1、C/S架構1.2、P2P架構 二、動態主機配置協議DHCP2.1、DHCP發現報文2.2、DHCP提供報文2.3、DHCP請求報文2.4、DHCP確認報文2.5、DHCP的續約與終止 總結 一、應用層概述 應用層位于計算機網絡結構的最上層,用于解決應用進程的交互以實現特…

    為服務器SSH登錄增加2FA驗證

    安裝NTP模塊并設置時區 安裝NTP模塊 一般的服務器NTP服務默認是不安裝的,需要安裝NTP模塊【7】并啟用。 運行以下指令檢查你的NTP模塊是否已啟用,已啟用則忽略安裝NTP模塊的內容 timedatectl 如果你的返回內容和以下圖片一樣,則表示NTP未…

    AI大模型提示詞工程研究報告:長度與效果的辯證分析

    一、核心問題:提示詞長度與模型性能的平衡 核心矛盾:提示詞長度增加 → 信息豐富度↑ & 準確性↑ ? 計算成本↑ & 響應延遲↑ 二、詳細機制分析 (一)長提示詞的優勢(實證數據支持) 案例類型短提…

    HttpServletResponse源碼解析

    Java Servlet API 中 HttpServletResponse 接口的源碼,這是 Java Web 開發中非常核心的一個接口,用于向客戶端(通常是瀏覽器)發送 HTTP 響應。 public interface HttpServletResponse extends ServletResponse {int SC_CONTINUE …

    AI基礎概念

    目錄 1、ASR和STT區別 2、流式輸出 定義 原理 應用場景 優點 缺點 3、Ollama 4、mindspore和deepseek r1 v3 5、DeepSeek R1/V3 用的哪個底層AI框架 6、HAI-LLM比tensorflow、pytorch還強么 1. 核心優勢對比 2. 性能表現 3. 適用場景 總結 7、openai用的什么底層…

    ubuntu20.04速騰聚創airy驅動調試

    1.下載相關資料 下載包括:速騰airy產品手冊.pdf、RSView(用于顯示激光雷達數據)、3d數模文件、 RS-LiDAR-16用戶手冊 以下鏈接進行下載 https://www.robosense.cn/resources 2.連接線路后通過Wireshark抓包后進行本地IP配置 2.1按照線路連…

    Redis的大key和熱key如何解決

    文章目錄 Redis大Key一、什么是Redis大Key二、大Key的產生原因三、大Key的影響四、大Key的解決方案1. 檢測大Key2. 解決方案(1) 數據拆分(2) 使用壓縮算法(3) 使用合適的數據結構(4) 設置合理的過期時間(5) 合理清理(6) 配置優化 五、預防措施總結 Redis熱key一、熱Key問題的本…

    恒溫晶振與溫補晶振的區別

    在電子設備領域,晶振如同精準的“心臟起搏器”,為電路提供穩定的時鐘信號。恒溫晶振(OCXO)和溫補晶振(TCXO)作為兩類重要的晶體振蕩器,在不同的應用場景中發揮著關鍵作用,它們的區別…

    基于SpringBoot的在線考試智能監控系統設計與實現

    目錄 一.🦁前言二.🦁開源代碼與組件使用情況說明三.🦁核心功能1. ?算法設計2. ?Java開發語言3. ?Vue.js框架4. ?部署項目 四.🦁演示效果1. 管理員模塊1.1 用戶管理 2. 教師模塊2.1 考試管理2.2 瀏覽試題列表2.3 添加試題2.4 成…

    0基礎學Python系列【16】自動化郵件發送的終極教程:Python庫smtplib與email詳解

    大家好,歡迎來到Python學習的第二站!?? Python自帶了一些超好用的模塊,可以讓你不必從頭寫代碼就能實現很多功能。比如數學計算、文件操作、網絡通信等。花姐會挑選常用的一些模塊來講解,確保你能在實際項目中用到。?? 本章要學什么? 接下來花姐會深入淺出的講解下面…

    環衛車輛定位與監管:安心聯車輛監控管理平臺--科技賦能城市環境衛生管理

    一、 引言 城市環境衛生是城市文明的重要標志,也是城市管理的重要內容。隨著城市化進程的加快,環衛作業范圍不斷擴大,環衛車輛數量不斷增加,傳統的管理模式已難以滿足現代化城市管理的需求。為提高環衛作業效率,加強環…

    GIS 數據質檢:驗證 Geometry 有效性

    前言 在GIS開發中,數據的幾何有效性直接影響分析結果的準確性。無效的幾何(如自相交、空洞或坐標錯誤)可能導致空間計算失敗或輸出偏差。無論是Shapefile、GeoJSON還是數據庫中的空間數據,幾何質檢都是數據處理中不可忽視的關鍵步…

    AI大模型學習之基礎數學:高斯分布-AI大模型概率統計的基石

    🧑 博主簡介:CSDN博客專家、CSDN平臺優質創作者,高級開發工程師,數學專業,10年以上C/C, C#, Java等多種編程語言開發經驗,擁有高級工程師證書;擅長C/C、C#等開發語言,熟悉Java常用開…

    HarmonyOS性能優化——耗時操作減少

    耗時操作減少 在應用開發中,避免主線程執行冗余和耗時操作至關重要。這可以降低主線程負載,提升UI響應速度。 避免主線程冗余操作 冗余操作是不必要的、重復執行且對程序功能無實質性貢獻的操作。這些操作浪費計算資源,降低程序運行效率&a…

    emscripten 編譯 wasm 版本的 openssl

    搭建emscripten環境【參考:https://emscripten.org/docs/getting_started/downloads.html】 下載openssl解壓復制到emsdk目錄 依次執行下列命令: cd emsdk #激活emsdk source ./emsdk_env.shcd opensslemconfigure ./Configure linux-x32 -no-asm -sta…

    uniapp 實戰新聞頁面(一)

    新聞系統 一、 創建項目 創建個人中心 page.json 配置 tabar "tabBar": {"color":"#666","selectedColor": "#31C27C","list": [{"text": "首頁","pagePath": "pages/inde…

    JAVA鎖機制:對象鎖與類鎖

    JAVA鎖機制:對象鎖與類鎖 在多線程編程中,合理使用鎖機制是保證數據一致性和線程安全的關鍵。本文將通過示例詳細講解 Java 中的對象鎖和類鎖的原理、用法及區別。 一、未加鎖的并發問題 先看一段未加鎖的代碼: public class Synchronize…

    maxcomputer 和 hologres中的EXTERNAL TABLE 和 FOREIGN TABLE

    在阿里云的大數據和實時數倉產品中,MaxCompute 和 Hologres 都支持類似于 EXTERNAL TABLE 和 FOREIGN TABLE 的機制,但它們的實現和語義有所不同。 下面分別說明: ?? 一、MaxCompute 中的 EXTERNAL TABLE 和 FOREIGN TABLE 1. EXTERNAL T…

    穩定幣:從支付工具到金融基礎設施的技術演進與全球競爭新格局

    引言:穩定幣的崛起與金融體系重構 2025年6月,全球穩定幣市值突破2500億美元歷史大關,單年鏈上交易額高達35萬億美元——這一數字已超越Visa和萬事達卡交易總和。這一里程碑事件標志著穩定幣已從加密貨幣市場的邊緣實驗,蛻變為重構…

    用 HTML、CSS 和 jQuery 打造多頁輸入框驗證功能

    多頁輸入框驗證功能總結:使用 HTML、CSS 和 jQuery 實現 一、多頁表單驗證的核心概念與應用場景 多頁輸入框驗證是指將復雜表單拆分為多個頁面或步驟,逐步引導用戶完成輸入,并在每一步對用戶輸入進行驗證的功能。這種設計具有以下優勢: 提升用戶體驗:避免長表單帶來的心…