From bb2f24563c334d6bc248617bd588d1b73147de29 Mon Sep 17 00:00:00 2001 From: pgoze-ensae Date: Fri, 20 Mar 2026 08:21:54 +0000 Subject: [PATCH] debug --- notebooks/aum_flows_analysis.ipynb | 46 +- repair_challenge/carmignac repair.py | 1037 ----------------- repair_challenge/carmignac_repair.py | 679 +++++++++++ ..._report.html => carmignac_report_0.1.html} | 0 4 files changed, 714 insertions(+), 1048 deletions(-) delete mode 100644 repair_challenge/carmignac repair.py create mode 100644 repair_challenge/carmignac_repair.py rename repair_challenge/repair_results/{carmignac_report.html => carmignac_report_0.1.html} (100%) diff --git a/notebooks/aum_flows_analysis.ipynb b/notebooks/aum_flows_analysis.ipynb index a7bdf72..39db03f 100644 --- a/notebooks/aum_flows_analysis.ipynb +++ b/notebooks/aum_flows_analysis.ipynb @@ -11,15 +11,29 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 1, "metadata": {}, "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Matplotlib is building the font cache; this may take a moment.\n" + ] + }, { "name": "stdout", "output_type": "stream", "text": [ - "Requirement already satisfied: openpyxl in /opt/python/lib/python3.13/site-packages (3.1.5)\n", - "Requirement already satisfied: et-xmlfile in /opt/python/lib/python3.13/site-packages (from openpyxl) (2.0.0)\n" + "Collecting openpyxl\n", + " Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)\n", + "Collecting et-xmlfile (from openpyxl)\n", + " Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)\n", + "Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)\n", + "Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)\n", + "Installing collected packages: et-xmlfile, openpyxl\n", + "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m2/2\u001b[0m [openpyxl]1/2\u001b[0m [openpyxl]\n", + "\u001b[1A\u001b[2KSuccessfully installed et-xmlfile-2.0.0 openpyxl-3.1.5\n" ] } ], @@ -37,7 +51,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -50,21 +64,21 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "/tmp/ipykernel_94182/3768862044.py:5: DtypeWarning: Columns (0,1,2,3) have mixed types. Specify dtype option on import or set low_memory=False.\n", - " stocks = pd.read_csv(f, sep=\";\")\n" + "/tmp/ipykernel_3944/2679329308.py:2: DtypeWarning: Columns (0,1,2,3) have mixed types. Specify dtype option on import or set low_memory=False.\n", + " flows = pd.read_csv(f, sep=\";\")\n" ] } ], "source": [ - "#with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f:\n", - " #flows = pd.read_csv(f, sep=\";\")\n", + "with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f:\n", + " flows = pd.read_csv(f, sep=\";\")\n", "\n", "with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f:\n", " stocks = pd.read_csv(f, sep=\";\")\n", @@ -466,7 +480,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -478,13 +492,23 @@ } ], "source": [ - "stocks[\"Centralisation Date\"] = pd.to_datetime(stocks[\"Centralisation Date\"], errors=\"coerce\")\n", + "flows[\"Centralisation Date\"] = pd.to_datetime(flows[\"Centralisation Date\"], errors=\"coerce\")\n", "#flows[\"Centralisation Date\"] = pd.to_datetime(flows[\"Centralisation Date\"], errors=\"coerce\")\n", "#nav[\"NavDate\"] = pd.to_datetime(nav[\"NavDate\"], format=\"%d/%m/%Y\", errors=\"coerce\")\n", "\n", "print(\"Date conversion done.\")" ] }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "flows_head = flows.head(100)\n", + "flows_head.to_csv(\"flows_head.csv\")" + ] + }, { "cell_type": "code", "execution_count": 16, diff --git a/repair_challenge/carmignac repair.py b/repair_challenge/carmignac repair.py deleted file mode 100644 index f8dd91c..0000000 --- a/repair_challenge/carmignac repair.py +++ /dev/null @@ -1,1037 +0,0 @@ -""" -Carmignac Data Challenge — Pipeline Results Analysis -===================================================== -Analyses the CSV outputs produced by carmignac_repair.py: - - carmignac_scores.csv (post-surgery score history) - - carmignac_mapping.csv (reg_id mapping history) - - carmignac_surgery_log.csv (surgery operations) - -Produces a self-contained HTML report with interactive charts. - -Usage: - python carmignac_analysis.py - python carmignac_analysis.py --scores path/to/scores.csv \ - --mapping path/to/mapping.csv \ - --surgery path/to/surgery_log.csv \ - --out report.html -""" - -import argparse -import json -import os -import sys -import numpy as np -import pandas as pd - -# ───────────────────────────────────────────────────────────── -# 1. LOAD & VALIDATE -# ───────────────────────────────────────────────────────────── - -def load_outputs(scores_path, mapping_path, surgery_path): - scores = pd.read_csv(scores_path, parse_dates=["date"]) - mapping = pd.read_csv(mapping_path, parse_dates=["date"]) - surgery = pd.read_csv(surgery_path, parse_dates=["date"]) - - # Normalise dtypes - scores["reg_id"] = scores["reg_id"].astype(str) - mapping["reg_orig"] = mapping["reg_orig"].astype(str) - mapping["reg_used"] = mapping["reg_used"].astype(str) - mapping["changed"] = mapping["changed"].astype(bool) - surgery["reg_orig"] = surgery["reg_orig"].astype(str) - surgery["reg_from"] = surgery["reg_from"].astype(str) - surgery["reg_to"] = surgery["reg_to"].astype(str) - - return scores, mapping, surgery - - -# ───────────────────────────────────────────────────────────── -# 2. COMPUTE ANALYTICS -# ───────────────────────────────────────────────────────────── - -def compute_analytics(scores, mapping, surgery): - dates = sorted(scores["date"].unique()) - - # ── 2.1 Sum of scores per date (post-surgery) ────────────── - sum_post = (scores.groupby("date")["score"] - .sum() - .reindex(dates) - .rename("sum_post")) - - # ── 2.2 Reconstruct pre-surgery (counterfactual) ─────────── - # Without surgery, every reg_id that had a hard break would score 0 - # from that date backwards. We propagate the surgery "gain" as a - # cumulative deficit going back in time. - gain_by_date = surgery.groupby("date")["gain_vs_no_surgery"].sum() - # cumulative deficit = sum of gains for all surgeries at or after date t - cumulative_deficit = pd.Series(0.0, index=dates) - for d in dates: - cumulative_deficit[d] = gain_by_date[gain_by_date.index >= d].sum() - sum_pre = (sum_post - cumulative_deficit).clip(lower=0).rename("sum_pre") - - timeline = pd.DataFrame({"sum_post": sum_post, "sum_pre": sum_pre}) - timeline.index = pd.to_datetime(timeline.index) - timeline["recovery_pct"] = np.where( - sum_pre < sum_post, - (sum_post - sum_pre) / sum_post.clip(lower=1e-9) * 100, - 0.0, - ) - - # ── 2.3 Per-date surgery stats ───────────────────────────── - surgery_stats = ( - surgery.groupby("date") - .agg( - n_surgeries = ("reg_orig", "count"), - total_gain = ("gain_vs_no_surgery", "sum"), - avg_gain = ("gain_vs_no_surgery", "mean"), - avg_jaccard = ("jaccard_composite", "mean"), - avg_score_before = ("score_before", "mean"), - avg_score_after = ("score_after", "mean"), - ) - .reindex(dates, fill_value=0) - ) - - # ── 2.4 Score distribution over time ─────────────────────── - # Wide format: rows=dates, cols=reg_ids - pivot = scores.pivot_table(index="date", columns="reg_id", - values="score", aggfunc="last") - pivot = pivot.reindex(dates) - pivot.index = pd.to_datetime(pivot.index) - - # ── 2.5 Mapping churn ────────────────────────────────────── - # For each date, how many reg_ids are remapped (not using their original code)? - churn = (mapping.groupby("date")["changed"] - .sum() - .reindex(dates, fill_value=0) - .rename("n_remapped")) - - # ── 2.6 Score entropy (distribution spread) ──────────────── - def entropy(row): - p = row.dropna() - p = p[p > 0] - if len(p) == 0: - return np.nan - p = p / p.sum() - return -(p * np.log(p)).sum() - - timeline["entropy"] = pivot.apply(entropy, axis=1).values - - # ── 2.7 Individual score trajectories ────────────────────── - # Identify which reg_ids were ever remapped - ever_remapped = set(mapping.loc[mapping["changed"], "reg_orig"].unique()) - - # ── 2.8 Surgery detail table ─────────────────────────────── - surgery_detail = surgery.copy() - surgery_detail["gain_pct_of_score"] = ( - surgery_detail["gain_vs_no_surgery"] - / surgery_detail["score_before"].clip(lower=1e-9) * 100 - ).round(2) - - return { - "timeline": timeline, - "surgery_stats": surgery_stats, - "pivot": pivot, - "churn": churn, - "ever_remapped": ever_remapped, - "surgery_detail": surgery_detail, - "dates": [d.strftime("%Y-%m-%d") for d in dates], - } - - -# ───────────────────────────────────────────────────────────── -# 3. PRINT CONSOLE SUMMARY -# ───────────────────────────────────────────────────────────── - -def print_summary(analytics, surgery): - tl = analytics["timeline"] - ss = analytics["surgery_stats"] - - print("\n" + "=" * 65) - print(" CARMIGNAC PIPELINE — RESULTS SUMMARY") - print("=" * 65) - - print(f"\n Date range : {tl.index.min().date()} → {tl.index.max().date()}") - print(f" Total months : {len(tl)}") - print(f" Reg IDs : {analytics['pivot'].shape[1]}") - - print(f"\n ── Score (Σ) ──────────────────────────────────────────") - print(f" At t_ref (latest) : {tl['sum_post'].iloc[-1]:.6f}") - print(f" At t_min (earliest): {tl['sum_post'].iloc[0]:.6f}") - print(f" Min (post-surgery) : {tl['sum_post'].min():.6f} " - f"({tl['sum_post'].idxmin().date()})") - print(f" Min (pre-surgery) : {tl['sum_pre'].min():.6f} " - f"({tl['sum_pre'].idxmin().date()})") - print(f" Max recovery (pct) : {tl['recovery_pct'].max():.2f}%") - - print(f"\n ── Surgeries ─────────────────────────────────────────") - if len(surgery) == 0: - print(" No surgeries performed.") - else: - print(f" Total operations : {len(surgery)}") - print(f" Total score gained : {surgery['gain_vs_no_surgery'].sum():.6f}") - print(f" Avg Jaccard : {surgery['jaccard_composite'].mean():.4f}") - print(f" Avg gain / surgery : {surgery['gain_vs_no_surgery'].mean():.6f}") - print() - print(f" {'Date':12s} {'Reg orig':12s} {'From':15s} {'To':15s} " - f"{'Jaccard':>8s} {'Gain':>10s}") - print(" " + "-" * 78) - for _, row in surgery.sort_values("date").iterrows(): - print(f" {str(row['date'].date()):12s} {row['reg_orig']:12s} " - f"{row['reg_from']:15s} {row['reg_to']:15s} " - f"{row['jaccard_composite']:8.4f} {row['gain_vs_no_surgery']:10.6f}") - - print(f"\n ── Mapping churn ─────────────────────────────────────") - ch = analytics["churn"] - print(f" Max remapped at one date : {int(ch.max())} ({ch.idxmax().date() if ch.max()>0 else 'N/A'})") - print(f" Reg IDs ever remapped : {len(analytics['ever_remapped'])}") - - print(f"\n ── Score entropy (distribution spread) ───────────────") - ent = analytics["timeline"]["entropy"] - print(f" Mean entropy : {ent.mean():.4f}") - print(f" Std entropy : {ent.std():.4f}") - print() - - -# ───────────────────────────────────────────────────────────── -# 4. BUILD HTML REPORT -# ───────────────────────────────────────────────────────────── - -def build_html(analytics, surgery, scores, mapping): - tl = analytics["timeline"] - ss = analytics["surgery_stats"] - piv = analytics["pivot"] - ch = analytics["churn"] - dates_str = analytics["dates"] - - # ── helpers to serialise for JS ───────────────────────────── - def jf(arr, decimals=6): - return json.dumps([round(float(v), decimals) if not np.isnan(v) else None - for v in arr]) - - def js(arr): - return json.dumps(list(arr)) - - # ── colour palette ─────────────────────────────────────────── - REG_COLORS = [ - "#2563eb","#16a34a","#dc2626","#d97706","#7c3aed", - "#0891b2","#db2777","#65a30d","#ea580c","#6366f1", - "#059669","#b45309","#9333ea","#0284c7","#e11d48", - ] - - # ── 4.1 Surgery sparkline data ────────────────────────────── - surg_dates = [d.strftime("%Y-%m-%d") for d in ss.index] - n_surg = jf(ss["n_surgeries"].values, 0) - total_gain = jf(ss["total_gain"].values) - avg_gain = jf(ss["avg_gain"].values) - avg_jaccard = jf(ss["avg_jaccard"].values) - - # ── 4.2 Individual trajectories ──────────────────────────── - reg_ids = list(piv.columns) - traj_datasets = [] - for idx, rid in enumerate(reg_ids): - col = analytics["ever_remapped"] - dashed = rid in col - traj_datasets.append({ - "label": rid, - "data": [round(float(v), 6) if not np.isnan(v) else None - for v in piv[rid].values], - "borderColor": REG_COLORS[idx % len(REG_COLORS)], - "backgroundColor": REG_COLORS[idx % len(REG_COLORS)] + "22", - "borderWidth": 2 if not dashed else 2, - "borderDash": [] if not dashed else [6, 3], - "pointRadius": 0, - "tension": 0.3, - "fill": False, - }) - - traj_json = json.dumps(traj_datasets) - - # ── 4.3 Surgery detail table rows ────────────────────────── - sd = analytics["surgery_detail"].sort_values("date") - surg_rows_html = "" - if len(sd) == 0: - surg_rows_html = "No surgeries performed" - else: - for _, r in sd.iterrows(): - gain_class = "gain-high" if r["gain_vs_no_surgery"] > 0.05 else "gain-low" - surg_rows_html += f""" - - {r['date'].date()} - {r['reg_orig']} - {r['reg_from']} - → - {r['reg_to']} - {r['jaccard_composite']:.4f} - +{r['gain_vs_no_surgery']:.6f} - {r['gain_pct_of_score']:.1f}% - """ - - # ── 4.4 Top accounts table ────────────────────────────────── - last_date = piv.index.max() - top_accounts = piv.loc[last_date].dropna().sort_values(ascending=False) - top_rows_html = "" - for rank, (rid, sc) in enumerate(top_accounts.items(), 1): - remapped = "✓" if rid in analytics["ever_remapped"] else "" - bar_w = int(sc / top_accounts.max() * 100) - color = REG_COLORS[(rank - 1) % len(REG_COLORS)] - top_rows_html += f""" - - #{rank} - {rid} - {sc:.6f} - -
- - {remapped} - """ - - # ───────────────────────────────────────────────────────────── - # HTML TEMPLATE - # ───────────────────────────────────────────────────────────── - html = f""" - - - - -Carmignac Pipeline — Analysis Report - - - - - - -
-
Carmignac × ENSAE · Data Challenge 2025
-

Pipeline Results — Analysis Report

-
Registrar ID repair · Score propagation · Surgery audit
-
- - -
-
- Σ score at t_ref - {tl['sum_post'].iloc[-1]:.4f} - post-surgery -
-
- Σ score at t_min - {tl['sum_post'].iloc[0]:.4f} - post-surgery -
-
- Max recovery - {tl['recovery_pct'].max():.1f}% - score rescued by surgery -
-
- Total surgeries - {len(surgery)} - operations performed -
-
- Reg IDs universe - {piv.shape[1]} - at reference date -
-
- Ever remapped - {len(analytics['ever_remapped'])} - reg IDs w/ code change -
-
- - -
- - - - -
-
- Sum of scores — pre vs post surgery - - Post-surgery (solid) shows the corrected score after code repairs. - Pre-surgery (dashed) is the counterfactual without any remapping. - Gap = score rescued. - -
-
-
- -
-
-
- - -
-
-
- Score recovered by surgery - Difference post − pre at each date -
-
-
- -
-
-
- -
-
- Portfolio concentration (entropy) - Shannon entropy of score distribution — higher = more spread -
-
-
- -
-
-
-
- - - -
-
- Score per Registrar Account — full history - - Dashed lines = accounts that were remapped at some point (surgery applied). - Solid lines = stable codes throughout. - -
-
-
- -
-
-
- - - -
-
-
- Surgeries per time step - Number of code remappings performed at each month -
-
-
- -
-
-
- -
-
- Score gain per surgery - Average gain in Σ score from surgery at each month -
-
-
- -
-
-
-
- -
-
- Jaccard similarity of surgery matches - - Composite Jaccard score of the matched code pair — closer to 1.0 = stronger portfolio overlap. - Low values may indicate uncertain matches. - -
-
-
- -
-
-
- - - -
-
- All surgery operations -
-
- {'
No surgeries were performed on this dataset.
' if len(surgery) == 0 else f""" - - - - - - - - - - - - - - {surg_rows_html} -
DateReg origCode fromCode toJaccardScore gain% of score
"""} -
-
- - - -
-
- Accounts ranked by weight at reference date - ✓ in last column = account was remapped at some point in history -
-
- - - - - - - - - - - {top_rows_html} -
RankRegistrar IDScore (weight)Relative sizeRemapped
-
-
- -
- - - - - - -""" - - return html - - -# ───────────────────────────────────────────────────────────── -# 5. MAIN -# ───────────────────────────────────────────────────────────── - -def main(): - parser = argparse.ArgumentParser(description="Carmignac pipeline results analyser") - parser.add_argument("--scores", default="repair_results/carmignac_scores.csv") - parser.add_argument("--mapping", default="repair_results/carmignac_mapping.csv") - parser.add_argument("--surgery", default="repair_results/carmignac_surgery_log.csv") - parser.add_argument("--out", default="repair_results/carmignac_report.html") - args = parser.parse_args() - - # Resolve paths relative to this script's directory if files not found - base = os.path.dirname(os.path.abspath(__file__)) - def resolve(p): - if os.path.exists(p): - return p - alt = os.path.join(base, p) - if os.path.exists(alt): - return alt - sys.exit(f"[ERROR] File not found: {p}") - - scores_path = resolve(args.scores) - mapping_path = resolve(args.mapping) - surgery_path = resolve(args.surgery) - - print(f"[Load] scores : {scores_path}") - print(f"[Load] mapping : {mapping_path}") - print(f"[Load] surgery : {surgery_path}") - - scores, mapping, surgery = load_outputs(scores_path, mapping_path, surgery_path) - analytics = compute_analytics(scores, mapping, surgery) - - print_summary(analytics, surgery) - - html = build_html(analytics, surgery, scores, mapping) - - out_path = args.out - with open(out_path, "w", encoding="utf-8") as f: - f.write(html) - print(f"\n[Report] Written to → {out_path}") - - -if __name__ == "__main__": - main() diff --git a/repair_challenge/carmignac_repair.py b/repair_challenge/carmignac_repair.py new file mode 100644 index 0000000..6fab02f --- /dev/null +++ b/repair_challenge/carmignac_repair.py @@ -0,0 +1,679 @@ +""" +Carmignac Data Challenge — Registrar ID Repair Pipeline +========================================================= +Étape 1 : Filtrage & univers de référence à t=31/10/2025 +Étape 2 : Score de cohérence temporelle (propagation vers le passé) +Étape 3 : Chirurgie de code (matching 1-to-1) +""" + +import pandas as pd +import numpy as np +from collections import defaultdict +import os +import s3fs + +# ───────────────────────────────────────────── +# PARAMÈTRES +# ───────────────────────────────────────────── +ALPHA = 0.03 # tolérance réconciliation : 3% du stock à t +MIN_AUM_EUR = 5e6 # seuil filtrage étape 1 — 0 pour les heads de test +MIN_JACCARD = 0.3 # seuil minimal similarité portefeuille pour chirurgie +SCORE_DROP_THRESHOLD = 0.1 # si score chute de >10% → candidat chirurgie + +EXCLUDE_REGISTRAR = ["Off Distribution", "Private Clients"] + +# ───────────────────────────────────────────── +# 1. CHARGEMENT +# ───────────────────────────────────────────── +def load_data(aum_path, flows_path): + fs = s3fs.S3FileSystem( + client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'}, + key = os.environ["AWS_ACCESS_KEY_ID"], + secret = os.environ["AWS_SECRET_ACCESS_KEY"], + token = os.environ["AWS_SESSION_TOKEN"]) + + with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f: + flows = pd.read_csv(f, sep=";") + + with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f: + aum = pd.read_csv(f, sep=";") + + aum['Centralisation Date'] = pd.to_datetime(aum['Centralisation Date']) + flows['Centralisation Date'] = pd.to_datetime(flows['Centralisation Date']) + + # Noms courts + aum = aum.rename(columns={ + 'Registrar Account - ID': 'reg_id', + 'Product - Isin': 'isin', + 'Centralisation Date': 'date', + 'Quantity - AUM': 'qty_aum', + 'Value - AUM €': 'val_eur', + 'Registrar Account - Region': 'region', + }) + flows = flows.rename(columns={ + 'Registrar Account - ID': 'reg_id', + 'Product - Isin': 'isin', + 'Centralisation Date': 'date', + 'Quantity - NetFlows': 'qty_net', + 'Value € - NetFlows': 'val_net_eur', + }) + + aum['reg_id'] = aum['reg_id'].astype(str) + flows['reg_id'] = flows['reg_id'].astype(str) + + return aum, flows + +# ───────────────────────────────────────────── +# 2. ÉTAPE 1 — Univers de référence à T_REF +# ───────────────────────────────────────────── +def build_reference_universe(aum, t_ref=None): + """ + Construit l'univers de référence à t_ref (dernière date par défaut). + Retourne : + - aum_ref : AUM à t_ref pour chaque (reg_id, isin) + - weights : poids normalisé par reg_id + - universe : ensemble des reg_id retenus (>= MIN_AUM_EUR) + """ + if t_ref is None: + t_ref = aum['date'].max() + + print(f"\n[Étape 1] Date de référence : {t_ref.date()}") + + # Exclure Off Distribution / Private Clients (sur région ou nom) + mask_excl = aum['reg_id'].isin(EXCLUDE_REGISTRAR) + if 'region' in aum.columns: + mask_excl |= aum['region'].isin(EXCLUDE_REGISTRAR) + aum_clean = aum[~mask_excl].copy() + + # AUM à t_ref + aum_ref = aum_clean[aum_clean['date'] == t_ref][['reg_id', 'isin', 'qty_aum', 'val_eur']].copy() + + # AUM total par reg_id à t_ref + aum_by_reg = aum_ref.groupby('reg_id')['val_eur'].sum().rename('total_eur') + + # Filtrage >= MIN_AUM_EUR + universe = set(aum_by_reg[aum_by_reg >= MIN_AUM_EUR].index) + + total_eur_universe = aum_by_reg[aum_by_reg.index.isin(universe)].sum() + total_eur_all = aum_by_reg.sum() + coverage = total_eur_universe / total_eur_all if total_eur_all > 0 else 0 + + print(f" Registrar IDs à t_ref : {len(aum_by_reg)}") + print(f" Dont >= {MIN_AUM_EUR/1e6:.0f}M€ : {len(universe)}") + print(f" Couverture encours : {coverage:.1%}") + + # Poids initiaux (scores à t_ref) + weights = (aum_by_reg[aum_by_reg.index.isin(universe)] / total_eur_universe).to_dict() + + return aum_ref, weights, universe, t_ref + +# ───────────────────────────────────────────── +# 3. PANEL AUM MENSUEL (forward-fill) +# ───────────────────────────────────────────── +def build_monthly_panel(aum, universe, t_ref): + """ + Construit un panel mensuel complet (forward-fill des quantités AUM) + pour TOUS les reg_ids présents dans l'historique AUM — y compris les codes + historiques hors univers de référence, nécessaires pour la chirurgie. + """ + # Toutes les fin de mois entre la première date et t_ref + date_min = aum['date'].min() + all_months = pd.date_range(start=date_min, end=t_ref, freq='ME') + + # Pivot : (reg_id, isin) → série temporelle de qty_aum + aum_sorted = aum.sort_values(['reg_id', 'isin', 'date']) + + # On ne garde que les lignes jusqu'à t_ref + aum_sorted = aum_sorted[aum_sorted['date'] <= t_ref] + + # Multi-index pivot + panel = aum_sorted.pivot_table( + index='date', columns=['reg_id', 'isin'], values='qty_aum', aggfunc='last' + ) + + # Réindexer sur toutes les fins de mois + panel = panel.reindex(all_months) + + # Forward-fill : si pas de mouvement, la quantité reste la même + panel = panel.ffill() + + # Backward-fill initial pour les comptes qui démarrent après la première date + # (on ne remonte pas avant leur première apparition → on garde NaN) + + print(f"\n[Panel mensuel] {len(all_months)} mois, {panel.shape[1]} (reg_id, isin) paires") + + return panel, all_months + +# ───────────────────────────────────────────── +# 4. FLOWS AGRÉGÉS PAR MOIS +# ───────────────────────────────────────────── +def aggregate_flows_monthly(flows, all_months): + """ + Agrège les flows infra-mensuels sur chaque fenêtre ]fin_mois(t-1), fin_mois(t)]. + Retourne un DataFrame indexé par (fin_mois, reg_id, isin). + """ + flows_f = flows[flows['date'] <= all_months[-1]].copy() + + # Associer chaque transaction à la fin de mois correspondante + # = la première fin de mois >= date de transaction + flows_f['month_end'] = flows_f['date'].apply( + lambda d: all_months[all_months >= d][0] if any(all_months >= d) else pd.NaT + ) + flows_f = flows_f.dropna(subset=['month_end']) + + # Agrégation + monthly_flows = flows_f.groupby(['month_end', 'reg_id', 'isin'])['qty_net'].sum() + monthly_flows = monthly_flows.reset_index() + monthly_flows.columns = ['date', 'reg_id', 'isin', 'qty_net_month'] + + print(f"\n[Flows mensuels] {len(monthly_flows)} enregistrements (reg_id, isin, mois)") + + return monthly_flows + +# ───────────────────────────────────────────── +# 5. ÉTAPE 2 — Score de cohérence temporelle +# ───────────────────────────────────────────── +def compute_reconciliation_error(qty_t_minus1, qty_t, net_flow, alpha=ALPHA): + """ + Calcule l'erreur de réconciliation normalisée pour un (reg_id, isin, mois). + + Attendu : qty_t_minus1 + net_flow ≈ qty_t + Erreur : |qty_t_minus1 + net_flow - qty_t| / max(|qty_t|, |qty_t_minus1|) + + Retourne : + - err_ratio : erreur relative (0 = parfait) + - is_break : True si err_ratio > alpha + """ + denom = max(abs(qty_t), abs(qty_t_minus1), 1e-9) + err = abs(qty_t_minus1 + net_flow - qty_t) + err_ratio = err / denom + return err_ratio, err_ratio > alpha + +def score_propagation(panel, monthly_flows, weights, universe, all_months): + """ + Propage les scores de t_ref vers t=0 (passé). + + À chaque mois t (en remontant), pour chaque reg_id dans l'univers courant : + - Calculer l'erreur de réconciliation pondérée par ISIN + - Dégrader le score proportionnellement + + Retourne : + - scores_history : dict {date → {reg_id → score}} + - errors_history : dict {date → {reg_id → err_pondérée}} + - mapping : dict {reg_id_original → reg_id_courant} (après chirurgie) + """ + # Initialisation + scores = dict(weights) # scores à t_ref + scores_history = {all_months[-1]: dict(scores)} + errors_history = {} + + # Mapping actuel (identité au départ) + mapping = {r: r for r in universe} + + # Flows indexés pour accès rapide + flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month'] + + # Remonter dans le temps + for i in range(len(all_months) - 2, -1, -1): + t_prev = all_months[i] + t_curr = all_months[i + 1] + + errors_at_t = {} + new_scores = {} + + for reg_orig, reg_curr in mapping.items(): + score_curr = scores.get(reg_orig, 0) + if score_curr == 0: + new_scores[reg_orig] = 0 + continue + + # ISIN détenus par ce reg à t_curr (après mapping) + if reg_curr in panel.columns.get_level_values(0): + isin_list = panel[reg_curr].columns.tolist() + else: + # reg_curr n'existe pas du tout dans le panel → rupture totale + new_scores[reg_orig] = 0 + errors_at_t[reg_orig] = 1.0 + continue + + total_aum_t = 0 + weighted_err = 0 + valid_isin_count = 0 + all_nan_at_prev = True # détecte si le compte n'existait pas à t_prev + + for isin in isin_list: + qty_t = panel[reg_curr][isin].get(t_curr, np.nan) + qty_t_prev = panel[reg_curr][isin].get(t_prev, np.nan) + + if pd.isna(qty_t): + continue + + if not pd.isna(qty_t_prev): + all_nan_at_prev = False + + if pd.isna(qty_t_prev): + # ISIN existait à t_curr mais pas à t_prev → rupture sur cet ISIN + # On le traite comme une erreur maximale pondérée par son AUM + weight_isin = abs(qty_t) + weighted_err += 1.0 * weight_isin + total_aum_t += weight_isin + valid_isin_count += 1 + continue + + if qty_t == 0 and qty_t_prev == 0: + continue + # Flow agrégé sur ]t_prev, t_curr] + try: + net_flow = flows_idx.loc[(t_curr, reg_curr, isin)] + except KeyError: + net_flow = 0.0 + + err_ratio, is_break = compute_reconciliation_error( + qty_t_prev, qty_t, net_flow, alpha=ALPHA + ) + + # Pondération par AUM à t_curr + weight_isin = abs(qty_t) + weighted_err += err_ratio * weight_isin + total_aum_t += weight_isin + valid_isin_count += 1 + + if total_aum_t > 0 and valid_isin_count > 0: + avg_err = weighted_err / total_aum_t + else: + avg_err = 0.0 + + errors_at_t[reg_orig] = avg_err + + # Dégradation du score : score(t-1) = score(t) * (1 - err_pondérée) + # Clippée entre 0 et score_curr + degradation = min(avg_err, 1.0) + new_scores[reg_orig] = score_curr * (1.0 - degradation) + + scores = new_scores + scores_history[t_prev] = dict(scores) + errors_history[t_prev] = dict(errors_at_t) + + total_score = sum(scores.values()) + print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | " + f"Comptes actifs = {sum(1 for v in scores.values() if v > 0)}") + + return scores_history, errors_history, mapping + +# ───────────────────────────────────────────── +# 6. ÉTAPE 3 — Chirurgie de code +# ───────────────────────────────────────────── +def jaccard_isin(set_a, set_b): + """Coefficient de Jaccard entre deux ensembles d'ISIN.""" + if not set_a or not set_b: + return 0.0 + inter = len(set_a & set_b) + union = len(set_a | set_b) + return inter / union if union > 0 else 0.0 + +def find_best_candidate(reg_orig, reg_curr, t_prev, t_curr, + panel, flows_idx, all_regs_at_t_prev, mapping_inv): + """ + Pour un reg_id dont le score a fortement chuté, cherche le meilleur + candidat j à t_prev tel que : + - j n'est pas déjà mappé à un autre compte original + - Le portefeuille ISIN de j à t_prev est similaire à celui de reg_curr à t_curr + - La réconciliation est bonne + + Retourne (best_candidate, best_score_composite) ou (None, 0) + """ + # ISIN du compte cible à t_curr + if reg_curr not in panel.columns.get_level_values(0): + return None, 0.0 + + isin_curr = set(panel[reg_curr].columns[ + panel[reg_curr].loc[t_curr].notna() & (panel[reg_curr].loc[t_curr] != 0) + ].tolist()) + + if not isin_curr: + return None, 0.0 + + best_candidate = None + best_composite = 0.0 + + for j in all_regs_at_t_prev: + # Ne pas réutiliser un code déjà mappé + if j in mapping_inv: + continue + # Ne pas mapper sur soi-même si déjà présent + if j == reg_curr: + continue + + if j not in panel.columns.get_level_values(0): + continue + + # ISIN de j à t_prev + col_j = panel[j] + isin_j = set(col_j.columns[ + col_j.loc[t_prev].notna() & (col_j.loc[t_prev] != 0) + ].tolist()) if t_prev in col_j.index else set() + + if not isin_j: + continue + + jac = jaccard_isin(isin_curr, isin_j) + if jac < MIN_JACCARD: + continue + + # Erreur de réconciliation pour les ISIN communs + common_isin = isin_curr & isin_j + total_aum = 0 + weighted_err = 0 + + for isin in common_isin: + qty_t = panel[reg_curr][isin].get(t_curr, np.nan) if isin in panel[reg_curr].columns else np.nan + qty_t_prev = panel[j][isin].get(t_prev, np.nan) if isin in panel[j].columns else np.nan + + if pd.isna(qty_t) or pd.isna(qty_t_prev): + continue + + try: + net_flow = flows_idx.loc[(t_curr, j, isin)] + except KeyError: + net_flow = 0.0 + + err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow) + weight_isin = abs(qty_t) + weighted_err += err_ratio * weight_isin + total_aum += weight_isin + + avg_err = weighted_err / total_aum if total_aum > 0 else 1.0 + + composite = jac * (1.0 - min(avg_err, 1.0)) + + if composite > best_composite: + best_composite = composite + best_candidate = j + + return best_candidate, best_composite + + +def _recompute_score_with_candidate(reg_orig, candidate, t_prev, t_curr, + panel, flows_idx, score_curr): + """ + Recalcule proprement l'erreur de réconciliation pour un candidat donné, + et retourne le score après chirurgie. + """ + if candidate not in panel.columns.get_level_values(0): + return score_curr * 0 # candidat inexistant + + isin_list_cand = panel[candidate].columns.tolist() + isin_list_curr = panel[reg_orig].columns.tolist() if reg_orig in panel.columns.get_level_values(0) else [] + + total_aum = 0 + weighted_err = 0 + + for isin in isin_list_curr: + qty_t = panel[reg_orig][isin].get(t_curr, np.nan) if isin in panel[reg_orig].columns else np.nan + if pd.isna(qty_t) or qty_t == 0: + continue + + qty_t_prev = panel[candidate][isin].get(t_prev, np.nan) if isin in panel[candidate].columns else np.nan + + try: + net_flow = flows_idx.loc[(t_curr, candidate, isin)] + except KeyError: + net_flow = 0.0 + + if pd.isna(qty_t_prev): + err_ratio = 1.0 + else: + err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow) + + weight_isin = abs(qty_t) + weighted_err += err_ratio * weight_isin + total_aum += weight_isin + + avg_err = weighted_err / total_aum if total_aum > 0 else 1.0 + return score_curr * (1.0 - min(avg_err, 1.0)) + + +def run_surgery_pass(scores_history, errors_history, panel, monthly_flows, + weights, universe, all_months): + """ + Deuxième passe : pour chaque mois avec des ruptures fortes, + tente une chirurgie de code et recalcule les scores. + + Corrections par rapport à la passe naïve : + - Après chirurgie, le score est recalculé proprement (pas juste composite) + - Le mapping propagé en arrière utilise le bon code à chaque étape + - Pré-filtre ISIN pour performance sur grand dataset + + Retourne : + - mapping_history : {date → {reg_orig → reg_used}} + - surgery_log : liste des opérations effectuées + - scores_final : scores au dernier mois + """ + flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month'] + + # Tous les reg_ids présents dans le panel (univers + codes historiques) + all_regs_in_panel = set(panel.columns.get_level_values(0)) + + # Pré-calcul : ensemble d'ISIN par reg_id à chaque date (pour pré-filtre rapide) + # {reg_id → {date → set(isin)}} + reg_isin_at_date = {} + for reg in all_regs_in_panel: + reg_isin_at_date[reg] = {} + col = panel[reg] + for date in col.index: + active = set(col.columns[(col.loc[date].notna()) & (col.loc[date] != 0)].tolist()) + if active: + reg_isin_at_date[reg][date] = active + + # Mapping courant : reg_orig → reg_used + mapping = {r: r for r in universe} + mapping_inv = {r: r for r in universe} + + surgery_log = [] + mapping_history = {all_months[-1]: dict(mapping)} + scores_history_corrected = {all_months[-1]: dict(weights)} + + # Scores courants (initialisés à t_ref) + scores = dict(weights) + + for i in range(len(all_months) - 2, -1, -1): + t_prev = all_months[i] + t_curr = all_months[i + 1] + + new_scores = {} + new_mapping = {} + + for reg_orig in list(mapping.keys()): + reg_curr = mapping[reg_orig] + score_curr = scores.get(reg_orig, 0) + + if score_curr == 0: + new_scores[reg_orig] = 0 + new_mapping[reg_orig] = reg_curr + continue + + # Erreur sans chirurgie (depuis étape 2) + err = errors_history.get(t_prev, {}).get(reg_orig, 0.0) + score_prev_no_surgery = score_curr * (1.0 - min(err, 1.0)) + drop_ratio = 1.0 - (score_prev_no_surgery / score_curr) if score_curr > 0 else 0 + + if drop_ratio > SCORE_DROP_THRESHOLD: + # ── ISIN du compte courant à t_curr (pour pré-filtre) ── + isin_curr = reg_isin_at_date.get(reg_curr, {}).get(t_curr, set()) + + # ── Candidats disponibles (non déjà mappés) ── + available = all_regs_in_panel - set(mapping_inv.keys()) - {reg_curr} + + best_candidate = None + best_score_after = score_prev_no_surgery # baseline = pas de chirurgie + best_composite = 0.0 + + for j in available: + # Pré-filtre rapide : overlap ISIN minimal + isin_j = reg_isin_at_date.get(j, {}).get(t_prev, set()) + if not isin_curr or not isin_j: + continue + inter = len(isin_curr & isin_j) + if inter == 0: + continue + jac = inter / len(isin_curr | isin_j) + if jac < MIN_JACCARD: + continue + + # Score après chirurgie avec ce candidat + score_after = _recompute_score_with_candidate( + reg_curr, j, t_prev, t_curr, panel, flows_idx, score_curr + ) + composite = jac * (score_after / score_curr) if score_curr > 0 else 0 + + if score_after > best_score_after: + best_score_after = score_after + best_candidate = j + best_composite = composite + + if best_candidate: + surgery_log.append({ + 'date': t_prev, + 'reg_orig': reg_orig, + 'reg_from': reg_curr, + 'reg_to': best_candidate, + 'jaccard_composite': round(best_composite, 4), + 'score_before': round(score_curr, 6), + 'score_after': round(best_score_after, 6), + 'drop_without_surgery': round(drop_ratio, 4), + 'gain_vs_no_surgery': round(best_score_after - score_prev_no_surgery, 6), + }) + print(f" 🔧 CHIRURGIE {t_prev.date()} | {reg_orig} : " + f"{reg_curr} → {best_candidate} " + f"(composite={best_composite:.3f}, " + f"score {score_curr:.4f}→{best_score_after:.4f})") + + # Mise à jour mapping + if reg_curr in mapping_inv: + del mapping_inv[reg_curr] + mapping_inv[best_candidate] = reg_orig + new_mapping[reg_orig] = best_candidate + new_scores[reg_orig] = best_score_after + else: + new_mapping[reg_orig] = reg_curr + new_scores[reg_orig] = score_prev_no_surgery + else: + new_mapping[reg_orig] = reg_curr + new_scores[reg_orig] = score_prev_no_surgery + + mapping = new_mapping + mapping_inv = {v: k for k, v in mapping.items()} + scores = new_scores + mapping_history[t_prev] = dict(mapping) + scores_history_corrected[t_prev] = dict(scores) + + total_score = sum(s for s in scores.values() if not np.isnan(s)) + n_surgeries = sum(1 for op in surgery_log if op['date'] == t_prev) + print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | " + f"Chirurgies = {n_surgeries}") + + return mapping_history, surgery_log, scores, scores_history_corrected + +# ───────────────────────────────────────────── +# 7. EXPORT RÉSULTATS +# ───────────────────────────────────────────── +def export_results(scores_history, mapping_history, surgery_log, all_months, out_prefix="carmignac"): + """Exporte les résultats clés en CSV.""" + + # Score history + rows = [] + for date, sc in scores_history.items(): + for reg, score in sc.items(): + rows.append({'date': date, 'reg_id': reg, 'score': score}) + df_scores = pd.DataFrame(rows) if rows else pd.DataFrame(columns=['date', 'reg_id', 'score']) + if not df_scores.empty: + df_scores = df_scores.sort_values(['date', 'score'], ascending=[True, False]) + df_scores.to_csv(f"repair_results/{out_prefix}_scores.csv", index=False) + + # Mapping history + rows_m = [] + for date, mp in mapping_history.items(): + for reg_orig, reg_used in mp.items(): + rows_m.append({'date': date, 'reg_orig': reg_orig, 'reg_used': reg_used, + 'changed': reg_orig != reg_used}) + df_mapping = pd.DataFrame(rows_m) if rows_m else pd.DataFrame(columns=['date', 'reg_orig', 'reg_used', 'changed']) + if not df_mapping.empty: + df_mapping = df_mapping.sort_values(['date', 'reg_orig']) + df_mapping.to_csv(f"repair_results/{out_prefix}_mapping.csv", index=False) + + # Surgery log + if surgery_log: + df_surgery = pd.DataFrame(surgery_log).sort_values('date') + df_surgery.to_csv(f"repair_results/{out_prefix}_surgery_log.csv", index=False) + print(f"\n[Export] {len(surgery_log)} opérations de chirurgie sauvegardées.") + else: + print("\n[Export] Aucune chirurgie effectuée sur ce subset.") + + print(f"[Export] Scores → {out_prefix}_scores.csv") + print(f"[Export] Mapping → {out_prefix}_mapping.csv") + + return df_scores, df_mapping + +# ───────────────────────────────────────────── +# 8. PIPELINE PRINCIPAL +# ───────────────────────────────────────────── +def run_pipeline(aum_path, flows_path): + print("=" * 60) + print("CARMIGNAC — Pipeline de réparation des Registrar IDs") + print("=" * 60) + + # Chargement + aum, flows = load_data(aum_path, flows_path) + + # Étape 1 — Univers de référence + aum_ref, weights, universe, t_ref = build_reference_universe(aum) + + print(f"\n Top 5 comptes par poids :") + for reg, w in sorted(weights.items(), key=lambda x: -x[1])[:5]: + print(f" {reg} : {w:.4f} ({w*100:.2f}%)") + + # Panel mensuel + panel, all_months = build_monthly_panel(aum, universe, t_ref) + + # Flows mensuels agrégés + monthly_flows = aggregate_flows_monthly(flows, all_months) + + # Étape 2 — Score de cohérence (sans chirurgie) + print("\n[Étape 2] Propagation des scores (sans chirurgie)...") + scores_history, errors_history, _ = score_propagation( + panel, monthly_flows, weights, universe, all_months + ) + + # Étape 3 — Chirurgie + print("\n[Étape 3] Passe de chirurgie...") + mapping_history, surgery_log, final_scores, scores_history_corrected = run_surgery_pass( + scores_history, errors_history, panel, monthly_flows, + weights, universe, all_months + ) + + # Export — on utilise les scores corrigés (post-chirurgie) comme référence + print("\n[Export des résultats...]") + df_scores, df_mapping = export_results( + scores_history_corrected, mapping_history, surgery_log, all_months + ) + + # Résumé final + print("\n" + "=" * 60) + print("RÉSUMÉ FINAL") + print("=" * 60) + print(f" Dates couvertes : {all_months[0].date()} → {all_months[-1].date()}") + print(f" Comptes dans l'univers : {len(universe)}") + print(f" Chirurgies effectuées : {len(surgery_log)}") + score_by_date = {d: sum(s for s in sc.values() if s == s) + for d, sc in scores_history_corrected.items()} + print(f" Σ scores à t_ref : {score_by_date[t_ref]:.4f}") + print(f" Σ scores à t_min : {score_by_date[all_months[0]]:.4f}") + + return df_scores, df_mapping, surgery_log, scores_history_corrected, mapping_history + + +if __name__ == "__main__": + df_scores, df_mapping, surgery_log, scores_history, mapping_history = run_pipeline( + "s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv", + "s3://projet-bdc-data/carmignac/Flows ENSAE V2 -20251105.csv" + ) diff --git a/repair_challenge/repair_results/carmignac_report.html b/repair_challenge/repair_results/carmignac_report_0.1.html similarity index 100% rename from repair_challenge/repair_results/carmignac_report.html rename to repair_challenge/repair_results/carmignac_report_0.1.html