目錄
基于約束的方法 (Constraint-based)
基于評分的方法 (Score-based)
基于函數因果模型的方法 (Functional Causal Models)
基于梯度的方法 (Gradient-based)
因果發現是機器學習中一個重要的研究方向,它旨在從觀測數據中推斷變量之間的因果關系
基于約束的方法 (Constraint-based)
- 核心思想:利用條件獨立性檢驗來推斷因果結構
- 代表算法:PC算法、FCI算法、RFCI算法
## 因果發現算法
# 1. 基于約束的方法——PC算法
from pgmpy.estimators import PC
import pandas as pd
import numpy as np# 生成數據
data = pd.DataFrame(np.random.randn(1000, 3), columns=['X', 'Y', 'Z'])# PC算法
est = PC(data)
model = est.estimate(variant="orig", alpha=0.05)
print("因果邊:", model.edges())
基于評分的方法 (Score-based)
- 核心思想:定義評分函數評估圖結構,搜索得分最高的圖
- 代表算法:GES算法、FGES算法
# 2. 基于評分的方法——GES算法
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
from itertools import permutations
from sklearn.linear_model import LinearRegressionnp.random.seed(42)
X = np.random.normal(size=1000)
Y = 0.5 * X + np.random.normal(size=1000)
Z = 0.3 * Y + np.random.normal(size=1000)
data = pd.DataFrame({'X': X, 'Y': Y, 'Z': Z})variables = data.columns.tolist()
print("變量列表:", variables) # ['X', 'Y', 'Z']def compute_bic(data, parents_dict):"""計算 BIC 評分(線性高斯模型)"""score = 0for node in data.columns:X = data[parents_dict[node]] if parents_dict[node] else np.zeros((len(data), 1))y = data[node]model = LinearRegression().fit(X, y)residuals = y - model.predict(X)n = len(data)k = len(parents_dict[node])sigma2 = np.var(residuals)score += -n * np.log(sigma2) / 2 - k * np.log(n) / 2return score# 窮舉所有可能的 DAG(檢查 edges 生成)
best_score = -np.inf
best_graph = Noneall_edges = list(permutations(variables, 2))
print("所有可能的邊組合:", all_edges) # [('X', 'Y'), ('X', 'Z'), ('Y', 'X'), ('Y', 'Z'), ('Z', 'X'), ('Z', 'Y')]for u, v in all_edges:graph = {node: [] for node in variables}graph[v].append(u) # 添加邊 u -> vscore = compute_bic(data, graph)if score > best_score:best_score = scorebest_graph = graph# 可視化
G = nx.DiGraph()
for node in best_graph:for parent in best_graph[node]:G.add_edge(parent, node)nx.draw(G, with_labels=True, node_color='lightgreen')
plt.title("GES Algorithm (Simplified)")
plt.savefig('ges_graph.png')
plt.show()
基于函數因果模型的方法 (Functional Causal Models)
- 核心思想:假設數據生成過程的函數形式
- 代表算法:LiNGAM、ANM(加性噪聲模型)、PNL(后非線性模型)
# 3. 基于函數因果模型的方法——LiNGAM
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx
from itertools import permutationsnp.random.seed(42)
X = np.random.normal(size=1000)
Y = 0.5 * X + np.random.normal(size=1000)
Z = 0.3 * Y + np.random.normal(size=1000)
data = pd.DataFrame({'X': X, 'Y': Y, 'Z': Z})data = (data - data.mean()) / data.std()try:# 嘗試使用 causallearn 的 LiNGAMfrom causallearn.search.FCMBased.lingam import DirectLiNGAMmodel = DirectLiNGAM()model.fit(data)adj_matrix = model.adjacency_matrix_print("LiNGAM 鄰接矩陣:\n", adj_matrix)# [[0. 0.41099056 0.]# [0. 0. 0.]# [0. 0.31428138 0.]]except ImportError:print("未找到 causallearn,改用簡化版 LiNGAM(基于回歸殘差)")# 簡化版 LiNGAM(基于殘差獨立性檢驗)adj_matrix = np.zeros((3, 3)) # 初始化鄰接矩陣variables = data.columns.tolist()for i, target in enumerate(variables):predictors = [var for var in variables if var != target]X_pred = data[predictors].valuesy_target = data[target].values# 多元線性回歸coef = np.linalg.lstsq(X_pred, y_target, rcond=None)[0]residuals = y_target - X_pred.dot(coef)# 檢查殘差與預測變量的獨立性(簡化版:相關系數)for j, predictor in enumerate(predictors):corr = np.corrcoef(residuals, data[predictor])[0, 1]if abs(corr) < 0.05: # 閾值可調整adj_matrix[variables.index(predictor), i] = 1 # predictor -> target# 可視化
G = nx.DiGraph(adj_matrix)
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=True, node_size=1000, arrowsize=20,node_color='lightgreen', edge_color='gray')
plt.title('LiNGAM Causal Graph (Direct)' if 'model' in locals() else 'Simplified LiNGAM')
plt.show()
基于梯度的方法 (Gradient-based)
- 核心思想:使用神經網絡和梯度下降學習因果結構
- 代表算法:DAG-GNN、NOTEARS
# 4. 基于梯度的方法——NOTEARS
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt# 生成數據
np.random.seed(42)
X = np.random.normal(size=1000)
Y = 0.5 * X + np.random.normal(size=1000)
Z = 0.3 * Y + np.random.normal(size=1000)
data = np.column_stack([X, Y, Z])
data = (data - data.mean(axis=0)) / data.std(axis=0) # 標準化# 自定義簡化版NOTEARS(梯度下降優化)
def notears_simple(X, lambda1=0.1, max_iter=100):n_features = X.shape[1]W = np.zeros((n_features, n_features)) # 初始化鄰接矩陣for _ in range(max_iter):# 計算梯度(最小二乘損失 + 無環約束)grad = -X.T @ (X - X @ W) / len(X) + lambda1 * np.sign(W)# 投影梯度更新(確保無環)W -= 0.01 * grad # 學習率W = np.clip(W, -1, 1) # 限制權重范圍return (W != 0).astype(int) # 二值化鄰接矩陣# 運行并可視化
W_est = notears_simple(data)
labels = ['X', 'Y', 'Z']
G = nx.DiGraph(W_est, labels=labels)
nx.draw(G, with_labels=True, node_color='lightblue', arrowsize=20)
plt.title('Simplified NOTEARS Result')
plt.show()