【深度強化學習】MIP-DQN 實現案例(完整Python代碼)

目錄

  • MIP-DQN 算法概述
    • 建模基礎
    • 訓練階段(Training)
    • 部署階段(Online Execution)
    • DNN 網絡轉化為 MIP 表達式
    • 性能指標
  • 完整 Python 代碼實現
    • 主函數:random_generator_battery
    • 模型函數:MIP_DQN
      • 基礎/專用庫包安裝
      • 模型運行(完整Python代碼)
    • 參數設置函數:Parameters
  • 參考

本博客根據論文《Optimal energy system scheduling using a constraint-aware reinforcement learning algorithm》(2023)中提出的 MIP-DQN 算法,對其完整研究方法進行詳細介紹。

內容包括:輸入、輸出,狀態空間與動作空間定義,訓練過程,在線部署過程,以及如何將 DNN 轉換為 MIP 進行約束優化。

MIP-DQN 算法概述

MIP-DQN(Mixed-Integer Programming Deep Q-Network)是一種 值函數驅動的深度強化學習算法,與傳統 深度強化學習DRL 不同,它在 執行階段能夠嚴格滿足操作約束,例如功率平衡、爬坡限制等。

建模基礎

輸入(State 狀態空間):

每個時刻 ( t ) 的狀態 ( s_t ) 包括:

  • ( P^V_t ):當前時刻的光伏出力(向量)
  • ( P^L_t ):當前時刻的用戶負荷(向量)
  • ( P^G_{t-1} ):上一時刻 DG(分布式發電機)出力(向量)
  • ( SOC_t ):當前時刻儲能系統(ESS)電量狀態(向量)
s_t = (P^V_t, P^L_t, P^G_{t-1}, SOC_t)

在這里插入圖片描述


輸出(Action 動作空間):

每個時刻 ( t ) 的動作 ( a_t ) 包括:

  • ( P^G_{i,t} ):每個 DG 單元的出力(連續變量)
  • ( P^B_{j,t} ):每個 ESS 的充放電功率(連續變量)
a_t = (P^G_{i,t}, P^B_{j,t})

注意:系統與電網的交易功率 ( P^N_t ) 不由 DRL 控制,它由系統自動調整以維持功率平衡。

訓練階段(Training)

🎯 目標:
學習一個 Q 函數,估計在狀態 ( s_t ) 下采取動作 ( a_t ) 所帶來的期望回報。
在這里插入圖片描述

獎勵函數定義(Reward Function):

R(s_t, a_t) = -σ_1 × \text{運行成本} - σ_2 × \text{功率不平衡}
  • 運行成本 = DG 成本 + 與電網交易成本
  • 功率不平衡 ( ΔP_t ):公式如下:
ΔP_t = \left| \sum_i P^G_{i,t} + \sum_j P^B_{j,t} + \sum_m P^V_{m,t} + P^N_t - \sum_k P^L_{k,t} \right|

在這里插入圖片描述
訓練流程(見算法 1):

  1. 初始化 Q 網絡 ( Q_\theta(s, a) )、目標網絡 ( Q_{\theta_{target}} ),以及策略網絡 ( \pi_\omega )
  2. 采樣動作:從策略網絡 ( \pi_\omega(s) ) 加上噪聲(用于探索)
  3. 與環境交互:得到 ( (s_t, a_t, r_t, s_{t+1}) )
  4. 存入回放緩沖區 Replay Buffer
  5. 采樣 mini-batch 批量更新:
    • 更新 Q 網絡:使用目標網絡計算 target Q 值
    • 更新策略網絡:最大化 Q 網絡的期望值
  6. 定期軟更新目標網絡:( \theta_{target} = \tau \theta + (1 - \tau)\theta_{target} )
    在這里插入圖片描述

部署階段(Online Execution)

訓練完成后,丟棄策略網絡 ( \pi_\omega ),僅保留 Q 網絡 ( Q_\theta(s, a) ),并將其轉化為 帶約束的整數規劃問題(MIP) 進行決策。

步驟如下(見算法 2):

  1. 將訓練好的 DNN ( Q_\theta(s, a) ) 轉化為 MIP 表達形式(見下節)
  2. 添加操作約束:
    • 功率平衡約束(等式)
    • DG 出力上下限、爬坡約束
    • ESS 充放電限制、SOC 約束等
  3. 使用商業 MIP 求解器(如 Gurobi)求解:
\max_{a \in \mathcal{A}} Q_\theta(s, a) \quad \text{subject to all constraints}

在這里插入圖片描述

求解結果中的 ( a^* ) 即為當前狀態下最優可行動作!

在這里插入圖片描述

DNN 網絡轉化為 MIP 表達式

假設 Q 網絡結構為 多層前饋神經網絡(ReLU 激活),則每一層輸出滿足:

x^k = \text{ReLU}(W^{k-1} x^{k-1} + b^{k-1})

ReLU 可通過如下線性混合整數形式建模:

對于每一個神經元 ( x ):

