目錄
- MIP-DQN 算法概述
- 建模基礎
- 訓練階段(Training)
- 部署階段(Online Execution)
- DNN 網絡轉化為 MIP 表達式
- 性能指標
- 完整 Python 代碼實現
- 主函數:random_generator_battery
- 模型函數:MIP_DQN
- 基礎/專用庫包安裝
- 模型運行(完整Python代碼)
- 參數設置函數:Parameters
- 參考
本博客根據論文《Optimal energy system scheduling using a constraint-aware reinforcement learning algorithm》(2023)中提出的 MIP-DQN 算法,對其完整研究方法進行詳細介紹。
內容包括:輸入、輸出,狀態空間與動作空間定義,訓練過程,在線部署過程,以及如何將 DNN 轉換為 MIP 進行約束優化。
MIP-DQN 算法概述
MIP-DQN(Mixed-Integer Programming Deep Q-Network)是一種 值函數驅動的深度強化學習算法,與傳統 深度強化學習DRL 不同,它在 執行階段能夠嚴格滿足操作約束,例如功率平衡、爬坡限制等。
建模基礎
輸入(State 狀態空間):
每個時刻 ( t ) 的狀態 ( s_t ) 包括:
- ( P^V_t ):當前時刻的光伏出力(向量)
- ( P^L_t ):當前時刻的用戶負荷(向量)
- ( P^G_{t-1} ):上一時刻 DG(分布式發電機)出力(向量)
- ( SOC_t ):當前時刻儲能系統(ESS)電量狀態(向量)
s_t = (P^V_t, P^L_t, P^G_{t-1}, SOC_t)
輸出(Action 動作空間):
每個時刻 ( t ) 的動作 ( a_t ) 包括:
- ( P^G_{i,t} ):每個 DG 單元的出力(連續變量)
- ( P^B_{j,t} ):每個 ESS 的充放電功率(連續變量)
a_t = (P^G_{i,t}, P^B_{j,t})
注意:系統與電網的交易功率 ( P^N_t ) 不由 DRL 控制,它由系統自動調整以維持功率平衡。
訓練階段(Training)
🎯 目標:
學習一個 Q 函數,估計在狀態 ( s_t ) 下采取動作 ( a_t ) 所帶來的期望回報。
獎勵函數定義(Reward Function):
R(s_t, a_t) = -σ_1 × \text{運行成本} - σ_2 × \text{功率不平衡}
- 運行成本 = DG 成本 + 與電網交易成本
- 功率不平衡 ( ΔP_t ):公式如下:
ΔP_t = \left| \sum_i P^G_{i,t} + \sum_j P^B_{j,t} + \sum_m P^V_{m,t} + P^N_t - \sum_k P^L_{k,t} \right|
訓練流程(見算法 1):
- 初始化 Q 網絡 ( Q_\theta(s, a) )、目標網絡 ( Q_{\theta_{target}} ),以及策略網絡 ( \pi_\omega )
- 采樣動作:從策略網絡 ( \pi_\omega(s) ) 加上噪聲(用于探索)
- 與環境交互:得到 ( (s_t, a_t, r_t, s_{t+1}) )
- 存入回放緩沖區 Replay Buffer
- 采樣 mini-batch 批量更新:
- 更新 Q 網絡:使用目標網絡計算 target Q 值
- 更新策略網絡:最大化 Q 網絡的期望值
- 定期軟更新目標網絡:( \theta_{target} = \tau \theta + (1 - \tau)\theta_{target} )
部署階段(Online Execution)
訓練完成后,丟棄策略網絡 ( \pi_\omega ),僅保留 Q 網絡 ( Q_\theta(s, a) ),并將其轉化為 帶約束的整數規劃問題(MIP) 進行決策。
步驟如下(見算法 2):
- 將訓練好的 DNN ( Q_\theta(s, a) ) 轉化為 MIP 表達形式(見下節)
- 添加操作約束:
- 功率平衡約束(等式)
- DG 出力上下限、爬坡約束
- ESS 充放電限制、SOC 約束等
- 使用商業 MIP 求解器(如 Gurobi)求解:
\max_{a \in \mathcal{A}} Q_\theta(s, a) \quad \text{subject to all constraints}
求解結果中的 ( a^* ) 即為當前狀態下最優可行動作!
DNN 網絡轉化為 MIP 表達式
假設 Q 網絡結構為 多層前饋神經網絡(ReLU 激活),則每一層輸出滿足:
x^k = \text{ReLU}(W^{k-1} x^{k-1} + b^{k-1})
ReLU 可通過如下線性混合整數形式建模:
對于每一個神經元 ( x ):
x = \max(0, \hat{x}) \Rightarrow
\begin{cases}
x \geq 0 \\
x \geq \hat{x} \\
x \leq M z \\
x - \hat{x} \leq M (1 - z) \\
z \in \{0, 1\}
\end{cases}
其中 ( z ) 是二進制變量,( M ) 是足夠大的常數。
性能指標
DRL 算法性能評估指標:
- 運行成本:目標函數(越小越好)
- 功率不平衡(ΔP):是否滿足功率平衡(越小越好)
- 算法運行時間:是否滿足實時性要求
完整 Python 代碼實現
所用工具:
- PyTorch:訓練 Q 網絡和策略網絡
- OMLT(Optimization and Machine Learning Toolkit):將 PyTorch 模型轉為 MIP 形式
- Gurobi / CPLEX / CBC:商業混合整數規劃求解器
- Pyomo:建模數學規劃問題
- OpenAI Gym / 自定義環境:訓練環境
主函數:random_generator_battery
此段代碼構建了一個 能源管理環境 ESSEnv,用于模擬以下多個能源設備的運行:
- 🔋 電池系統(Battery)
- ? 三臺分布式發電機(DG1、DG2、DG3)
- 🌞 光伏發電(PV)
- 🔌 電網供電(Grid)
- 📈 能源價格、負荷、電量數據(通過 DataManager 管理)
該環境繼承自 gym.Env
,兼容 OpenAI Gym 強化學習 API,支持 reset()
、step()
、render()
等接口,用于訓練強化學習智能體。
模型函數:MIP_DQN
基礎/專用庫包安裝
安裝基礎依賴:
conda install numpy pandas matplotlib scikit-learn -y
conda install pytorch torchvision -c pytorch -y
安裝 Pyomo(用于建模 MIP):
conda install -c conda-forge pyomo -y
安裝 Gurobi(用于求解 MIP)
??注意: Gurobi 是商業軟件,需注冊并獲取 Academic License(免費用于學術用途)
官網-Gurobi Optimization。Gurobi的安裝及 license 獲取可參考我的另一博客-。
1. 安裝 Gurobi
conda install -c gurobi gurobi -y
2. 設置 license(首次使用)
grbgetkey <your-license-key>grbgetkeyex: grbgetkey ae36ac20-16e6-acd2-f242-4da6e765fa0a
然后按提示操作。若你已有 gurobi.lic 文件,請放在 ~/.gurobi/ 或 C:\gurobi\ 目錄下。
安裝 OMLT(Optimization and Machine Learning Toolkit,優化與機器學習集成工具)
pip install omltconda install omlt
注意:OMLT 依賴 pyomo 和 onnx,會自動安裝。
安裝 ONNX(用于神經網絡轉模型)
pip install onnx onnxruntime
說明,onnx 是模型格式,onnxruntime 是運行時推理引擎
安裝 Weights & Biases(可選,日志可視化工具)
wandb 是一個輕量級第三方 Python 包,不依賴底層 C/C++ 庫,pip 安裝非常穩定,適合用于 Conda 環境中。
官網-wandb,進去官網后注冊即可獲取個人API。
下載命令如下:
pip install wandb
輸入以下命令,按照指示輸入個人API即可。
wandb login
模型運行(完整Python代碼)
import pickle
import torch
import os
import numpy as np
import numpy.random as rd
import pandas as pd
import pyomo.environ as pyo
import pyomo.kernel as pmo
from omlt import OmltBlock
from gurobipy import *
from omlt.neuralnet import NetworkDefinition, FullSpaceNNFormulation,ReluBigMFormulation
from omlt.io.onnx import write_onnx_model_with_bounds,load_onnx_neural_network_with_bounds
import tempfile
import torch.onnx
import torch.nn as nn
from copy import deepcopy
import wandb
from random_generator_battery import ESSEnv## define net
# 經驗回放緩沖區 ReplayBuffer:用于存儲 agent 與環境交互的軌跡,以供后續訓練使用。
class ReplayBuffer:def __init__(self, max_len, state_dim, action_dim, gpu_id=0):self.now_len = 0self.next_idx = 0self.if_full = Falseself.max_len = max_lenself.data_type = torch.float32self.action_dim = action_dimself.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")other_dim = 1 + 1 + self.action_dimself.buf_other = torch.empty(size=(max_len, other_dim), dtype=self.data_type, device=self.device)if isinstance(state_dim, int): # state is pixelself.buf_state = torch.empty((max_len, state_dim), dtype=torch.float32, device=self.device)elif isinstance(state_dim, tuple):self.buf_state = torch.empty((max_len, *state_dim), dtype=torch.uint8, device=self.device)else:raise ValueError('state_dim')# extend_buffer():添加新數據def extend_buffer(self, state, other): # CPU array to CPU arraysize = len(other)next_idx = self.next_idx + sizeif next_idx > self.max_len:self.buf_state[self.next_idx:self.max_len] = state[:self.max_len - self.next_idx]self.buf_other[self.next_idx:self.max_len] = other[:self.max_len - self.next_idx]self.if_full = Truenext_idx = next_idx - self.max_lenself.buf_state[0:next_idx] = state[-next_idx:]self.buf_other[0:next_idx] = other[-next_idx:]else:self.buf_state[self.next_idx:next_idx] = stateself.buf_other[self.next_idx:next_idx] = otherself.next_idx = next_idx# sample_batch():采樣 mini-batch(訓練用)def sample_batch(self, batch_size) -> tuple:indices = rd.randint(self.now_len - 1, size=batch_size)r_m_a = self.buf_other[indices]return (r_m_a[:, 0:1],r_m_a[:, 1:2],r_m_a[:, 2:],self.buf_state[indices],self.buf_state[indices + 1])def update_now_len(self):self.now_len = self.max_len if self.if_full else self.next_idxclass Arguments:def __init__(self, agent=None, env=None):self.agent = agent # Deep Reinforcement Learning algorithmself.env = env # the environment for trainingself.cwd = None # current work directory. None means set automaticallyself.if_remove = False # remove the cwd folder? (True, False, None:ask me)self.visible_gpu = '0,1,2,3' # for example: os.environ['CUDA_VISIBLE_DEVICES'] = '0, 2,'self.worker_num = 2 # rollout workers number pre GPU (adjust it to get high GPU usage)self.num_threads = 8 # cpu_num for evaluate model, torch.set_num_threads(self.num_threads)self.if_per_or_gae = False'''Arguments for training'''self.num_episode=3000self.gamma = 0.995 # discount factor of future rewardsself.learning_rate = 1e-4 # 2 ** -14 ~= 6e-5self.soft_update_tau = 1e-2 # 2 ** -8 ~= 5e-3self.net_dim = 64 # the network width 256self.batch_size = 256 # num of transitions sampled from replay buffer.self.repeat_times = 2 ** 3 # repeatedly update network to keep critic's loss smallself.target_step = 1000 # collect target_step experiences , then update network, 1024self.max_memo = 50000 # capacity of replay buffer## arguments for controlling explorationself.explorate_decay=0.99self.explorate_min=0.3'''Arguments for evaluate'''self.random_seed_list=[1234,2234,3234,4234,5234]# self.random_seed_list=[2234]self.run_name='MIP_DQN_experiments''''Arguments for save'''self.train=Trueself.save_network=Truedef init_before_training(self, if_main):if self.cwd is None:agent_name = self.agent.__class__.__name__self.cwd = f'./{agent_name}/{self.run_name}'if if_main:import shutil # remove history according to bool(if_remove)if self.if_remove is None:self.if_remove = bool(input(f"| PRESS 'y' to REMOVE: {self.cwd}? ") == 'y')elif self.if_remove:shutil.rmtree(self.cwd, ignore_errors=True)print(f"| Remove cwd: {self.cwd}")os.makedirs(self.cwd, exist_ok=True)np.random.seed(self.random_seed)torch.manual_seed(self.random_seed)torch.set_num_threads(self.num_threads)torch.set_default_dtype(torch.float32)os.environ['CUDA_VISIBLE_DEVICES'] = str(self.visible_gpu)# control how many GPU is used # 模型定義 Actor 網絡(策略網絡)
class Actor(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net=nn.Sequential(nn.Linear(state_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,action_dim))def forward(self,state):return self.net(state).tanh()# make the data from -1 to 1# 用于探索時加入噪聲def get_action(self,state,action_std):#action=self.net(state).tanh()noise=(torch.randn_like(action)*action_std).clamp(-0.5,0.5)#return (action+noise).clamp(-1.0,1.0)# 模型定義 CriticQ 網絡(雙 Q 網絡)
class CriticQ(nn.Module):def __init__(self,mid_dim,state_dim,action_dim):super().__init__()self.net_head=nn.Sequential(nn.Linear(state_dim+action_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,mid_dim),nn.ReLU())self.net_q1=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q1 valueself.net_q2=nn.Sequential(nn.Linear(mid_dim,mid_dim),nn.ReLU(),nn.Linear(mid_dim,1))# we get q2 valuedef forward(self,value):mid=self.net_head(value)return self.net_q1(mid)def get_q1_q2(self,value):mid=self.net_head(value)return self.net_q1(mid),self.net_q2(mid)class AgentBase:def __init__(self):self.state = Noneself.device = Noneself.action_dim = Noneself.if_off_policy = Noneself.explore_noise = Noneself.trajectory_list = Noneself.explore_rate = 1.0self.criterion = torch.nn.SmoothL1Loss()def init(self, net_dim, state_dim, action_dim, learning_rate=1e-4, _if_per_or_gae=False, gpu_id=0):self.device = torch.device(f"cuda:{gpu_id}" if (torch.cuda.is_available() and (gpu_id >= 0)) else "cpu")self.action_dim = action_dimself.cri = self.ClassCri(net_dim, state_dim, action_dim).to(self.device)self.act = self.ClassAct(net_dim, state_dim, action_dim).to(self.device) if self.ClassAct else self.criself.cri_target = deepcopy(self.cri) if self.if_use_cri_target else self.criself.act_target = deepcopy(self.act) if self.if_use_act_target else self.actself.cri_optim = torch.optim.Adam(self.cri.parameters(), learning_rate)self.act_optim = torch.optim.Adam(self.act.parameters(),learning_rate) if self.ClassAct else self.cridel self.ClassCri, self.ClassActdef select_action(self, state) -> np.ndarray:states = torch.as_tensor((state,), dtype=torch.float32, device=self.device)action = self.act(states)[0]if rd.rand()<self.explore_rate:action = (action + torch.randn_like(action) * self.explore_noise).clamp(-1, 1)return action.detach().cpu().numpy()def explore_env(self, env, target_step):trajectory = list()state = self.statefor _ in range(target_step):action = self.select_action(state)state, next_state, reward, done, = env.step(action)trajectory.append((state, (reward, done, *action)))state = env.reset() if done else next_stateself.state = statereturn trajectory@staticmethoddef optim_update(optimizer, objective):optimizer.zero_grad()objective.backward()optimizer.step()@staticmethoddef soft_update(target_net, current_net, tau):for tar, cur in zip(target_net.parameters(), current_net.parameters()):tar.data.copy_(cur.data * tau + tar.data * (1.0 - tau))def save_or_load_agent(self, cwd, if_save):def load_torch_file(model_or_optim, _path):state_dict = torch.load(_path, map_location=lambda storage, loc: storage)model_or_optim.load_state_dict(state_dict)name_obj_list = [('actor', self.act), ('act_target', self.act_target), ('act_optim', self.act_optim),('critic', self.cri), ('cri_target', self.cri_target), ('cri_optim', self.cri_optim), ]name_obj_list = [(name, obj) for name, obj in name_obj_list if obj is not None]if if_save:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"torch.save(obj.state_dict(), save_path)else:for name, obj in name_obj_list:save_path = f"{cwd}/{name}.pth"load_torch_file(obj, save_path) if os.path.isfile(save_path) else Nonedef _update_exploration_rate(self,explorate_decay,explore_rate_min):self.explore_rate = max(self.explore_rate * explorate_decay, explore_rate_min)'''this function is used to update the explorate probability when select action'''# 模型定義 AgentMIPDQN 算法(繼承基礎類 AgentBase)
class AgentMIPDQN(AgentBase):def __init__(self):super().__init__()self.explore_noise = 0.5 # standard deviation of exploration noiseself.policy_noise = 0.2 # standard deviation of policy noiseself.update_freq = 2 # delay update frequencyself.if_use_cri_target = self.if_use_act_target = Trueself.ClassCri = CriticQself.ClassAct = Actor# update_net():更新策略網絡和雙 Q 網絡def update_net(self, buffer, batch_size, repeat_times, soft_update_tau) -> tuple:buffer.update_now_len()obj_critic = obj_actor = Nonefor update_c in range(int(buffer.now_len / batch_size * repeat_times)):# we update too much time?obj_critic, state = self.get_obj_critic(buffer, batch_size)self.optim_update(self.cri_optim, obj_critic)action_pg = self.act(state) # policy gradientobj_actor = -self.cri_target(torch.cat((state, action_pg),dim=-1)).mean() # use cri_target instead of cri for stable trainingself.optim_update(self.act_optim, obj_actor)if update_c % self.update_freq == 0: # delay updateself.soft_update(self.cri_target, self.cri, soft_update_tau)self.soft_update(self.act_target, self.act, soft_update_tau)return obj_critic.item() / 2, obj_actor.item()# get_obj_critic():獲取目標 Q 值并計算 critic lossdef get_obj_critic(self, buffer, batch_size) -> (torch.Tensor, torch.Tensor):with torch.no_grad():reward, mask, action, state, next_s = buffer.sample_batch(batch_size)next_a = self.act_target.get_action(next_s, self.policy_noise) # policy noise,next_q = torch.min(*self.cri_target.get_q1_q2(torch.cat((next_s, next_a),dim=-1))) # twin criticsq_label = reward + mask * next_qq1, q2 = self.cri.get_q1_q2(torch.cat((state, action),dim=-1))obj_critic = self.criterion(q1, q_label) + self.criterion(q2, q_label) # twin criticsreturn obj_critic, statedef update_buffer(_trajectory):ten_state = torch.as_tensor([item[0] for item in _trajectory], dtype=torch.float32)ary_other = torch.as_tensor([item[1] for item in _trajectory])ary_other[:, 0] = ary_other[:, 0] # ten_rewardary_other[:, 1] = (1.0 - ary_other[:, 1]) * gamma # ten_mask = (1.0 - ary_done) * gammabuffer.extend_buffer(ten_state, ary_other)_steps = ten_state.shape[0]_r_exp = ary_other[:, 0].mean() # other = (reward, mask, action)return _steps, _r_expdef get_episode_return(env, act, device):'''get information of one episode during the training'''episode_return = 0.0 # sum of rewards in an episodeepisode_unbalance=0.0episode_operation_cost=0.0state = env.reset()for i in range(24):s_tensor = torch.as_tensor((state,), device=device)a_tensor = act(s_tensor)action = a_tensor.detach().cpu().numpy()[0] # not need detach(), because with torch.no_grad() outsidestate, next_state, reward, done,= env.step(action)state=next_stateepisode_return += rewardepisode_unbalance+=env.real_unbalanceepisode_operation_cost+=env.operation_costif done:breakreturn episode_return,episode_unbalance,episode_operation_cost# 網絡導出與 MIP 調用模塊 Actor_MIP
# 用于將訓練好的 Q 網絡轉換成 ONNX + OMLT + Pyomo 表達形式,并結合 Gurobi 求解最優動作
class Actor_MIP:'''this actor is used to get the best action and Q function, the only input should be batch tensor state, action, and network, while the output should bebatch tensor max_action, batch tensor max_Q'''def __init__(self,scaled_parameters,batch_size,net,state_dim,action_dim,env,constrain_on=False):self.batch_size = batch_sizeself.net = netself.state_dim = state_dimself.action_dim =action_dimself.env = envself.constrain_on=constrain_onself.scaled_parameters=scaled_parametersdef get_input_bounds(self,input_batch_state):batch_size = self.batch_sizebatch_input_bounds = []lbs_states = input_batch_state.detach().numpy()ubs_states = lbs_statesfor i in range(batch_size):input_bounds = {}for j in range(self.action_dim + self.state_dim):if j < self.state_dim:input_bounds[j] = (float(lbs_states[i][j]), float(ubs_states[i][j]))else:input_bounds[j] = (float(-1), float(1))batch_input_bounds.append(input_bounds)return batch_input_boundsdef predict_best_action(self, state):state=state.detach().cpu().numpy()v1 = torch.zeros((1, self.state_dim+self.action_dim), dtype=torch.float32)'''this function is used to get the best action based on current net'''model = self.net.to('cpu')input_bounds = {}lb_state = stateub_state = statefor i in range(self.action_dim + self.state_dim):if i < self.state_dim:input_bounds[i] = (float(lb_state[0][i]), float(ub_state[0][i]))else:input_bounds[i] = (float(-1), float(1))with tempfile.NamedTemporaryFile(suffix='.onnx', delete=False) as f:# export neural network to ONNXtorch.onnx.export(model,v1,f,input_names=['state_action'],output_names=['Q_value'],dynamic_axes={'state_action': {0: 'batch_size'},'Q_value': {0: 'batch_size'}})# write ONNX model and its bounds using OMLTwrite_onnx_model_with_bounds(f.name, None, input_bounds)# load the network definition from the ONNX modelnetwork_definition = load_onnx_neural_network_with_bounds(f.name)# global optimalityformulation = ReluBigMFormulation(network_definition)m = pyo.ConcreteModel()m.nn = OmltBlock()m.nn.build_formulation(formulation)'''# we are now building the surrogate model between action and state'''# constrain for battery,if self.constrain_on:m.power_balance_con1 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\((m.nn.inputs[8] * self.scaled_parameters[1])+m.nn.inputs[4]*self.scaled_parameters[5]) +\((m.nn.inputs[9] * self.scaled_parameters[2])+m.nn.inputs[5]*self.scaled_parameters[6]) +\((m.nn.inputs[10] * self.scaled_parameters[3])+m.nn.inputs[6]*self.scaled_parameters[7])>=\m.nn.inputs[3] *self.scaled_parameters[4]-self.env.grid.exchange_ability))m.power_balance_con2 = pyo.Constraint(expr=((-m.nn.inputs[7] * self.scaled_parameters[0])+\(m.nn.inputs[8] * self.scaled_parameters[1]+m.nn.inputs[4]*self.scaled_parameters[5]) +\(m.nn.inputs[9] * self.scaled_parameters[2]+m.nn.inputs[5]*self.scaled_parameters[6]) +\(m.nn.inputs[10] * self.scaled_parameters[3]+m.nn.inputs[6]*self.scaled_parameters[7])<=\m.nn.inputs[3] *self.scaled_parameters[4]+self.env.grid.exchange_ability))m.obj = pyo.Objective(expr=(m.nn.outputs[0]), sense=pyo.maximize)pyo.SolverFactory('gurobi').solve(m, tee=False)best_input = pyo.value(m.nn.inputs[:])best_action = (best_input[self.state_dim::])return best_action# define test function
if __name__ == '__main__':args = Arguments()'''here record real unbalance'''reward_record = {'episode': [], 'steps': [], 'mean_episode_reward': [], 'unbalance': [],'episode_operation_cost': []}loss_record = {'episode': [], 'steps': [], 'critic_loss': [], 'actor_loss': [], 'entropy_loss': []}args.visible_gpu = '2'for seed in args.random_seed_list:args.random_seed = seed# set different seedargs.agent = AgentMIPDQN()agent_name = f'{args.agent.__class__.__name__}'args.agent.cri_target = Trueargs.env = ESSEnv()args.init_before_training(if_main=True)'''init agent and environment'''agent = args.agentenv = args.envagent.init(args.net_dim, env.state_space.shape[0], env.action_space.shape[0], args.learning_rate,args.if_per_or_gae)'''init replay buffer'''buffer = ReplayBuffer(max_len=args.max_memo, state_dim=env.state_space.shape[0],action_dim=env.action_space.shape[0])'''start training'''cwd = args.cwdgamma = args.gammabatch_size = args.batch_size # how much data should be used to update nettarget_step = args.target_step # how manysteps of one episode should stoprepeat_times = args.repeat_times # how many times should update for one batch size datasoft_update_tau = args.soft_update_tauagent.state = env.reset()'''collect data and train and update network'''num_episode = args.num_episodeargs.train=Trueargs.save_network=True# 自動記錄每集的 reward、loss、unbalance 等wandb.init(project='MIP_DQN_experiments',name=args.run_name,settings=wandb.Settings(start_method="fork"))wandb.config = {"epochs": num_episode,"batch_size": batch_size}wandb.define_metric('custom_step')if args.train:collect_data = Truewhile collect_data:print(f'buffer:{buffer.now_len}')with torch.no_grad():trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)buffer.update_now_len()if buffer.now_len >= 10000:collect_data = Falsefor i_episode in range(num_episode):critic_loss, actor_loss = agent.update_net(buffer, batch_size, repeat_times, soft_update_tau)wandb.log({'critic loss':critic_loss,'custom_step':i_episode})wandb.log({'actor loss': actor_loss,'custom_step':i_episode})loss_record['critic_loss'].append(critic_loss)loss_record['actor_loss'].append(actor_loss)with torch.no_grad():episode_reward, episode_unbalance, episode_operation_cost = get_episode_return(env, agent.act,agent.device)wandb.log({'mean_episode_reward': episode_reward,'custom_step':i_episode})wandb.log({'unbalance':episode_unbalance,'custom_step':i_episode})wandb.log({'episode_operation_cost':episode_operation_cost,'custom_step':i_episode})reward_record['mean_episode_reward'].append(episode_reward)reward_record['unbalance'].append(episode_unbalance)reward_record['episode_operation_cost'].append(episode_operation_cost)print(f'curren epsiode is {i_episode}, reward:{episode_reward},unbalance:{episode_unbalance},buffer_length: {buffer.now_len}')if i_episode % 10 == 0:# target_stepwith torch.no_grad():agent._update_exploration_rate(args.explorate_decay,args.explorate_min)trajectory = agent.explore_env(env, target_step)steps, r_exp = update_buffer(trajectory)wandb.finish()if args.update_training_data:loss_record_path = f'{args.cwd}/loss_data.pkl'reward_record_path = f'{args.cwd}/reward_data.pkl'with open(loss_record_path, 'wb') as tf:pickle.dump(loss_record, tf)with open(reward_record_path, 'wb') as tf:pickle.dump(reward_record, tf)act_save_path = f'{args.cwd}/actor.pth'cri_save_path = f'{args.cwd}/critic.pth'print('training data have been saved')if args.save_network:# 模型保存與結果存儲torch.save(agent.act.state_dict(), act_save_path)torch.save(agent.cri.state_dict(), cri_save_path)print('training finished and actor and critic parameters have been saved')
參數設置函數:Parameters
1、電池參數(battery_parameters)
battery_parameters = {'capacity': 500, # 電池總容量(kWh)'max_charge': 100, # 最大充電功率(kW)'max_discharge': 100, # 最大放電功率(kW)'efficiency': 0.9, # 充放電效率(90%)'degradation': 0, # 電池退化成本(€/kW,未啟用)'max_soc': 0.8, # 最大SOC(80%)'min_soc': 0.2, # 最小SOC(20%)'initial_capacity': 0.2 # 初始SOC(20%)
}
參數解釋:
參數名 | 含義 | 用途 |
---|---|---|
capacity | 電池的總電量容量(單位 kWh) | 用于計算 SOC 的絕對值 |
max_charge | 每小時最大充電功率(kW) | 限制動作空間中充電方向的動作上限 |
max_discharge | 每小時最大放電功率(kW) | 限制動作空間中放電方向的動作下限 |
efficiency | 充/放電的能量轉換效率 | 影響 SOC 更新公式,通常 < 1 |
degradation | 每單位放電引起的電池退化成本 | 可選項,未啟用 |
max_soc | SOC 最大值(占比) | 建模約束,防止過充(如 0.8×500 kWh) |
min_soc | SOC 最小值(占比) | 建模約束,防止過放 |
initial_capacity | 初始時刻的 SOC(占比) | 環境初始化狀態使用 |
在 MIP-DQN 中的作用:
- 在狀態空間中,SOC 是環境的一個組成變量;
- 在動作空間中,充放電功率是 agent 決策的一部分;
- 在約束中,必須滿足:
min_soc × capacity ≤ SOC_t ≤ max_soc × capacity
-max_discharge ≤ P_battery_t ≤ max_charge
SOC_{t+1} = SOC_t + η × P_battery_t × Δt / capacity
2、發電機參數(dg_parameters
)
結構是一個字典嵌套字典,每一個 key (如 'gen_1'
)代表一個 DG 單元,value 是該 DG 的參數。
dg_parameters = {'gen_1': {...},'gen_2': {...},'gen_3': {...}
}
參數結構(以 gen_1
為例):
{'a': 0.0034, # 成本函數二次項系數'b': 3, # 成本函數一次項系數'c': 30, # 成本函數常數項'd': 0.03, # 熱電參數(未使用)'e': 4.2,'f': 0.031,'power_output_max': 150, # 最大出力(kW)'power_output_min': 0, # 最小出力(kW)'heat_output_max': None, # 若為熱電聯產系統使用(未啟用)'heat_output_min': None,'ramping_up': 100, # 每小時最大爬坡(上升)能力(kW/h)'ramping_down': 100, # 每小時最大爬坡(下降)能力(kW/h)'min_up': 2, # 最小連續開機時間(小時)'min_down': 1 # 最小連續關機時間(小時)
}
成本函數定義:
發電成本函數為二次型:
C_{DG}(P) = a × P^2 + b × P + c
以 gen_1
為例:
C_{DG_1}(P) = 0.0034 × P^2 + 3 × P + 30
這在論文中公式 (2) 中有體現。
約束相關參數:
參數名 | 含義 | 用途 |
---|---|---|
power_output_max/min | 發電出力上下限 | 控制動作空間邊界 |
ramping_up/down | 爬坡約束 | 限制連續兩個時刻出力差值 |
min_up/down | 連續開/關機約束 | 狀態轉換約束(若考慮啟停狀態) |
注意:最小啟停時間在本文中未顯式建模,若要考慮,需要引入二進制變量進行建模(Unit Commitment 問題)。