diff --git a/repair_challenge/carmignac_repair.py b/repair_challenge/carmignac_repair.py new file mode 100644 index 0000000..873ef47 --- /dev/null +++ b/repair_challenge/carmignac_repair.py @@ -0,0 +1,682 @@ +""" +Carmignac Data Challenge — Registrar ID Repair Pipeline +========================================================= +Étape 1 : Filtrage & univers de référence à t=31/10/2025 +Étape 2 : Score de cohérence temporelle (propagation vers le passé) +Étape 3 : Chirurgie de code (matching 1-to-1) +""" + +import pandas as pd +import numpy as np +from collections import defaultdict +import os +import s3fs + +# ───────────────────────────────────────────── +# PARAMÈTRES +# ───────────────────────────────────────────── +ALPHA = 0.01 # tolérance réconciliation : 1% du stock à t +MIN_AUM_EUR = 5e6 # seuil filtrage étape 1 +MIN_JACCARD = 0.3 # seuil minimal similarité portefeuille pour chirurgie +SCORE_DROP_THRESHOLD = 0.5 # si score chute de >50% → candidat chirurgie + +EXCLUDE_REGISTRAR = ["Off Distribution", "Private Clients"] + +# ───────────────────────────────────────────── +# 1. CHARGEMENT +# ───────────────────────────────────────────── +def connect_s3fs(): + fs = s3fs.S3FileSystem( + client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'}, + key = os.environ["AWS_ACCESS_KEY_ID"], + secret = os.environ["AWS_SECRET_ACCESS_KEY"], + token = os.environ["AWS_SESSION_TOKEN"]) + return fs + +def load_data(aum_path, flows_path): + fs = connect_s3fs() + + with fs.open(aum_path, 'rb') as aum_raw: + aum = pd.read_csv(aum_raw, sep=";") + + with fs.open(flows_path, 'rb') as flows_raw: + flows = pd.read_csv(flows_raw, sep=";") + + aum['Centralisation Date'] = pd.to_datetime(aum['Centralisation Date']) + flows['Centralisation Date'] = pd.to_datetime(flows['Centralisation Date']) + + aum = aum.rename(columns={ + 'Registrar Account - ID': 'reg_id', + 'Product - Isin': 'isin', + 'Centralisation Date': 'date', + 'Quantity - AUM': 'qty_aum', + 'Value - AUM €': 'val_eur', + 'Registrar Account - Region': 'region', + }) + flows = flows.rename(columns={ + 'Registrar Account - ID': 'reg_id', + 'Product - Isin': 'isin', + 'Centralisation Date': 'date', + 'Quantity - NetFlows': 'qty_net', + 'Value € - NetFlows': 'val_net_eur', + }) + + aum['reg_id'] = aum['reg_id'].astype(str) + flows['reg_id'] = flows['reg_id'].astype(str) + + return aum, flows + +# ───────────────────────────────────────────── +# 2. ÉTAPE 1 — Univers de référence à T_REF +# ───────────────────────────────────────────── +def build_reference_universe(aum, t_ref=None): + """ + Construit l'univers de référence à t_ref (dernière date par défaut). + Retourne : + - aum_ref : AUM à t_ref pour chaque (reg_id, isin) + - weights : poids normalisé par reg_id + - universe : ensemble des reg_id retenus (>= MIN_AUM_EUR) + """ + if t_ref is None: + t_ref = aum['date'].max() + + print(f"\n[Étape 1] Date de référence : {t_ref.date()}") + + # Exclure Off Distribution / Private Clients (sur région ou nom) + mask_excl = aum['reg_id'].isin(EXCLUDE_REGISTRAR) + if 'region' in aum.columns: + mask_excl |= aum['region'].isin(EXCLUDE_REGISTRAR) + aum_clean = aum[~mask_excl].copy() + + # AUM à t_ref + aum_ref = aum_clean[aum_clean['date'] == t_ref][['reg_id', 'isin', 'qty_aum', 'val_eur']].copy() + + # AUM total par reg_id à t_ref + aum_by_reg = aum_ref.groupby('reg_id')['val_eur'].sum().rename('total_eur') + + # Filtrage >= MIN_AUM_EUR + universe = set(aum_by_reg[aum_by_reg >= MIN_AUM_EUR].index) + + total_eur_universe = aum_by_reg[aum_by_reg.index.isin(universe)].sum() + total_eur_all = aum_by_reg.sum() + coverage = total_eur_universe / total_eur_all if total_eur_all > 0 else 0 + + print(f" Registrar IDs à t_ref : {len(aum_by_reg)}") + print(f" Dont >= {MIN_AUM_EUR/1e6:.0f}M€ : {len(universe)}") + print(f" Couverture encours : {coverage:.1%}") + + # Poids initiaux (scores à t_ref) + weights = (aum_by_reg[aum_by_reg.index.isin(universe)] / total_eur_universe).to_dict() + + return aum_ref, weights, universe, t_ref + +# ───────────────────────────────────────────── +# 3. PANEL AUM MENSUEL (forward-fill) +# ───────────────────────────────────────────── +def build_monthly_panel(aum, universe, t_ref): + """ + Construit un panel mensuel complet (forward-fill des quantités AUM) + pour TOUS les reg_ids présents dans l'historique AUM — y compris les codes + historiques hors univers de référence, nécessaires pour la chirurgie. + """ + # Toutes les fin de mois entre la première date et t_ref + date_min = aum['date'].min() + all_months = pd.date_range(start=date_min, end=t_ref, freq='ME') + + # Pivot : (reg_id, isin) → série temporelle de qty_aum + aum_sorted = aum.sort_values(['reg_id', 'isin', 'date']) + + # On ne garde que les lignes jusqu'à t_ref + aum_sorted = aum_sorted[aum_sorted['date'] <= t_ref] + + # Multi-index pivot + panel = aum_sorted.pivot_table( + index='date', columns=['reg_id', 'isin'], values='qty_aum', aggfunc='last' + ) + + # Réindexer sur toutes les fins de mois + panel = panel.reindex(all_months) + + # Forward-fill : si pas de mouvement, la quantité reste la même + panel = panel.ffill() + + # Backward-fill initial pour les comptes qui démarrent après la première date + # (on ne remonte pas avant leur première apparition → on garde NaN) + + print(f"\n[Panel mensuel] {len(all_months)} mois, {panel.shape[1]} (reg_id, isin) paires") + + return panel, all_months + +# ───────────────────────────────────────────── +# 4. FLOWS AGRÉGÉS PAR MOIS +# ───────────────────────────────────────────── +def aggregate_flows_monthly(flows, all_months): + """ + Agrège les flows infra-mensuels sur chaque fenêtre ]fin_mois(t-1), fin_mois(t)]. + Retourne un DataFrame indexé par (fin_mois, reg_id, isin). + """ + flows_f = flows[flows['date'] <= all_months[-1]].copy() + + # Associer chaque transaction à la fin de mois correspondante + # = la première fin de mois >= date de transaction + flows_f['month_end'] = flows_f['date'].apply( + lambda d: all_months[all_months >= d][0] if any(all_months >= d) else pd.NaT + ) + flows_f = flows_f.dropna(subset=['month_end']) + + # Agrégation + monthly_flows = flows_f.groupby(['month_end', 'reg_id', 'isin'])['qty_net'].sum() + monthly_flows = monthly_flows.reset_index() + monthly_flows.columns = ['date', 'reg_id', 'isin', 'qty_net_month'] + + print(f"\n[Flows mensuels] {len(monthly_flows)} enregistrements (reg_id, isin, mois)") + + return monthly_flows + +# ───────────────────────────────────────────── +# 5. ÉTAPE 2 — Score de cohérence temporelle +# ───────────────────────────────────────────── +def compute_reconciliation_error(qty_t_minus1, qty_t, net_flow, alpha=ALPHA): + """ + Calcule l'erreur de réconciliation normalisée pour un (reg_id, isin, mois). + + Attendu : qty_t_minus1 + net_flow ≈ qty_t + Erreur : |qty_t_minus1 + net_flow - qty_t| / max(|qty_t|, |qty_t_minus1|) + + Retourne : + - err_ratio : erreur relative (0 = parfait) + - is_break : True si err_ratio > alpha + """ + denom = max(abs(qty_t), abs(qty_t_minus1), 1e-9) + err = abs(qty_t_minus1 + net_flow - qty_t) + err_ratio = err / denom + return err_ratio, err_ratio > alpha + +def score_propagation(panel, monthly_flows, weights, universe, all_months): + """ + Propage les scores de t_ref vers t=0 (passé). + + À chaque mois t (en remontant), pour chaque reg_id dans l'univers courant : + - Calculer l'erreur de réconciliation pondérée par ISIN + - Dégrader le score proportionnellement + + Retourne : + - scores_history : dict {date → {reg_id → score}} + - errors_history : dict {date → {reg_id → err_pondérée}} + - mapping : dict {reg_id_original → reg_id_courant} (après chirurgie) + """ + # Initialisation + scores = dict(weights) # scores à t_ref + scores_history = {all_months[-1]: dict(scores)} + errors_history = {} + + # Mapping actuel (identité au départ) + mapping = {r: r for r in universe} + + # Flows indexés pour accès rapide + flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month'] + + # Remonter dans le temps + for i in range(len(all_months) - 2, -1, -1): + t_prev = all_months[i] + t_curr = all_months[i + 1] + + errors_at_t = {} + new_scores = {} + + for reg_orig, reg_curr in mapping.items(): + score_curr = scores.get(reg_orig, 0) + if score_curr == 0: + new_scores[reg_orig] = 0 + continue + + # ISIN détenus par ce reg à t_curr (après mapping) + if reg_curr in panel.columns.get_level_values(0): + isin_list = panel[reg_curr].columns.tolist() + else: + # reg_curr n'existe pas du tout dans le panel → rupture totale + new_scores[reg_orig] = 0 + errors_at_t[reg_orig] = 1.0 + continue + + total_aum_t = 0 + weighted_err = 0 + valid_isin_count = 0 + all_nan_at_prev = True # détecte si le compte n'existait pas à t_prev + + for isin in isin_list: + qty_t = panel[reg_curr][isin].get(t_curr, np.nan) + qty_t_prev = panel[reg_curr][isin].get(t_prev, np.nan) + + if pd.isna(qty_t): + continue + + if not pd.isna(qty_t_prev): + all_nan_at_prev = False + + if pd.isna(qty_t_prev): + # ISIN existait à t_curr mais pas à t_prev → rupture sur cet ISIN + # On le traite comme une erreur maximale pondérée par son AUM + weight_isin = abs(qty_t) + weighted_err += 1.0 * weight_isin + total_aum_t += weight_isin + valid_isin_count += 1 + continue + + if qty_t == 0 and qty_t_prev == 0: + continue + # Flow agrégé sur ]t_prev, t_curr] + try: + net_flow = flows_idx.loc[(t_curr, reg_curr, isin)] + except KeyError: + net_flow = 0.0 + + err_ratio, is_break = compute_reconciliation_error( + qty_t_prev, qty_t, net_flow, alpha=ALPHA + ) + + # Pondération par AUM à t_curr + weight_isin = abs(qty_t) + weighted_err += err_ratio * weight_isin + total_aum_t += weight_isin + valid_isin_count += 1 + + if total_aum_t > 0 and valid_isin_count > 0: + avg_err = weighted_err / total_aum_t + else: + avg_err = 0.0 + + errors_at_t[reg_orig] = avg_err + + # Dégradation du score : score(t-1) = score(t) * (1 - err_pondérée) + # Clippée entre 0 et score_curr + degradation = min(avg_err, 1.0) + new_scores[reg_orig] = score_curr * (1.0 - degradation) + + scores = new_scores + scores_history[t_prev] = dict(scores) + errors_history[t_prev] = dict(errors_at_t) + + total_score = sum(scores.values()) + print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | " + f"Comptes actifs = {sum(1 for v in scores.values() if v > 0)}") + + return scores_history, errors_history, mapping + +# ───────────────────────────────────────────── +# 6. ÉTAPE 3 — Chirurgie de code +# ───────────────────────────────────────────── +def jaccard_isin(set_a, set_b): + """Coefficient de Jaccard entre deux ensembles d'ISIN.""" + if not set_a or not set_b: + return 0.0 + inter = len(set_a & set_b) + union = len(set_a | set_b) + return inter / union if union > 0 else 0.0 + +def find_best_candidate(reg_orig, reg_curr, t_prev, t_curr, + panel, flows_idx, all_regs_at_t_prev, mapping_inv): + """ + Pour un reg_id dont le score a fortement chuté, cherche le meilleur + candidat j à t_prev tel que : + - j n'est pas déjà mappé à un autre compte original + - Le portefeuille ISIN de j à t_prev est similaire à celui de reg_curr à t_curr + - La réconciliation est bonne + + Retourne (best_candidate, best_score_composite) ou (None, 0) + """ + # ISIN du compte cible à t_curr + if reg_curr not in panel.columns.get_level_values(0): + return None, 0.0 + + isin_curr = set(panel[reg_curr].columns[ + panel[reg_curr].loc[t_curr].notna() & (panel[reg_curr].loc[t_curr] != 0) + ].tolist()) + + if not isin_curr: + return None, 0.0 + + best_candidate = None + best_composite = 0.0 + + for j in all_regs_at_t_prev: + # Ne pas réutiliser un code déjà mappé + if j in mapping_inv: + continue + # Ne pas mapper sur soi-même si déjà présent + if j == reg_curr: + continue + + if j not in panel.columns.get_level_values(0): + continue + + # ISIN de j à t_prev + col_j = panel[j] + isin_j = set(col_j.columns[ + col_j.loc[t_prev].notna() & (col_j.loc[t_prev] != 0) + ].tolist()) if t_prev in col_j.index else set() + + if not isin_j: + continue + + jac = jaccard_isin(isin_curr, isin_j) + if jac < MIN_JACCARD: + continue + + # Erreur de réconciliation pour les ISIN communs + common_isin = isin_curr & isin_j + total_aum = 0 + weighted_err = 0 + + for isin in common_isin: + qty_t = panel[reg_curr][isin].get(t_curr, np.nan) if isin in panel[reg_curr].columns else np.nan + qty_t_prev = panel[j][isin].get(t_prev, np.nan) if isin in panel[j].columns else np.nan + + if pd.isna(qty_t) or pd.isna(qty_t_prev): + continue + + try: + net_flow = flows_idx.loc[(t_curr, j, isin)] + except KeyError: + net_flow = 0.0 + + err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow) + weight_isin = abs(qty_t) + weighted_err += err_ratio * weight_isin + total_aum += weight_isin + + avg_err = weighted_err / total_aum if total_aum > 0 else 1.0 + + composite = jac * (1.0 - min(avg_err, 1.0)) + + if composite > best_composite: + best_composite = composite + best_candidate = j + + return best_candidate, best_composite + + +def _recompute_score_with_candidate(reg_orig, candidate, t_prev, t_curr, + panel, flows_idx, score_curr): + """ + Recalcule proprement l'erreur de réconciliation pour un candidat donné, + et retourne le score après chirurgie. + """ + if candidate not in panel.columns.get_level_values(0): + return score_curr * 0 # candidat inexistant + + isin_list_cand = panel[candidate].columns.tolist() + isin_list_curr = panel[reg_orig].columns.tolist() if reg_orig in panel.columns.get_level_values(0) else [] + + total_aum = 0 + weighted_err = 0 + + for isin in isin_list_curr: + qty_t = panel[reg_orig][isin].get(t_curr, np.nan) if isin in panel[reg_orig].columns else np.nan + if pd.isna(qty_t) or qty_t == 0: + continue + + qty_t_prev = panel[candidate][isin].get(t_prev, np.nan) if isin in panel[candidate].columns else np.nan + + try: + net_flow = flows_idx.loc[(t_curr, candidate, isin)] + except KeyError: + net_flow = 0.0 + + if pd.isna(qty_t_prev): + err_ratio = 1.0 + else: + err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow) + + weight_isin = abs(qty_t) + weighted_err += err_ratio * weight_isin + total_aum += weight_isin + + avg_err = weighted_err / total_aum if total_aum > 0 else 1.0 + return score_curr * (1.0 - min(avg_err, 1.0)) + + +def run_surgery_pass(scores_history, errors_history, panel, monthly_flows, + weights, universe, all_months): + """ + Deuxième passe : pour chaque mois avec des ruptures fortes, + tente une chirurgie de code et recalcule les scores. + + Corrections par rapport à la passe naïve : + - Après chirurgie, le score est recalculé proprement (pas juste composite) + - Le mapping propagé en arrière utilise le bon code à chaque étape + - Pré-filtre ISIN pour performance sur grand dataset + + Retourne : + - mapping_history : {date → {reg_orig → reg_used}} + - surgery_log : liste des opérations effectuées + - scores_final : scores au dernier mois + """ + flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month'] + + # Tous les reg_ids présents dans le panel (univers + codes historiques) + all_regs_in_panel = set(panel.columns.get_level_values(0)) + + # Pré-calcul : ensemble d'ISIN par reg_id à chaque date (pour pré-filtre rapide) + # {reg_id → {date → set(isin)}} + reg_isin_at_date = {} + for reg in all_regs_in_panel: + reg_isin_at_date[reg] = {} + col = panel[reg] + for date in col.index: + active = set(col.columns[(col.loc[date].notna()) & (col.loc[date] != 0)].tolist()) + if active: + reg_isin_at_date[reg][date] = active + + # Mapping courant : reg_orig → reg_used + mapping = {r: r for r in universe} + mapping_inv = {r: r for r in universe} + + surgery_log = [] + mapping_history = {all_months[-1]: dict(mapping)} + scores_history_corrected = {all_months[-1]: dict(weights)} + + # Scores courants (initialisés à t_ref) + scores = dict(weights) + + for i in range(len(all_months) - 2, -1, -1): + t_prev = all_months[i] + t_curr = all_months[i + 1] + + new_scores = {} + new_mapping = {} + + for reg_orig in list(mapping.keys()): + reg_curr = mapping[reg_orig] + score_curr = scores.get(reg_orig, 0) + + if score_curr == 0: + new_scores[reg_orig] = 0 + new_mapping[reg_orig] = reg_curr + continue + + # Erreur sans chirurgie (depuis étape 2) + err = errors_history.get(t_prev, {}).get(reg_orig, 0.0) + score_prev_no_surgery = score_curr * (1.0 - min(err, 1.0)) + drop_ratio = 1.0 - (score_prev_no_surgery / score_curr) if score_curr > 0 else 0 + + if drop_ratio > SCORE_DROP_THRESHOLD: + # ── ISIN du compte courant à t_curr (pour pré-filtre) ── + isin_curr = reg_isin_at_date.get(reg_curr, {}).get(t_curr, set()) + + # ── Candidats disponibles (non déjà mappés) ── + available = all_regs_in_panel - set(mapping_inv.keys()) - {reg_curr} + + best_candidate = None + best_score_after = score_prev_no_surgery # baseline = pas de chirurgie + best_composite = 0.0 + + for j in available: + # Pré-filtre rapide : overlap ISIN minimal + isin_j = reg_isin_at_date.get(j, {}).get(t_prev, set()) + if not isin_curr or not isin_j: + continue + inter = len(isin_curr & isin_j) + if inter == 0: + continue + jac = inter / len(isin_curr | isin_j) + if jac < MIN_JACCARD: + continue + + # Score après chirurgie avec ce candidat + score_after = _recompute_score_with_candidate( + reg_curr, j, t_prev, t_curr, panel, flows_idx, score_curr + ) + composite = jac * (score_after / score_curr) if score_curr > 0 else 0 + + if score_after > best_score_after: + best_score_after = score_after + best_candidate = j + best_composite = composite + + if best_candidate: + surgery_log.append({ + 'date': t_prev, + 'reg_orig': reg_orig, + 'reg_from': reg_curr, + 'reg_to': best_candidate, + 'jaccard_composite': round(best_composite, 4), + 'score_before': round(score_curr, 6), + 'score_after': round(best_score_after, 6), + 'drop_without_surgery': round(drop_ratio, 4), + 'gain_vs_no_surgery': round(best_score_after - score_prev_no_surgery, 6), + }) + print(f" 🔧 CHIRURGIE {t_prev.date()} | {reg_orig} : " + f"{reg_curr} → {best_candidate} " + f"(composite={best_composite:.3f}, " + f"score {score_curr:.4f}→{best_score_after:.4f})") + + # Mise à jour mapping + if reg_curr in mapping_inv: + del mapping_inv[reg_curr] + mapping_inv[best_candidate] = reg_orig + new_mapping[reg_orig] = best_candidate + new_scores[reg_orig] = best_score_after + else: + new_mapping[reg_orig] = reg_curr + new_scores[reg_orig] = score_prev_no_surgery + else: + new_mapping[reg_orig] = reg_curr + new_scores[reg_orig] = score_prev_no_surgery + + mapping = new_mapping + mapping_inv = {v: k for k, v in mapping.items()} + scores = new_scores + mapping_history[t_prev] = dict(mapping) + scores_history_corrected[t_prev] = dict(scores) + + total_score = sum(s for s in scores.values() if not np.isnan(s)) + n_surgeries = sum(1 for op in surgery_log if op['date'] == t_prev) + print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | " + f"Chirurgies = {n_surgeries}") + + return mapping_history, surgery_log, scores, scores_history_corrected + +# ───────────────────────────────────────────── +# 7. EXPORT RÉSULTATS +# ───────────────────────────────────────────── +def export_results(scores_history, mapping_history, surgery_log, all_months, out_prefix="carmignac"): + """Exporte les résultats clés en CSV.""" + + # Score history + rows = [] + for date, sc in scores_history.items(): + for reg, score in sc.items(): + rows.append({'date': date, 'reg_id': reg, 'score': score}) + df_scores = pd.DataFrame(rows) if rows else pd.DataFrame(columns=['date', 'reg_id', 'score']) + if not df_scores.empty: + df_scores = df_scores.sort_values(['date', 'score'], ascending=[True, False]) + df_scores.to_csv(f"repair_results/{out_prefix}_scores.csv", index=False) + + # Mapping history + rows_m = [] + for date, mp in mapping_history.items(): + for reg_orig, reg_used in mp.items(): + rows_m.append({'date': date, 'reg_orig': reg_orig, 'reg_used': reg_used, + 'changed': reg_orig != reg_used}) + df_mapping = pd.DataFrame(rows_m) if rows_m else pd.DataFrame(columns=['date', 'reg_orig', 'reg_used', 'changed']) + if not df_mapping.empty: + df_mapping = df_mapping.sort_values(['date', 'reg_orig']) + df_mapping.to_csv(f"repair_results/{out_prefix}_mapping.csv", index=False) + + # Surgery log + if surgery_log: + df_surgery = pd.DataFrame(surgery_log).sort_values('date') + df_surgery.to_csv(f"repair_results/{out_prefix}_surgery_log.csv", index=False) + print(f"\n[Export] {len(surgery_log)} opérations de chirurgie sauvegardées.") + else: + print("\n[Export] Aucune chirurgie effectuée sur ce subset.") + + print(f"[Export] Scores → {out_prefix}_scores.csv") + print(f"[Export] Mapping → {out_prefix}_mapping.csv") + + return df_scores, df_mapping + +# ───────────────────────────────────────────── +# 8. PIPELINE PRINCIPAL +# ───────────────────────────────────────────── +def run_pipeline(aum_path, flows_path): + print("=" * 60) + print("CARMIGNAC — Pipeline de réparation des Registrar IDs") + print("=" * 60) + + # Chargement + aum, flows = load_data(aum_path, flows_path) + + # Étape 1 — Univers de référence + aum_ref, weights, universe, t_ref = build_reference_universe(aum) + + print(f"\n Top 5 comptes par poids :") + for reg, w in sorted(weights.items(), key=lambda x: -x[1])[:5]: + print(f" {reg} : {w:.4f} ({w*100:.2f}%)") + + # Panel mensuel + panel, all_months = build_monthly_panel(aum, universe, t_ref) + + # Flows mensuels agrégés + monthly_flows = aggregate_flows_monthly(flows, all_months) + + # Étape 2 — Score de cohérence (sans chirurgie) + print("\n[Étape 2] Propagation des scores (sans chirurgie)...") + scores_history, errors_history, _ = score_propagation( + panel, monthly_flows, weights, universe, all_months + ) + + # Étape 3 — Chirurgie + print("\n[Étape 3] Passe de chirurgie...") + mapping_history, surgery_log, final_scores, scores_history_corrected = run_surgery_pass( + scores_history, errors_history, panel, monthly_flows, + weights, universe, all_months + ) + + # Export — on utilise les scores corrigés (post-chirurgie) comme référence + print("\n[Export des résultats...]") + df_scores, df_mapping = export_results( + scores_history_corrected, mapping_history, surgery_log, all_months + ) + + # Résumé final + print("\n" + "=" * 60) + print("RÉSUMÉ FINAL") + print("=" * 60) + print(f" Dates couvertes : {all_months[0].date()} → {all_months[-1].date()}") + print(f" Comptes dans l'univers : {len(universe)}") + print(f" Chirurgies effectuées : {len(surgery_log)}") + score_by_date = {d: sum(s for s in sc.values() if s == s) + for d, sc in scores_history_corrected.items()} + print(f" Σ scores à t_ref : {score_by_date[t_ref]:.4f}") + print(f" Σ scores à t_min : {score_by_date[all_months[0]]:.4f}") + + return df_scores, df_mapping, surgery_log, scores_history_corrected, mapping_history + + +if __name__ == "__main__": + df_scores, df_mapping, surgery_log, scores_history, mapping_history = run_pipeline( + "s3://projet-bdc-carmignac-g3/stock_repaired.csv", + "projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv" + )