x = \max(0, \hat{x}) \Rightarrow
\begin{cases}
x \geq 0 \\
x \geq \hat{x} \\
x \leq M z \\
x - \hat{x} \leq M (1 - z) \\
z \in \{0, 1\}
\end{cases}

其中 ( z ) 是二進制變量,( M ) 是足夠大的常數。

性能指標

DRL 算法性能評估指標:

  • 運行成本:目標函數(越小越好)
  • 功率不平衡(ΔP):是否滿足功率平衡(越小越好)
  • 算法運行時間:是否滿足實時性要求

完整 Python 代碼實現

所用工具:

  • PyTorch:訓練 Q 網絡和策略網絡
  • OMLT(Optimization and Machine Learning Toolkit):將 PyTorch 模型轉為 MIP 形式
  • Gurobi / CPLEX / CBC:商業混合整數規劃求解器
  • Pyomo:建模數學規劃問題
  • OpenAI Gym / 自定義環境:訓練環境

主函數:random_generator_battery

此段代碼構建了一個 能源管理環境 ESSEnv,用于模擬以下多個能源設備的運行:

  • 🔋 電池系統(Battery)
  • ? 三臺分布式發電機(DG1、DG2、DG3)
  • 🌞 光伏發電(PV)
  • 🔌 電網供電(Grid)
  • 📈 能源價格、負荷、電量數據(通過 DataManager 管理)

該環境繼承自 gym.Env,兼容 OpenAI Gym 強化學習 API,支持 reset()step()render() 等接口,用于訓練強化學習智能體。

模型函數:MIP_DQN

基礎/專用庫包安裝

安裝基礎依賴:

conda install numpy pandas matplotlib scikit-learn -y
conda install pytorch torchvision -c pytorch -y

安裝 Pyomo(用于建模 MIP):

conda install -c conda-forge pyomo -y

安裝 Gurobi(用于求解 MIP)

??注意: Gurobi 是商業軟件,需注冊并獲取 Academic License(免費用于學術用途)

官網-Gurobi Optimization。Gurobi的安裝及 license 獲取可參考我的另一博客-。

1. 安裝 Gurobi

conda install -c gurobi gurobi -y

2. 設置 license(首次使用)

grbgetkey <your-license-key>grbgetkeyex: grbgetkey ae36ac20-16e6-acd2-f242-4da6e765fa0a

然后按提示操作。若你已有 gurobi.lic 文件,請放在 ~/.gurobi/ 或 C:\gurobi\ 目錄下。
在這里插入圖片描述


安裝 OMLT(Optimization and Machine Learning Toolkit,優化與機器學習集成工具)

pip install omltconda install omlt

注意:OMLT 依賴 pyomo 和 onnx,會自動安裝。


安裝 ONNX(用于神經網絡轉模型)

pip install onnx onnxruntime

說明,onnx 是模型格式,onnxruntime 是運行時推理引擎


安裝 Weights & Biases(可選,日志可視化工具)
wandb 是一個輕量級第三方 Python 包,不依賴底層 C/C++ 庫,pip 安裝非常穩定,適合用于 Conda 環境中。

官網-wandb,進去官網后注冊即可獲取個人API。

下載命令如下:

pip install wandb

輸入以下命令,按照指示輸入個人API即可。

wandb login

模型運行(完整Python代碼)

