簡述
「使用邏輯回歸暴力預測金融欺詐,并不斷增加特征維度持續測試」的做法,體現了一種逐步建模與迭代驗證的實驗思路,在金融欺詐檢測中非常有價值,本文作為一篇回顧性記錄了早年間公司給某行做反欺詐預測用到的技術和思路。百度原始論文目前搜不到了,當時這篇論文的要點就提到,通過特征增廣,只要是特征就用起來,使用最簡單的邏輯回歸模型來提高預測的精確度。在工程上會攢出驚人數量級的特征維度, 但是效果也是很明顯的
簡單說了有很明顯的以下意義
類別 | 意義與作用 |
---|---|
建模策略 | 用邏輯回歸打底,快速獲得欺詐檢測的性能基線 |
特征工程 | 逐步加入特征可以精準評估其增益,防止過擬合或噪聲引入 |
可解釋性 | 每一輪邏輯回歸結果都可以回溯和解釋,滿足金融合規要求 |
?
不斷增加特征維度:逐步特征注入的意義
“不斷增加特征維度并進行邏輯回歸測試”,這是一個典型的特征迭代探索過程(Feature Incremental Analysis),其意義包括:
1. 評估每個特征或一組特征的預測增益
通過每次只加入一列或一組新特征,觀察模型性能變化(如 Recall 提升情況),可明確:哪些特征有效,哪些冗余或帶噪聲
2. 幫助構建更輕量可解釋的模型
在金融場景下,可解釋性極重要。逐步加入有效特征,有助于合規:便于模型在監管審查時解釋每一步
?3. 支持工程實踐中的在線迭代開發
這種“增量構建 + 持續測試”的方式與業務需求匹配:可能后續有新特征,如設備類型、IP地址、商戶代碼,逐步測試可以確保每次上線的模型在當前特征下最優。
工程意義
持續測試不是純學術,而是貼近工程現實的:
工程需求 | 你這樣做的優勢 |
---|---|
模型部署快速迭代 | 每次新特征上線后,能快速評估是否要加入生產 |
資源限制 | 簡化邏輯回歸相比復雜模型更容易上線測試 |
數據動態更新 | 新交易行為、新維度不斷加入,適合增量測試 |
數據集Kaggle – Credit Card Fraud
-
鏈接:https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud
-
規模:284,807 條交易數據,欺詐樣本比例:0.17%(極度不平衡)
-
特征:30 個字段,含匿名特征 V1~V28 + 時間、金額、標簽(Class)實際信用卡交易,匿名化處理,適合模型測試和不平衡分類研究
?
代碼實現
實現一個手寫的邏輯回歸模型來預測信用卡欺詐行為,使用creditcard.csv數據集,并逐步引入新特征來提高準確性。從頭實現邏輯回歸,包括數據預處理模型訓練和特征選擇。
一般思路
- 數據加載和預處理:
- 加載creditcard.csv數據集(假設包含特征如Time, V1-V28, Amount, Class)。
- 對特征進行標準化(StandardScaler手動實現),因為邏輯回歸對特征尺度敏感。
- 處理數據不平衡(信用卡欺詐數據集通常正負樣本極不平衡),使用下采樣或上采樣。
- 邏輯回歸模型:
- 實現sigmoid函數、損失函數(交叉熵損失)。
- 使用梯度下降進行優化。
- 手動實現預測和評估指標(準確率、精確率、召回率、F1分數)。
- 特征工程:
- 初始模型使用所有特征(V1-V28, Amount)。
- 逐步引入新特征,例如:Amount的衍生特征(對數轉換、標準化后的平方)。Time特征的周期性特征(如小時、分鐘)。V1-V28的交互項或多項式特征。
- 使用特征重要性(基于權重絕對值)選擇最有影響的特征。
- 迭代優化:
- 通過多次實驗,比較不同特征組合的性能。
- 記錄每次引入新特征后的模型性能(F1分數為主,因為數據不平衡)。
在欺詐預測里指標的含義
1. Accuracy(準確率)
整體上模型預測對了多少比例。
在欺詐檢測中的意義:
模型整體預測對的比例。
?? 由于欺詐比例極低(如僅 0.17%),高準確率常常是虛假的。比如:
-
你預測全部是“正常交易”,準確率也可以達到 99.8%。
-
但模型其實一個欺詐都沒識別出來。
?2. Precision(精確率)
預測為欺詐的交易中,實際真的是欺詐的比例。
在欺詐檢測中的意義:
模型命中的質量。預測欺詐是否精準?
-
高 precision:你標為“欺詐”的交易,大多數是真的欺詐。
-
低 precision:你誤報太多“正常交易”為欺詐(誤殺正常用戶)。
3. Recall(召回率)
所有真實欺詐中,被模型成功識別出來的比例。
在欺詐檢測中的意義:
模型有沒有漏掉真正的欺詐?
-
高 recall:幾乎所有欺詐交易都抓到了(不漏)
-
低 recall:很多欺詐交易模型沒抓到(漏檢)
4. F1-score(調和平均分)
Precision 和 Recall 的調和平均,綜合考慮“誤殺”和“漏檢”。
在欺詐檢測中的意義:
衡量模型綜合能力的關鍵指標。適合在不平衡數據下使用。
-
當你既不想漏掉欺詐(Recall),也不想誤報正常交易(Precision),就用 F1。
流程
[開始]|v
[初始化參數]- feature_subset_ratio = 0.5- learning_rate = 0.001- num_iterations = 3000- num_trials = 100- class_weights = {0: 1.0, 1: 10.0}- stages = [Initial, Stage 1, Stage 2, Stage 3]|v
[循環每個階段 (stage_name, stage)]|v
[初始化階段性能記錄]- metrics = []- last_feature_names = None- last_feature_importance = None|v
[循環每次試驗 (trial = 1 to num_trials)]|v
[加載和預處理數據]- random_seed = 42 + trial- 加載 creditcard.csv- 下采樣 (20% 非欺詐樣本)- 隨機選擇 50% 特征- 標準化特征- 返回 X_subset, y, selected_feature_names, original_X, 索引信息|v
[分割訓練/測試集]- 使用 random_seed 分割 80% 訓練集, 20% 測試集- X_train, X_test, y_train, y_test|v
[特征增廣]- 調用 add_features_stage(stage)- 階段0: 隨機特征子集- 階段1: + Amount_log, Amount_square- 階段2: + Time_hours- 階段3: + V1_V2_interaction, V3_V4_interaction, V1_square- 返回 X_new, feature_names, num_new_features|v
[初始化權重]- 如果 stage == 0 或 prev_weights 為空:init_weights = zeros(X_new.shape[1])- 否則:init_weights = [prev_weights, zeros(num_new_features)]|v
[訓練模型]- 調用 train_and_evaluate- 使用 gradient_descent (init_weights, learning_rate, num_iterations, class_weights)- 加權損失函數優化召回率- 返回 metrics (accuracy, precision, recall, f1), weights|v
[保存結果]- metrics.append(metrics)- 如果 trial == num_trials - 1:保存 last_feature_names, last_feature_importance- 更新 prev_weights = weights|v
[結束試驗循環]|v
[計算平均性能]- avg_metrics = mean(metrics)- 輸出 Accuracy, Precision, Recall, F1|v
[輸出特征重要性 (僅 Stage 3)]- 如果 stage == 3:創建 DataFrame(last_feature_names, last_feature_importance)按 Importance 降序輸出|v
[結束階段循環]|v
[結束]
代碼
實際經過多輪修改后的代碼
import numpy as np
import pandas as pd
import random# Sigmoid函數
def sigmoid(z):return 1 / (1 + np.exp(-np.clip(z, -500, 500)))# 加權交叉熵損失函數
def compute_weighted_loss(y, y_pred, class_weights):y_pred = np.clip(y_pred, 1e-15, 1 - 1e-15)weights = np.where(y == 1, class_weights[1], class_weights[0])return -np.mean(weights * (y * np.log(y_pred) + (1 - y) * np.log(1 - y_pred)))# 梯度下降(加權)
def gradient_descent(X, y, weights, learning_rate, num_iterations, class_weights):m = len(y)for _ in range(num_iterations):y_pred = sigmoid(np.dot(X, weights))weights_vector = np.where(y == 1, class_weights[1], class_weights[0])gradient = np.dot(X.T, weights_vector * (y_pred - y)) / mweights -= learning_rate * gradientreturn weights# 標準化特征
def standardize(X):mean = np.mean(X, axis=0)std = np.std(X, axis=0)std[std == 0] = 1 # 防止除以零return (X - mean) / std, mean, std# 評估指標
def evaluate(y_true, y_pred):y_pred_binary = (y_pred >= 0.5).astype(int)accuracy = np.mean(y_true == y_pred_binary)precision = np.sum((y_true == 1) & (y_pred_binary == 1)) / np.sum(y_pred_binary == 1) if np.sum(y_pred_binary == 1) > 0 else 0recall = np.sum((y_true == 1) & (y_pred_binary == 1)) / np.sum(y_true == 1) if np.sum(y_true == 1) > 0 else 0f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0return accuracy, precision, recall, f1# 加載和預處理數據
def load_and_preprocess_data(filepath, downsample_ratio=0.2, feature_subset_ratio=0.5, random_seed=42):df = pd.read_csv(filepath)feature_names = df.columns.drop('Class').tolist()X = df.drop(columns=['Class']).valuesy = df['Class'].values# 下采樣負樣本random.seed(random_seed)fraud_indices = np.where(y == 1)[0]non_fraud_indices = np.where(y == 0)[0]sampled_non_fraud = random.sample(list(non_fraud_indices), int(len(non_fraud_indices) * downsample_ratio))selected_indices = np.concatenate([fraud_indices, sampled_non_fraud])np.random.seed(random_seed)np.random.shuffle(selected_indices)X = X[selected_indices]y = y[selected_indices]# 隨機選擇一部分特征num_features = X.shape[1]num_selected = int(num_features * feature_subset_ratio)selected_feature_indices = random.sample(range(num_features), num_selected)selected_feature_indices.sort()X_subset = X[:, selected_feature_indices]selected_feature_names = [feature_names[i] for i in selected_feature_indices]# 記錄Time、Amount、V1、V3、V4的列索引time_col = feature_names.index('Time') if 'Time' in feature_names else -1amount_col = feature_names.index('Amount') if 'Amount' in feature_names else -1v1_col = feature_names.index('V1') if 'V1' in feature_names else -1v3_col = feature_names.index('V3') if 'V3' in feature_names else -1v4_col = feature_names.index('V4') if 'V4' in feature_names else -1time_col_selected = selected_feature_indices.index(time_col) if time_col in selected_feature_indices else -1amount_col_selected = selected_feature_indices.index(amount_col) if amount_col in selected_feature_indices else -1v1_col_selected = selected_feature_indices.index(v1_col) if v1_col in selected_feature_indices else -1v3_col_selected = selected_feature_indices.index(v3_col) if v3_col in selected_feature_indices else -1v4_col_selected = selected_feature_indices.index(v4_col) if v4_col in selected_feature_indices else -1# 標準化X_subset, mean, std = standardize(X_subset)return X_subset, y, mean, std, selected_feature_names, X, selected_feature_indices, time_col, amount_col, v1_col, v3_col, v4_col, time_col_selected, amount_col_selected, v1_col_selected, v3_col_selected, v4_col_selected# 創建新特征(分階段)
def add_features_stage(X, feature_names, original_X, selected_feature_indices, time_col, amount_col, v1_col, v3_col,v4_col, time_col_selected, amount_col_selected, v1_col_selected, v3_col_selected,v4_col_selected, stage):X_new = X.copy()new_feature_names = feature_names.copy()num_new_features = 0if stage >= 1: # 階段1:Amount相關特征if amount_col_selected != -1:amount = original_X[:, amount_col]amount_log = np.log1p(np.abs(amount))X_new = np.column_stack([X_new, amount_log])new_feature_names.append('Amount_log')amount_square = amount ** 2X_new = np.column_stack([X_new, amount_square])new_feature_names.append('Amount_square')num_new_features += 2if stage >= 2: # 階段2:Time相關特征if time_col_selected != -1:time_hours = (original_X[:, time_col] % (24 * 3600)) / 3600X_new = np.column_stack([X_new, time_hours])new_feature_names.append('Time_hours')num_new_features += 1if stage >= 3: # 階段3:交互項和多項式特征v1_col_idx = feature_names.index('V1') if 'V1' in feature_names else -1v2_col_idx = feature_names.index('V2') if 'V2' in feature_names else -1if v1_col_idx != -1 and v2_col_idx != -1:v1_v2_interaction = original_X[:, feature_names.index('V1')] * original_X[:, feature_names.index('V2')]X_new = np.column_stack([X_new, v1_v2_interaction])new_feature_names.append('V1_V2_interaction')num_new_features += 1if v3_col_selected != -1 and v4_col_selected != -1:v3_v4_interaction = original_X[:, v3_col] * original_X[:, v4_col]X_new = np.column_stack([X_new, v3_v4_interaction])new_feature_names.append('V3_V4_interaction')num_new_features += 1if v1_col_selected != -1:v1_square = original_X[:, v1_col] ** 2X_new = np.column_stack([X_new, v1_square])new_feature_names.append('V1_square')num_new_features += 1return X_new, new_feature_names, num_new_features# 訓練和評估模型
def train_and_evaluate(X_train, X_test, y_train, y_test, learning_rate, num_iterations, class_weights, init_weights):weights = init_weightsweights = gradient_descent(X_train, y_train, weights, learning_rate, num_iterations, class_weights)y_pred = sigmoid(np.dot(X_test, weights))return evaluate(y_test, y_pred), weights# 主函數
def main():filepath = 'creditcard.csv'feature_subset_ratio = 0.5learning_rate = 0.001num_iterations = 3000num_trials = 100class_weights = {0: 1.0, 1: 10.0} # 欺詐樣本權重更高stages = [("Initial (Random Subset)", 0),("Stage 1: Amount Features", 1),("Stage 2: Time Features", 2),("Stage 3: Interaction & Polynomial Features", 3)]print(f"\n=== Feature Subset Ratio: {feature_subset_ratio}, Learning Rate: {learning_rate}, Iterations: {num_iterations} ===")prev_weights = None # 用于跨階段繼承權重for stage_name, stage in stages:print(f"\n--- {stage_name} ---")metrics = []last_feature_names = Nonelast_feature_importance = Nonefor trial in range(num_trials):print(f"\nTrial {trial + 1}")random_seed = 42 + trial# 加載數據X_subset, y, mean, std, selected_feature_names, original_X, selected_feature_indices, time_col, amount_col, v1_col, v3_col, v4_col, time_col_selected, amount_col_selected, v1_col_selected, v3_col_selected, v4_col_selected = load_and_preprocess_data(filepath, downsample_ratio=0.2, feature_subset_ratio=feature_subset_ratio, random_seed=random_seed)# 分割訓練和測試集np.random.seed(random_seed)indices = np.arange(len(y))np.random.shuffle(indices)train_size = int(0.8 * len(y))train_idx, test_idx = indices[:train_size], indices[train_size:]X_train, X_test = X_subset[train_idx], X_subset[test_idx]y_train, y_test = y[train_idx], y[test_idx]# 添加特征X_new, feature_names, num_new_features = add_features_stage(X_subset, selected_feature_names, original_X,selected_feature_indices, time_col, amount_col,v1_col, v3_col, v4_col, time_col_selected,amount_col_selected, v1_col_selected,v3_col_selected, v4_col_selected, stage)X_train_new, X_test_new = X_new[train_idx], X_new[test_idx]# 初始化權重(繼承上一階段權重)if stage == 0 or prev_weights is None:init_weights = np.zeros(X_new.shape[1])else:init_weights = np.zeros(X_new.shape[1])init_weights[:len(prev_weights)] = prev_weights# 訓練和評估result, weights = train_and_evaluate(X_train_new, X_test_new, y_train, y_test, learning_rate,num_iterations, class_weights, init_weights)metrics.append(result)print(f"Accuracy: {result[0]:.4f}, Precision: {result[1]:.4f}, Recall: {result[2]:.4f}, F1: {result[3]:.4f}")# 保存最后一次的特征重要性if trial == num_trials - 1:last_feature_names = feature_nameslast_feature_importance = np.abs(weights)# 計算平均性能avg_metrics = np.mean(metrics, axis=0)print(f"\nAverage Performance for {stage_name}:")print(f"Accuracy: {avg_metrics[0]:.4f}, Precision: {avg_metrics[1]:.4f}, Recall: {avg_metrics[2]:.4f}, F1: {avg_metrics[3]:.4f}")# 更新跨階段權重(使用最后一次試驗的權重)prev_weights = weights# 輸出特征重要性(僅最后階段)if stage == 3 and last_feature_names and last_feature_importance is not None:if len(last_feature_names) != len(last_feature_importance):print(f"Error: Length mismatch - feature_names: {len(last_feature_names)}, feature_importance: {len(last_feature_importance)}")returnimportance_df = pd.DataFrame({'Feature': last_feature_names, 'Importance': last_feature_importance})print(f"\nFeature Importance (Last Trial, Stage 3):")print(importance_df.sort_values(by='Importance', ascending=False))if __name__ == "__main__":main()
- 初始化參數:
- 設置超參數:feature_subset_ratio=0.5(隨機選擇50%特征),learning_rate=0.001,num_iterations=3000,num_trials=100,class_weights={0: 1.0, 1: 10.0}(欺詐樣本權重更高)。
- 定義特征增廣階段(stages):
- 階段0:初始隨機特征子集。
- 階段1:添加Amount_log、Amount_square。
- 階段2:添加Time_hours。
- 階段3:添加V1_V2_interaction、V3_V4_interaction、V1_square。
- 循環每個特征增廣階段:
- 對于每個階段(Initial, Stage 1, Stage 2, Stage 3):
- 初始化性能記錄:創建列表metrics存儲每次試驗的性能指標。
- 初始化特征重要性:保存最后一次試驗的特征名稱和權重。
- 對于每個階段(Initial, Stage 1, Stage 2, Stage 3):
訓練和評估
預期的結果是隨著特征增廣后,越來越多的特征的加入預測的召回率會更高, 能把更多的欺詐項目找出來,而且模型訓練過程并不需要改變模型結構
--- Initial (Random Subset) ---
...
Trial 100
Accuracy: 0.5832, Precision: 0.0209, Recall: 0.9508, F1: 0.0409...
--- Stage 1: Amount Features ---
...
Trial 100
Accuracy: 0.5832, Precision: 0.2692, Recall: 0.9808, F1: 0.4224...
--- Stage 2: Time Features ---
Accuracy: 0.9904, Precision: 0.4431, Recall: 0.8132, F1: 0.5736...
--- Stage 3: Interaction & Polynomial Features ---
Accuracy: 0.9908, Precision: 0.4934, Recall: 0.7212, F1: 0.5859
?
?
?