本篇是對Kaggle上Home Credit - Credit Risk Model Stability競賽中的開源代碼VotingClassifier Home Credit的解讀。原鏈接在VotingClassifier Home Credit (kaggle.com)。
%%writefile script.py
import sys
from pathlib import Path
import subprocess
import os
import gc
from glob import globimport numpy as np
import pandas as pd
import polars as pl
from datetime import datetime
import seaborn as sns
import matplotlib.pyplot as plt
import joblib
import warnings
warnings.filterwarnings('ignore')ROOT = '/kaggle/input/home-credit-credit-risk-model-stability'from sklearn.model_selection import TimeSeriesSplit, GroupKFold, StratifiedGroupKFold
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.metrics import roc_auc_score
import lightgbm as lgbclass Pipeline:def set_table_dtypes(df):for col in df.columns:if col in ["case_id", "WEEK_NUM", "num_group1", "num_group2"]:df = df.with_columns(pl.col(col).cast(pl.Int64))elif col in ["date_decision"]:df = df.with_columns(pl.col(col).cast(pl.Date))elif col[-1] in ("P", "A"):df = df.with_columns(pl.col(col).cast(pl.Float64))elif col[-1] in ("M",):df = df.with_columns(pl.col(col).cast(pl.String))elif col[-1] in ("D",):df = df.with_columns(pl.col(col).cast(pl.Date))return dfdef handle_dates(df):for col in df.columns:if col[-1] in ("D",):df = df.with_columns(pl.col(col) - pl.col("date_decision")) #!!?df = df.with_columns(pl.col(col).dt.total_days()) # t - t-1df = df.drop("date_decision", "MONTH")return dfdef filter_cols(df):for col in df.columns:if (col not in ["target", "case_id", "WEEK_NUM"]) & (df[col].dtype == pl.String):freq = df[col].n_unique()if (freq == 1) | (freq > 200):df = df.drop(col)return dfclass Aggregator:#Please add or subtract features yourself, be aware that too many features will take up too much space.def num_expr(df):cols = [col for col in df.columns if col[-1] in ("P", "A")]expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]return expr_maxdef date_expr(df):cols = [col for col in df.columns if col[-1] in ("D")]expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]return expr_maxdef str_expr(df):cols = [col for col in df.columns if col[-1] in ("M",)]expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]return expr_maxdef other_expr(df):cols = [col for col in df.columns if col[-1] in ("T", "L")]expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]return expr_max def count_expr(df):cols = [col for col in df.columns if "num_group" in col]expr_max = [pl.max(col).alias(f"max_{col}") for col in cols] return expr_maxdef get_exprs(df):exprs = Aggregator.num_expr(df) + \Aggregator.date_expr(df) + \Aggregator.str_expr(df) + \Aggregator.other_expr(df) + \Aggregator.count_expr(df)return exprsdef read_file(path, depth=None):df = pl.read_parquet(path)df = df.pipe(Pipeline.set_table_dtypes)if depth in [1,2]:df = df.group_by("case_id").agg(Aggregator.get_exprs(df)) return dfdef read_files(regex_path, depth=None):chunks = []for path in glob(str(regex_path)):df = pl.read_parquet(path)df = df.pipe(Pipeline.set_table_dtypes)if depth in [1, 2]:df = df.group_by("case_id").agg(Aggregator.get_exprs(df))chunks.append(df)df = pl.concat(chunks, how="vertical_relaxed")df = df.unique(subset=["case_id"])return dfdef feature_eng(df_base, depth_0, depth_1, depth_2):df_base = (df_base.with_columns(month_decision = pl.col("date_decision").dt.month(),weekday_decision = pl.col("date_decision").dt.weekday(),))for i, df in enumerate(depth_0 + depth_1 + depth_2):df_base = df_base.join(df, how="left", on="case_id", suffix=f"_{i}")df_base = df_base.pipe(Pipeline.handle_dates)return df_basedef to_pandas(df_data, cat_cols=None):df_data = df_data.to_pandas()if cat_cols is None:cat_cols = list(df_data.select_dtypes("object").columns)df_data[cat_cols] = df_data[cat_cols].astype("category")return df_data, cat_colsdef reduce_mem_usage(df):""" iterate through all the columns of a dataframe and modify the data typeto reduce memory usage. """start_mem = df.memory_usage().sum() / 1024**2for col in df.columns:col_type = df[col].dtypeif str(col_type)=="category":continueif col_type != object:c_min = df[col].min()c_max = df[col].max()if str(col_type)[:3] == 'int':if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:df[col] = df[col].astype(np.int8)elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:df[col] = df[col].astype(np.int16)elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:df[col] = df[col].astype(np.int32)elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:df[col] = df[col].astype(np.int64) else:if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:df[col] = df[col].astype(np.float16)elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:df[col] = df[col].astype(np.float32)else:df[col] = df[col].astype(np.float64)else:continueend_mem = df.memory_usage().sum() / 1024**2 return dfROOT = Path("/kaggle/input/home-credit-credit-risk-model-stability")TRAIN_DIR = ROOT / "parquet_files" / "train"
TEST_DIR = ROOT / "parquet_files" / "test"data_store = {"df_base": read_file(TRAIN_DIR / "train_base.parquet"),"depth_0": [read_file(TRAIN_DIR / "train_static_cb_0.parquet"),read_files(TRAIN_DIR / "train_static_0_*.parquet"),],"depth_1": [read_files(TRAIN_DIR / "train_applprev_1_*.parquet", 1),read_file(TRAIN_DIR / "train_tax_registry_a_1.parquet", 1),read_file(TRAIN_DIR / "train_tax_registry_b_1.parquet", 1),read_file(TRAIN_DIR / "train_tax_registry_c_1.parquet", 1),read_files(TRAIN_DIR / "train_credit_bureau_a_1_*.parquet", 1),read_file(TRAIN_DIR / "train_credit_bureau_b_1.parquet", 1),read_file(TRAIN_DIR / "train_other_1.parquet", 1),read_file(TRAIN_DIR / "train_person_1.parquet", 1),read_file(TRAIN_DIR / "train_deposit_1.parquet", 1),read_file(TRAIN_DIR / "train_debitcard_1.parquet", 1),],"depth_2": [read_file(TRAIN_DIR / "train_credit_bureau_b_2.parquet", 2),]
}df_train = feature_eng(**data_store)
del data_store
gc.collect()
df_train = df_train.pipe(Pipeline.filter_cols)
df_train, cat_cols = to_pandas(df_train)
df_train = reduce_mem_usage(df_train)
nums=df_train.select_dtypes(exclude='category').columns
from itertools import combinations, permutations
nans_df = df_train[nums].isna()
nans_groups={}
for col in nums:cur_group = nans_df[col].sum()try:nans_groups[cur_group].append(col)except:nans_groups[cur_group]=[col]
del nans_df; x=gc.collect()def reduce_group(grps):use = []for g in grps:mx = 0; vx = g[0]for gg in g:n = df_train[gg].nunique()if n>mx:mx = nvx = gguse.append(vx)return usedef group_columns_by_correlation(matrix, threshold=0.8):correlation_matrix = matrix.corr()groups = []remaining_cols = list(matrix.columns)while remaining_cols:col = remaining_cols.pop(0)group = [col]correlated_cols = [col]for c in remaining_cols:if correlation_matrix.loc[col, c] >= threshold:group.append(c)correlated_cols.append(c)groups.append(group)remaining_cols = [c for c in remaining_cols if c not in correlated_cols]return groupsuses=[]
for k,v in nans_groups.items():if len(v)>1:Vs = nans_groups[k]grps= group_columns_by_correlation(df_train[Vs], threshold=0.8)use=reduce_group(grps)uses=uses+useelse:uses=uses+v
df_train=df_train[uses]data_store = {"df_base": read_file(TEST_DIR / "test_base.parquet"),"depth_0": [read_file(TEST_DIR / "test_static_cb_0.parquet"),read_files(TEST_DIR / "test_static_0_*.parquet"),],"depth_1": [read_files(TEST_DIR / "test_applprev_1_*.parquet", 1),read_file(TEST_DIR / "test_tax_registry_a_1.parquet", 1),read_file(TEST_DIR / "test_tax_registry_b_1.parquet", 1),read_file(TEST_DIR / "test_tax_registry_c_1.parquet", 1),read_files(TEST_DIR / "test_credit_bureau_a_1_*.parquet", 1),read_file(TEST_DIR / "test_credit_bureau_b_1.parquet", 1),read_file(TEST_DIR / "test_other_1.parquet", 1),read_file(TEST_DIR / "test_person_1.parquet", 1),read_file(TEST_DIR / "test_deposit_1.parquet", 1),read_file(TEST_DIR / "test_debitcard_1.parquet", 1),],"depth_2": [read_file(TEST_DIR / "test_credit_bureau_b_2.parquet", 2),]
}df_test = feature_eng(**data_store)
del data_store
gc.collect()
df_test = df_test.select([col for col in df_train.columns if col != "target"])
df_test, cat_cols = to_pandas(df_test)
df_test = reduce_mem_usage(df_test)
gc.collect()df_train['target']=0
df_test['target']=1df_train=pd.concat([df_train,df_test])
df_train=reduce_mem_usage(df_train)y = df_train["target"]
df_train= df_train.drop(columns=["target", "case_id", "WEEK_NUM"])joblib.dump((df_train,y,df_test),'data.pkl')
導入必要的庫:代碼開始部分導入了多個Python庫,包括用于數據處理的NumPy、Pandas、Polars,以及用于可視化的Seaborn、Matplotlib等。
設置警告過濾器:使用warnings.filterwarnings('ignore')
來忽略警告信息,這在處理大型數據集時很常見。
?定義數據路徑:設置ROOT
變量,指向包含輸入數據的目錄。
定義Pipeline類:這個類包含幾個靜態方法,用于設置數據類型、處理日期列和過濾列。
定義Aggregator類:這個類包含多個靜態方法,用于聚合數據,如計算最大值等。
定義數據讀取函數:read_file
和read_files
函數用于讀取Parquet格式的文件,并將它們轉換為Polars DataFrame。
特征工程:feature_eng
函數用于添加新特征,如決策月份和星期幾等。
轉換為Pandas DataFrame:to_pandas
函數用于將Polars DataFrame轉換為Pandas DataFrame,并優化內存使用。
內存優化:reduce_mem_usage
函數用于減少DataFrame的內存占用,通過將數據類型轉換為更小的類型。
讀取和處理訓練數據:代碼讀取訓練數據文件,應用特征工程,并進行內存優化。代碼通過分析缺失值的模式,決定哪些列是有用的,并據此過濾列。
基于相關性分組列:group_columns_by_correlation
函數用于基于列之間的相關性將它們分組。
讀取、處理和保存測試數據:類似地,讀取測試數據文件,應用特征工程,并進行內存優化。設置目標變量,并將訓練數據和測試數據合并。最后,使用joblib.dump
將處理后的訓練數據、測試數據和目標變量保存到一個文件中。