跳轉到

sklearn Pipeline 與資料前處理模板

Pipeline 是 sklearn 最值得養成習慣的工具:防 leakage、方便調參、可直接部署。本頁提供可直接複製的模板,從混合型態資料前處理到 GridSearch 整合再到儲存載入。


為什麼要用 Pipeline

三個理由,缺一不可:

  1. 防 leakage:Pipeline 確保 fit 只看訓練集,transform 再套用到驗證/測試集,不會因為「順序搞錯」讓測試資訊污染訓練。
  2. 部署一致性:訓練好的 Pipeline 是一個物件——deploy 時只需要 pipeline.predict(X_new),不需要手動重現前處理步驟。
  3. 調參乾淨GridSearchCV(pipeline, ...) 可以同時調前處理參數和模型超參數,每次 CV 折都完整執行前處理,指標可信。

基礎 Pipeline

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('clf', LogisticRegression(max_iter=1000, random_state=42))
])

pipe.fit(X_train, y_train)
pipe.score(X_test, y_test)         # accuracy
pipe.predict(X_test)               # 預測類別
pipe.predict_proba(X_test)[:, 1]   # 預測機率

Pipeline 的步驟除最後一步以外必須實作 transform;最後一步通常是模型。


ColumnTransformer:處理混合型態資料

實務資料幾乎都是混合型態(數值 + 類別),ColumnTransformer 讓不同欄位走不同前處理管道。

import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier

# --- 定義欄位類型 ---
num_cols = ['age', 'income', 'tenure']
cat_onehot_cols = ['city', 'product_type']          # 低基數類別 → OneHot
cat_ordinal_cols = ['education']                    # 有序類別 → Ordinal

# --- 各管道 ---
num_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler',  StandardScaler()),
])

cat_onehot_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])

cat_ordinal_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OrdinalEncoder(
        categories=[['High School', 'Bachelor', 'Master', 'PhD']],
        handle_unknown='use_encoded_value',
        unknown_value=-1
    )),
])

# --- 組合 ---
preprocessor = ColumnTransformer([
    ('num',         num_pipe,          num_cols),
    ('cat_onehot',  cat_onehot_pipe,   cat_onehot_cols),
    ('cat_ordinal', cat_ordinal_pipe,  cat_ordinal_cols),
], remainder='drop')  # 其餘欄位丟掉;用 'passthrough' 原樣保留

# --- 完整 Pipeline ---
full_pipe = Pipeline([
    ('preprocessor', preprocessor),
    ('clf', GradientBoostingClassifier(random_state=42)),
])

full_pipe.fit(X_train, y_train)

常用 Transformer 速查

Transformer 適用情境 重要參數
SimpleImputer 填補缺失值 strategy: mean/median/most_frequent/constant
StandardScaler 線性模型、SVM、KNN、NN
MinMaxScaler 需要 [0,1] 範圍 feature_range
RobustScaler 離群值多時 quantile_range
OneHotEncoder 低基數名義類別(< ~20) handle_unknown='ignore'(上線安全)
OrdinalEncoder 有序類別、樹模型 categories 明確指定順序
TargetEncoder 高基數類別(sklearn ≥ 1.3) smooth, 內建 CV 防 leakage
PowerTransformer 讓數值更接近常態分布 method='yeo-johnson'(支援負值)
PolynomialFeatures 加入交互項與多項式特徵 degree, interaction_only
SelectKBest / SelectFromModel 特徵選擇 放在 preprocessor 後、模型前

自訂 Transformer

當內建 transformer 不夠用時,繼承 BaseEstimator + TransformerMixin

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np

class LogTransformer(BaseEstimator, TransformerMixin):
    """對指定欄位做 log1p 轉換(支援 Pipeline fit/transform 介面)。"""

    def __init__(self, cols=None):
        self.cols = cols  # None = 全部欄位

    def fit(self, X, y=None):
        return self  # 無需 fit,直接回傳 self

    def transform(self, X):
        X = X.copy()
        cols = self.cols if self.cols else X.columns.tolist()
        X[cols] = np.log1p(X[cols])
        return X

放進 Pipeline:

pipe = Pipeline([
    ('log', LogTransformer(cols=['income', 'tenure'])),
    ('scaler', StandardScaler()),
    ('clf', LogisticRegression()),
])

FunctionTransformer:輕量的一次性轉換

不需要新 class 時,用 FunctionTransformer 包一個函式:

from sklearn.preprocessing import FunctionTransformer

log_transformer = FunctionTransformer(np.log1p, validate=True)

Pipeline + GridSearchCV

超參數名稱用 步驟名__參數名(雙底線):

from sklearn.model_selection import GridSearchCV, StratifiedKFold

