""" feature_engineering.py ─────────────────────── Construction du dataset de features pour la modélisation prédictive. Ce module assemble trois familles de features : [A] Features comportementales client (depuis stocks/AUM) - Encours actuel et lags (1m, 3m, 6m) - Croissance de l'AUM sur différentes fenêtres - Concentration du portefeuille client (part du fonds dans son total) [B] Features de performance absolue (depuis weekly_perf) - Rendements 6Mo et 1Yr du fonds détenu - Percentile Morningstar (rang brut dans la catégorie) [C] Features de performance relative (depuis relative_performance.py) - Spread vs médiane des vrais peers - Rang dans le groupe de peers restreint - Momentum de rang, ratio d'outperformance - Dummies top/bottom quartile (lien avec la relation convexe de Sirri & Tufano) Variable cible : flux_net_proxy = ΔAum(t → t+1) → À remplacer par les flux transactionnels bruts dès que disponibles. Usage : from peers_loader import PeersLoader from relative_performance import RelativePerformanceCalculator from feature_engineering import FeatureBuilder loader = PeersLoader("peers/").load() calc = RelativePerformanceCalculator(loader) builder = FeatureBuilder(loader, calc) dataset = builder.build( stocks_df = stocks, perf_df = weekly_perf, target_lag = 1 # prédire les flux à t+1 mois ) """ import numpy as np import pandas as pd from typing import Optional # ── Constantes ──────────────────────────────────────────────────────────────── # Périodes de performance à inclure comme features PERF_PERIODS_TO_USE = ["6MoRet", "1YrRet"] # Lags AUM (en mois) AUM_LAGS = [1, 3, 6] # Fenêtre de tolérance pour le merge_asof (en jours) MERGE_ASOF_TOLERANCE_DAYS = 35 # ── Classe principale ───────────────────────────────────────────────────────── class FeatureBuilder: """ Construit le dataset de modélisation en assemblant les trois familles de features. Paramètres ---------- loader : PeersLoader (déjà chargé) rel_calc : RelativePerformanceCalculator """ def __init__(self, loader, rel_calc): self.loader = loader self.rel_calc = rel_calc # ── Point d'entrée principal ────────────────────────────────────────────── def build(self, stocks_df: pd.DataFrame, perf_df: pd.DataFrame, target_lag: int = 1, perf_periods: list[str] | None = None, verbose: bool = True) -> pd.DataFrame: """ Construit le dataset final avec features et variable cible. Paramètres ---------- stocks_df : AUM mensuels (equity_stocks_full.csv ou similaire) perf_df : performances hebdomadaires (weekly_perf_full.csv) target_lag : horizon de prédiction en mois (1 = flux du mois suivant) perf_periods: périodes de perf à utiliser (défaut : PERF_PERIODS_TO_USE) Retourne -------- DataFrame avec une ligne par (compte × fonds × mois) contenant toutes les features et la variable cible. """ if perf_periods is None: perf_periods = [p for p in PERF_PERIODS_TO_USE if p in perf_df["perfPeriod"].unique()] if not perf_periods: # Fallback : utiliser toutes les périodes disponibles perf_periods = perf_df["perfPeriod"].unique().tolist() if verbose: print("── FeatureBuilder ──────────────────────────────────────") print(f"Périodes de performance utilisées : {perf_periods}") # Étape 1 : résolution ISIN dans perf via peers perf_df = self._resolve_perf_isin(perf_df) # Étape 2 : calcul des métriques relatives if verbose: print("\nCalcul des performances relatives...") rel_df = self.rel_calc.compute(perf_df, perf_periods=perf_periods, verbose=verbose) # Étape 3 : features client (AUM) if verbose: print("\nConstruction des features AUM...") stocks_feat = self._build_aum_features(stocks_df) # Étape 4 : features de performance absolue if verbose: print("\nJointure des performances absolues...") perf_abs = self._build_absolute_perf_features(perf_df, perf_periods) # Étape 5 : jointure AUM × perf absolue dataset = self._merge_aum_and_perf(stocks_feat, perf_abs, verbose) # Étape 6 : jointure avec les métriques relatives if not rel_df.empty: if verbose: print("\nJointure des métriques relatives...") dataset = self._merge_relative_perf(dataset, rel_df, verbose) # Étape 7 : variable cible if verbose: print(f"\nConstruction de la variable cible (lag = {target_lag} mois)...") dataset = self._build_target(dataset, lag=target_lag) # Étape 8 : nettoyage final dataset = self._final_cleanup(dataset, verbose) if verbose: self._print_dataset_summary(dataset) return dataset # ── Étape 1 : résolution ISIN dans perf ────────────────────────────────── def _resolve_perf_isin(self, perf_df: pd.DataFrame) -> pd.DataFrame: """Ajoute la colonne 'isin' dans perf_df via PeersLoader.""" perf_df = perf_df.copy() perf_df["Date"] = pd.to_datetime(perf_df["Date"]) perf_df["isin"] = perf_df["shareClass_name"].apply( self.loader.resolve_shareclass_name ) # Ajouter la stratégie Carmignac si disponible isin_to_strategy = dict( zip(self.loader.carmignac_df["ISIN"], self.loader.carmignac_df["carmignac_strategy"]) ) perf_df["carmignac_strategy"] = perf_df["isin"].map(isin_to_strategy) return perf_df # ── Étape 2 : features AUM ──────────────────────────────────────────────── def _build_aum_features(self, stocks_df: pd.DataFrame) -> pd.DataFrame: """ Construit les features comportementales depuis les snapshots AUM mensuels. Features produites (par compte × fonds × date) : aum_t : encours à t aum_lag{n} : encours à t-n mois (n ∈ AUM_LAGS) aum_growth_{n}m : croissance relative sur n mois aum_share_wallet : part du fonds dans le portefeuille total du compte """ df = stocks_df.copy() df["Centralisation Date"] = pd.to_datetime(df["Centralisation Date"]) # Tri pour les lags df = df.sort_values(["Registrar Account - ID", "Product - Isin", "Centralisation Date"]) grp = df.groupby(["Registrar Account - ID", "Product - Isin"]) # Lags AUM for lag in AUM_LAGS: df[f"aum_lag{lag}"] = grp["Value - AUM €"].shift(lag) # Croissances for lag in AUM_LAGS: df[f"aum_growth_{lag}m"] = ( (df["Value - AUM €"] - df[f"aum_lag{lag}"]) / (df[f"aum_lag{lag}"].abs() + 1.0) ) # Part dans le portefeuille total du compte (concentration) total_aum_by_account = ( df.groupby(["Registrar Account - ID", "Centralisation Date"])["Value - AUM €"] .transform("sum") ) df["aum_share_wallet"] = df["Value - AUM €"] / (total_aum_by_account + 1.0) # Renommage pour clarté df = df.rename(columns={"Value - AUM €": "aum_t"}) # Colonnes à conserver keep = ( ["Registrar Account - ID", "Product - Isin", "Centralisation Date", "Registrar Account - Region", "RegistrarAccount - Country", "Product - Asset Type", "Product - Strategy", "Product - Fund", "aum_t", "aum_share_wallet"] + [f"aum_lag{lag}" for lag in AUM_LAGS] + [f"aum_growth_{lag}m" for lag in AUM_LAGS] ) keep = [c for c in keep if c in df.columns] return df[keep] # ── Étape 3 : features de performance absolue ───────────────────────────── def _build_absolute_perf_features(self, perf_df: pd.DataFrame, perf_periods: list[str]) -> pd.DataFrame: """ Pivote weekly_perf pour obtenir une ligne par (isin, date) avec une colonne par (période × métrique). Colonnes produites : perf_return_6MoRet, perf_pct_1YrRet, etc. """ relevant = perf_df[perf_df["perfPeriod"].isin(perf_periods)].copy() if relevant.empty: return pd.DataFrame(columns=["isin", "Date"]) pivoted = relevant.pivot_table( index=["isin", "Date"], columns="perfPeriod", values=["return", "percentile"], aggfunc="mean" ) # Aplatir les colonnes multi-index pivoted.columns = [ f"perf_{metric}_{period}" for metric, period in pivoted.columns ] return pivoted.reset_index() # ── Étape 4 : jointure AUM × perf absolue ──────────────────────────────── def _merge_aum_and_perf(self, stocks_feat: pd.DataFrame, perf_abs: pd.DataFrame, verbose: bool) -> pd.DataFrame: """ merge_asof temporel : pour chaque snapshot mensuel AUM, trouve la performance hebdomadaire la plus récente ≤ date snapshot. """ if perf_abs.empty: if verbose: print(" ⚠ Aucune performance absolue à joindre.") return stocks_feat merged_parts = [] for isin_val in stocks_feat["Product - Isin"].unique(): s = stocks_feat[stocks_feat["Product - Isin"] == isin_val].sort_values( "Centralisation Date") p = perf_abs[perf_abs["isin"] == isin_val].sort_values("Date") if p.empty: merged_parts.append(s) continue m = pd.merge_asof( s, p, left_on="Centralisation Date", right_on="Date", direction="backward", tolerance=pd.Timedelta(f"{MERGE_ASOF_TOLERANCE_DAYS}d") ) merged_parts.append(m) result = pd.concat(merged_parts, ignore_index=True) perf_cols_joined = [c for c in result.columns if c.startswith("perf_")] if verbose: n_matched = result[perf_cols_joined[0]].notna().sum() if perf_cols_joined else 0 print(f" {n_matched}/{len(result)} lignes avec performance jointe " f"({len(perf_cols_joined)} colonnes)") return result # ── Étape 5 : jointure métriques relatives ──────────────────────────────── def _merge_relative_perf(self, dataset: pd.DataFrame, rel_df: pd.DataFrame, verbose: bool) -> pd.DataFrame: """ Joint les métriques relatives sur (carmignac_strategy, date). Stratégie de jointure : - La stratégie Carmignac d'un compte est déduite depuis Product - Strategy (nom court) ou ISIN via peers_loader. - merge_asof temporel avec tolérance ±35j. """ # Récupérer la stratégie Carmignac depuis l'ISIN isin_to_strategy = dict( zip(self.loader.carmignac_df["ISIN"], self.loader.carmignac_df["carmignac_strategy"]) ) dataset["carmignac_strategy"] = dataset["Product - Isin"].map(isin_to_strategy) # Agréger rel_df sur toutes périodes (moyenne par stratégie × date) rel_cols = [c for c in rel_df.columns if c.startswith("rel_")] if not rel_cols: return dataset rel_agg = (rel_df .groupby(["carmignac_strategy", "Date"])[rel_cols] .mean() .reset_index() .sort_values(["carmignac_strategy", "Date"])) merged_parts = [] for strat in dataset["carmignac_strategy"].dropna().unique(): d_strat = dataset[dataset["carmignac_strategy"] == strat].sort_values( "Centralisation Date") r_strat = rel_agg[rel_agg["carmignac_strategy"] == strat].sort_values("Date") if r_strat.empty: merged_parts.append(d_strat) continue m = pd.merge_asof( d_strat, r_strat, left_on="Centralisation Date", right_on="Date", direction="backward", tolerance=pd.Timedelta(f"{MERGE_ASOF_TOLERANCE_DAYS}d") ) merged_parts.append(m) # Ajouter les comptes sans stratégie identifiée no_strat = dataset[dataset["carmignac_strategy"].isna()] if not no_strat.empty: merged_parts.append(no_strat) result = pd.concat(merged_parts, ignore_index=True) if verbose: n_rel = result[rel_cols[0]].notna().sum() if rel_cols else 0 print(f" {n_rel}/{len(result)} lignes avec métriques relatives jointes " f"({len(rel_cols)} colonnes)") return result # ── Étape 6 : variable cible ────────────────────────────────────────────── def _build_target(self, dataset: pd.DataFrame, lag: int) -> pd.DataFrame: """ Construit la variable cible : ΔAum(t → t+lag). flux_net_proxy = aum(t+lag) - aum(t) Note : Dans un contexte de production, remplacer par : flux_net = sum(souscriptions[t:t+lag]) - sum(rachats[t:t+lag]) depuis le fichier de transactions quotidiennes. """ dataset = dataset.sort_values( ["Registrar Account - ID", "Product - Isin", "Centralisation Date"] ) grp = dataset.groupby(["Registrar Account - ID", "Product - Isin"]) dataset["aum_next"] = grp["aum_t"].shift(-lag) dataset["flux_net_proxy"] = dataset["aum_next"] - dataset["aum_t"] # Feature supplémentaire : flux relatif (normalisé par l'AUM) dataset["flux_net_relative"] = ( dataset["flux_net_proxy"] / (dataset["aum_t"].abs() + 1.0) ) return dataset # ── Étape 7 : nettoyage final ───────────────────────────────────────────── def _final_cleanup(self, dataset: pd.DataFrame, verbose: bool) -> pd.DataFrame: """ Supprime les doublons de colonnes, retire les lignes sans cible, log les taux de remplissage. """ # Supprimer les colonnes en double (artefacts du merge) dataset = dataset.loc[:, ~dataset.columns.duplicated()] # Retirer les lignes sans variable cible n_before = len(dataset) dataset = dataset.dropna(subset=["flux_net_proxy"]) n_after = len(dataset) if verbose and n_before > n_after: print(f" Lignes supprimées (cible manquante) : {n_before - n_after}") return dataset.reset_index(drop=True) # ── Résumé ──────────────────────────────────────────────────────────────── def _print_dataset_summary(self, dataset: pd.DataFrame): feature_cols = self.get_feature_columns(dataset) print("\n── Dataset final ───────────────────────────────────────") print(f"Lignes : {len(dataset):,}") print(f"Colonnes totales : {len(dataset.columns)}") print(f"Features : {len(feature_cols)}") print(f"\nTaux de remplissage des features :") families = { "AUM": [c for c in feature_cols if c.startswith("aum_")], "Perf abs": [c for c in feature_cols if c.startswith("perf_")], "Perf rel": [c for c in feature_cols if c.startswith("rel_")], } for family, cols in families.items(): if cols: fill_rates = dataset[cols].notna().mean() avg_fill = fill_rates.mean() print(f" {family:<12} ({len(cols):2d} cols) : {avg_fill:.1%} en moyenne") print(f"\nVariable cible 'flux_net_proxy' :") t = dataset["flux_net_proxy"] print(f" Médiane : {t.median():+,.0f} €") print(f" Std : {t.std():,.0f} €") print(f" % positif (souscription nette) : {(t > 0).mean():.1%}") print(f" % négatif (rachat net) : {(t < 0).mean():.1%}") print("─────────────────────────────────────────────────────────") # ── API publique ────────────────────────────────────────────────────────── @staticmethod def get_feature_columns(dataset: pd.DataFrame) -> list[str]: """Retourne la liste des colonnes de features (exclut les métadonnées et la cible).""" exclude = { "Registrar Account - ID", "Product - Isin", "Centralisation Date", "Product - Fund", "Product - Strategy", "Product - Asset Type", "Registrar Account - Region", "RegistrarAccount - Country", "carmignac_strategy", "isin", "Date", "aum_next", "flux_net_proxy", "flux_net_relative", "shareClass_name", } return [c for c in dataset.columns if c not in exclude and dataset[c].dtype in [np.float64, np.int64, float, int]] @staticmethod def get_target_column() -> str: return "flux_net_proxy" # ── Usage autonome ──────────────────────────────────────────────────────────── if __name__ == "__main__": import sys from peers_loader import PeersLoader from relative_performance import RelativePerformanceCalculator peers_dir = sys.argv[1] if len(sys.argv) > 1 else "." stocks_path = sys.argv[2] if len(sys.argv) > 2 else "equity_stocks_head.csv" perf_path = sys.argv[3] if len(sys.argv) > 3 else "weekly_perf_head.csv" loader = PeersLoader(peers_dir=peers_dir).load() calc = RelativePerformanceCalculator(loader) builder = FeatureBuilder(loader, calc) stocks_df = pd.read_csv(stocks_path) perf_df = pd.read_csv(perf_path) dataset = builder.build(stocks_df, perf_df) print("\nAperçu du dataset :") feature_cols = FeatureBuilder.get_feature_columns(dataset) print(dataset[feature_cols].describe().round(3).to_string()) dataset.to_csv("dataset_features.csv", index=False) print("\nDataset sauvegardé dans dataset_features.csv")