""" Carmignac Data Challenge — Registrar ID Repair Pipeline ========================================================= Étape 1 : Filtrage & univers de référence à t=31/10/2025 Étape 2 : Score de cohérence temporelle (propagation vers le passé) Étape 3 : Chirurgie de code (matching 1-to-1) """ import pandas as pd import numpy as np from collections import defaultdict import os import s3fs # ───────────────────────────────────────────── # PARAMÈTRES # ───────────────────────────────────────────── ALPHA = 0.03 # tolérance réconciliation : 3% du stock à t MIN_AUM_EUR = 5e6 # seuil filtrage étape 1 — 0 pour les heads de test MIN_JACCARD = 0.3 # seuil minimal similarité portefeuille pour chirurgie SCORE_DROP_THRESHOLD = 0.1 # si score chute de >10% → candidat chirurgie EXCLUDE_REGISTRAR = ["Off Distribution", "Private Clients"] # ───────────────────────────────────────────── # 1. CHARGEMENT # ───────────────────────────────────────────── def load_data(aum_path, flows_path): fs = s3fs.S3FileSystem( client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'}, key = os.environ["AWS_ACCESS_KEY_ID"], secret = os.environ["AWS_SECRET_ACCESS_KEY"], token = os.environ["AWS_SESSION_TOKEN"]) with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f: flows = pd.read_csv(f, sep=";") with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f: aum = pd.read_csv(f, sep=";") aum['Centralisation Date'] = pd.to_datetime(aum['Centralisation Date']) flows['Centralisation Date'] = pd.to_datetime(flows['Centralisation Date']) # Noms courts aum = aum.rename(columns={ 'Registrar Account - ID': 'reg_id', 'Product - Isin': 'isin', 'Centralisation Date': 'date', 'Quantity - AUM': 'qty_aum', 'Value - AUM €': 'val_eur', 'Registrar Account - Region': 'region', }) flows = flows.rename(columns={ 'Registrar Account - ID': 'reg_id', 'Product - Isin': 'isin', 'Centralisation Date': 'date', 'Quantity - NetFlows': 'qty_net', 'Value € - NetFlows': 'val_net_eur', }) aum['reg_id'] = aum['reg_id'].astype(str) flows['reg_id'] = flows['reg_id'].astype(str) return aum, flows # ───────────────────────────────────────────── # 2. ÉTAPE 1 — Univers de référence à T_REF # ───────────────────────────────────────────── def build_reference_universe(aum, t_ref=None): """ Construit l'univers de référence à t_ref (dernière date par défaut). Retourne : - aum_ref : AUM à t_ref pour chaque (reg_id, isin) - weights : poids normalisé par reg_id - universe : ensemble des reg_id retenus (>= MIN_AUM_EUR) """ if t_ref is None: t_ref = aum['date'].max() print(f"\n[Étape 1] Date de référence : {t_ref.date()}") # Exclure Off Distribution / Private Clients (sur région ou nom) mask_excl = aum['reg_id'].isin(EXCLUDE_REGISTRAR) if 'region' in aum.columns: mask_excl |= aum['region'].isin(EXCLUDE_REGISTRAR) aum_clean = aum[~mask_excl].copy() # AUM à t_ref aum_ref = aum_clean[aum_clean['date'] == t_ref][['reg_id', 'isin', 'qty_aum', 'val_eur']].copy() # AUM total par reg_id à t_ref aum_by_reg = aum_ref.groupby('reg_id')['val_eur'].sum().rename('total_eur') # Filtrage >= MIN_AUM_EUR universe = set(aum_by_reg[aum_by_reg >= MIN_AUM_EUR].index) total_eur_universe = aum_by_reg[aum_by_reg.index.isin(universe)].sum() total_eur_all = aum_by_reg.sum() coverage = total_eur_universe / total_eur_all if total_eur_all > 0 else 0 print(f" Registrar IDs à t_ref : {len(aum_by_reg)}") print(f" Dont >= {MIN_AUM_EUR/1e6:.0f}M€ : {len(universe)}") print(f" Couverture encours : {coverage:.1%}") # Poids initiaux (scores à t_ref) weights = (aum_by_reg[aum_by_reg.index.isin(universe)] / total_eur_universe).to_dict() return aum_ref, weights, universe, t_ref # ───────────────────────────────────────────── # 3. PANEL AUM MENSUEL (forward-fill) # ───────────────────────────────────────────── def build_monthly_panel(aum, universe, t_ref): """ Construit un panel mensuel complet (forward-fill des quantités AUM) pour TOUS les reg_ids présents dans l'historique AUM — y compris les codes historiques hors univers de référence, nécessaires pour la chirurgie. """ # Toutes les fin de mois entre la première date et t_ref date_min = aum['date'].min() all_months = pd.date_range(start=date_min, end=t_ref, freq='ME') # Pivot : (reg_id, isin) → série temporelle de qty_aum aum_sorted = aum.sort_values(['reg_id', 'isin', 'date']) # On ne garde que les lignes jusqu'à t_ref aum_sorted = aum_sorted[aum_sorted['date'] <= t_ref] # Multi-index pivot panel = aum_sorted.pivot_table( index='date', columns=['reg_id', 'isin'], values='qty_aum', aggfunc='last' ) # Réindexer sur toutes les fins de mois panel = panel.reindex(all_months) # Forward-fill : si pas de mouvement, la quantité reste la même panel = panel.ffill() # Backward-fill initial pour les comptes qui démarrent après la première date # (on ne remonte pas avant leur première apparition → on garde NaN) print(f"\n[Panel mensuel] {len(all_months)} mois, {panel.shape[1]} (reg_id, isin) paires") return panel, all_months # ───────────────────────────────────────────── # 4. FLOWS AGRÉGÉS PAR MOIS # ───────────────────────────────────────────── def aggregate_flows_monthly(flows, all_months): """ Agrège les flows infra-mensuels sur chaque fenêtre ]fin_mois(t-1), fin_mois(t)]. Retourne un DataFrame indexé par (fin_mois, reg_id, isin). """ flows_f = flows[flows['date'] <= all_months[-1]].copy() # Associer chaque transaction à la fin de mois correspondante # = la première fin de mois >= date de transaction flows_f['month_end'] = flows_f['date'].apply( lambda d: all_months[all_months >= d][0] if any(all_months >= d) else pd.NaT ) flows_f = flows_f.dropna(subset=['month_end']) # Agrégation monthly_flows = flows_f.groupby(['month_end', 'reg_id', 'isin'])['qty_net'].sum() monthly_flows = monthly_flows.reset_index() monthly_flows.columns = ['date', 'reg_id', 'isin', 'qty_net_month'] print(f"\n[Flows mensuels] {len(monthly_flows)} enregistrements (reg_id, isin, mois)") return monthly_flows # ───────────────────────────────────────────── # 5. ÉTAPE 2 — Score de cohérence temporelle # ───────────────────────────────────────────── def compute_reconciliation_error(qty_t_minus1, qty_t, net_flow, alpha=ALPHA): """ Calcule l'erreur de réconciliation normalisée pour un (reg_id, isin, mois). Attendu : qty_t_minus1 + net_flow ≈ qty_t Erreur : |qty_t_minus1 + net_flow - qty_t| / max(|qty_t|, |qty_t_minus1|) Retourne : - err_ratio : erreur relative (0 = parfait) - is_break : True si err_ratio > alpha """ denom = max(abs(qty_t), abs(qty_t_minus1), 1e-9) err = abs(qty_t_minus1 + net_flow - qty_t) err_ratio = err / denom return err_ratio, err_ratio > alpha def score_propagation(panel, monthly_flows, weights, universe, all_months): """ Propage les scores de t_ref vers t=0 (passé). À chaque mois t (en remontant), pour chaque reg_id dans l'univers courant : - Calculer l'erreur de réconciliation pondérée par ISIN - Dégrader le score proportionnellement Retourne : - scores_history : dict {date → {reg_id → score}} - errors_history : dict {date → {reg_id → err_pondérée}} - mapping : dict {reg_id_original → reg_id_courant} (après chirurgie) """ # Initialisation scores = dict(weights) # scores à t_ref scores_history = {all_months[-1]: dict(scores)} errors_history = {} # Mapping actuel (identité au départ) mapping = {r: r for r in universe} # Flows indexés pour accès rapide flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month'] # Remonter dans le temps for i in range(len(all_months) - 2, -1, -1): t_prev = all_months[i] t_curr = all_months[i + 1] errors_at_t = {} new_scores = {} for reg_orig, reg_curr in mapping.items(): score_curr = scores.get(reg_orig, 0) if score_curr == 0: new_scores[reg_orig] = 0 continue # ISIN détenus par ce reg à t_curr (après mapping) if reg_curr in panel.columns.get_level_values(0): isin_list = panel[reg_curr].columns.tolist() else: # reg_curr n'existe pas du tout dans le panel → rupture totale new_scores[reg_orig] = 0 errors_at_t[reg_orig] = 1.0 continue total_aum_t = 0 weighted_err = 0 valid_isin_count = 0 all_nan_at_prev = True # détecte si le compte n'existait pas à t_prev for isin in isin_list: qty_t = panel[reg_curr][isin].get(t_curr, np.nan) qty_t_prev = panel[reg_curr][isin].get(t_prev, np.nan) if pd.isna(qty_t): continue if not pd.isna(qty_t_prev): all_nan_at_prev = False if pd.isna(qty_t_prev): # ISIN existait à t_curr mais pas à t_prev → rupture sur cet ISIN # On le traite comme une erreur maximale pondérée par son AUM weight_isin = abs(qty_t) weighted_err += 1.0 * weight_isin total_aum_t += weight_isin valid_isin_count += 1 continue if qty_t == 0 and qty_t_prev == 0: continue # Flow agrégé sur ]t_prev, t_curr] try: net_flow = flows_idx.loc[(t_curr, reg_curr, isin)] except KeyError: net_flow = 0.0 err_ratio, is_break = compute_reconciliation_error( qty_t_prev, qty_t, net_flow, alpha=ALPHA ) # Pondération par AUM à t_curr weight_isin = abs(qty_t) weighted_err += err_ratio * weight_isin total_aum_t += weight_isin valid_isin_count += 1 if total_aum_t > 0 and valid_isin_count > 0: avg_err = weighted_err / total_aum_t else: avg_err = 0.0 errors_at_t[reg_orig] = avg_err # Dégradation du score : score(t-1) = score(t) * (1 - err_pondérée) # Clippée entre 0 et score_curr degradation = min(avg_err, 1.0) new_scores[reg_orig] = score_curr * (1.0 - degradation) scores = new_scores scores_history[t_prev] = dict(scores) errors_history[t_prev] = dict(errors_at_t) total_score = sum(scores.values()) print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | " f"Comptes actifs = {sum(1 for v in scores.values() if v > 0)}") return scores_history, errors_history, mapping # ───────────────────────────────────────────── # 6. ÉTAPE 3 — Chirurgie de code # ───────────────────────────────────────────── def jaccard_isin(set_a, set_b): """Coefficient de Jaccard entre deux ensembles d'ISIN.""" if not set_a or not set_b: return 0.0 inter = len(set_a & set_b) union = len(set_a | set_b) return inter / union if union > 0 else 0.0 def find_best_candidate(reg_orig, reg_curr, t_prev, t_curr, panel, flows_idx, all_regs_at_t_prev, mapping_inv): """ Pour un reg_id dont le score a fortement chuté, cherche le meilleur candidat j à t_prev tel que : - j n'est pas déjà mappé à un autre compte original - Le portefeuille ISIN de j à t_prev est similaire à celui de reg_curr à t_curr - La réconciliation est bonne Retourne (best_candidate, best_score_composite) ou (None, 0) """ # ISIN du compte cible à t_curr if reg_curr not in panel.columns.get_level_values(0): return None, 0.0 isin_curr = set(panel[reg_curr].columns[ panel[reg_curr].loc[t_curr].notna() & (panel[reg_curr].loc[t_curr] != 0) ].tolist()) if not isin_curr: return None, 0.0 best_candidate = None best_composite = 0.0 for j in all_regs_at_t_prev: # Ne pas réutiliser un code déjà mappé if j in mapping_inv: continue # Ne pas mapper sur soi-même si déjà présent if j == reg_curr: continue if j not in panel.columns.get_level_values(0): continue # ISIN de j à t_prev col_j = panel[j] isin_j = set(col_j.columns[ col_j.loc[t_prev].notna() & (col_j.loc[t_prev] != 0) ].tolist()) if t_prev in col_j.index else set() if not isin_j: continue jac = jaccard_isin(isin_curr, isin_j) if jac < MIN_JACCARD: continue # Erreur de réconciliation pour les ISIN communs common_isin = isin_curr & isin_j total_aum = 0 weighted_err = 0 for isin in common_isin: qty_t = panel[reg_curr][isin].get(t_curr, np.nan) if isin in panel[reg_curr].columns else np.nan qty_t_prev = panel[j][isin].get(t_prev, np.nan) if isin in panel[j].columns else np.nan if pd.isna(qty_t) or pd.isna(qty_t_prev): continue try: net_flow = flows_idx.loc[(t_curr, j, isin)] except KeyError: net_flow = 0.0 err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow) weight_isin = abs(qty_t) weighted_err += err_ratio * weight_isin total_aum += weight_isin avg_err = weighted_err / total_aum if total_aum > 0 else 1.0 composite = jac * (1.0 - min(avg_err, 1.0)) if composite > best_composite: best_composite = composite best_candidate = j return best_candidate, best_composite def _recompute_score_with_candidate(reg_orig, candidate, t_prev, t_curr, panel, flows_idx, score_curr): """ Recalcule proprement l'erreur de réconciliation pour un candidat donné, et retourne le score après chirurgie. """ if candidate not in panel.columns.get_level_values(0): return score_curr * 0 # candidat inexistant isin_list_cand = panel[candidate].columns.tolist() isin_list_curr = panel[reg_orig].columns.tolist() if reg_orig in panel.columns.get_level_values(0) else [] total_aum = 0 weighted_err = 0 for isin in isin_list_curr: qty_t = panel[reg_orig][isin].get(t_curr, np.nan) if isin in panel[reg_orig].columns else np.nan if pd.isna(qty_t) or qty_t == 0: continue qty_t_prev = panel[candidate][isin].get(t_prev, np.nan) if isin in panel[candidate].columns else np.nan try: net_flow = flows_idx.loc[(t_curr, candidate, isin)] except KeyError: net_flow = 0.0 if pd.isna(qty_t_prev): err_ratio = 1.0 else: err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow) weight_isin = abs(qty_t) weighted_err += err_ratio * weight_isin total_aum += weight_isin avg_err = weighted_err / total_aum if total_aum > 0 else 1.0 return score_curr * (1.0 - min(avg_err, 1.0)) def run_surgery_pass(scores_history, errors_history, panel, monthly_flows, weights, universe, all_months): """ Deuxième passe : pour chaque mois avec des ruptures fortes, tente une chirurgie de code et recalcule les scores. Corrections par rapport à la passe naïve : - Après chirurgie, le score est recalculé proprement (pas juste composite) - Le mapping propagé en arrière utilise le bon code à chaque étape - Pré-filtre ISIN pour performance sur grand dataset Retourne : - mapping_history : {date → {reg_orig → reg_used}} - surgery_log : liste des opérations effectuées - scores_final : scores au dernier mois """ flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month'] # Tous les reg_ids présents dans le panel (univers + codes historiques) all_regs_in_panel = set(panel.columns.get_level_values(0)) # Pré-calcul : ensemble d'ISIN par reg_id à chaque date (pour pré-filtre rapide) # {reg_id → {date → set(isin)}} reg_isin_at_date = {} for reg in all_regs_in_panel: reg_isin_at_date[reg] = {} col = panel[reg] for date in col.index: active = set(col.columns[(col.loc[date].notna()) & (col.loc[date] != 0)].tolist()) if active: reg_isin_at_date[reg][date] = active # Mapping courant : reg_orig → reg_used mapping = {r: r for r in universe} mapping_inv = {r: r for r in universe} surgery_log = [] mapping_history = {all_months[-1]: dict(mapping)} scores_history_corrected = {all_months[-1]: dict(weights)} # Scores courants (initialisés à t_ref) scores = dict(weights) for i in range(len(all_months) - 2, -1, -1): t_prev = all_months[i] t_curr = all_months[i + 1] new_scores = {} new_mapping = {} for reg_orig in list(mapping.keys()): reg_curr = mapping[reg_orig] score_curr = scores.get(reg_orig, 0) if score_curr == 0: new_scores[reg_orig] = 0 new_mapping[reg_orig] = reg_curr continue # Erreur sans chirurgie (depuis étape 2) err = errors_history.get(t_prev, {}).get(reg_orig, 0.0) score_prev_no_surgery = score_curr * (1.0 - min(err, 1.0)) drop_ratio = 1.0 - (score_prev_no_surgery / score_curr) if score_curr > 0 else 0 if drop_ratio > SCORE_DROP_THRESHOLD: # ── ISIN du compte courant à t_curr (pour pré-filtre) ── isin_curr = reg_isin_at_date.get(reg_curr, {}).get(t_curr, set()) # ── Candidats disponibles (non déjà mappés) ── available = all_regs_in_panel - set(mapping_inv.keys()) - {reg_curr} best_candidate = None best_score_after = score_prev_no_surgery # baseline = pas de chirurgie best_composite = 0.0 for j in available: # Pré-filtre rapide : overlap ISIN minimal isin_j = reg_isin_at_date.get(j, {}).get(t_prev, set()) if not isin_curr or not isin_j: continue inter = len(isin_curr & isin_j) if inter == 0: continue jac = inter / len(isin_curr | isin_j) if jac < MIN_JACCARD: continue # Score après chirurgie avec ce candidat score_after = _recompute_score_with_candidate( reg_curr, j, t_prev, t_curr, panel, flows_idx, score_curr ) composite = jac * (score_after / score_curr) if score_curr > 0 else 0 if score_after > best_score_after: best_score_after = score_after best_candidate = j best_composite = composite if best_candidate: surgery_log.append({ 'date': t_prev, 'reg_orig': reg_orig, 'reg_from': reg_curr, 'reg_to': best_candidate, 'jaccard_composite': round(best_composite, 4), 'score_before': round(score_curr, 6), 'score_after': round(best_score_after, 6), 'drop_without_surgery': round(drop_ratio, 4), 'gain_vs_no_surgery': round(best_score_after - score_prev_no_surgery, 6), }) print(f" 🔧 CHIRURGIE {t_prev.date()} | {reg_orig} : " f"{reg_curr} → {best_candidate} " f"(composite={best_composite:.3f}, " f"score {score_curr:.4f}→{best_score_after:.4f})") # Mise à jour mapping if reg_curr in mapping_inv: del mapping_inv[reg_curr] mapping_inv[best_candidate] = reg_orig new_mapping[reg_orig] = best_candidate new_scores[reg_orig] = best_score_after else: new_mapping[reg_orig] = reg_curr new_scores[reg_orig] = score_prev_no_surgery else: new_mapping[reg_orig] = reg_curr new_scores[reg_orig] = score_prev_no_surgery mapping = new_mapping mapping_inv = {v: k for k, v in mapping.items()} scores = new_scores mapping_history[t_prev] = dict(mapping) scores_history_corrected[t_prev] = dict(scores) total_score = sum(s for s in scores.values() if not np.isnan(s)) n_surgeries = sum(1 for op in surgery_log if op['date'] == t_prev) print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | " f"Chirurgies = {n_surgeries}") return mapping_history, surgery_log, scores, scores_history_corrected # ───────────────────────────────────────────── # 7. EXPORT RÉSULTATS # ───────────────────────────────────────────── def export_results(scores_history, mapping_history, surgery_log, all_months, out_prefix="carmignac"): """Exporte les résultats clés en CSV.""" # Score history rows = [] for date, sc in scores_history.items(): for reg, score in sc.items(): rows.append({'date': date, 'reg_id': reg, 'score': score}) df_scores = pd.DataFrame(rows) if rows else pd.DataFrame(columns=['date', 'reg_id', 'score']) if not df_scores.empty: df_scores = df_scores.sort_values(['date', 'score'], ascending=[True, False]) df_scores.to_csv(f"repair_results/{out_prefix}_scores.csv", index=False) # Mapping history rows_m = [] for date, mp in mapping_history.items(): for reg_orig, reg_used in mp.items(): rows_m.append({'date': date, 'reg_orig': reg_orig, 'reg_used': reg_used, 'changed': reg_orig != reg_used}) df_mapping = pd.DataFrame(rows_m) if rows_m else pd.DataFrame(columns=['date', 'reg_orig', 'reg_used', 'changed']) if not df_mapping.empty: df_mapping = df_mapping.sort_values(['date', 'reg_orig']) df_mapping.to_csv(f"repair_results/{out_prefix}_mapping.csv", index=False) # Surgery log if surgery_log: df_surgery = pd.DataFrame(surgery_log).sort_values('date') df_surgery.to_csv(f"repair_results/{out_prefix}_surgery_log.csv", index=False) print(f"\n[Export] {len(surgery_log)} opérations de chirurgie sauvegardées.") else: print("\n[Export] Aucune chirurgie effectuée sur ce subset.") print(f"[Export] Scores → {out_prefix}_scores.csv") print(f"[Export] Mapping → {out_prefix}_mapping.csv") return df_scores, df_mapping # ───────────────────────────────────────────── # 8. PIPELINE PRINCIPAL # ───────────────────────────────────────────── def run_pipeline(aum_path, flows_path): print("=" * 60) print("CARMIGNAC — Pipeline de réparation des Registrar IDs") print("=" * 60) # Chargement aum, flows = load_data(aum_path, flows_path) # Étape 1 — Univers de référence aum_ref, weights, universe, t_ref = build_reference_universe(aum) print(f"\n Top 5 comptes par poids :") for reg, w in sorted(weights.items(), key=lambda x: -x[1])[:5]: print(f" {reg} : {w:.4f} ({w*100:.2f}%)") # Panel mensuel panel, all_months = build_monthly_panel(aum, universe, t_ref) # Flows mensuels agrégés monthly_flows = aggregate_flows_monthly(flows, all_months) # Étape 2 — Score de cohérence (sans chirurgie) print("\n[Étape 2] Propagation des scores (sans chirurgie)...") scores_history, errors_history, _ = score_propagation( panel, monthly_flows, weights, universe, all_months ) # Étape 3 — Chirurgie print("\n[Étape 3] Passe de chirurgie...") mapping_history, surgery_log, final_scores, scores_history_corrected = run_surgery_pass( scores_history, errors_history, panel, monthly_flows, weights, universe, all_months ) # Export — on utilise les scores corrigés (post-chirurgie) comme référence print("\n[Export des résultats...]") df_scores, df_mapping = export_results( scores_history_corrected, mapping_history, surgery_log, all_months ) # Résumé final print("\n" + "=" * 60) print("RÉSUMÉ FINAL") print("=" * 60) print(f" Dates couvertes : {all_months[0].date()} → {all_months[-1].date()}") print(f" Comptes dans l'univers : {len(universe)}") print(f" Chirurgies effectuées : {len(surgery_log)}") score_by_date = {d: sum(s for s in sc.values() if s == s) for d, sc in scores_history_corrected.items()} print(f" Σ scores à t_ref : {score_by_date[t_ref]:.4f}") print(f" Σ scores à t_min : {score_by_date[all_months[0]]:.4f}") return df_scores, df_mapping, surgery_log, scores_history_corrected, mapping_history if __name__ == "__main__": df_scores, df_mapping, surgery_log, scores_history, mapping_history = run_pipeline( "s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv", "s3://projet-bdc-data/carmignac/Flows ENSAE V2 -20251105.csv" )