Project_Carmignac/src/predictive_model.py

423 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
predictive_model.py
────────────────────
Modélisation prédictive des flux nets avec walk-forward validation.
Ce module est intentionnellement séparé du feature engineering :
il prend en entrée le dataset produit par FeatureBuilder et se concentre
sur l'entraînement, la validation et l'interprétation des modèles.
Modèles implémentés :
- Baseline : prédiction zéro (benchmark naïf)
- Ridge : régression linéaire régularisée (interprétable)
- RandomForest : non-linéaire, robuste aux outliers
- GradientBoosting : état de l'art sur données tabulaires
Validation : walk-forward expanding window (pas de data leakage).
Usage :
from feature_engineering import FeatureBuilder
from predictive_model import WalkForwardModel
feature_cols = FeatureBuilder.get_feature_columns(dataset)
model = WalkForwardModel()
results = model.fit_evaluate(dataset, feature_cols)
model.plot_results(results, output_path="results.png")
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.inspection import permutation_importance
import warnings
warnings.filterwarnings("ignore")
# ── Constantes ────────────────────────────────────────────────────────────────
COLORS = ["#1f4e79", "#2e75b6", "#70ad47", "#ed7d31", "#a50026"]
MODEL_CONFIGS = {
"Ridge": {
"cls": Ridge,
"kwargs": {"alpha": 1.0},
"scale": True, # nécessite standardisation
},
"Random Forest": {
"cls": RandomForestRegressor,
"kwargs": {"n_estimators": 200, "max_depth": 6,
"min_samples_leaf": 3, "random_state": 42, "n_jobs": -1},
"scale": False,
},
"Gradient Boosting": {
"cls": GradientBoostingRegressor,
"kwargs": {"n_estimators": 200, "max_depth": 4,
"learning_rate": 0.05, "subsample": 0.8,
"random_state": 42},
"scale": False,
},
}
# ── Classe principale ─────────────────────────────────────────────────────────
class WalkForwardModel:
"""
Entraîne et évalue plusieurs modèles via walk-forward validation.
Paramètres
----------
date_col : colonne de date dans le dataset (snapshots mensuels)
target_col : colonne de la variable cible
min_train_frac : fraction minimale de dates en train (défaut 0.4)
"""
def __init__(self,
date_col: str = "Centralisation Date",
target_col: str = "flux_net_proxy",
min_train_frac: float = 0.4):
self.date_col = date_col
self.target_col = target_col
self.min_train_frac = min_train_frac
# Stockage post-fit
self.results_df_ = pd.DataFrame()
self.importances_ = {}
self.final_models_ = {}
# ── Walk-forward ──────────────────────────────────────────────────────────
def fit_evaluate(self,
dataset: pd.DataFrame,
feature_cols: list[str],
verbose: bool = True) -> pd.DataFrame:
"""
Exécute le walk-forward expanding window sur tous les modèles.
Retourne un DataFrame avec les métriques par (modèle, date de test).
"""
dataset = dataset.copy()
dataset[self.date_col] = pd.to_datetime(dataset[self.date_col])
dates_sorted = sorted(dataset[self.date_col].unique())
n_dates = len(dates_sorted)
min_train = max(3, int(n_dates * self.min_train_frac))
if verbose:
print(f"Walk-forward : {n_dates} dates | min train = {min_train}")
if n_dates <= min_train:
print("⚠ Pas assez de dates pour le walk-forward. "
"Augmenter la taille du dataset.")
return pd.DataFrame()
records = []
for test_idx in range(min_train, n_dates):
train_dates = dates_sorted[:test_idx]
test_date = dates_sorted[test_idx]
train = dataset[dataset[self.date_col].isin(train_dates)]
test = dataset[dataset[self.date_col] == test_date]
X_train, y_train = self._prepare(train, feature_cols, fit=True)
X_test, y_test = self._prepare(test, feature_cols, fit=False)
if len(X_test) == 0 or y_test.std() == 0:
continue
# Baseline
records.append({
"test_date": test_date,
"model": "Baseline (zéro)",
"mae": mean_absolute_error(y_test, np.zeros(len(y_test))),
"r2": r2_score(y_test, np.zeros(len(y_test))),
"n_test": len(y_test),
"n_train": len(X_train),
})
for model_name, cfg in MODEL_CONFIGS.items():
model = cfg["cls"](**cfg["kwargs"])
scaler = StandardScaler() if cfg["scale"] else None
X_tr = scaler.fit_transform(X_train) if scaler else X_train
X_te = scaler.transform(X_test) if scaler else X_test
model.fit(X_tr, y_train)
preds = model.predict(X_te)
records.append({
"test_date": test_date,
"model": model_name,
"mae": mean_absolute_error(y_test, preds),
"r2": r2_score(y_test, preds),
"n_test": len(y_test),
"n_train": len(X_train),
})
self.results_df_ = pd.DataFrame(records)
if verbose:
self._print_results_summary()
# Entraîner les modèles finaux sur toutes les données
self._fit_final_models(dataset, feature_cols)
return self.results_df_
# ── Modèles finaux (pour importance des variables) ────────────────────────
def _fit_final_models(self, dataset: pd.DataFrame, feature_cols: list[str]):
"""Entraîne chaque modèle sur l'intégralité du dataset (pour l'interprétation)."""
X, y = self._prepare(dataset, feature_cols, fit=True)
if len(X) == 0:
return
for model_name, cfg in MODEL_CONFIGS.items():
model = cfg["cls"](**cfg["kwargs"])
scaler = StandardScaler() if cfg["scale"] else None
X_fit = scaler.fit_transform(X) if scaler else X
model.fit(X_fit, y)
self.final_models_[model_name] = (model, scaler, feature_cols)
# Importance des variables : Random Forest (Gini) + Permutation
rf_model, _, _ = self.final_models_.get("Random Forest", (None, None, None))
if rf_model is not None:
self.importances_["gini"] = pd.Series(
rf_model.feature_importances_, index=feature_cols
).sort_values(ascending=False)
perm = permutation_importance(
rf_model, X, y, n_repeats=10, random_state=42, n_jobs=-1
)
self.importances_["permutation"] = pd.Series(
perm.importances_mean, index=feature_cols
).sort_values(ascending=False)
# ── Prédiction ────────────────────────────────────────────────────────────
def predict(self, new_data: pd.DataFrame,
model_name: str = "Random Forest") -> np.ndarray:
"""Prédit les flux nets sur de nouvelles données."""
if model_name not in self.final_models_:
raise ValueError(f"Modèle '{model_name}' non disponible. "
f"Disponibles : {list(self.final_models_.keys())}")
model, scaler, feature_cols = self.final_models_[model_name]
X, _ = self._prepare(new_data, feature_cols, fit=False)
X_pred = scaler.transform(X) if scaler else X
return model.predict(X_pred)
# ── Visualisation ─────────────────────────────────────────────────────────
def plot_results(self, output_path: str = "model_results.png"):
"""Génère les graphiques de résultats du walk-forward."""
if self.results_df_.empty:
print("⚠ Aucun résultat à visualiser (walk-forward non exécuté).")
self._plot_schema(output_path)
return
fig = plt.figure(figsize=(16, 14))
fig.patch.set_facecolor("white")
gs = gridspec.GridSpec(3, 2, figure=fig, hspace=0.45, wspace=0.35)
# ── [A] MAE par modèle et date ────────────────────────────────────────
ax1 = fig.add_subplot(gs[0, :])
for i, (model_name, grp) in enumerate(self.results_df_.groupby("model")):
style = "--" if "Baseline" in model_name else "-"
ax1.plot(grp["test_date"], grp["mae"],
label=model_name, linewidth=1.8,
color=COLORS[i % len(COLORS)], linestyle=style)
ax1.set_title("Walk-Forward Validation — MAE par modèle", fontsize=13, fontweight="bold")
ax1.set_ylabel("MAE (€)")
ax1.legend(fontsize=9)
ax1.tick_params(axis="x", rotation=20)
# ── [B] R² par modèle ─────────────────────────────────────────────────
ax2 = fig.add_subplot(gs[1, 0])
for i, (model_name, grp) in enumerate(self.results_df_.groupby("model")):
if "Baseline" not in model_name:
ax2.plot(grp["test_date"], grp["r2"].clip(-1, 1),
label=model_name, linewidth=1.5,
color=COLORS[i % len(COLORS)])
ax2.axhline(0, color="black", linestyle="--", linewidth=1, alpha=0.5)
ax2.set_title("R² par modèle", fontsize=12, fontweight="bold")
ax2.set_ylabel("")
ax2.legend(fontsize=9)
ax2.tick_params(axis="x", rotation=20)
# ── [C] MAE agrégée (boîtes) ──────────────────────────────────────────
ax3 = fig.add_subplot(gs[1, 1])
model_names = self.results_df_["model"].unique().tolist()
mae_by_model = [
self.results_df_[self.results_df_["model"] == m]["mae"].dropna().values
for m in model_names
]
bp = ax3.boxplot(mae_by_model, labels=model_names, patch_artist=True,
medianprops=dict(color="black", linewidth=2))
for patch, color in zip(bp["boxes"], COLORS):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ax3.set_title("Distribution de MAE (tous folds)", fontsize=12, fontweight="bold")
ax3.set_ylabel("MAE (€)")
ax3.tick_params(axis="x", rotation=20)
# ── [D] Importance des variables (Gini) ───────────────────────────────
ax4 = fig.add_subplot(gs[2, 0])
if "gini" in self.importances_:
imp = self.importances_["gini"].head(15)
colors_imp = [
"#70ad47" if c.startswith("rel_") else
"#ed7d31" if c.startswith("perf_") else
"#1f4e79"
for c in imp.index
]
ax4.barh(imp.index[::-1], imp.values[::-1], color=colors_imp[::-1])
ax4.set_title("Importance (Gini) — Top 15 features", fontsize=12, fontweight="bold")
ax4.set_xlabel("Importance relative")
from matplotlib.patches import Patch
legend_elements = [
Patch(color="#70ad47", label="Perf relative (peers)"),
Patch(color="#ed7d31", label="Perf absolue"),
Patch(color="#1f4e79", label="Comportement AUM"),
]
ax4.legend(handles=legend_elements, fontsize=8, loc="lower right")
else:
ax4.axis("off")
ax4.text(0.5, 0.5, "Importance des variables\nnon disponible",
ha="center", va="center", fontsize=12)
# ── [E] Importance permutation ────────────────────────────────────────
ax5 = fig.add_subplot(gs[2, 1])
if "permutation" in self.importances_:
pimp = self.importances_["permutation"].head(15)
pimp = pimp[pimp > 0] # garder seulement les features utiles
colors_pimp = [
"#70ad47" if c.startswith("rel_") else
"#ed7d31" if c.startswith("perf_") else
"#1f4e79"
for c in pimp.index
]
ax5.barh(pimp.index[::-1], pimp.values[::-1], color=colors_pimp[::-1])
ax5.set_title("Permutation Importance — Top 15", fontsize=12, fontweight="bold")
ax5.set_xlabel("Δ MAE moyen (permutation)")
else:
ax5.axis("off")
plt.suptitle("Carmignac × ENSAE — Résultats du modèle prédictif",
fontsize=14, fontweight="bold", y=1.01)
plt.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white")
plt.close()
print(f"✅ Graphiques sauvegardés : {output_path}")
def _plot_schema(self, output_path: str):
"""Affiche un schéma du pipeline si les données sont insuffisantes."""
fig, ax = plt.subplots(figsize=(12, 6))
fig.patch.set_facecolor("white")
ax.axis("off")
ax.set_xlim(0, 10)
ax.set_ylim(0, 5)
schema = (
"WALK-FORWARD VALIDATION — SCHÉMA\n\n"
" t₁ t₂ t₃ t₄ t₅ t₆ t₇ ...\n"
" ─────────────────────────────────\n"
" TRAIN ████████ │TEST│\n"
" TRAIN ███████████│TEST│\n"
" TRAIN ██████████████│TEST│\n"
" ...\n\n"
"Principe :\n"
" → Expanding window : la fenêtre de train s'agrandit à chaque fold\n"
" → Test = 1 date future (mois suivant)\n"
" → Aucune information future dans le train → pas de data leakage\n\n"
"Métriques calculées à chaque fold :\n"
" → MAE (Mean Absolute Error) en € de flux\n"
" → R² (coefficient de détermination)\n\n"
"Relancer avec les données complètes pour obtenir les résultats réels."
)
ax.text(0.5, 4.8, schema, va="top", fontsize=11, fontfamily="monospace",
bbox=dict(boxstyle="round", facecolor="#eaf2fb", alpha=0.8))
ax.set_title("Modèle prédictif — En attente de données complètes",
fontsize=13, fontweight="bold")
plt.savefig(output_path, dpi=150, bbox_inches="tight", facecolor="white")
plt.close()
print(f"✅ Schéma sauvegardé : {output_path}")
# ── Helpers internes ──────────────────────────────────────────────────────
def _prepare(self, df: pd.DataFrame,
feature_cols: list[str],
fit: bool) -> tuple[np.ndarray, np.ndarray]:
"""Extrait X et y depuis le DataFrame, gère les NaN."""
available = [c for c in feature_cols if c in df.columns]
X = df[available].fillna(0).values
y = df[self.target_col].values if self.target_col in df.columns else np.array([])
return X, y
def _print_results_summary(self):
print("\n── Résultats walk-forward (médiane sur tous les folds) ──")
summary = (
self.results_df_
.groupby("model")
.agg(
MAE_median=("mae", "median"),
MAE_mean=("mae", "mean"),
R2_median=("r2", "median"),
n_folds=("mae", "count"),
)
.round(4)
.sort_values("MAE_median")
)
print(summary.to_string())
print("─────────────────────────────────────────────────────────")
# ── API publique ──────────────────────────────────────────────────────────
def get_best_model(self) -> str:
"""Retourne le nom du modèle avec la MAE médiane la plus faible."""
if self.results_df_.empty:
return "Random Forest"
summary = (self.results_df_
[self.results_df_["model"] != "Baseline (zéro)"]
.groupby("model")["mae"]
.median())
return summary.idxmin()
def get_top_features(self, n: int = 10,
method: str = "permutation") -> list[str]:
"""Retourne les n features les plus importantes."""
if method not in self.importances_:
method = list(self.importances_.keys())[0] if self.importances_ else None
if method is None:
return []
return self.importances_[method].head(n).index.tolist()
# ── Usage autonome ────────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
from feature_engineering import FeatureBuilder
dataset_path = sys.argv[1] if len(sys.argv) > 1 else "dataset_features.csv"
dataset = pd.read_csv(dataset_path)
feature_cols = FeatureBuilder.get_feature_columns(dataset)
print(f"Dataset : {dataset.shape} | {len(feature_cols)} features")
model = WalkForwardModel()
results = model.fit_evaluate(dataset, feature_cols)
model.plot_results("model_results.png")
if not results.empty:
print(f"\nMeilleur modèle : {model.get_best_model()}")
print(f"Top features : {model.get_top_features(5)}")