import pickle
import torch
import os
import numpy as np
import numpy.random as rd
import pandas as pd
import pyomo.environ as pyo
import pyomo.kernel as pmo
from omlt import OmltBlock
from gurobipy import *
from omlt.neuralnet import NetworkDefinition, FullSpaceNNFormulation,ReluBigMFormulation
from omlt.io.onnx import write_onnx_model_with_bounds,load_onnx_neural_network_with_bounds
import tempfile
import torch.onnx
import torch.nn as nn
from copy import deepcopy
import wandb
from random_generator_battery import ESSEnv## define net
# 經驗回放緩沖區 ReplayBuffer:用于存儲 agent 與環境交互的軌跡,以供后續訓練使用。
class ReplayBuffer:def __init__(self, max_len, state_dim, action_dim, gpu_id=0):self.now_len = 0self.next_idx = 0self.if_full = Falseself.max_len = max_lenself.data_type = torch.float32self.action_dim = action_dimself.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")other_dim = 1 + 1 + self.action_dimself.buf_other = torch.empty(size=(max_len, other_dim), dtype=self.data_type, device=self.device)if isinstance(state_dim, int):  # state is pixelself.buf_state = torch.empty((max_len, state_dim), dtype=torch.float32, device=self.device)elif isinstance(state_dim, tuple):self.buf_state = torch.empty((max_len, *state_dim), dtype=torch.uint8, device=self.device)else:raise ValueError('state_dim')# extend_buffer():添加新數據def extend_buffer(self, state, other):  # CPU array to CPU arraysize = len(other)next_idx = self.next_idx + sizeif next_idx > self.max_len:self.buf_state[self.next_idx:self.max_len] = state[:self.max_len - self.next_idx]self.buf_other[self.next_idx:self.max_len] = other[:self.max_len - self.next_idx]self.if_full = Truenext_idx = next_idx - self.max_lenself.buf_state[0:next_idx] = state[-next_idx:]self.buf_other[0:next_idx] = other[-next_idx:]else:self.buf_state[self.next_idx:next_idx] = stateself.buf_other[self.next_idx:next_idx] = otherself.next_idx = next_idx# sample_batch():采樣 mini-batch(訓練用)def sample_batch(self, batch_size) -> tuple:indices = rd.randint(self.now_len - 1, size=batch_size)r_m_a = self.buf_other[indices]return (r_m_a[:, 0:1],r_m_a[:, 1:2],r_m_a[:, 2:],self.buf_state[indices],self.buf_state[indices + 1])def update_now_len(self):self.now_len = self.max_len if self.if_full else self.next_idxclass Arguments:def __init__(self, agent=None, env=None):self.agent = agent  # Deep Reinforcement Learning algorithmself.env = env  # the environment for trainingself.cwd = None  # current work directory. None means set automaticallyself.if_remove = False  # remove the cwd folder? (True, False, None:ask me)self.visible_gpu = '0,1,2,3'  # for example: os.environ['CUDA_VISIBLE_DEVICES'] = '0, 2,'self.worker_num = 2  # rollout workers number pre GPU (adjust it to get high GPU usage)self.num_threads = 8  # cpu_num for evaluate model, torch.set_num_threads(self.num_threads)self.if_per_or_gae = False'''Arguments for training'''self.num_episode=3000self.gamma = 0.995  # discount factor of future rewardsself.learning_rate = 1e-4  # 2 ** -14 ~= 6e-5self.soft_update_tau = 1e-2  # 2 ** -8 ~= 5e-3self.net_dim = 64  # the network width 256self.batch_size = 256  # num of transitions sampled from replay buffer.self.repeat_times = 2 ** 3  # repeatedly update network to keep critic's loss smallself.target_step = 1000 # collect target_step experiences , then update network, 1024self.max_memo = 50000  # capacity of replay buffer## arguments for controlling explorationself.explorate_decay=0.99self.explorate_min=0.3'''Arguments for evaluate'''self.random_seed_list=[1234,2234,3234,4234,5234]# self.random_seed_list=[2234]self.run_name='MIP_DQN_experiments''''Arguments for save'''self.train=Trueself.save_network=Truedef init_before_training(self, if_main):if self.cwd is None:agent_name = self.agent.__class__.__name__self.cwd = f'./{agent_name}/{self.run_name}'if if_main:import shutil  # remove history according to bool(if_remove)if self.if_remove is None:self.if_remove = bool(input(f"| PRESS 'y' to REMOVE: {self.cwd}? ") == 'y')elif self.if_remove:shutil.rmtree(self.cwd, ignore_errors=True)print(f"| Remove cwd: {self.cwd}")os.makedirs(self.cwd, exist_ok=True)np.random.seed(self.random_seed)torch.manual_seed(self.random_seed)torch.set_num_threads(self.num_threads)torch.set_default_dtype(torch.float32)os.environ['CUDA_VISIBLE_DEVICES'] = str(self.visible_gpu)# control how many GPU is used  # 模型定義 Actor 網絡(策略網絡)
class Actor(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net=nn.Sequential(nn.Linear(state_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,action_dim))def forward(self,state):return self.net(state).tanh()# make the data from -1 to 1# 用于探索時加入噪聲def get_action(self,state,action_std):#action=self.net(state).tanh()noise=(torch.randn_like(action)*action_std).clamp(-0.5,0.5)#return (action+noise).clamp(-1.0,1.0)# 模型定義 CriticQ 網絡(雙 Q 網絡)
class CriticQ(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net_head=nn.Sequential(nn.Linear(state_dim+action_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU())self.net_q1=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q1 valueself.net_q2=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q2 valuedef forward(self,value):mid=self.net_head(value)return self.net_q1(mid)def get_q1_q2(self,value):mid=self.net_head(value)return self.net_q1(mid),self.net_q2(mid)class AgentBase:def __init__(self):self.state = Noneself.device = Noneself.action_dim = Noneself.if_off_policy = Noneself.explore_noise = Noneself.trajectory_list = Noneself.explore_rate = 1.0self.criterion = torch.nn.SmoothL1Loss()def init(self, net_dim, state_dim, action_dim, learning_rate=1e-4, _if_per_or_gae=False, gpu_id=0):self.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")self.action_dim = action_dimself.cri = self.ClassCri(net_dim, state_dim, action_dim).to(self.device)self.act = self.ClassAct(net_dim, state_dim, action_dim).to(self.device) if self.ClassAct else self.criself.cri_target = deepcopy(self.cri) if self.if_use_cri_target else self.criself.act_target = deepcopy(self.act) if self.if_use_act_target else self.actself.cri_optim = torch.optim.Adam(self.cri.parameters(), learning_rate)self.act_optim = torch.optim.Adam(self.act.parameters(),learning_rate) if self.ClassAct else self.cridel self.ClassCri, self.ClassActdef select_action(self, state) -> np.ndarray:states = torch.as_tensor((state,), dtype=torch.float32, device=self.device)action = self.act(states)[0]if rd.rand()<self.explore_rate:action = (action + torch.randn_like(action) * self.explore_noise).clamp(-1, 1)return action.detach().cpu().numpy()def explore_env(self, env, target_step):trajectory = list()state = self.statefor _ in range(target_step):action = self.select_action(state)state, next_state, reward, done, = env.step(action)trajectory.append((state, (reward, done, *action)))state = env.reset() if done else next_stateself.state = statereturn trajectory@staticmethoddef optim_update(optimizer, objective):optimizer.zero_grad()objective.backward()optimizer.step()@staticmethoddef soft_update(target_net, current_net, tau):for tar, cur in zip(target_net.parameters(), current_net.parameters()):tar.data.copy_(cur.data * tau + tar.data * (1.0 - tau))def save_or_load_agent(self, cwd, if_save):def load_torch_file(model_or_optim, _path):state_dict = torch.load(_path, map_location=lambda storage, loc: storage)model_or_optim.load_state_dict(state_dict)name_obj_list = [('actor', self.act), ('act_target', self.act_target), ('act_optim', self.act_optim),('critic', self.cri), ('cri_target', self.cri_target), ('cri_optim', self.cri_optim), ]name_obj_list = [(name, obj) for name, obj in name_obj_list if obj is not None]if if_save:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"torch.save(obj.state_dict(), save_path)else:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"load_torch_file(obj, save_path) if os.path.isfile(save_path) else Nonedef _update_exploration_rate(self,explorate_decay,explore_rate_min):self.explore_rate = max(self.explore_rate * explorate_decay, explore_rate_min)'''this function is used to update the explorate probability when select action'''# 模型定義 AgentMIPDQN 算法(繼承基礎類 AgentBase)
class AgentMIPDQN(AgentBase):def __init__(self):super().__init__()self.explore_noise = 0.5  # standard deviation of exploration noiseself.policy_noise = 0.2  # standard deviation of policy noiseself.update_freq = 2  # delay update frequencyself.if_use_cri_target = self.if_use_act_target = Trueself.ClassCri = CriticQself.ClassAct = Actor# update_net():更新策略網絡和雙 Q 網絡def update_net(self, buffer, batch_size, repeat_times, soft_update_tau) -> tuple:buffer.update_now_len()obj_critic = obj_actor = Nonefor update_c in range(int(buffer.now_len / batch_size * repeat_times)):# we update too much time?obj_critic, state = self.get_obj_critic(buffer, batch_size)self.optim_update(self.cri_optim, obj_critic)action_pg = self.act(state)  # policy gradientobj_actor = -self.cri_target(torch.cat((state, action_pg),dim=-1)).mean()  # use cri_target instead of cri for stable trainingself.optim_update(self.act_optim, obj_actor)if update_c % self.update_freq == 0:  # delay updateself.soft_update(self.cri_target, self.cri, soft_update_tau)self.soft_update(self.act_target, self.act, soft_update_tau)return obj_critic.item() / 2, obj_actor.item()# get_obj_critic():獲取目標 Q 值并計算 critic lossdef get_obj_critic(self, buffer, batch_size) -> (torch.Tensor, torch.Tensor):with torch.no_grad():reward, mask, action, state, next_s = buffer.sample_batch(batch_size)next_a = self.act_target.get_action(next_s, self.policy_noise)  # policy noise,next_q = torch.min(*self.cri_target.get_q1_q2(torch.cat((next_s, next_a),dim=-1)))  # twin criticsq_label = reward + mask * next_qq1, q2 = self.cri.get_q1_q2(torch.cat((state, action),dim=-1))obj_critic = self.criterion(q1, q_label) + self.criterion(q2, q_label)  # twin criticsreturn obj_critic, statedef update_buffer(_trajectory):ten_state = torch.as_tensor([item[0] for item in _trajectory], dtype=torch.float32)ary_other = torch.as_tensor([item[1] for item in _trajectory])ary_other[:, 0] = ary_other[:, 0]   # ten_rewardary_other[:, 1] = (1.0 - ary_other[:, 1]) * gamma  # ten_mask = (1.0 - ary_done) * gammabuffer.extend_buffer(ten_state, ary_other)_steps = ten_state.shape[0]_r_exp = ary_other[:, 0].mean()  # other = (reward, mask, action)return _steps, _r_expdef get_episode_return(env, act, device):'''get information of one episode during the training'''episode_return = 0.0  # sum of rewards in an episodeepisode_unbalance=0.0episode_operation_cost=0.0state = env.reset()for i in range(24):s_tensor = torch.as_tensor((state,), device=device)a_tensor = act(s_tensor)action = a_tensor.detach().cpu().numpy()[0]  # not need detach(), because with torch.no_grad() outsidestate, next_state, reward, done,= env.step(action)state=next_stateepisode_return += rewardepisode_unbalance+=env.real_unbalanceepisode_operation_cost+=env.operation_costif done:breakreturn episode_return,episode_unbalance,episode_operation_cost# 網絡導出與 MIP 調用模塊 Actor_MIP
# 用于將訓練好的 Q 網絡轉換成 ONNX + OMLT + Pyomo 表達形式,并結合 Gurobi 求解最優動作
class Actor_MIP:'''this actor is used to get the best action and Q function, the only input should be batch tensor state, action, and network, while the output should bebatch tensor max_action, batch tensor max_Q'''def __init__(self,scaled_parameters,batch_size,net,state_dim,action_dim,env,constrain_on=False):self.batch_size = batch_sizeself.net = netself.state_dim = state_dimself.action_dim =action_dimself.env = envself.constrain_on=constrain_onself.scaled_parameters=scaled_parametersdef get_input_bounds(self,input_batch_state):batch_size = self.batch_sizebatch_input_bounds = []lbs_states = input_batch_state.detach().numpy()ubs_states = lbs_statesfor i in range(batch_size):input_bounds = {}for j in range(self.action_dim + self.state_dim):if j < self.state_dim:input_bounds[j] = (float(lbs_states[i][j]), float(ubs_states[i][j]))else:input_bounds[j] = (float(-1), float(1))batch_input_bounds.append(input_bounds)return batch_input_boundsdef predict_best_action(self, state):state=state.detach().cpu().numpy()v1 = torch.zeros((1, self.state_dim+self.action_dim), dtype=torch.float32)'''this function is used to get the best action based on current net'''model = self.net.to('cpu')input_bounds = {}lb_state = stateub_state = statefor i in range(self.action_dim + self.state_dim):if i < self.state_dim:input_bounds[i] = (float(lb_state[0][i]), float(ub_state[0][i]))else:input_bounds[i] = (float(-1), float(1))with tempfile.NamedTemporaryFile(suffix='.onnx', delete=False) as f:# export neural network to ONNXtorch.onnx.export(model,v1,f,input_names=['state_action'],output_names=['Q_value'],dynamic_axes={'state_action': {0: 'batch_size'},'Q_value': {0: 'batch_size'}})# write ONNX model and its bounds using OMLTwrite_onnx_model_with_bounds(f.name, None, input_bounds)# load the network definition from the ONNX modelnetwork_definition = load_onnx_neural_network_with_bounds(f.name)# global optimalityformulation = ReluBigMFormulation(network_definition)m = pyo.ConcreteModel()m.nn = OmltBlock()m.nn.build_formulation(formulation)'''# we are now building the surrogate model between action and state'''# constrain for battery,if self.constrain_on:m.power_balance_con1 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\((m.nn.inputs[8] * self.scaled_parameters[1])+m.nn.inputs[4]*self.scaled_parameters[5]) +\((m.nn.inputs[9] * self.scaled_parameters[2])+m.nn.inputs[5]*self.scaled_parameters[6]) +\((m.nn.inputs[10] * self.scaled_parameters[3])+m.nn.inputs[6]*self.scaled_parameters[7])>=\m.nn.inputs[3] *self.scaled_parameters[4]-self.env.grid.exchange_ability))m.power_balance_con2 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\(m.nn.inputs[8] * self.scaled_parameters[1]+m.nn.inputs[4]*self.scaled_parameters[5]) +\(m.nn.inputs[9] * self.scaled_parameters[2]+m.nn.inputs[5]*self.scaled_parameters[6]) +\(m.nn.inputs[10] * self.scaled_parameters[3]+m.nn.inputs[6]*self.scaled_parameters[7])<=\m.nn.inputs[3] *self.scaled_parameters[4]+self.env.grid.exchange_ability))m.obj = pyo.Objective(expr=(m.nn.outputs[0]), sense=pyo.maximize)pyo.SolverFactory('gurobi').solve(m, tee=False)best_input = pyo.value(m.nn.inputs[:])best_action = (best_input[self.state_dim::])return best_action# define test function
if __name__ == '__main__':args = Arguments()'''here record real unbalance'''reward_record = {'episode': [], 'steps': [], 'mean_episode_reward': [], 'unbalance': [],'episode_operation_cost': []}loss_record = {'episode': [], 'steps': [], 'critic_loss': [], 'actor_loss': [], 'entropy_loss': []}args.visible_gpu = '2'for seed in args.random_seed_list:args.random_seed = seed# set different seedargs.agent = AgentMIPDQN()agent_name = f'{args.agent.__class__.__name__}'args.agent.cri_target = Trueargs.env = ESSEnv()args.init_before_training(if_main=True)'''init agent and environment'''agent = args.agentenv = args.envagent.init(args.net_dim, env.state_space.shape[0], env.action_space.shape[0], args.learning_rate,args.if_per_or_gae)'''init replay buffer'''buffer = ReplayBuffer(max_len=args.max_memo, state_dim=env.state_space.shape[0],action_dim=env.action_space.shape[0])'''start training'''cwd = args.cwdgamma = args.gammabatch_size = args.batch_size  # how much data should be used to update nettarget_step = args.target_step  # how manysteps of one episode should stoprepeat_times = args.repeat_times  # how many times should update for one batch size datasoft_update_tau = args.soft_update_tauagent.state = env.reset()'''collect data and train and update network'''num_episode = args.num_episodeargs.train=Trueargs.save_network=True# 自動記錄每集的 reward、loss、unbalance 等wandb.init(project='MIP_DQN_experiments',name=args.run_name,settings=wandb.Settings(start_method="fork"))wandb.config = {"epochs": num_episode,"batch_size": batch_size}wandb.define_metric('custom_step')if args.train:collect_data = Truewhile collect_data:print(f'buffer:{buffer.now_len}')with torch.no_grad():trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)buffer.update_now_len()if buffer.now_len >= 10000:collect_data = Falsefor i_episode in range(num_episode):critic_loss, actor_loss = agent.update_net(buffer, batch_size, repeat_times, soft_update_tau)wandb.log({'critic loss':critic_loss,'custom_step':i_episode})wandb.log({'actor loss': actor_loss,'custom_step':i_episode})loss_record['critic_loss'].append(critic_loss)loss_record['actor_loss'].append(actor_loss)with torch.no_grad():episode_reward, episode_unbalance, episode_operation_cost = get_episode_return(env, agent.act,agent.device)wandb.log({'mean_episode_reward': episode_reward,'custom_step':i_episode})wandb.log({'unbalance':episode_unbalance,'custom_step':i_episode})wandb.log({'episode_operation_cost':episode_operation_cost,'custom_step':i_episode})reward_record['mean_episode_reward'].append(episode_reward)reward_record['unbalance'].append(episode_unbalance)reward_record['episode_operation_cost'].append(episode_operation_cost)print(f'curren epsiode is {i_episode}, reward:{episode_reward},unbalance:{episode_unbalance},buffer_length: {buffer.now_len}')if i_episode % 10 == 0:# target_stepwith torch.no_grad():agent._update_exploration_rate(args.explorate_decay,args.explorate_min)trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)wandb.finish()if args.update_training_data:loss_record_path = f'{args.cwd}/loss_data.pkl'reward_record_path = f'{args.cwd}/reward_data.pkl'with open(loss_record_path, 'wb') as tf:pickle.dump(loss_record, tf)with open(reward_record_path, 'wb') as tf:pickle.dump(reward_record, tf)act_save_path = f'{args.cwd}/actor.pth'cri_save_path = f'{args.cwd}/critic.pth'print('training data have been saved')if args.save_network:# 模型保存與結果存儲torch.save(agent.act.state_dict(), act_save_path)torch.save(agent.cri.state_dict(), cri_save_path)print('training finished and actor and critic parameters have been saved')

參數設置函數:Parameters

1、電池參數(battery_parameters)

battery_parameters = {'capacity': 500,           # 電池總容量(kWh)'max_charge': 100,         # 最大充電功率(kW)'max_discharge': 100,      # 最大放電功率(kW)'efficiency': 0.9,         # 充放電效率(90%)'degradation': 0,          # 電池退化成本(€/kW,未啟用)'max_soc': 0.8,            # 最大SOC(80%)'min_soc': 0.2,            # 最小SOC(20%)'initial_capacity': 0.2    # 初始SOC(20%)
}

參數解釋:

參數名含義用途
capacity電池的總電量容量(單位 kWh)用于計算 SOC 的絕對值
max_charge每小時最大充電功率(kW)限制動作空間中充電方向的動作上限
max_discharge每小時最大放電功率(kW)限制動作空間中放電方向的動作下限
efficiency充/放電的能量轉換效率影響 SOC 更新公式,通常 < 1
degradation每單位放電引起的電池退化成本可選項,未啟用
max_socSOC 最大值(占比)建模約束,防止過充(如 0.8×500 kWh)
min_socSOC 最小值(占比)建模約束,防止過放
initial_capacity初始時刻的 SOC(占比)環境初始化狀態使用

在 MIP-DQN 中的作用:

  • 在狀態空間中,SOC 是環境的一個組成變量;
  • 在動作空間中,充放電功率是 agent 決策的一部分;
  • 在約束中,必須滿足:
    • min_soc × capacity ≤ SOC_t ≤ max_soc × capacity
    • -max_discharge ≤ P_battery_t ≤ max_charge
    • SOC_{t+1} = SOC_t + η × P_battery_t × Δt / capacity

2、發電機參數(dg_parameters

結構是一個字典嵌套字典,每一個 key (如 'gen_1')代表一個 DG 單元,value 是該 DG 的參數。

dg_parameters = {'gen_1': {...},'gen_2': {...},'gen_3': {...}
}

參數結構(以 gen_1 為例):

{'a': 0.0034,         # 成本函數二次項系數'b': 3,              # 成本函數一次項系數'c': 30,             # 成本函數常數項'd': 0.03,           # 熱電參數(未使用)'e': 4.2,'f': 0.031,'power_output_max': 150,   # 最大出力(kW)'power_output_min': 0,     # 最小出力(kW)'heat_output_max': None,   # 若為熱電聯產系統使用(未啟用)'heat_output_min': None,'ramping_up': 100,         # 每小時最大爬坡(上升)能力(kW/h)'ramping_down': 100,       # 每小時最大爬坡(下降)能力(kW/h)'min_up': 2,               # 最小連續開機時間(小時)'min_down': 1              # 最小連續關機時間(小時)
}

成本函數定義:

發電成本函數為二次型:

C_{DG}(P) = a × P^2 + b × P + c

gen_1 為例:

C_{DG_1}(P) = 0.0034 × P^2 + 3 × P + 30

這在論文中公式 (2) 中有體現。

約束相關參數:

參數名含義用途
power_output_max/min發電出力上下限控制動作空間邊界
ramping_up/down爬坡約束限制連續兩個時刻出力差值
min_up/down連續開/關機約束狀態轉換約束(若考慮啟停狀態)

注意:最小啟停時間在本文中未顯式建模,若要考慮,需要引入二進制變量進行建模(Unit Commitment 問題)。

參考

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/89621.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/89621.shtml
英文地址,請注明出處:http://en.pswp.cn/web/89621.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

微信小程序 wx.request() 的封裝

基于微信小程序的wx.request()方法封裝下面是一個封裝方案&#xff0c;滿足您提出的所有要求&#xff1a;class HttpService {constructor() {this.baseUrl ; // 基礎URLthis.pendingRequests new Map(); // 請求緩存池this.interceptors {request: [],response: []};}// 設…

yolo8實時識別目標(和平精英敵人+骨骼關鍵點)

現在需要識別人物的肢體&#xff08;姿態/骨骼關鍵點&#xff09;&#xff0c;即所謂的「姿態估計&#xff08;pose estimation&#xff09;」&#xff0c;以下是一些主流、訓練好可直接使用的開源模型推薦&#xff0c;支持多人識別與骨骼關鍵點檢測&#xff0c;適合你后續用于…

MyBatis動態SQL全解析:五大核心標簽實戰指南

MyBatis動態SQL全解析&#xff1a;五大核心標簽實戰指南 一、動態SQL的價值&#xff1a;告別硬編碼時代 傳統SQL拼接的痛點 // 傳統方式需要手動拼接SQL字符串 StringBuilder sql new StringBuilder("SELECT * FROM orders WHERE 11"); if (status ! null) {sql.app…

線上 CPU 過高怎么排查

通過以下幾個命令解決1、top命令&#xff0c;找到 CPU 過高的pid(進程); ?編輯 2、根據pid(進程)找到CPU過高的線程id;top -H -p pid(進程)3、把線程id轉換16 進制的printf 0x%x\n 線程id4、導致CPU 飆升的線程異常信息&#xff0c;-A 30表示打印 30 行記錄jstack pid(進程id)…

Letter Combination of a Phone Number

IntroduceProblem Analysis (Using “258” as example) //2 a b c //5 j k l //8 t u vPossible letter combinations: a, j, t (no further options, this is one combination)a, j, u (no further options, another combination)a, j, v (another c…

【問題解決】npm包下載速度慢

問題描述&#xff1a; npm包下載速度慢 問題原因&#xff1a; 為什么下載 npm 包速度慢&#xff1f; 在使用npm下包的時候&#xff0c;默認從國外的https://regitry.npmjs.org/服務器進行下載。此時&#xff0c;網絡數據的傳輸需要經過漫長的海底光纜&#xff0c;因此下包速度…

Apache DolphinScheduler介紹與部署

目錄 一、軟件介紹 1、軟件概述 2、發展歷史 3、名詞解釋 4、模塊介紹 軟件部署 1、下載發布包 2、上傳與解壓 3、啟動 4、瀏覽器驗證 一、軟件介紹 1、軟件概述 Apache DolphinScheduler 是一個分布式易擴展的可視化DAG工作流任務調度開源系統。適用于企業級場景&…

Selenium 啟動的瀏覽器自動退出問題分析

當 Selenium 啟動的瀏覽器自動關閉時&#xff0c;通常是由于以下原因導致的&#xff1a;1. 腳本執行完畢原因&#xff1a;Selenium 腳本執行到末尾時&#xff0c;如果沒有保持瀏覽器打開的代碼&#xff08;如time.sleep()或循環&#xff09;&#xff0c;瀏覽器會自動關閉。解決…

rust實現的快捷補全到剪貼板的實用工具

最近在兼職項目中老是遇到這樣的場景&#xff1a; 在云服務器之間通過scp命令傳輸文件&#xff0c;密碼太長記不住(客戶服務器不方便ssh-copy-id)在服務器上使用mysql命令登錄修改數據&#xff0c;數據庫密碼太長記不住&#xff08;客戶設置的密碼&#xff0c;直接改掉哈&#…

信息系統風險的安全技術防范思路

針對信息系統風險的安全技術防范思路 降低風險&#xff0c;即提升了安全能力和水平 保護資產 加強信息系統軟硬件及數據安全保護&#xff1b;減少脆弱性 通過研發、部署、應用各環節來盡量減少或避免脆弱性&#xff1b;應對威脅 采取防御措施&#xff0c;實施攻防對抗。

Java項目:基于SSM框架實現的網盤管理系統【ssm+B/S架構+源碼+數據庫+畢業論文】

摘 要 網絡技術和計算機技術發展至今&#xff0c;已經擁有了深厚的理論基礎&#xff0c;并在現實中進行了充分運用&#xff0c;尤其是基于計算機運行的軟件更是受到各界的關注。加上現在人們已經步入信息時代&#xff0c;所以對于信息的宣傳和管理就很關鍵。因此文件信息的管理…

Echart 地圖放大縮小

文章目錄 常用方法 1. **開啟 `roam` 屬性** 2. **通過鼠標滾輪或手勢縮放** 3. **設置初始縮放比例** 4. **通過按鈕控制縮放** 5. **限制縮放范圍** 6. **監聽縮放和平移事件** 7. **結合 `dataZoom` 實現數據縮放** 總結 相關文章 在 ECharts 中,可以通過設置地圖的 roam …

針對VMware虛擬化環境遷移的復雜場景,我將從技術架構、遷移方案、代碼實現、可視化流程四個維度進行專業解析,并提供完整的解決方案框架。

針對VMware虛擬化環境遷移的復雜場景&#xff0c;我將從技術架構、遷移方案、代碼實現、可視化流程四個維度進行專業解析&#xff0c;并提供完整的解決方案框架。一、技術架構分析&#xff08;架構圖表格對比&#xff09;graph TDA[源環境] -->|vMotion| B[目標環境]A -->…

揭秘 AIGC 背后的技術:GPT、BERT 與 Transformer 模型的工作原理

一、引言AIGC 的崛起與重要性人工智能生成內容&#xff08;AIGC&#xff09;已經不再是未來的技術&#xff0c;它正以驚人的速度滲透到各行各業&#xff0c;重新定義了內容創作、媒體生產、甚至人類認知的邊界。從深度學習到大規模自然語言處理&#xff0c;AIGC 的崛起代表著一…

Compose筆記(三十五)--ModalBottomSheetLayout

這一節主要了解一下Compose中的ModalBottomSheetLayout&#xff0c;在Jetpack Compose開發中&#xff0c;ModalBottomSheetLayout是Material Design組件庫中用于實現模態底部面板的核心組件&#xff0c;其核心作用是通過聲明式API管理底部面板的顯示、隱藏及交互邏輯。API Moda…

AWS Partner: Accreditation (Technical)

AWS Partner: Accreditation &#xff08;Technical&#xff09;AWS 核心技術簡介云計算的優勢AWS 全球基礎設施核心技術&#xff1a;計算 Amazon Elastic Compute Cloud (Amazon EC2)存儲數據庫聯網安全性從服務到解決方案解決方案設計簡介遷移策略架構最佳實踐AWS Well-Archi…

【52】MFC入門到精通——(CComboBox)下拉框選項順序與初始化不一致,默認顯示項也不一致

文章目錄1 問題描述2 問題分析與解決上一講&#xff0c;我們實現了MFC串口助手初級版。 MFC入門到精通——MFC串口助手(一)—初級版&#xff08;串口設置、初始化、打開/關閉、狀態顯示&#xff09;,附源碼1 問題描述 程序運行后串口默認參數&#xff0c;與我們預期不完全一致…

Astro:前端性能革命!從原生 HTML 到 Astro + React 的升級指南

為什么程序員必須關注 Astro在網站性能和 SEO 日益關鍵的今天&#xff0c;靜態站點生成&#xff08;SSG&#xff09;再次成為焦點。Astro 作為一款專為內容驅動網站設計的現代前端框架&#xff0c;正引領一場輕盈革命。它強調服務器優先渲染&#xff0c;將頁面預先轉為純 HTML&…

格式轉換Total Excel Converter:20 種格式XLS XLSX 批量轉 PDFWord

各位辦公小能手們&#xff01;今天給大家介紹一款超厲害的軟件&#xff0c;叫Total Excel Converter&#xff0c;軟件下載地址安裝包 它可是專業的Excel文件格式轉換工具。你知道嗎&#xff0c;它能把Excel工作簿&#xff0c;像XLS、XLSX、XLSM這些格式&#xff0c;批量轉換成…

Thread,ThreadLocal,ThreadLocalMap 三者的關系, 以及在實際開發中的應用【AI記錄用】

在 Java 多線程編程中&#xff0c;Thread、ThreadLocal 和 ThreadLocalMap 是三個緊密相關的類&#xff0c;它們共同構成了 Java 中**線程本地變量&#xff08;Thread-Local Storage&#xff09;**機制的基礎。下面我將從 三者的關系、實現原理 以及 實際開發中的應用 三個方面…