2026-03-20 09:21:54 +01:00
|
|
|
"""
|
|
|
|
|
Carmignac Data Challenge — Registrar ID Repair Pipeline
|
|
|
|
|
=========================================================
|
|
|
|
|
Étape 1 : Filtrage & univers de référence à t=31/10/2025
|
|
|
|
|
Étape 2 : Score de cohérence temporelle (propagation vers le passé)
|
|
|
|
|
Étape 3 : Chirurgie de code (matching 1-to-1)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
import numpy as np
|
|
|
|
|
from collections import defaultdict
|
|
|
|
|
import os
|
|
|
|
|
import s3fs
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# PARAMÈTRES
|
|
|
|
|
# ─────────────────────────────────────────────
|
2026-03-20 09:40:02 +01:00
|
|
|
ALPHA = 0.03 # tolérance réconciliation : 5% du stock à t
|
|
|
|
|
MIN_AUM_EUR = 0 # seuil filtrage étape 1 — 0 pour les heads de test, 5e6 en prod
|
2026-03-20 09:21:54 +01:00
|
|
|
MIN_JACCARD = 0.3 # seuil minimal similarité portefeuille pour chirurgie
|
2026-03-20 09:40:02 +01:00
|
|
|
SCORE_DROP_THRESHOLD = 0.1 # si score chute de >10% → candidat chirurgie
|
2026-03-20 09:21:54 +01:00
|
|
|
|
|
|
|
|
EXCLUDE_REGISTRAR = ["Off Distribution", "Private Clients"]
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 1. CHARGEMENT
|
|
|
|
|
# ─────────────────────────────────────────────
|
2026-03-20 09:40:02 +01:00
|
|
|
def load_data():
|
2026-03-20 09:21:54 +01:00
|
|
|
fs = s3fs.S3FileSystem(
|
|
|
|
|
client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'},
|
|
|
|
|
key = os.environ["AWS_ACCESS_KEY_ID"],
|
|
|
|
|
secret = os.environ["AWS_SECRET_ACCESS_KEY"],
|
|
|
|
|
token = os.environ["AWS_SESSION_TOKEN"])
|
|
|
|
|
|
|
|
|
|
with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f:
|
|
|
|
|
flows = pd.read_csv(f, sep=";")
|
|
|
|
|
|
|
|
|
|
with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f:
|
|
|
|
|
aum = pd.read_csv(f, sep=";")
|
|
|
|
|
|
|
|
|
|
aum['Centralisation Date'] = pd.to_datetime(aum['Centralisation Date'])
|
|
|
|
|
flows['Centralisation Date'] = pd.to_datetime(flows['Centralisation Date'])
|
|
|
|
|
|
|
|
|
|
# Noms courts
|
|
|
|
|
aum = aum.rename(columns={
|
|
|
|
|
'Registrar Account - ID': 'reg_id',
|
|
|
|
|
'Product - Isin': 'isin',
|
|
|
|
|
'Centralisation Date': 'date',
|
|
|
|
|
'Quantity - AUM': 'qty_aum',
|
|
|
|
|
'Value - AUM €': 'val_eur',
|
|
|
|
|
'Registrar Account - Region': 'region',
|
|
|
|
|
})
|
|
|
|
|
flows = flows.rename(columns={
|
|
|
|
|
'Registrar Account - ID': 'reg_id',
|
|
|
|
|
'Product - Isin': 'isin',
|
|
|
|
|
'Centralisation Date': 'date',
|
|
|
|
|
'Quantity - NetFlows': 'qty_net',
|
|
|
|
|
'Value € - NetFlows': 'val_net_eur',
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
aum['reg_id'] = aum['reg_id'].astype(str)
|
|
|
|
|
flows['reg_id'] = flows['reg_id'].astype(str)
|
|
|
|
|
|
|
|
|
|
return aum, flows
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 2. ÉTAPE 1 — Univers de référence à T_REF
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
def build_reference_universe(aum, t_ref=None):
|
|
|
|
|
"""
|
|
|
|
|
Construit l'univers de référence à t_ref (dernière date par défaut).
|
|
|
|
|
Retourne :
|
|
|
|
|
- aum_ref : AUM à t_ref pour chaque (reg_id, isin)
|
|
|
|
|
- weights : poids normalisé par reg_id
|
|
|
|
|
- universe : ensemble des reg_id retenus (>= MIN_AUM_EUR)
|
|
|
|
|
"""
|
|
|
|
|
if t_ref is None:
|
|
|
|
|
t_ref = aum['date'].max()
|
|
|
|
|
|
|
|
|
|
print(f"\n[Étape 1] Date de référence : {t_ref.date()}")
|
|
|
|
|
|
|
|
|
|
# Exclure Off Distribution / Private Clients (sur région ou nom)
|
|
|
|
|
mask_excl = aum['reg_id'].isin(EXCLUDE_REGISTRAR)
|
|
|
|
|
if 'region' in aum.columns:
|
|
|
|
|
mask_excl |= aum['region'].isin(EXCLUDE_REGISTRAR)
|
|
|
|
|
aum_clean = aum[~mask_excl].copy()
|
|
|
|
|
|
|
|
|
|
# AUM à t_ref
|
|
|
|
|
aum_ref = aum_clean[aum_clean['date'] == t_ref][['reg_id', 'isin', 'qty_aum', 'val_eur']].copy()
|
|
|
|
|
|
|
|
|
|
# AUM total par reg_id à t_ref
|
|
|
|
|
aum_by_reg = aum_ref.groupby('reg_id')['val_eur'].sum().rename('total_eur')
|
|
|
|
|
|
|
|
|
|
# Filtrage >= MIN_AUM_EUR
|
|
|
|
|
universe = set(aum_by_reg[aum_by_reg >= MIN_AUM_EUR].index)
|
|
|
|
|
|
|
|
|
|
total_eur_universe = aum_by_reg[aum_by_reg.index.isin(universe)].sum()
|
|
|
|
|
total_eur_all = aum_by_reg.sum()
|
|
|
|
|
coverage = total_eur_universe / total_eur_all if total_eur_all > 0 else 0
|
|
|
|
|
|
|
|
|
|
print(f" Registrar IDs à t_ref : {len(aum_by_reg)}")
|
|
|
|
|
print(f" Dont >= {MIN_AUM_EUR/1e6:.0f}M€ : {len(universe)}")
|
|
|
|
|
print(f" Couverture encours : {coverage:.1%}")
|
|
|
|
|
|
|
|
|
|
# Poids initiaux (scores à t_ref)
|
|
|
|
|
weights = (aum_by_reg[aum_by_reg.index.isin(universe)] / total_eur_universe).to_dict()
|
|
|
|
|
|
|
|
|
|
return aum_ref, weights, universe, t_ref
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 3. PANEL AUM MENSUEL (forward-fill)
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
def build_monthly_panel(aum, universe, t_ref):
|
|
|
|
|
"""
|
|
|
|
|
Construit un panel mensuel complet (forward-fill des quantités AUM)
|
|
|
|
|
pour TOUS les reg_ids présents dans l'historique AUM — y compris les codes
|
|
|
|
|
historiques hors univers de référence, nécessaires pour la chirurgie.
|
|
|
|
|
"""
|
|
|
|
|
# Toutes les fin de mois entre la première date et t_ref
|
|
|
|
|
date_min = aum['date'].min()
|
|
|
|
|
all_months = pd.date_range(start=date_min, end=t_ref, freq='ME')
|
|
|
|
|
|
|
|
|
|
# Pivot : (reg_id, isin) → série temporelle de qty_aum
|
|
|
|
|
aum_sorted = aum.sort_values(['reg_id', 'isin', 'date'])
|
|
|
|
|
|
|
|
|
|
# On ne garde que les lignes jusqu'à t_ref
|
|
|
|
|
aum_sorted = aum_sorted[aum_sorted['date'] <= t_ref]
|
|
|
|
|
|
|
|
|
|
# Multi-index pivot
|
|
|
|
|
panel = aum_sorted.pivot_table(
|
|
|
|
|
index='date', columns=['reg_id', 'isin'], values='qty_aum', aggfunc='last'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Réindexer sur toutes les fins de mois
|
|
|
|
|
panel = panel.reindex(all_months)
|
|
|
|
|
|
|
|
|
|
# Forward-fill : si pas de mouvement, la quantité reste la même
|
|
|
|
|
panel = panel.ffill()
|
|
|
|
|
|
|
|
|
|
# Backward-fill initial pour les comptes qui démarrent après la première date
|
|
|
|
|
# (on ne remonte pas avant leur première apparition → on garde NaN)
|
|
|
|
|
|
|
|
|
|
print(f"\n[Panel mensuel] {len(all_months)} mois, {panel.shape[1]} (reg_id, isin) paires")
|
|
|
|
|
|
|
|
|
|
return panel, all_months
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 4. FLOWS AGRÉGÉS PAR MOIS
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
def aggregate_flows_monthly(flows, all_months):
|
|
|
|
|
"""
|
|
|
|
|
Agrège les flows infra-mensuels sur chaque fenêtre ]fin_mois(t-1), fin_mois(t)].
|
|
|
|
|
Retourne un DataFrame indexé par (fin_mois, reg_id, isin).
|
|
|
|
|
"""
|
|
|
|
|
flows_f = flows[flows['date'] <= all_months[-1]].copy()
|
|
|
|
|
|
|
|
|
|
# Associer chaque transaction à la fin de mois correspondante
|
|
|
|
|
# = la première fin de mois >= date de transaction
|
|
|
|
|
flows_f['month_end'] = flows_f['date'].apply(
|
|
|
|
|
lambda d: all_months[all_months >= d][0] if any(all_months >= d) else pd.NaT
|
|
|
|
|
)
|
|
|
|
|
flows_f = flows_f.dropna(subset=['month_end'])
|
|
|
|
|
|
|
|
|
|
# Agrégation
|
|
|
|
|
monthly_flows = flows_f.groupby(['month_end', 'reg_id', 'isin'])['qty_net'].sum()
|
|
|
|
|
monthly_flows = monthly_flows.reset_index()
|
|
|
|
|
monthly_flows.columns = ['date', 'reg_id', 'isin', 'qty_net_month']
|
|
|
|
|
|
|
|
|
|
print(f"\n[Flows mensuels] {len(monthly_flows)} enregistrements (reg_id, isin, mois)")
|
|
|
|
|
|
|
|
|
|
return monthly_flows
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 5. ÉTAPE 2 — Score de cohérence temporelle
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
def compute_reconciliation_error(qty_t_minus1, qty_t, net_flow, alpha=ALPHA):
|
|
|
|
|
"""
|
|
|
|
|
Calcule l'erreur de réconciliation normalisée pour un (reg_id, isin, mois).
|
|
|
|
|
|
|
|
|
|
Attendu : qty_t_minus1 + net_flow ≈ qty_t
|
|
|
|
|
Erreur : |qty_t_minus1 + net_flow - qty_t| / max(|qty_t|, |qty_t_minus1|)
|
|
|
|
|
|
|
|
|
|
Retourne :
|
|
|
|
|
- err_ratio : erreur relative (0 = parfait)
|
|
|
|
|
- is_break : True si err_ratio > alpha
|
|
|
|
|
"""
|
|
|
|
|
denom = max(abs(qty_t), abs(qty_t_minus1), 1e-9)
|
|
|
|
|
err = abs(qty_t_minus1 + net_flow - qty_t)
|
|
|
|
|
err_ratio = err / denom
|
|
|
|
|
return err_ratio, err_ratio > alpha
|
|
|
|
|
|
|
|
|
|
def score_propagation(panel, monthly_flows, weights, universe, all_months):
|
|
|
|
|
"""
|
|
|
|
|
Propage les scores de t_ref vers t=0 (passé).
|
|
|
|
|
|
|
|
|
|
À chaque mois t (en remontant), pour chaque reg_id dans l'univers courant :
|
|
|
|
|
- Calculer l'erreur de réconciliation pondérée par ISIN
|
|
|
|
|
- Dégrader le score proportionnellement
|
|
|
|
|
|
|
|
|
|
Retourne :
|
|
|
|
|
- scores_history : dict {date → {reg_id → score}}
|
|
|
|
|
- errors_history : dict {date → {reg_id → err_pondérée}}
|
|
|
|
|
- mapping : dict {reg_id_original → reg_id_courant} (après chirurgie)
|
|
|
|
|
"""
|
|
|
|
|
# Initialisation
|
|
|
|
|
scores = dict(weights) # scores à t_ref
|
|
|
|
|
scores_history = {all_months[-1]: dict(scores)}
|
|
|
|
|
errors_history = {}
|
|
|
|
|
|
|
|
|
|
# Mapping actuel (identité au départ)
|
|
|
|
|
mapping = {r: r for r in universe}
|
|
|
|
|
|
|
|
|
|
# Flows indexés pour accès rapide
|
|
|
|
|
flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month']
|
|
|
|
|
|
|
|
|
|
# Remonter dans le temps
|
|
|
|
|
for i in range(len(all_months) - 2, -1, -1):
|
|
|
|
|
t_prev = all_months[i]
|
|
|
|
|
t_curr = all_months[i + 1]
|
|
|
|
|
|
|
|
|
|
errors_at_t = {}
|
|
|
|
|
new_scores = {}
|
|
|
|
|
|
|
|
|
|
for reg_orig, reg_curr in mapping.items():
|
|
|
|
|
score_curr = scores.get(reg_orig, 0)
|
|
|
|
|
if score_curr == 0:
|
|
|
|
|
new_scores[reg_orig] = 0
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# ISIN détenus par ce reg à t_curr (après mapping)
|
|
|
|
|
if reg_curr in panel.columns.get_level_values(0):
|
|
|
|
|
isin_list = panel[reg_curr].columns.tolist()
|
|
|
|
|
else:
|
|
|
|
|
# reg_curr n'existe pas du tout dans le panel → rupture totale
|
|
|
|
|
new_scores[reg_orig] = 0
|
|
|
|
|
errors_at_t[reg_orig] = 1.0
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
total_aum_t = 0
|
|
|
|
|
weighted_err = 0
|
|
|
|
|
valid_isin_count = 0
|
|
|
|
|
all_nan_at_prev = True # détecte si le compte n'existait pas à t_prev
|
|
|
|
|
|
|
|
|
|
for isin in isin_list:
|
|
|
|
|
qty_t = panel[reg_curr][isin].get(t_curr, np.nan)
|
|
|
|
|
qty_t_prev = panel[reg_curr][isin].get(t_prev, np.nan)
|
|
|
|
|
|
|
|
|
|
if pd.isna(qty_t):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if not pd.isna(qty_t_prev):
|
|
|
|
|
all_nan_at_prev = False
|
|
|
|
|
|
|
|
|
|
if pd.isna(qty_t_prev):
|
|
|
|
|
# ISIN existait à t_curr mais pas à t_prev → rupture sur cet ISIN
|
|
|
|
|
# On le traite comme une erreur maximale pondérée par son AUM
|
|
|
|
|
weight_isin = abs(qty_t)
|
|
|
|
|
weighted_err += 1.0 * weight_isin
|
|
|
|
|
total_aum_t += weight_isin
|
|
|
|
|
valid_isin_count += 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if qty_t == 0 and qty_t_prev == 0:
|
|
|
|
|
continue
|
|
|
|
|
# Flow agrégé sur ]t_prev, t_curr]
|
|
|
|
|
try:
|
|
|
|
|
net_flow = flows_idx.loc[(t_curr, reg_curr, isin)]
|
|
|
|
|
except KeyError:
|
|
|
|
|
net_flow = 0.0
|
|
|
|
|
|
|
|
|
|
err_ratio, is_break = compute_reconciliation_error(
|
|
|
|
|
qty_t_prev, qty_t, net_flow, alpha=ALPHA
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Pondération par AUM à t_curr
|
|
|
|
|
weight_isin = abs(qty_t)
|
|
|
|
|
weighted_err += err_ratio * weight_isin
|
|
|
|
|
total_aum_t += weight_isin
|
|
|
|
|
valid_isin_count += 1
|
|
|
|
|
|
|
|
|
|
if total_aum_t > 0 and valid_isin_count > 0:
|
|
|
|
|
avg_err = weighted_err / total_aum_t
|
|
|
|
|
else:
|
|
|
|
|
avg_err = 0.0
|
|
|
|
|
|
|
|
|
|
errors_at_t[reg_orig] = avg_err
|
|
|
|
|
|
|
|
|
|
# Dégradation du score : score(t-1) = score(t) * (1 - err_pondérée)
|
|
|
|
|
# Clippée entre 0 et score_curr
|
|
|
|
|
degradation = min(avg_err, 1.0)
|
|
|
|
|
new_scores[reg_orig] = score_curr * (1.0 - degradation)
|
|
|
|
|
|
|
|
|
|
scores = new_scores
|
|
|
|
|
scores_history[t_prev] = dict(scores)
|
|
|
|
|
errors_history[t_prev] = dict(errors_at_t)
|
|
|
|
|
|
|
|
|
|
total_score = sum(scores.values())
|
|
|
|
|
print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | "
|
|
|
|
|
f"Comptes actifs = {sum(1 for v in scores.values() if v > 0)}")
|
|
|
|
|
|
|
|
|
|
return scores_history, errors_history, mapping
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 6. ÉTAPE 3 — Chirurgie de code
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
def jaccard_isin(set_a, set_b):
|
|
|
|
|
"""Coefficient de Jaccard entre deux ensembles d'ISIN."""
|
|
|
|
|
if not set_a or not set_b:
|
|
|
|
|
return 0.0
|
|
|
|
|
inter = len(set_a & set_b)
|
|
|
|
|
union = len(set_a | set_b)
|
|
|
|
|
return inter / union if union > 0 else 0.0
|
|
|
|
|
|
|
|
|
|
def find_best_candidate(reg_orig, reg_curr, t_prev, t_curr,
|
|
|
|
|
panel, flows_idx, all_regs_at_t_prev, mapping_inv):
|
|
|
|
|
"""
|
|
|
|
|
Pour un reg_id dont le score a fortement chuté, cherche le meilleur
|
|
|
|
|
candidat j à t_prev tel que :
|
|
|
|
|
- j n'est pas déjà mappé à un autre compte original
|
|
|
|
|
- Le portefeuille ISIN de j à t_prev est similaire à celui de reg_curr à t_curr
|
|
|
|
|
- La réconciliation est bonne
|
|
|
|
|
|
|
|
|
|
Retourne (best_candidate, best_score_composite) ou (None, 0)
|
|
|
|
|
"""
|
|
|
|
|
# ISIN du compte cible à t_curr
|
|
|
|
|
if reg_curr not in panel.columns.get_level_values(0):
|
|
|
|
|
return None, 0.0
|
|
|
|
|
|
|
|
|
|
isin_curr = set(panel[reg_curr].columns[
|
|
|
|
|
panel[reg_curr].loc[t_curr].notna() & (panel[reg_curr].loc[t_curr] != 0)
|
|
|
|
|
].tolist())
|
|
|
|
|
|
|
|
|
|
if not isin_curr:
|
|
|
|
|
return None, 0.0
|
|
|
|
|
|
|
|
|
|
best_candidate = None
|
|
|
|
|
best_composite = 0.0
|
|
|
|
|
|
|
|
|
|
for j in all_regs_at_t_prev:
|
|
|
|
|
# Ne pas réutiliser un code déjà mappé
|
|
|
|
|
if j in mapping_inv:
|
|
|
|
|
continue
|
|
|
|
|
# Ne pas mapper sur soi-même si déjà présent
|
|
|
|
|
if j == reg_curr:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if j not in panel.columns.get_level_values(0):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# ISIN de j à t_prev
|
|
|
|
|
col_j = panel[j]
|
|
|
|
|
isin_j = set(col_j.columns[
|
|
|
|
|
col_j.loc[t_prev].notna() & (col_j.loc[t_prev] != 0)
|
|
|
|
|
].tolist()) if t_prev in col_j.index else set()
|
|
|
|
|
|
|
|
|
|
if not isin_j:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
jac = jaccard_isin(isin_curr, isin_j)
|
|
|
|
|
if jac < MIN_JACCARD:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Erreur de réconciliation pour les ISIN communs
|
|
|
|
|
common_isin = isin_curr & isin_j
|
|
|
|
|
total_aum = 0
|
|
|
|
|
weighted_err = 0
|
|
|
|
|
|
|
|
|
|
for isin in common_isin:
|
|
|
|
|
qty_t = panel[reg_curr][isin].get(t_curr, np.nan) if isin in panel[reg_curr].columns else np.nan
|
|
|
|
|
qty_t_prev = panel[j][isin].get(t_prev, np.nan) if isin in panel[j].columns else np.nan
|
|
|
|
|
|
|
|
|
|
if pd.isna(qty_t) or pd.isna(qty_t_prev):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
net_flow = flows_idx.loc[(t_curr, j, isin)]
|
|
|
|
|
except KeyError:
|
|
|
|
|
net_flow = 0.0
|
|
|
|
|
|
|
|
|
|
err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow)
|
|
|
|
|
weight_isin = abs(qty_t)
|
|
|
|
|
weighted_err += err_ratio * weight_isin
|
|
|
|
|
total_aum += weight_isin
|
|
|
|
|
|
|
|
|
|
avg_err = weighted_err / total_aum if total_aum > 0 else 1.0
|
|
|
|
|
|
|
|
|
|
composite = jac * (1.0 - min(avg_err, 1.0))
|
|
|
|
|
|
|
|
|
|
if composite > best_composite:
|
|
|
|
|
best_composite = composite
|
|
|
|
|
best_candidate = j
|
|
|
|
|
|
|
|
|
|
return best_candidate, best_composite
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _recompute_score_with_candidate(reg_orig, candidate, t_prev, t_curr,
|
|
|
|
|
panel, flows_idx, score_curr):
|
|
|
|
|
"""
|
|
|
|
|
Recalcule proprement l'erreur de réconciliation pour un candidat donné,
|
|
|
|
|
et retourne le score après chirurgie.
|
|
|
|
|
"""
|
|
|
|
|
if candidate not in panel.columns.get_level_values(0):
|
|
|
|
|
return score_curr * 0 # candidat inexistant
|
|
|
|
|
|
|
|
|
|
isin_list_cand = panel[candidate].columns.tolist()
|
|
|
|
|
isin_list_curr = panel[reg_orig].columns.tolist() if reg_orig in panel.columns.get_level_values(0) else []
|
|
|
|
|
|
|
|
|
|
total_aum = 0
|
|
|
|
|
weighted_err = 0
|
|
|
|
|
|
|
|
|
|
for isin in isin_list_curr:
|
|
|
|
|
qty_t = panel[reg_orig][isin].get(t_curr, np.nan) if isin in panel[reg_orig].columns else np.nan
|
|
|
|
|
if pd.isna(qty_t) or qty_t == 0:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
qty_t_prev = panel[candidate][isin].get(t_prev, np.nan) if isin in panel[candidate].columns else np.nan
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
net_flow = flows_idx.loc[(t_curr, candidate, isin)]
|
|
|
|
|
except KeyError:
|
|
|
|
|
net_flow = 0.0
|
|
|
|
|
|
|
|
|
|
if pd.isna(qty_t_prev):
|
|
|
|
|
err_ratio = 1.0
|
|
|
|
|
else:
|
|
|
|
|
err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow)
|
|
|
|
|
|
|
|
|
|
weight_isin = abs(qty_t)
|
|
|
|
|
weighted_err += err_ratio * weight_isin
|
|
|
|
|
total_aum += weight_isin
|
|
|
|
|
|
|
|
|
|
avg_err = weighted_err / total_aum if total_aum > 0 else 1.0
|
|
|
|
|
return score_curr * (1.0 - min(avg_err, 1.0))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_surgery_pass(scores_history, errors_history, panel, monthly_flows,
|
|
|
|
|
weights, universe, all_months):
|
|
|
|
|
"""
|
|
|
|
|
Deuxième passe : pour chaque mois avec des ruptures fortes,
|
|
|
|
|
tente une chirurgie de code et recalcule les scores.
|
|
|
|
|
|
|
|
|
|
Corrections par rapport à la passe naïve :
|
|
|
|
|
- Après chirurgie, le score est recalculé proprement (pas juste composite)
|
|
|
|
|
- Le mapping propagé en arrière utilise le bon code à chaque étape
|
|
|
|
|
- Pré-filtre ISIN pour performance sur grand dataset
|
|
|
|
|
|
|
|
|
|
Retourne :
|
|
|
|
|
- mapping_history : {date → {reg_orig → reg_used}}
|
|
|
|
|
- surgery_log : liste des opérations effectuées
|
|
|
|
|
- scores_final : scores au dernier mois
|
|
|
|
|
"""
|
|
|
|
|
flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month']
|
|
|
|
|
|
|
|
|
|
# Tous les reg_ids présents dans le panel (univers + codes historiques)
|
|
|
|
|
all_regs_in_panel = set(panel.columns.get_level_values(0))
|
|
|
|
|
|
|
|
|
|
# Pré-calcul : ensemble d'ISIN par reg_id à chaque date (pour pré-filtre rapide)
|
|
|
|
|
# {reg_id → {date → set(isin)}}
|
|
|
|
|
reg_isin_at_date = {}
|
|
|
|
|
for reg in all_regs_in_panel:
|
|
|
|
|
reg_isin_at_date[reg] = {}
|
|
|
|
|
col = panel[reg]
|
|
|
|
|
for date in col.index:
|
|
|
|
|
active = set(col.columns[(col.loc[date].notna()) & (col.loc[date] != 0)].tolist())
|
|
|
|
|
if active:
|
|
|
|
|
reg_isin_at_date[reg][date] = active
|
|
|
|
|
|
|
|
|
|
# Mapping courant : reg_orig → reg_used
|
|
|
|
|
mapping = {r: r for r in universe}
|
|
|
|
|
mapping_inv = {r: r for r in universe}
|
|
|
|
|
|
|
|
|
|
surgery_log = []
|
|
|
|
|
mapping_history = {all_months[-1]: dict(mapping)}
|
|
|
|
|
scores_history_corrected = {all_months[-1]: dict(weights)}
|
|
|
|
|
|
|
|
|
|
# Scores courants (initialisés à t_ref)
|
|
|
|
|
scores = dict(weights)
|
|
|
|
|
|
|
|
|
|
for i in range(len(all_months) - 2, -1, -1):
|
|
|
|
|
t_prev = all_months[i]
|
|
|
|
|
t_curr = all_months[i + 1]
|
|
|
|
|
|
|
|
|
|
new_scores = {}
|
|
|
|
|
new_mapping = {}
|
|
|
|
|
|
|
|
|
|
for reg_orig in list(mapping.keys()):
|
|
|
|
|
reg_curr = mapping[reg_orig]
|
|
|
|
|
score_curr = scores.get(reg_orig, 0)
|
|
|
|
|
|
|
|
|
|
if score_curr == 0:
|
|
|
|
|
new_scores[reg_orig] = 0
|
|
|
|
|
new_mapping[reg_orig] = reg_curr
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Erreur sans chirurgie (depuis étape 2)
|
|
|
|
|
err = errors_history.get(t_prev, {}).get(reg_orig, 0.0)
|
|
|
|
|
score_prev_no_surgery = score_curr * (1.0 - min(err, 1.0))
|
|
|
|
|
drop_ratio = 1.0 - (score_prev_no_surgery / score_curr) if score_curr > 0 else 0
|
|
|
|
|
|
|
|
|
|
if drop_ratio > SCORE_DROP_THRESHOLD:
|
|
|
|
|
# ── ISIN du compte courant à t_curr (pour pré-filtre) ──
|
|
|
|
|
isin_curr = reg_isin_at_date.get(reg_curr, {}).get(t_curr, set())
|
|
|
|
|
|
2026-03-20 09:40:02 +01:00
|
|
|
# ── Candidats disponibles ──
|
|
|
|
|
# On exclut les codes déjà mappés à un autre compte,
|
|
|
|
|
# mais reg_curr lui-même est un candidat valide (self-mapping :
|
|
|
|
|
# le compte existait déjà sous ce code à t-1, dormant ou partiel).
|
|
|
|
|
available = (all_regs_in_panel - set(mapping_inv.keys())) | {reg_curr}
|
2026-03-20 09:21:54 +01:00
|
|
|
|
|
|
|
|
best_candidate = None
|
|
|
|
|
best_score_after = score_prev_no_surgery # baseline = pas de chirurgie
|
|
|
|
|
best_composite = 0.0
|
|
|
|
|
|
|
|
|
|
for j in available:
|
|
|
|
|
# Pré-filtre rapide : overlap ISIN minimal
|
|
|
|
|
isin_j = reg_isin_at_date.get(j, {}).get(t_prev, set())
|
|
|
|
|
if not isin_curr or not isin_j:
|
|
|
|
|
continue
|
|
|
|
|
inter = len(isin_curr & isin_j)
|
|
|
|
|
if inter == 0:
|
|
|
|
|
continue
|
|
|
|
|
jac = inter / len(isin_curr | isin_j)
|
|
|
|
|
if jac < MIN_JACCARD:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Score après chirurgie avec ce candidat
|
|
|
|
|
score_after = _recompute_score_with_candidate(
|
|
|
|
|
reg_curr, j, t_prev, t_curr, panel, flows_idx, score_curr
|
|
|
|
|
)
|
|
|
|
|
composite = jac * (score_after / score_curr) if score_curr > 0 else 0
|
|
|
|
|
|
|
|
|
|
if score_after > best_score_after:
|
|
|
|
|
best_score_after = score_after
|
|
|
|
|
best_candidate = j
|
|
|
|
|
best_composite = composite
|
|
|
|
|
|
|
|
|
|
if best_candidate:
|
|
|
|
|
surgery_log.append({
|
|
|
|
|
'date': t_prev,
|
|
|
|
|
'reg_orig': reg_orig,
|
|
|
|
|
'reg_from': reg_curr,
|
|
|
|
|
'reg_to': best_candidate,
|
|
|
|
|
'jaccard_composite': round(best_composite, 4),
|
|
|
|
|
'score_before': round(score_curr, 6),
|
|
|
|
|
'score_after': round(best_score_after, 6),
|
|
|
|
|
'drop_without_surgery': round(drop_ratio, 4),
|
|
|
|
|
'gain_vs_no_surgery': round(best_score_after - score_prev_no_surgery, 6),
|
|
|
|
|
})
|
|
|
|
|
print(f" 🔧 CHIRURGIE {t_prev.date()} | {reg_orig} : "
|
|
|
|
|
f"{reg_curr} → {best_candidate} "
|
|
|
|
|
f"(composite={best_composite:.3f}, "
|
|
|
|
|
f"score {score_curr:.4f}→{best_score_after:.4f})")
|
|
|
|
|
|
|
|
|
|
# Mise à jour mapping
|
2026-03-20 09:40:02 +01:00
|
|
|
# Si self-mapping (best_candidate == reg_curr), on ne touche pas
|
|
|
|
|
# mapping_inv car le code ne change pas — on met juste à jour le score.
|
|
|
|
|
if best_candidate != reg_curr:
|
|
|
|
|
if reg_curr in mapping_inv:
|
|
|
|
|
del mapping_inv[reg_curr]
|
|
|
|
|
mapping_inv[best_candidate] = reg_orig
|
2026-03-20 09:21:54 +01:00
|
|
|
new_mapping[reg_orig] = best_candidate
|
|
|
|
|
new_scores[reg_orig] = best_score_after
|
|
|
|
|
else:
|
|
|
|
|
new_mapping[reg_orig] = reg_curr
|
|
|
|
|
new_scores[reg_orig] = score_prev_no_surgery
|
|
|
|
|
else:
|
|
|
|
|
new_mapping[reg_orig] = reg_curr
|
|
|
|
|
new_scores[reg_orig] = score_prev_no_surgery
|
|
|
|
|
|
|
|
|
|
mapping = new_mapping
|
|
|
|
|
mapping_inv = {v: k for k, v in mapping.items()}
|
|
|
|
|
scores = new_scores
|
|
|
|
|
mapping_history[t_prev] = dict(mapping)
|
|
|
|
|
scores_history_corrected[t_prev] = dict(scores)
|
|
|
|
|
|
|
|
|
|
total_score = sum(s for s in scores.values() if not np.isnan(s))
|
|
|
|
|
n_surgeries = sum(1 for op in surgery_log if op['date'] == t_prev)
|
|
|
|
|
print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | "
|
|
|
|
|
f"Chirurgies = {n_surgeries}")
|
|
|
|
|
|
|
|
|
|
return mapping_history, surgery_log, scores, scores_history_corrected
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 7. EXPORT RÉSULTATS
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
def export_results(scores_history, mapping_history, surgery_log, all_months, out_prefix="carmignac"):
|
|
|
|
|
"""Exporte les résultats clés en CSV."""
|
|
|
|
|
|
|
|
|
|
# Score history
|
|
|
|
|
rows = []
|
|
|
|
|
for date, sc in scores_history.items():
|
|
|
|
|
for reg, score in sc.items():
|
|
|
|
|
rows.append({'date': date, 'reg_id': reg, 'score': score})
|
|
|
|
|
df_scores = pd.DataFrame(rows) if rows else pd.DataFrame(columns=['date', 'reg_id', 'score'])
|
|
|
|
|
if not df_scores.empty:
|
|
|
|
|
df_scores = df_scores.sort_values(['date', 'score'], ascending=[True, False])
|
|
|
|
|
df_scores.to_csv(f"repair_results/{out_prefix}_scores.csv", index=False)
|
|
|
|
|
|
|
|
|
|
# Mapping history
|
|
|
|
|
rows_m = []
|
|
|
|
|
for date, mp in mapping_history.items():
|
|
|
|
|
for reg_orig, reg_used in mp.items():
|
|
|
|
|
rows_m.append({'date': date, 'reg_orig': reg_orig, 'reg_used': reg_used,
|
|
|
|
|
'changed': reg_orig != reg_used})
|
|
|
|
|
df_mapping = pd.DataFrame(rows_m) if rows_m else pd.DataFrame(columns=['date', 'reg_orig', 'reg_used', 'changed'])
|
|
|
|
|
if not df_mapping.empty:
|
|
|
|
|
df_mapping = df_mapping.sort_values(['date', 'reg_orig'])
|
|
|
|
|
df_mapping.to_csv(f"repair_results/{out_prefix}_mapping.csv", index=False)
|
|
|
|
|
|
|
|
|
|
# Surgery log
|
|
|
|
|
if surgery_log:
|
|
|
|
|
df_surgery = pd.DataFrame(surgery_log).sort_values('date')
|
|
|
|
|
df_surgery.to_csv(f"repair_results/{out_prefix}_surgery_log.csv", index=False)
|
|
|
|
|
print(f"\n[Export] {len(surgery_log)} opérations de chirurgie sauvegardées.")
|
|
|
|
|
else:
|
|
|
|
|
print("\n[Export] Aucune chirurgie effectuée sur ce subset.")
|
|
|
|
|
|
|
|
|
|
print(f"[Export] Scores → {out_prefix}_scores.csv")
|
|
|
|
|
print(f"[Export] Mapping → {out_prefix}_mapping.csv")
|
|
|
|
|
|
|
|
|
|
return df_scores, df_mapping
|
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
# 8. PIPELINE PRINCIPAL
|
|
|
|
|
# ─────────────────────────────────────────────
|
2026-03-20 09:40:02 +01:00
|
|
|
def run_pipeline():
|
2026-03-20 09:21:54 +01:00
|
|
|
print("=" * 60)
|
|
|
|
|
print("CARMIGNAC — Pipeline de réparation des Registrar IDs")
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
|
|
|
|
# Chargement
|
2026-03-20 09:40:02 +01:00
|
|
|
aum, flows = load_data()
|
2026-03-20 09:21:54 +01:00
|
|
|
|
|
|
|
|
# Étape 1 — Univers de référence
|
|
|
|
|
aum_ref, weights, universe, t_ref = build_reference_universe(aum)
|
|
|
|
|
|
|
|
|
|
print(f"\n Top 5 comptes par poids :")
|
|
|
|
|
for reg, w in sorted(weights.items(), key=lambda x: -x[1])[:5]:
|
|
|
|
|
print(f" {reg} : {w:.4f} ({w*100:.2f}%)")
|
|
|
|
|
|
|
|
|
|
# Panel mensuel
|
|
|
|
|
panel, all_months = build_monthly_panel(aum, universe, t_ref)
|
|
|
|
|
|
|
|
|
|
# Flows mensuels agrégés
|
|
|
|
|
monthly_flows = aggregate_flows_monthly(flows, all_months)
|
|
|
|
|
|
|
|
|
|
# Étape 2 — Score de cohérence (sans chirurgie)
|
|
|
|
|
print("\n[Étape 2] Propagation des scores (sans chirurgie)...")
|
|
|
|
|
scores_history, errors_history, _ = score_propagation(
|
|
|
|
|
panel, monthly_flows, weights, universe, all_months
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Étape 3 — Chirurgie
|
|
|
|
|
print("\n[Étape 3] Passe de chirurgie...")
|
|
|
|
|
mapping_history, surgery_log, final_scores, scores_history_corrected = run_surgery_pass(
|
|
|
|
|
scores_history, errors_history, panel, monthly_flows,
|
|
|
|
|
weights, universe, all_months
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Export — on utilise les scores corrigés (post-chirurgie) comme référence
|
|
|
|
|
print("\n[Export des résultats...]")
|
|
|
|
|
df_scores, df_mapping = export_results(
|
|
|
|
|
scores_history_corrected, mapping_history, surgery_log, all_months
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Résumé final
|
|
|
|
|
print("\n" + "=" * 60)
|
|
|
|
|
print("RÉSUMÉ FINAL")
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
print(f" Dates couvertes : {all_months[0].date()} → {all_months[-1].date()}")
|
|
|
|
|
print(f" Comptes dans l'univers : {len(universe)}")
|
|
|
|
|
print(f" Chirurgies effectuées : {len(surgery_log)}")
|
|
|
|
|
score_by_date = {d: sum(s for s in sc.values() if s == s)
|
|
|
|
|
for d, sc in scores_history_corrected.items()}
|
|
|
|
|
print(f" Σ scores à t_ref : {score_by_date[t_ref]:.4f}")
|
|
|
|
|
print(f" Σ scores à t_min : {score_by_date[all_months[0]]:.4f}")
|
|
|
|
|
|
|
|
|
|
return df_scores, df_mapping, surgery_log, scores_history_corrected, mapping_history
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2026-03-20 09:40:02 +01:00
|
|
|
df_scores, df_mapping, surgery_log, scores_history, mapping_history = run_pipeline()
|