param_grid = {
    'preprocessor__num__imputer__strategy': ['mean', 'median'],
    'clf__n_estimators': [100, 300],
    'clf__max_depth': [3, 5, None],
    'clf__learning_rate': [0.05, 0.1],
}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
grid_search = GridSearchCV(
    full_pipe,
    param_grid,
    cv=cv,
    scoring='roc_auc',
    n_jobs=-1,
    verbose=1,
)
grid_search.fit(X_train, y_train)

print("Best params:", grid_search.best_params_)
print("Best CV AUC:", grid_search.best_score_)
print("Test AUC:", roc_auc_score(y_test, grid_search.predict_proba(X_test)[:, 1]))

n_jobs=-1 使用所有 CPU 核心並行;verbose=1 顯示進度。

RandomizedSearchCV:超參數空間大時的替代

from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform

param_dist = {
    'clf__n_estimators': randint(100, 500),
    'clf__max_depth': [3, 5, 7, None],
    'clf__learning_rate': uniform(0.01, 0.2),
    'clf__subsample': uniform(0.6, 0.4),
}

rand_search = RandomizedSearchCV(
    full_pipe,
    param_dist,
    n_iter=50,
    cv=cv,
    scoring='roc_auc',
    n_jobs=-1,
    random_state=42,
)
rand_search.fit(X_train, y_train)

查看 Pipeline 結構與特徵名稱

# 顯示 Pipeline 架構(Jupyter 中有視覺化)
from sklearn import set_config
set_config(display='diagram')
full_pipe

# 取得 OneHotEncoder 後的欄位名稱
feature_names = full_pipe.named_steps['preprocessor'].get_feature_names_out()

# 取得各步驟
preprocessor = full_pipe.named_steps['preprocessor']
model = full_pipe.named_steps['clf']

儲存與載入

import joblib

# 儲存(包含前處理的所有 fit 狀態 + 模型權重)
joblib.dump(full_pipe, 'model_pipeline.joblib')

# 載入並預測(只需要一行)
loaded_pipe = joblib.load('model_pipeline.joblib')
predictions = loaded_pipe.predict(X_new)

⚠️ 用 joblib 而非 pickle——joblib 對 numpy array 有更好的壓縮效率。版本要對齊:載入時的 sklearn 版本應和儲存時相同或相容,否則可能拋 warning 甚至錯誤。


完整可複製模板

import pandas as pd
import numpy as np
import joblib
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import (
    train_test_split, StratifiedKFold, RandomizedSearchCV, cross_val_score
)
from sklearn.metrics import roc_auc_score, average_precision_score, classification_report
from scipy.stats import randint, uniform

# ── 1. 切分資料 ────────────────────────────────────────────
X_temp, X_test, y_temp, y_test = train_test_split(
    X, y, test_size=0.15, stratify=y, random_state=42
)
X_train, X_val, y_train, y_val = train_test_split(
    X_temp, y_temp, test_size=0.18, stratify=y_temp, random_state=42
)

# ── 2. 定義欄位 ────────────────────────────────────────────
num_cols = [c for c in X.columns if X[c].dtype in ['int64', 'float64']]
cat_cols  = [c for c in X.columns if X[c].dtype == 'object']

# ── 3. 建 Preprocessor ────────────────────────────────────
num_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler',  StandardScaler()),
])
cat_pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])
preprocessor = ColumnTransformer([
    ('num', num_pipe, num_cols),
    ('cat', cat_pipe, cat_cols),
])

# ── 4. 完整 Pipeline ──────────────────────────────────────
pipe = Pipeline([
    ('preprocessor', preprocessor),
    ('clf', GradientBoostingClassifier(random_state=42)),
])

# ── 5. 超參數搜索 ─────────────────────────────────────────
param_dist = {
    'clf__n_estimators': randint(100, 400),
    'clf__max_depth': [3, 5, 7],
    'clf__learning_rate': uniform(0.05, 0.15),
    'clf__subsample': uniform(0.7, 0.3),
}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
search = RandomizedSearchCV(pipe, param_dist, n_iter=40, cv=cv,
                             scoring='roc_auc', n_jobs=-1, random_state=42)
search.fit(X_train, y_train)
best_pipe = search.best_estimator_

# ── 6. 驗證集評估 ─────────────────────────────────────────
val_prob = best_pipe.predict_proba(X_val)[:, 1]
print(f"Val AUC-ROC : {roc_auc_score(y_val, val_prob):.4f}")
print(f"Val PR-AUC  : {average_precision_score(y_val, val_prob):.4f}")

# ── 7. 最終測試集評估(只碰一次)─────────────────────────
test_prob = best_pipe.predict_proba(X_test)[:, 1]
print(f"Test AUC-ROC: {roc_auc_score(y_test, test_prob):.4f}")
print(classification_report(y_test, best_pipe.predict(X_test)))

# ── 8. 儲存 ──────────────────────────────────────────────
joblib.dump(best_pipe, 'model_pipeline.joblib')

延伸閱讀(本站)

來源