2025.3.2周報
- 一、文獻閱讀
- 題目信息
- 摘要
- Abstract
- 創新點
- 網絡架構
- 實驗
- 結論
- 不足以及展望
一、文獻閱讀
題目信息
- 題目: Physics-Informed Neural Networks of the Saint-Venant Equations for Downscaling a Large-Scale River Model
- 期刊: Water Resources Research
- 作者: Dongyu Feng, Zeli Tan, QiZhi He
- 發表時間: 2023/12/11
- 文章鏈接: https://arxiv.org/pdf/2210.03240v1
摘要
沿海地區人口增長,人類面臨颶風和洪水等自然災害風險增大。氣候變化加劇極端風暴潮、降水及海平面上升,使洪水風險進一步提升,研究潮汐河流動力學對減輕洪災風險至關重要。按照傳統的方法,大規模河流模型是氣候變化研究的重要工具,但在模擬局部洪水過程時存在不足。其物理可解釋性和網格分辨率低,無法解析洪水泛濫的詳細信息;統計和動力降尺度方法,但在河流建模中應用有限。傳統線性插值降尺度方法無法解決網格單元內空間變化的水流問題。為解決以上問題,本文提出基于物理信息神經網絡的機器學習框架,用于模擬大尺度河流模型在沿海地區的亞網格尺度水流。首先展示PINN能同化各類觀測并求解一維圣維南方程,通過多個合成案例研究評估其性能,結果表明PINN在水深模擬上精度良好。針對風暴潮和潮汐引發的洪水波傳播,提出基于傅里葉特征嵌入的神經網絡架構。此外,研究表明PINN降尺度能通過同化觀測數據產生更合理的亞網格解,優于簡單線性插值,為改進大尺度模型模擬精細尺度沿海過程能力提供了有前景的途徑。
Abstract
Population growth in coastal regions has heightened human exposure to natural disasters such as hurricanes and floods. Climate change exacerbates the risks of extreme storm surges, precipitation, and sea-level rise, further amplifying flood hazards. Investigating tidal river dynamics is critical for mitigating these risks. Traditionally, large-scale river models have served as essential tools in climate change research. However, they exhibit limitations in simulating localized flood processes due to their low physical interpretability and coarse grid resolution, which hinder the detailed resolution of flood inundation dynamics. Statistical and dynamical downscaling methods, while useful, have seen limited application in river modeling. Conventional linear interpolation downscaling approaches fail to address spatial variations in water flow within grid cells. To overcome these challenges, this study proposes a machine learning framework based on Physics-Informed Neural Networks (PINNs) to simulate subgrid-scale water flows in large-scale river models for coastal regions. We first demonstrate that PINNs can assimilate diverse observational data and solve the one-dimensional Saint-Venant equations. Performance evaluation through multiple synthetic case studies reveals that PINNs achieve high accuracy in water depth simulations. For flood wave propagation induced by storm surges and tides, we introduce a neural network architecture incorporating Fourier feature embedding. Furthermore, the study shows that PINN-based downscaling, by assimilating observational data, generates more physically consistent subgrid solutions compared to simple linear interpolation. This approach offers a promising pathway for enhancing the capability of large-scale models to simulate fine-scale coastal processes.
創新點
-
作者通過基于PINN的數據同化降尺度方法,解決大尺度河流模型在海岸地區子網格尺度河流動力學的變異性問題。該方法能融合多種觀測數據,無需修改數值算法或細化網格分辨率,且不受網格限制,為降尺度研究提供新途徑。
降尺度是指從一個粗糙的、低分辨率的數據或模型結果,推導出更精細的、高分辨率的結果的過程。簡單來說就是把模糊的大圖變成清晰的小圖。
-
作者提出了傅里葉特征嵌入的方法,在輸入層前將坐標𝑥和𝑡映射到高維傅里葉特征空間。論文特別針對時間坐標𝑡,以適應潮汐的周期性。下面對其進行說明:
傅里葉變換的核心思想是: 任何周期性的信號都可以分解成一堆不同頻率的正弦波和余弦波。因為正弦和余弦是周期函數,可以描述有規律的波動。
其數學表達如下:
γ ( ν ) = [ cos ? ( 2 π B ν ) sin ? ( 2 π B ν ) ] , ν = [ x , t ] T , B ~ N ( 0 , s ) \gamma(\nu)=\left[\begin{array}{c}\cos (2 \pi B \nu) \\\sin (2 \pi B \nu)\end{array}\right], \quad \nu=[x, t]^{T}, \quad B \sim N(0, s) γ(ν)=[cos(2πBν)sin(2πBν)?],ν=[x,t]T,B~N(0,s)
其中:
ν = [ x , t ] T \nu=[x, t]^{T} ν=[x,t]T輸入是空間坐標 𝑥和時間 𝑡,這里是一個二維向量;
B是一個隨機矩陣,里面的值從高斯分布 𝑁(0, 𝑠)中抽取,𝑠 是標準差;
cos ? ( 2 π B ν ) \cos (2 \pi B \nu) cos(2πBν)與 sin ? ( 2 π B ν ) \sin (2 \pi B \nu) sin(2πBν)則是對輸入進行正弦和余弦變換。
論文中作者通過這個特性將傅里葉特征嵌入到時間中,將來描述潮汐(因為潮汐具有周期性),具體表示為:
γ t ( i ) ( t ) = [ cos ? ( 2 π B t ( i ) t ) sin ? ( 2 π B t ( i ) t ) ] , for? i = 1 , 2 , … \begin{array}{l}\gamma_{t}^{(i)}(t)=\left[\begin{array}{c}\cos \left(2 \pi \boldsymbol{B}_{t}^{(i)} t\right) \\\sin \left(2 \pi \boldsymbol{B}_{t}^{(i)} t\right)\end{array}\right], \quad \text { for } i=1,2, \ldots\\\end{array} γt(i)?(t)= ?cos(2πBt(i)?t)sin(2πBt(i)?t)? ?,?for?i=1,2,…?
網絡架構
如圖(a)所示,在本片論文中PINN框架由一個深度神經網絡與SVE結合的架構。
在輸出結果在作者還強調數據同化過程(所謂數據同化,就是讓模型預測值不斷接近真實值的一個過程,而不是某一個步驟。比如,在算Loss中就算是一個數據同化的步驟)。
下面我們來具體分析一下這個模型的架構:
輸入:空間坐標 𝑥 和時間 𝑡 。
結構: 在圖示中NN的架構為2個隱藏層和每層4個神經元,其中隱藏層使用激活函數 𝜎
其中𝜎為: tanh ? = e x ? e ? x e x + e ? x \tanh =\frac{e^{x}-e^{-x}}{e^{x}+e^{-x}} tanh=ex+e?xex?e?x?。
輸出:預測值 𝑢 流速和?水深,由橙色節點表示。此外,輸入數據來源于數值解、現場測站數據、衛星提供的水深快照,這些數據通過損失函數約束神經網絡,確保預測接近真實值。
物理約束:
然后通過自動微分計算偏導數并代入SVE檢查殘差。
損失函數: 損失函數定義為
J ( θ ) = J f ( θ ) + ∑ j w j J j ( θ ) J(\theta)=J_{f}(\theta)+\sum_{j} w_{j} J_{j}(\theta) J(θ)=Jf?(θ)+j∑?wj?Jj?(θ)
1、 J f ( θ ) J_{f}(\theta) Jf?(θ):SVE殘差損失。其中 J f ( θ ) = 1 N f ∑ i = 1 N f ∣ r f 2 ( x i , t i , θ ) ∣ J_f(\theta) = \frac{1}{N_f} \sum_{i=1}^{N_f} \left| r_f^2(x^i, t^i, \theta) \right| Jf?(θ)=Nf?1?∑i=1Nf?? ?rf2?(xi,ti,θ) ?
N f N_f Nf?為配置點; r f r_f rf?是SVE的殘差表示; θ \theta θ是神經網絡的參數; x i , t i x^i, t^i xi,ti是第 𝑖個配置點的空間坐標和時間坐標。
2、 ∑ j w j J j ( θ ) \sum_{j} w_{j} J_{j}(\theta) ∑j?wj?Jj?(θ):邊界條件、初始條件損失、觀測數據的誤差等。
具體如下:
邊界條件損失 J B C h J_{BC_h} JBCh??、 J B C u J_{BC_u} JBCu??:
邊界條件損失,分別衡量流速 𝑢和水深 ?在邊界上的預測值與已知邊界條件𝑢和?的偏差。值越小,說明邊界符合要求。
其中:
J B C u ( θ ) = 1 N B C u ∑ i = 1 N B C u ∣ u ^ ( x i , t i , θ ) ? u ( x i , t i ) ∣ 2 J_{BC_u}(\theta) = \frac{1}{N_{BC_u}} \sum_{i=1}^{N_{BC_u}} \left| \hat{u}(x^i, t^i, \theta) - u(x^i, t^i) \right|^2 JBCu??(θ)=NBCu??1?∑i=1NBCu??? ?u^(xi,ti,θ)?u(xi,ti) ?2
J B C h ( θ ) = 1 N B C h ∑ i = 1 N B C h ∣ h ^ ( x i , t i , θ ) ? h ( x i , t i ) ∣ 2 , x ∈ Ω B C , t ∈ [ 0 , T ] J_{BC_h}(\theta) = \frac{1}{N_{BC_h}} \sum_{i=1}^{N_{BC_h}} \left| \hat{h}(x^i, t^i, \theta) - h(x^i, t^i) \right|^2, \quad x \in \Omega_{BC}, t \in [0, T] JBCh??(θ)=NBCh??1?∑i=1NBCh??? ?h^(xi,ti,θ)?h(xi,ti) ?2,x∈ΩBC?,t∈[0,T]
N B C u N_{BC_u} NBCu??與 N B C h N_{BC_h} NBCh??是邊界條件點的總數; ∑ i = 1 N B C h \sum_{i=1}^{N_{BC_h}} ∑i=1NBCh???與 ∑ i = 1 N B C u \sum_{i=1}^{N_{BC_u}} ∑i=1NBCu???對邊界上的所有點求和。
初始條件損失 J S u J_{S_u} JSu??和快照損失 J S h J_{S_h} JSh??:
初始條件或快照損失,分別衡量 u ^ \hat{u} u^流速和 h ^ \hat{h} h^水深在初始時間 t=0 或特定快照時刻的預測值與已知值的偏差。
J S u ( θ ) = 1 N S u ∑ i = 1 N S u ∣ u ^ ( x i , 0 , θ ) ? u ( x i , 0 ) ∣ 2 J_{S_u}(\theta) = \frac{1}{N_{S_u}} \sum_{i=1}^{N_{S_u}} \left| \hat{u}(x^i, 0, \theta) - u(x^i, 0) \right|^2 JSu??(θ)=NSu??1?∑i=1NSu??? ?u^(xi,0,θ)?u(xi,0) ?2
J S h ( θ ) = 1 N S h ∑ i = 1 N S h ∣ h ^ ( x i , 0 , θ ) ? h ( x i , 0 ) ∣ 2 , x ∈ Ω S J_{S_h}(\theta) = \frac{1}{N_{S_h}} \sum_{i=1}^{N_{S_h}} \left| \hat{h}(x^i, 0, \theta) - h(x^i, 0) \right|^2, \quad x \in \Omega_S JSh??(θ)=NSh??1?∑i=1NSh??? ?h^(xi,0,θ)?h(xi,0) ?2,x∈ΩS?
其中 N S u N_{S_u} NSu??與 N S h N_{S_h} NSh??是初始條件或快照點的總數; x ∈ Ω S \quad x \in \Omega_S x∈ΩS?定義了河道距離 x x x的范圍。
快照條件是指在特定時間點上對系統狀態。如,水深 ?或流速u的已知值或觀測值。此外,快照條件可以是t=0,也可以是后續時間點的觀測值,提供額外的時空約束。
觀測數據損失 J o b s u J_{obs_u} Jobsu??與 J o b s h J_{obs_h} Jobsh??
J o b s u ( θ ) = 1 N o b s u ∑ i = 1 N o b s u ∣ u ^ ( x i , t i , θ ) ? u ( x i , t i ) ∣ 2 J_{obs_u}(\theta) = \frac{1}{N_{obs_u}} \sum_{i=1}^{N_{obs_u}} \left| \hat{u}(x^i, t^i, \theta) - u(x^i, t^i) \right|^2 Jobsu??(θ)=Nobsu??1?∑i=1Nobsu??? ?u^(xi,ti,θ)?u(xi,ti) ?2
J o b s h ( θ ) = 1 N o b s h ∑ i = 1 N o b s h ∣ h ^ ( x i , t i , θ ) ? h ( x i , t i ) ∣ 2 , x ∈ Ω o b s , t ∈ [ 0 , T ] J_{obs_h}(\theta) = \frac{1}{N_{obs_h}} \sum_{i=1}^{N_{obs_h}} \left| \hat{h}(x^i, t^i, \theta) - h(x^i, t^i) \right|^2, \quad x \in \Omega_{obs}, t \in [0, T] Jobsh??(θ)=Nobsh??1?∑i=1Nobsh??? ?h^(xi,ti,θ)?h(xi,ti) ?2,x∈Ωobs?,t∈[0,T]
其中 N o b s u N_{obs_u} Nobsu??與 N o b s h N_{obs_h} Nobsh??觀測點數據總數;在 x ∈ Ω o b s , t ∈ [ 0 , T ] \quad x \in \Omega_{obs}, t \in [0, T] x∈Ωobs?,t∈[0,T]中, Ω o b s \Omega_{obs} Ωobs?為河道范圍,T為模擬周期。
圖(b)為一個大尺度網格單元內的子網格降尺度過程,包括時間步和空間位置,其目的是在粗網格Δ𝑥內生成更詳細的水深和流速解。其中:
時間從 t n t_n tn?到 t n + 1 t_{n+1} tn+1?,Δt為時間步長。
空間從 x i x_i xi?到 x x + 1 x_{x+1} xx+1?, x o b s 1 x_{obs}^1 xobs1?與 x o b s 2 x_{obs}^2 xobs2?為檢測站獲取的信息。
實驗
本文通過六個實驗,研究基于PINN的數據同化降尺度方法,以解決大型河流模型在沿海地區河流動力學的亞網格變異性問題。其中,實驗1和2分析的是洪水平原,測試PINN求解SVE能力;實驗3和4分析的是開闊河道,驗證動態水流模擬;實驗5和6分析的潮汐河流,引入了傅里葉嵌入來評估風暴潮和潮汐影響下的降尺度效果。
實驗結果及分析如下:
- 在實驗1和2中,PINN在解決簡化的圣維南方程(SVE)時展現出較高準確性,能很好地模擬洪水在洪泛平原的傳播,其洪水波傳播模式和淹沒前沿形狀與解析解或RK4解接近,相對L2誤差和RMSE較小。這表明在有足夠觀測數據和地形信息時,PINN可用于計算亞網格尺度的洪泛平原淹沒情況,提供更詳細的淹沒地圖。
- 在實驗3和4中,PINN解與HEC - RAS參考解在水深方面吻合良好,僅在沿河道剖面的上游端附近有較小模擬偏差,L2誤差和RMSE也表明其性能良好。在案例4中,編碼傅里葉特征的PINN解能夠捕捉到下游潮汐引起的周期性變化,相比標準架構的PINN解,其?h和RMSE更小。但由于潮汐和邊界條件產生的高振蕩行為,案例4的預測準確性比案例3差,意味著PINN訓練需要更多物理約束和觀測數據。
- 在實驗5和6中,與線性插值相比,PINN在解析河道地形和亞網格尺度的可變流態方面表現更優。線性插值在無原位數據區域的降尺度解較差,無法再現時空模式和沿河道剖面,而PINN解在整個亞網格區域具有更清晰的流動動力學,特別是在亞臨界流占主導且水深較大的中間部分,誤差遠小于線性插值解。
在案例6中,盡管兩種降尺度方法隨著原位數據增加性能都有所提升,但PINN解更準確且對觀測數據依賴性小。線性插值在缺乏數據的上游區域高估潮汐影響,而PINN能合理捕捉潮汐相位和振幅,不過在潮汐傳播尖端附近因流量 - 潮汐相互作用存在較大偏差。總體而言,PINN降尺度在可變流態條件下優于線性插值。
此外,實驗6結果表明,同化觀測數據的可用性對PINN解的準確性至關重要,有限的配置點下,狀態變量時空變化增加會降低PINN性能,但PINN在降尺度一維河道流時對觀測的依賴小于線性插值。提出的傅里葉特征嵌入時間t足以編碼潮汐物理特征和處理邊界條件的周期性,但當沿河道流變化頻繁時,PINN性能會下降,因為傅里葉變換難以直接編碼空間變量,且高頻與低頻過程的非線性相互作用在觀測有限時難以很好再現。不過,隨著同化觀測數據量增加,PINN準確性會提高。
代碼如下:
import numpy as np
# 導入插值函數
from scipy.interpolate import interp1d
# 導入 Matplotlib 用于繪圖
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from mpl_toolkits.axes_grid1 import make_axes_locatable
import h5py
from datetime import datetime
import time
# 導入拉丁超立方采樣工具
from pyDOE import lhs
import tensorflow as tf
import pickle
import os
import warnings# 忽略警告信息以保持輸出清潔
warnings.filterwarnings("ignore")
# 設置 TensorFlow 隨機種子為 1234 確保可重復性
tf.set_random_seed(1234)
# 設置 NumPy 隨機種子為 1234 確保可重復性
np.random.seed(1234)# 定義 SVE 模型類,替代缺失模塊
class SVE:DTYPE = tf.float32 # 定義 TensorFlow 數據類型為 float32def __init__(self, X_h_IC, X_u_BC, X_h_BC, X_u_obs, X_h_obs, X_f, h_IC, u_BC, h_BC, u_obs, h_obs,layers, lb, ub, S, b, X_star, u_star, h_star, lr=5e-4, ExistModel=0, uhDir='',wDir='', wmffDir='', useObs=True, use_ff=False):"""初始化 PINN 模型用于 SVE。參數:X_h_IC, X_u_BC, X_h_BC, X_u_obs, X_h_obs, X_f: 輸入坐標 (x, t)h_IC, u_BC, h_BC, u_obs, h_obs: 初始條件、邊界條件和觀測值layers: 神經網絡架構lb, ub: 域邊界S: 渠道坡度b: 渠道寬度X_star, u_star, h_star: 測試數據lr: 學習率ExistModel: 加載已有模型標志 (0: 新建, 1/2: 加載)uhDir, wDir, wmffDir: 權重保存/加載路徑useObs: 是否使用觀測數據use_ff: 是否使用傅里葉特征嵌入"""self.count = 0 # 初始化回調計數器,用于跟蹤訓練迭代次數self.lb, self.ub = lb, ub # 設置輸入數據的下界和上界self.S = tf.constant(S, dtype=self.DTYPE) if isinstance(S, (int, float)) else S # 將坡度轉換為 TensorFlow 張量self.b = tf.constant(b, dtype=self.DTYPE) # 將渠道寬度轉換為 TensorFlow 張量self.useObs, self.use_ff = useObs, use_ff # 設置是否使用觀測數據和傅里葉特征的標志self.X_star, self.u_star, self.h_star = X_star, u_star, h_star # 存儲測試數據 (x, t, u, h)self.beta = 0.9 # 自適應權重更新因子self.adaptive_constants = { # 初始化自適應權重字典'bcs_u': np.array(1.0), 'bcs_h': np.array(1.0), # 邊界條件權重'ics_h': np.array(1.0), 'obs_u': np.array(1.0), # 初始條件和觀測權重'obs_h': np.array(1.0)}# 準備輸入數據self.x_h_IC, self.t_h_IC = X_h_IC[:, 0:1], X_h_IC[:, 1:2] # 提取初始條件 x 和 t 坐標self.x_u_BC, self.t_u_BC = X_u_BC[:, 0:1], X_u_BC[:, 1:2] # 提取流速邊界條件 x 和 t 坐標self.x_h_BC, self.t_h_BC = X_h_BC[:, 0:1], X_h_BC[:, 1:2] # 提取水深邊界條件 x 和 t 坐標self.x_u_obs, self.t_u_obs = X_u_obs[:, 0:1], X_u_obs[:, 1:2] if useObs else (None, None) # 提取流速觀測數據坐標self.x_h_obs, self.t_h_obs = X_h_obs[:, 0:1], X_h_obs[:, 1:2] if useObs else (None, None) # 提取水深觀測數據坐標self.x_f, self.t_f = X_f[:, 0:1], X_f[:, 1:2] # 提取內部點 x 和 t 坐標self.h_IC, self.u_BC, self.h_BC = h_IC, u_BC, h_BC # 存儲初始和邊界條件值self.u_obs, self.h_obs = u_obs, h_obs if useObs else (None, None) # 存儲觀測值self.layers = layers # 存儲神經網絡層結構# 初始化權重和偏置if ExistModel == 0:self.weights, self.biases = self.initialize_NN(layers, use_ff) # 新建網絡參數else:self.weights, self.biases = self.load_NN(uhDir, layers) # 加載已有模型參數# 創建 TensorFlow 會話self.sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True, log_device_placement=True)) # 初始化會話,支持軟放置和設備日志self.placeholders = self._define_placeholders() # 定義輸入數據的占位符self._build_model() # 構建神經網絡模型self._define_loss_and_optimizer() # 定義損失函數和優化器def _define_placeholders(self):"""定義 TensorFlow 占位符用于輸入數據。"""placeholders = {'x_h_IC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:初始條件 x 坐標't_h_IC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:初始條件 t 坐標'h_IC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:初始水深值'x_u_BC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:流速邊界 x 坐標't_u_BC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:流速邊界 t 坐標'u_BC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:邊界流速值'x_h_BC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:水深邊界 x 坐標't_h_BC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:水深邊界 t 坐標'h_BC': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:邊界水深值'x_u_obs': tf.placeholder(self.DTYPE, [None, 1]) if self.useObs else None, # 占位符:流速觀測 x 坐標(若使用觀測)'t_u_obs': tf.placeholder(self.DTYPE, [None, 1]) if self.useObs else None, # 占位符:流速觀測 t 坐標(若使用觀測)'u_obs': tf.placeholder(self.DTYPE, [None, 1]) if self.useObs else None, # 占位符:觀測流速值(若使用觀測)'x_h_obs': tf.placeholder(self.DTYPE, [None, 1]) if self.useObs else None, # 占位符:水深觀測 x 坐標(若使用觀測)'t_h_obs': tf.placeholder(self.DTYPE, [None, 1]) if self.useObs else None, # 占位符:水深觀測 t 坐標(若使用觀測)'h_obs': tf.placeholder(self.DTYPE, [None, 1]) if self.useObs else None, # 占位符:觀測水深值(若使用觀測)'x_f': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:內部點 x 坐標't_f': tf.placeholder(self.DTYPE, [None, 1]), # 占位符:內部點 t 坐標'adaptive_bcs_u': tf.placeholder(tf.float32, [1]) if ExistModel == 0 else None, # 占位符:流速邊界自適應權重(新建模型時使用)'adaptive_bcs_h': tf.placeholder(tf.float32, [1]) if ExistModel == 0 else None, # 占位符:水深邊界自適應權重(新建模型時使用)'adaptive_ics_h': tf.placeholder(tf.float32, [1]) if ExistModel == 0 else None, # 占位符:初始條件自適應權重(新建模型時使用)'adaptive_obs_u': tf.placeholder(tf.float32, [1]) if ExistModel == 0 and self.useObs else None, # 占位符:流速觀測自適應權重(新建模型且使用觀測時使用)'adaptive_obs_h': tf.placeholder(tf.float32, [1]) if ExistModel == 0 and self.useObs else None # 占位符:水深觀測自適應權重(新建模型且使用觀測時使用)}return placeholders # 返回占位符字典def _build_model(self):"""構建神經網絡模型,包括預測和殘差計算。"""# 歸一化輸入數據到 [-1, 1]X = 2.0 * (tf.concat([self.placeholders['x_f'], self.placeholders['t_f']], 1) - self.lb) / (self.ub - self.lb) - 1.0self.u_pred, self.h_pred = self.neural_net(X, self.weights, self.biases) # 預測流速和水深self.u_ic_pred, self.h_ic_pred = self.neural_net(2.0 * (tf.concat([self.placeholders['x_h_IC'], self.placeholders['t_h_IC']], 1) - self.lb) / (self.ub - self.lb) - 1.0,self.weights, self.biases) # 初始條件預測self.u_bc_pred, self.h_bc_pred = self.neural_net(2.0 * (tf.concat([self.placeholders['x_u_BC'], self.placeholders['t_u_BC']], 1) - self.lb) / (self.ub - self.lb) - 1.0,self.weights, self.biases) # 邊界條件預測if self.useObs:self.u_obs_pred, self.h_obs_pred = self.neural_net(2.0 * (tf.concat([self.placeholders['x_u_obs'], self.placeholders['t_u_obs']], 1) - self.lb) / (self.ub - self.lb) - 1.0,self.weights, self.biases) # 觀測數據預測self.eq1_pred, self.eq2_pred = self._compute_residuals() # 計算殘差def _compute_residuals(self):"""計算 SVE 的連續性和動量殘差。"""u_t = tf.gradients(self.u_pred, self.placeholders['t_f'])[0] # 流速時間導數u_x = tf.gradients(self.u_pred, self.placeholders['x_f'])[0] # 流速空間導數h_t = tf.gradients(self.h_pred, self.placeholders['t_f'])[0] # 水深時間導數h_x = tf.gradients(self.h_pred, self.placeholders['x_f'])[0] # 水深空間導數eq1 = self.fun_r_mass(self.u_pred, h_t, h_x) # 連續性殘差eq2 = self.fun_r_momentum(self.u_pred, self.h_pred, u_t, u_x, h_x) # 動量殘差return eq1, eq2 # 返回殘差def fun_r_mass(self, u, h_t, h_x):"""計算連續性方程殘差:?h/?t + u ?h/?x = 0。"""return h_t + u * h_x # 返回連續性殘差def fun_r_momentum(self, u, h, u_t, u_x, h_x):"""計算動量方程殘差:?u/?t + u ?u/?x + g ?h/?x + g (n2|u|u / R^(4/3) - S) = 0。"""n = 0.015 # Manning 粗糙度系數h = tf.clip_by_value(h, clip_value_min=1e-4, clip_value_max=50) # 限制水深范圍R = self.b * h / (2 * h + self.b) # 液壓半徑return u_t + u * u_x + 9.81 * h_x + 9.81 * (n * n * tf.abs(u) * u / tf.pow(tf.square(R), 2.0 / 3.0) - self.S) # 返回動量殘差def _define_loss_and_optimizer(self):"""定義損失函數和優化器。"""self.loss_f_c = tf.reduce_mean(tf.square(self.eq1_pred)) # 計算連續性損失的均方誤差self.loss_f_m = tf.reduce_mean(tf.square(self.eq2_pred)) # 計算動量損失的均方誤差self.loss_f = self.loss_f_c + self.loss_f_m # 總物理損失self.loss_bc_u = tf.reduce_mean(tf.square(self.placeholders['u_BC'] - self.u_bc_pred)) # 流速邊界損失self.loss_bc_h = tf.reduce_mean(tf.square(self.placeholders['h_BC'] - self.h_bc_pred)) # 水深邊界損失self.loss_bcs = self.loss_bc_u + self.loss_bc_h # 總邊界損失self.loss_ic_h = tf.reduce_mean(tf.square(self.placeholders['h_IC'] - self.h_ic_pred)) # 初始條件損失self.loss = self.loss_f + self.loss_bcs + self.loss_ic_h # 總損失if self.useObs:self.loss_obs_u = tf.reduce_mean(tf.square(self.placeholders['u_obs'] - self.u_obs_pred)) # 流速觀測損失self.loss_obs_h = tf.reduce_mean(tf.square(self.placeholders['h_obs'] - self.h_obs_pred)) # 水深觀測損失self.loss += self.loss_obs_u + self.loss_obs_h # 加入觀測損失# 定義 L-BFGS-B 優化器self.optimizer = tf.contrib.opt.ScipyOptimizerInterface(self.loss,method='L-BFGS-B',options={'maxiter': 50000, # 最大迭代次數'maxfun': 50000, # 最大函數評估次數'maxcor': 50, # 最大校正向量數'maxls': 50, # 最大線搜索步數'ftol': 1.0e-10, # 函數值容差'gtol': 0.000001}) # 梯度容差self.global_step = tf.Variable(0, trainable=False) # 全局步數,訓練時遞增self.learning_rate = tf.train.exponential_decay(lr, self.global_step, 5000, 0.9, staircase=False) # 學習率衰減self.optimizer_adam = tf.train.AdamOptimizer(learning_rate=self.learning_rate) # Adam 優化器self.train_op_adam = self.optimizer_adam.minimize(self.loss, global_step=self.global_step) # Adam 優化操作def train(self, num_epochs, tf_dict):"""訓練模型,使用 Adam 優化器。"""for it in range(num_epochs): # 循環指定次數start_time = time.time() # 記錄訓練開始時間self.sess.run(self.train_op_adam, feed_dict=tf_dict) # 執行 Adam 優化一步if it % 10 == 0: # 每 10 步打印一次elapsed = time.time() - start_time # 計算單步耗時loss_value = self.sess.run(self.loss, feed_dict=tf_dict) # 獲取當前損失learning_rate = self.sess.run(self.learning_rate) # 獲取當前學習率print('Iteration: %d, Loss: %.3e, Time: %.2f, Learning Rate: %.3e' % (it, loss_value, elapsed, learning_rate)) # 打印信息u_pred, h_pred = self.predict(self.X_star[:, 0:1], self.X_star[:, 1:2]) # 預測結果error_u = np.linalg.norm(self.u_star - u_pred, 2) / np.linalg.norm(self.u_star, 2) # 流速 L2 誤差error_h = np.linalg.norm(self.h_star - h_pred, 2) / np.linalg.norm(self.h_star, 2) # 水深 L2 誤差print('Error u: %.3e, Error h: %.3e' % (error_u, error_h)) # 打印誤差def predict(self, x_star, t_star):"""預測流速和水深。"""tf_dict = {self.placeholders['x_f']: x_star, # 輸入 x 坐標self.placeholders['t_f']: t_star, # 輸入 t 坐標self.placeholders['x_u_tf']: x_star, # 流速預測 x 坐標self.placeholders['t_u_tf']: t_star, # 流速預測 t 坐標self.placeholders['x_h_tf']: x_star, # 水深預測 x 坐標self.placeholders['t_h_tf']: t_star # 水深預測 t 坐標}u_pred, h_pred = self.sess.run([self.u_pred, self.h_pred], feed_dict=tf_dict) # 獲取預測值return u_pred, h_pred # 返回流速和水深預測def initialize_NN(self, layers, use_ff):"""初始化神經網絡權重和偏置,使用 Xavier 初始化。"""weights, biases = [], [] # 初始化權重和偏置列表num_layers = len(layers) # 獲取層數if use_ff: # 如果使用傅里葉特征,調整輸入層layers = [2 * layers[0]] + layers[1:]for l in range(num_layers - 1): # 遍歷每一層W = self.xavier_init([layers[l], layers[l + 1]]) # 初始化權重b = tf.Variable(tf.zeros([1, layers[l + 1]], dtype=self.DTYPE), dtype=self.DTYPE) # 初始化偏置weights.append(W) # 添加權重biases.append(b) # 添加偏置return weights, biases # 返回權重和偏置def xavier_init(self, size):"""Xavier 初始化權重。"""in_dim, out_dim = size # 獲取輸入和輸出維度xavier_stddev = np.sqrt(2 / (in_dim + out_dim)) # 計算 Xavier 標準差return tf.Variable(tf.truncated_normal(size, stddev=xavier_stddev), dtype=self.DTYPE) # 返回初始化權重def neural_net(self, X, weights, biases):"""定義前饋神經網絡。"""H = X # 輸入數據for l in range(len(weights) - 1): # 遍歷隱藏層W, b = weights[l], biases[l] # 獲取當前層的權重和偏置H = tf.tanh(tf.add(tf.matmul(H, W), b)) # 應用激活函數 tanhW, b = weights[-1], biases[-1] # 獲取輸出層的權重和偏置Y = tf.add(tf.matmul(H, W), b) # 計算輸出return Y[:, 0:1], Y[:, 1:2] # 返回流速 u 和水深 hdef add_noise(self, signal):"""添加高斯噪聲到信號。"""target_snr_db = 500 # 目標信噪比 (dB)sig_avg_db = 10 * np.log10(np.mean(signal)) # 信號平均功率 (dB)noise_avg_db = sig_avg_db - target_snr_db # 噪聲平均功率 (dB)noise_avg = 10 ** (noise_avg_db / 10) # 噪聲平均功率 (W)noise = np.random.normal(0, np.sqrt(noise_avg), len(signal)) # 生成白噪聲return signal + noise # 返回含噪聲的信號# 主程序
if __name__ == "__main__":# 定義英尺到米的轉換因子FACTOR = 0.3048# 定義每個 Case 的配置字典CASES = {1: {'file': 'case1/MixedFlow.p02.hdf', 'dx': 30, 'dt': 30, 'n': 0.005, 'u': 1.0},2: {'file': 'case2/MixedFlow.p02.hdf', 'dx': 20, 'dt': 30, 'n': 0.025, 'u': 1.0, 'S': 1e-3},3: {'file': 'case3/MixedFlow.p02.hdf', 'obs': [16, 32]},4: {'file': 'case4/MixedFlow.p02.hdf', 'obs': [16, 32], 'use_ff': True},5: {'file': 'case5/MixedFlow.p02.hdf', 'obs': [118, 134]},6: {'file': 'case6/MixedFlow.p02.hdf', 'obs': [[118, 134], [25, 118, 134], [25, 50, 118, 134], [25, 50, 100, 118, 134]]}}# 定義默認網絡層結構LAYERS = [2] + 4 * [1 * 64] + [2]for case_id in range(1, 7): # 循環運行每個 Caseprint(f"\nRunning Case {case_id}...") # 打印當前 Case 編號case = CASES[case_id] # 獲取當前 Case 配置hdf_file = f'HEC-RAS/{case["file"]}' if case_id > 2 else None # 根據 Case 類型設置 HDF 文件路徑use_ff = case.get('use_ff', False) # 獲取是否使用傅里葉特征obs_indices = case.get('obs', [[int(1200 / case.get('dx', 30)), int(2400 / case.get('dx', 30))]]) # 獲取觀測點索引# 數據準備if hdf_file: # 如果是 Case 3-6,使用 HDF5 數據hf = h5py.File(hdf_file, 'r') # 打開 HDF5 文件attrs = hf['Geometry']['Cross Sections']['Attributes'][:] # 讀取幾何屬性staid, eles, reach_len = [], [], [] # 初始化列表for attr in attrs: # 遍歷屬性staid.append(attr[2].decode('utf-8')) # 提取站點 IDeles.append(attr[14]) # 提取高程reach_len.append(attr[6]) # 提取河段長度coor = np.cumsum(np.array(reach_len[:-1])) # 計算累積坐標coor = [0] + coor.tolist() # 添加起始點coor = coor[::-1] # 反轉坐標eles = np.array(eles) # 轉換為數組slope = np.gradient(eles, coor) # 計算坡度water_surface = hf['Results']['Unsteady']['Output']["/Results/Unsteady/Output"]['Output Blocks']['Base Output']["Unsteady Time Series"]["Cross Sections"]['Water Surface'][1:] # 讀取水面高程velocity_total = hf['Results']['Unsteady']['Output']["/Results/Unsteady/Output"]['Output Blocks']['Base Output']["Unsteady Time Series"]["Cross Sections"]['Velocity Total'][1:] # 讀取總流速timestamp = hf['Results']['Unsteady']['Output']["/Results/Unsteady/Output"]['Output Blocks']['Base Output']["Unsteady Time Series"]['Time Date Stamp'][1:] # 讀取時間戳time_model = [datetime.strptime(t.decode('utf-8'), '%d%b%Y %H:%M:%S') for t in timestamp] # 轉換時間格式water_depth = water_surface - eles[None, :] # 計算水深Nt, Nx = water_depth.shape # 獲取時間和空間維度t = np.arange(Nt)[:, None] # 生成時間數組x = np.array(coor[::-1])[:, None] # 生成空間數組u_exact, h_exact = velocity_total, water_depth # 提取真實流速和水深else: # Case 1-2,使用模擬數據dx, dt = case['dx'], case['dt'] # 獲取空間和時間步長n, u, S = case['n'], case['u'], case.get('S', 0) # 獲取 Manning 系數、流速和坡度t = np.arange(0, 3600 + dt, dt) # 生成時間序列x = np.arange(0, 3600 + dx, dx) # 生成空間序列Nt, Nx = len(t), len(x) # 獲取維度if case_id == 1: # Case 1:解析解h_exact = np.zeros((Nt, Nx)) # 初始化水深數組for i in range(Nt): # 遍歷時間xb = u * t[i] # 計算邊界位置indb = np.argwhere(np.abs(x - xb) == np.abs(x - xb).min())[0][0] # 找到最近點if x[indb] > xb: # 調整索引indb -= 1h_exact[i, :indb + 1] = np.power(-7 / 3 * n * n * u * u * (x[:indb + 1] - u * t[i]), 3 / 7) # 計算解析解elif case_id == 2: # Case 2:RK4 解h_exact = np.zeros((Nt, Nx)) # 初始化水深數組h_exact[0, 0] = 4 * np.sin(2 * np.pi / (4 * 3600) * t[0]) # 初始邊界條件for i in range(1, Nt): # 遍歷時間xb = u * t[i] # 計算邊界位置indb = np.argwhere(np.abs(x - xb) == np.abs(x - xb).min())[0][0] # 找到最近點if x[indb] > xb: # 調整索引indb -= 1x_ = x[:indb + 1] # 截取空間范圍y0 = 4 * np.sin(2 * np.pi / (4 * 3600) * t[i]) # 邊界水深h_ = [y0] # 初始值for j in range(1, len(x_)): # RK4 迭代k1 = dx * (-S - n * n * u * u / np.power(np.power(h_[-1], 4), 1 / 3.))k2 = dx * (-S - n * n * u * u / np.power(np.power(h_[-1] + 0.5 * k1, 4), 1 / 3.))k3 = dx * (-S - n * n * u * u / np.power(np.power(h_[-1] + 0.5 * k2, 4), 1 / 3.))k4 = dx * (-S - n * n * u * u / np.power(np.power(h_[-1] + k3, 4), 1 / 3.))y = h_[-1] + (1.0 / 6.0) * (k1 + 2 * k2 + 2 * k3 + k4) # 更新水深if y >= 0: # 確保水深非負h_.append(y)else:breakh_exact[i, :len(h_)] = h_ # 存儲 RK4 結果u_exact = np.zeros_like(h_exact) + u # 假設恒定流速X, T = np.meshgrid(x, t) # 生成網格X_star = np.hstack((X.flatten()[:, None], T.flatten()[:, None])) # 展平坐標u_star, h_star = u_exact.flatten()[:, None], h_exact.flatten()[:, None] # 展平目標值lb, ub = X_star.min(0), X_star.max(0) # 獲取域邊界# 準備訓練數據tsteps = [0, Nt - 1] # 初始和結束時間步X_h_IC, h_IC = None, None # 初始化初始條件for i, tstep in enumerate(tsteps): # 遍歷時間步xx1_ = np.hstack((X[tstep:tstep + 1, :].T, T[tstep:tstep + 1, :].T)) # 提取坐標hh1_ = self.add_noise(h_exact[tstep:tstep + 1, :].T) # 添加噪聲if i == 0: # 首次賦值X_h_IC, h_IC = xx1_, hh1_else: # 追加數據X_h_IC, h_IC = np.vstack((X_h_IC, xx1_)), np.vstack((h_IC, hh1_))X_u_BC = np.vstack((np.hstack((X[:, 0:1], T[:, 0:1])), np.hstack((u * t, t)))) # 上下游邊界坐標X_h_BC = X_u_BC # 水深邊界坐標u_BC = np.vstack((u_exact[:, 0:1], h_exact[:, np.argmin(np.abs(x - u * t), axis=1)])) # 流速邊界值h_BC = np.vstack((h_exact[:, 0:1], h_exact[:, np.argmin(np.abs(x - u * t), axis=1)])) # 水深邊界值X_u_obs, X_h_obs, u_obs, h_obs = None, None, None, None # 初始化觀測數據if case_id > 2 or useObs: # 如果使用觀測數據t_obs_u, x_obs_u, u_obs = None, None, None # 初始化流速觀測t_obs_h, x_obs_h, h_obs = None, None, None # 初始化水深觀測for obs_idx in obs_indices[0] if case_id == 6 else obs_indices: # 遍歷觀測點t_obs_u = np.append(t_obs_u, t.flatten()) if 't_obs_u' in locals() else t.flatten() # 追加時間x_obs_u = np.append(x_obs_u, np.ones(Nt) * x[obs_idx]) if 'x_obs_u' in locals() else np.ones(Nt) * x[obs_idx] # 追加 x 坐標u_obs = np.append(u_obs, self.add_noise(u_exact[:, obs_idx])) if 'u_obs' in locals() else self.add_noise(u_exact[:, obs_idx]) # 追加流速t_obs_h = np.append(t_obs_h, t.flatten()) if 't_obs_h' in locals() else t.flatten() # 追加時間x_obs_h = np.append(x_obs_h, np.ones(Nt) * x[obs_idx]) if 'x_obs_h' in locals() else np.ones(Nt) * x[obs_idx] # 追加 x 坐標h_obs = np.append(h_obs, self.add_noise(h_exact[:, obs_idx])) if 'h_obs' in locals() else self.add_noise(h_exact[:, obs_idx]) # 追加水深X_u_obs, X_h_obs = np.vstack((x_obs_u, t_obs_u)).T, np.vstack((x_obs_h, t_obs_h)).T # 組合坐標u_obs, h_obs = u_obs[:, None], h_obs[:, None] # 轉換為列向量X_f_train = X_star # 內部訓練點slope = np.hstack([np.array(slope) for _ in range(Nt)])[:, None] if hdf_file else 0 # 擴展坡度數組# 模型訓練model = SVE(X_h_IC, X_u_BC, X_h_BC, X_u_obs, X_h_obs, X_f_train, h_IC, u_BC, h_BC, u_obs, h_obs,LAYERS if case_id > 2 else [2] + 3 * [1 * 32] + [1], lb, ub, slope, 10,X_star, u_star, h_star, useObs=True, use_ff=use_ff) # 創建模型實例tf_dict = {k: v for k, v in model.placeholders.items() if v is not None} # 創建字典,僅包含非空占位符tf_dict.update({ # 更新字典model.placeholders['x_h_IC']: X_h_IC,model.placeholders['t_h_IC']: X_h_IC[:, 1:2],model.placeholders['h_IC']: h_IC,model.placeholders['x_u_BC']: X_u_BC[:, 0:1],model.placeholders['t_u_BC']: X_u_BC[:, 1:2],model.placeholders['u_BC']: u_BC,model.placeholders['x_h_BC']: X_h_BC[:, 0:1],model.placeholders['t_h_BC']: X_h_BC[:, 1:2],model.placeholders['h_BC']: h_BC})if useObs: # 如果使用觀測數據tf_dict.update({model.placeholders['x_u_obs']: X_u_obs[:, 0:1],model.placeholders['t_u_obs']: X_u_obs[:, 1:2],model.placeholders['u_obs']: u_obs,model.placeholders['x_h_obs']: X_h_obs[:, 0:1],model.placeholders['t_h_obs']: X_h_obs[:, 1:2],model.placeholders['h_obs']: h_obs})tf_dict.update({model.placeholders['x_f']: X_f_train[:, 0:1], # 內部點 xmodel.placeholders['t_f']: X_f_train[:, 1:2]}) # 內部點 tmodel.train(10000, tf_dict) # 訓練模型 10000 輪# 預測和評估x_test, t_test = X_star[:Nt * Nx, 0:1], X_star[:Nt * Nx, 1:2] # 準備測試數據u_pred, h_pred = model.predict(x_test, t_test) # 預測結果u_pred, h_pred = u_pred.reshape(Nt, Nx), h_pred.reshape(Nt, Nx) # 重塑為原始形狀u_exact, h_exact = u_exact[:Nt], h_exact[:Nt] # 截取測試范圍error_h = np.linalg.norm(h_exact - h_pred, 2) / np.linalg.norm(h_exact, 2) # 計算 L2 誤差rmse_h = np.sqrt(((h_exact - h_pred) ** 2).mean()) # 計算 RMSEprint(f'Case {case_id} - Error h: {error_h:.3e}, RMSE h: {rmse_h:.3f} m') # 打印結果# 可視化hours = np.arange(Nt) / 120. if case_id > 2 else np.arange(Nt) * dt / 3600 # 轉換為小時x = x[::-1] * FACTOR if case_id > 2 else x # 反轉并轉換單位xx, tt = np.meshgrid(x, hours) # 生成網格h_pred *= FACTOR if case_id > 2 else 1 # 轉換單位h_exact *= FACTOR if case_id > 2 else 1 # 轉換單位eles = np.zeros_like(x) if case_id <= 2 else eles * FACTOR # 地形高程fig = plt.figure(figsize=(16.5, 15 if case_id == 6 else 8)) # 創建圖形對象gs = gridspec.GridSpec(3 if case_id <= 4 else 5, 4 if case_id == 6 else 3, hspace=0.08, wspace=0.15) # 設置網格levels = np.linspace(0, 5.4 if case_id > 2 else 0.6, 9 if case_id > 2 else 10) # 設置顏色條范圍ax0 = fig.add_subplot(gs[0, 1:3 if case_id <= 4 else 3:5]) # 添加第一個子圖cs = ax0.contourf(xx, tt, h_exact[:Nt], cmap='rainbow', levels=levels, alpha=0.8) # 繪制參考水深圖ax0.scatter(X_u_BC[:, 0] * FACTOR if case_id > 2 else X[:, 0:1][::3], # 繪制邊界條件點X_u_BC[:, 1] / 120. if case_id > 2 else T[:, 0:1][::3],marker='o', c='g', s=12, clip_on=False)ax0.scatter(x_obs_h * FACTOR if case_id > 2 else x_obs[::2], # 繪制觀測點t_obs_h / 120. if case_id > 2 else t_obs[::2],facecolors='none', edgecolors='k', marker='o', s=15, clip_on=False)ax0.scatter(X_h_IC[:, 0] * FACTOR if case_id > 2 else xx1_[:, 0][::3], # 繪制快照點X_h_IC[:, 1] / 120. if case_id > 2 else xx1_[:, 1][::3],marker='*', c='r', s=25, clip_on=False)ax0.set_ylabel('Time (h)') # 設置 y 軸標簽ax0.set_xticklabels([] if case_id <= 4 else ax0.get_xticklabels()) # 隱藏 x 軸標簽(若非最后一行)ax0.text(0.05, 0.9, f'(a) Ref', fontsize=16, transform=ax0.transAxes) # 添加子圖標簽divider = make_axes_locatable(ax0) # 創建顏色條布局cax = divider.append_axes("right", size="2%", pad=0.05) # 添加顏色條軸cb = fig.colorbar(cs, cax=cax, orientation='vertical') # 添加顏色條cb.ax.tick_params(labelsize=14) # 設置顏色條刻度字體大小cb.ax.yaxis.offsetText.set_fontsize(14) # 設置偏移文本字體大小cb.set_label('Water depth (m)', fontsize=14) # 設置顏色條標簽if case_id == 6: # Case 6 特殊處理,4 列 PINN 結果ax1, ax2, ax3, ax4 = [fig.add_subplot(gs[1, i * 2:(i + 1) * 2]) for i in range(4)] # 添加 4 個子圖for ax, h_pred_, label in zip([ax1, ax2, ax3, ax4], [h_pred2obs, h_pred3obs, h_pred4obs, h_pred5obs],['(b) PINN', '(c) PINN', '(d) PINN', '(e) PINN']):cs = ax.contourf(xx, tt, h_pred_[:Nt], cmap='rainbow', levels=levels, alpha=0.8) # 繪制 PINN 水深ax.set_xticklabels([]) # 隱藏 x 軸標簽ax.set_yticklabels([] if ax != ax1 else ax.get_yticklabels()) # 僅第一個子圖顯示 y 軸標簽ax.text(0.05, 0.9, label, fontsize=16, transform=ax.transAxes) # 添加子圖標簽divider = make_axes_locatable(ax4) # 創建顏色條布局cax = divider.append_axes("right", size="2%", pad=0.05) # 添加顏色條軸cb = fig.colorbar(cs, cax=cax, orientation='vertical') # 添加顏色條cb.ax.tick_params(labelsize=14) # 設置顏色條刻度字體大小cb.ax.yaxis.offsetText.set_fontsize(14) # 設置偏移文本字體大小cb.set_label('Water depth (m)', fontsize=14) # 設置顏色條標簽ax5, ax6, ax7, ax8 = [fig.add_subplot(gs[2, i * 2:(i + 1) * 2]) for i in range(4)] # 添加 4 個誤差子圖for ax, h_pred_, label in zip([ax5, ax6, ax7, ax8], [h_pred2obs, h_pred3obs, h_pred4obs, h_pred5obs],['(f) PINN-Ref', '(g) PINN-Ref', '(h) PINN-Ref', '(i) PINN-Ref']):cs = ax.contourf(xx, tt, h_pred_[:Nt] - h_exact[:Nt], cmap='bwr', levels=np.linspace(-0.5, 0.5, 11),alpha=0.8, extend='both') # 繪制 PINN 誤差ax.set_xticklabels([]) # 隱藏 x 軸標簽ax.set_yticklabels([] if ax != ax5 else ax.get_yticklabels()) # 僅第一個子圖顯示 y 軸標簽ax.text(0.05, 0.9, label, fontsize=16, transform=ax.transAxes) # 添加子圖標簽ax9, ax10, ax11, ax12 = [fig.add_subplot(gs[3, i * 2:(i + 1) * 2]) for i in range(4)] # 添加 4 個插值子圖for ax, h_interp_, label in zip([ax9, ax10, ax11, ax12], [h_interp2obs, h_interp3obs, h_interp4obs, h_interp5obs],['(j) Interpolation', '(k) Interpolation', '(l) Interpolation', '(m) Interpolation']):cs = ax.contourf(xx, tt, h_interp_[:Nt], cmap='rainbow', levels=levels, alpha=0.8) # 繪制插值水深ax.set_xticklabels([]) # 隱藏 x 軸標簽ax.set_yticklabels([] if ax != ax9 else ax.get_yticklabels()) # 僅第一個子圖顯示 y 軸標簽ax.text(0.05, 0.9, label, fontsize=16, transform=ax.transAxes) # 添加子圖標簽divider = make_axes_locatable(ax12) # 創建顏色條布局cax = divider.append_axes("right", size="2%", pad=0.05) # 添加顏色條軸cb = fig.colorbar(cs, cax=cax, orientation='vertical') # 添加顏色條cb.ax.tick_params(labelsize=14) # 設置顏色條刻度字體大小cb.ax.yaxis.offsetText.set_fontsize(14) # 設置偏移文本字體大小cb.set_label('Water depth (m)', fontsize=14) # 設置顏色條標簽ax13, ax14, ax15, ax16 = [fig.add_subplot(gs[4, i * 2:(i + 1) * 2]) for i in range(4)] # 添加 4 個插值誤差子圖for ax, h_interp_, label in zip([ax13, ax14, ax15, ax16], [h_interp2obs, h_interp3obs, h_interp4obs, h_interp5obs],['(n) Interpolation-Ref', '(o) Interpolation-Ref', '(p) Interpolation-Ref', '(q) Interpolation-Ref']):cs = ax.contourf(xx, tt, h_interp_[:Nt] - h_exact[:Nt], cmap='bwr', levels=np.linspace(-0.5, 0.5, 11),alpha=0.8, extend='both') # 繪制插值誤差ax.set_xlabel('Distance upstream (m)' if case_id > 2 else 'Distance (m)') # 設置 x 軸標簽ax.set_yticklabels([] if ax != ax13 else ax.get_yticklabels()) # 僅第一個子圖顯示 y 軸標簽ax.text(0.05, 0.9, label, fontsize=16, transform=ax.transAxes) # 添加子圖標簽divider = make_axes_locatable(ax16) # 創建顏色條布局cax = divider.append_axes("right", size="2%", pad=0.05) # 添加顏色條軸cb = fig.colorbar(cs, cax=cax, orientation='vertical') # 添加顏色條cb.ax.tick_params(labelsize=14) # 設置顏色條刻度字體大小cb.ax.yaxis.offsetText.set_fontsize(14) # 設置偏移文本字體大小cb.set_label('Error (m)', fontsize=14) # 設置顏色條標簽else: # Case 1-4 或 5 的標準布局ax1 = fig.add_subplot(gs[1, :2]) # 添加 PINN 子圖cs = ax1.contourf(xx, tt, h_pred[:Nt], cmap='rainbow', levels=levels, alpha=0.8) # 繪制 PINN 水深ax1.set_ylabel('Time (h)' if case_id > 2 else 'Time (s)') # 設置 y 軸標簽ax1.set_xticklabels([]) # 隱藏 x 軸標簽ax1.text(0.05, 0.9, '(b) PINN', fontsize=16, transform=ax1.transAxes) # 添加子圖標簽ax2 = fig.add_subplot(gs[1, 2:]) # 添加參考或插值子圖cs = ax2.contourf(xx, tt, h_exact[:Nt], cmap='rainbow', levels=levels, alpha=0.8) # 繪制參考或插值水深ax2.set_xticklabels([]) # 隱藏 x 軸標簽ax2.set_yticklabels([] if case_id <= 4 else ax2.get_yticklabels()) # 隱藏 y 軸標簽(若非 Case 5-6)ax2.text(0.05, 0.9, '(c) Interpolation' if case_id > 2 else '(c) Ref', fontsize=16, transform=ax2.transAxes) # 添加子圖標簽divider = make_axes_locatable(ax2) # 創建顏色條布局cax = divider.append_axes("right", size="2%", pad=0.05) # 添加顏色條軸cb = fig.colorbar(cs, cax=cax, orientation='vertical') # 添加顏色條cb.ax.tick_params(labelsize=14) # 設置顏色條刻度字體大小cb.ax.yaxis.offsetText.set_fontsize(14) # 設置偏移文本字體大小cb.set_label('Water depth (m)', fontsize=14) # 設置顏色條標簽ax3 = fig.add_subplot(gs[2, :2]) # 添加 PINN 誤差子圖cs = ax3.contourf(xx, tt, h_pred[:Nt] - h_exact[:Nt], cmap='bwr', levels=np.linspace(-0.5, 0.5, 11),alpha=0.8, extend='both') # 繪制 PINN 誤差ax3.set_xlabel('Distance upstream (m)' if case_id > 2 else 'Distance (m)') # 設置 x 軸標簽ax3.set_ylabel('Time (h)' if case_id > 2 else 'Time (s)') # 設置 y 軸標簽ax3.text(0.05, 0.9, '(d) PINN-Ref', fontsize=16, transform=ax3.transAxes) # 添加子圖標簽ax4 = fig.add_subplot(gs[2, 2:]) # 添加參考或插值誤差子圖cs = ax4.contourf(xx, tt, h_exact[:Nt] - h_exact[:Nt] if case_id <= 2 else h_exact[:Nt],cmap='bwr', levels=np.linspace(-0.5, 0.5, 11), alpha=0.8, extend='both') # 繪制誤差ax4.set_xlabel('Distance upstream (m)' if case_id > 2 else 'Distance (m)') # 設置 x 軸標簽ax4.set_yticklabels([] if case_id <= 4 else ax4.get_yticklabels()) # 隱藏 y 軸標簽(若非 Case 5-6)ax4.text(0.05, 0.9, '(e) Interpolation-Ref' if case_id > 2 else '(e) Ref-Ref', fontsize=16, transform=ax4.transAxes) # 添加子圖標簽divider = make_axes_locatable(ax4) # 創建顏色條布局cax = divider.append_axes("right", size="2%", pad=0.05) # 添加顏色條軸cb = fig.colorbar(cs, cax=cax, orientation='vertical') # 添加顏色條cb.ax.tick_params(labelsize=14) # 設置顏色條刻度字體大小cb.ax.yaxis.offsetText.set_fontsize(14) # 設置偏移文本字體大小cb.set_label('Error (m)', fontsize=14) # 設置顏色條標簽tlist = [20, 50, 100, 200] if case_id > 2 else [int(i * 60 / dt) for i in [20, 30, 40, 60]] # 選擇時間點fig, axes = plt.subplots(2, 2, figsize=(15, 6 if case_id <= 4 else 12), sharex=True, sharey=True) # 創建沿河圖axes = axes.ravel() # 展平軸數組for k in range(4): # 遍歷四個時間點axes[k].plot(x, h_exact[tlist[k]] + eles if case_id > 2 else h_exact[tlist[k]], 'ok', label='reference') # 繪制參考曲線axes[k].plot(x, h_pred[tlist[k]] + eles if case_id > 2 else h_pred[tlist[k]], '-r', linewidth=2, label='PINN') # 繪制 PINN 曲線if case_id > 2: # Case 3-6 添加插值曲線axes[k].plot(x, h_exact[tlist[k]] + eles, '-g', linewidth=2, label='Interpolation') # 繪制插值曲線axes[k].fill_between(x.flatten(), eles, color='0.7') # 填充地形axes[k].text(0.85, 0.85, f't={tlist[k]} h' if case_id > 2 else f't={tlist[k] * dt} s',fontsize=15, transform=axes[k].transAxes) # 添加時間標簽if k in [0, 2]: # 設置 y 軸標簽axes[k].set_ylabel('Water stage (m)')axes[k].set_ylim([0, 5 if case_id > 2 else 0.6]) # 設置 y 軸范圍axes[k].grid() # 添加網格if k in [2, 3]: # 設置 x 軸標簽axes[k].set_xlabel('Distance upstream (m)' if case_id > 2 else 'Distance (m)')if k == 0: # 添加圖例axes[k].legend(loc=2 if case_id > 2 else 4, prop={'size': 14})plt.tight_layout() # 調整布局plt.savefig(f'figures/case{case_id}/along_channel.pdf') # 保存沿河圖plt.close() # 關閉圖形fig.savefig(f'figures/case{case_id}/contour.pdf') # 保存輪廓圖plt.close() # 關閉圖形
結論
論文作者首次嘗試將PINN應用于大尺度河流模型降尺度至子網格中,該框架可解一維SVE方程、通過傅里葉變換編碼潮汐邊界周期性變化、同化多種數據類型并集成到大型河流模型。實驗證明PINN在洪水建模中有重要作用,為解決洪災問題應用提供了方法。
不足以及展望
作者在最后提出了論文中的不足:
- 測試案例相對簡單,僅應用恒定摩擦系數,未考慮河道幾何形狀影響。
- 現實中常見的數據問題如觀測不確定性、數據不一致和觀測缺失等也未充分探討,且PINN在解決河口非線性相互作用時精度有待提高。
作者對未來工作的展望如下:
未來可將PINN降尺度應用于更廣泛、更現實的流動條件。考慮可變摩擦系數、河道幾何形狀影響,開發更強大的抗噪聲算法處理數據不確定性。探索擴展PINN到二維領域,用于創建復雜流動區域的數字孿生,以解決當前大尺度模型無法解決的問題。