diff --git a/notebooks/aum_flows_analysis.ipynb b/notebooks/aum_flows_analysis.ipynb
index a7bdf72..39db03f 100644
--- a/notebooks/aum_flows_analysis.ipynb
+++ b/notebooks/aum_flows_analysis.ipynb
@@ -11,15 +11,29 @@
},
{
"cell_type": "code",
- "execution_count": 2,
+ "execution_count": 1,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "Matplotlib is building the font cache; this may take a moment.\n"
+ ]
+ },
{
"name": "stdout",
"output_type": "stream",
"text": [
- "Requirement already satisfied: openpyxl in /opt/python/lib/python3.13/site-packages (3.1.5)\n",
- "Requirement already satisfied: et-xmlfile in /opt/python/lib/python3.13/site-packages (from openpyxl) (2.0.0)\n"
+ "Collecting openpyxl\n",
+ " Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)\n",
+ "Collecting et-xmlfile (from openpyxl)\n",
+ " Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)\n",
+ "Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)\n",
+ "Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)\n",
+ "Installing collected packages: et-xmlfile, openpyxl\n",
+ "\u001b[2K \u001b[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\u001b[0m \u001b[32m2/2\u001b[0m [openpyxl]1/2\u001b[0m [openpyxl]\n",
+ "\u001b[1A\u001b[2KSuccessfully installed et-xmlfile-2.0.0 openpyxl-3.1.5\n"
]
}
],
@@ -37,7 +51,7 @@
},
{
"cell_type": "code",
- "execution_count": 3,
+ "execution_count": null,
"metadata": {},
"outputs": [],
"source": [
@@ -50,21 +64,21 @@
},
{
"cell_type": "code",
- "execution_count": 8,
+ "execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
- "/tmp/ipykernel_94182/3768862044.py:5: DtypeWarning: Columns (0,1,2,3) have mixed types. Specify dtype option on import or set low_memory=False.\n",
- " stocks = pd.read_csv(f, sep=\";\")\n"
+ "/tmp/ipykernel_3944/2679329308.py:2: DtypeWarning: Columns (0,1,2,3) have mixed types. Specify dtype option on import or set low_memory=False.\n",
+ " flows = pd.read_csv(f, sep=\";\")\n"
]
}
],
"source": [
- "#with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f:\n",
- " #flows = pd.read_csv(f, sep=\";\")\n",
+ "with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f:\n",
+ " flows = pd.read_csv(f, sep=\";\")\n",
"\n",
"with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f:\n",
" stocks = pd.read_csv(f, sep=\";\")\n",
@@ -466,7 +480,7 @@
},
{
"cell_type": "code",
- "execution_count": 5,
+ "execution_count": 7,
"metadata": {},
"outputs": [
{
@@ -478,13 +492,23 @@
}
],
"source": [
- "stocks[\"Centralisation Date\"] = pd.to_datetime(stocks[\"Centralisation Date\"], errors=\"coerce\")\n",
+ "flows[\"Centralisation Date\"] = pd.to_datetime(flows[\"Centralisation Date\"], errors=\"coerce\")\n",
"#flows[\"Centralisation Date\"] = pd.to_datetime(flows[\"Centralisation Date\"], errors=\"coerce\")\n",
"#nav[\"NavDate\"] = pd.to_datetime(nav[\"NavDate\"], format=\"%d/%m/%Y\", errors=\"coerce\")\n",
"\n",
"print(\"Date conversion done.\")"
]
},
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "flows_head = flows.head(100)\n",
+ "flows_head.to_csv(\"flows_head.csv\")"
+ ]
+ },
{
"cell_type": "code",
"execution_count": 16,
diff --git a/repair_challenge/carmignac repair.py b/repair_challenge/carmignac repair.py
deleted file mode 100644
index f8dd91c..0000000
--- a/repair_challenge/carmignac repair.py
+++ /dev/null
@@ -1,1037 +0,0 @@
-"""
-Carmignac Data Challenge — Pipeline Results Analysis
-=====================================================
-Analyses the CSV outputs produced by carmignac_repair.py:
- - carmignac_scores.csv (post-surgery score history)
- - carmignac_mapping.csv (reg_id mapping history)
- - carmignac_surgery_log.csv (surgery operations)
-
-Produces a self-contained HTML report with interactive charts.
-
-Usage:
- python carmignac_analysis.py
- python carmignac_analysis.py --scores path/to/scores.csv \
- --mapping path/to/mapping.csv \
- --surgery path/to/surgery_log.csv \
- --out report.html
-"""
-
-import argparse
-import json
-import os
-import sys
-import numpy as np
-import pandas as pd
-
-# ─────────────────────────────────────────────────────────────
-# 1. LOAD & VALIDATE
-# ─────────────────────────────────────────────────────────────
-
-def load_outputs(scores_path, mapping_path, surgery_path):
- scores = pd.read_csv(scores_path, parse_dates=["date"])
- mapping = pd.read_csv(mapping_path, parse_dates=["date"])
- surgery = pd.read_csv(surgery_path, parse_dates=["date"])
-
- # Normalise dtypes
- scores["reg_id"] = scores["reg_id"].astype(str)
- mapping["reg_orig"] = mapping["reg_orig"].astype(str)
- mapping["reg_used"] = mapping["reg_used"].astype(str)
- mapping["changed"] = mapping["changed"].astype(bool)
- surgery["reg_orig"] = surgery["reg_orig"].astype(str)
- surgery["reg_from"] = surgery["reg_from"].astype(str)
- surgery["reg_to"] = surgery["reg_to"].astype(str)
-
- return scores, mapping, surgery
-
-
-# ─────────────────────────────────────────────────────────────
-# 2. COMPUTE ANALYTICS
-# ─────────────────────────────────────────────────────────────
-
-def compute_analytics(scores, mapping, surgery):
- dates = sorted(scores["date"].unique())
-
- # ── 2.1 Sum of scores per date (post-surgery) ──────────────
- sum_post = (scores.groupby("date")["score"]
- .sum()
- .reindex(dates)
- .rename("sum_post"))
-
- # ── 2.2 Reconstruct pre-surgery (counterfactual) ───────────
- # Without surgery, every reg_id that had a hard break would score 0
- # from that date backwards. We propagate the surgery "gain" as a
- # cumulative deficit going back in time.
- gain_by_date = surgery.groupby("date")["gain_vs_no_surgery"].sum()
- # cumulative deficit = sum of gains for all surgeries at or after date t
- cumulative_deficit = pd.Series(0.0, index=dates)
- for d in dates:
- cumulative_deficit[d] = gain_by_date[gain_by_date.index >= d].sum()
- sum_pre = (sum_post - cumulative_deficit).clip(lower=0).rename("sum_pre")
-
- timeline = pd.DataFrame({"sum_post": sum_post, "sum_pre": sum_pre})
- timeline.index = pd.to_datetime(timeline.index)
- timeline["recovery_pct"] = np.where(
- sum_pre < sum_post,
- (sum_post - sum_pre) / sum_post.clip(lower=1e-9) * 100,
- 0.0,
- )
-
- # ── 2.3 Per-date surgery stats ─────────────────────────────
- surgery_stats = (
- surgery.groupby("date")
- .agg(
- n_surgeries = ("reg_orig", "count"),
- total_gain = ("gain_vs_no_surgery", "sum"),
- avg_gain = ("gain_vs_no_surgery", "mean"),
- avg_jaccard = ("jaccard_composite", "mean"),
- avg_score_before = ("score_before", "mean"),
- avg_score_after = ("score_after", "mean"),
- )
- .reindex(dates, fill_value=0)
- )
-
- # ── 2.4 Score distribution over time ───────────────────────
- # Wide format: rows=dates, cols=reg_ids
- pivot = scores.pivot_table(index="date", columns="reg_id",
- values="score", aggfunc="last")
- pivot = pivot.reindex(dates)
- pivot.index = pd.to_datetime(pivot.index)
-
- # ── 2.5 Mapping churn ──────────────────────────────────────
- # For each date, how many reg_ids are remapped (not using their original code)?
- churn = (mapping.groupby("date")["changed"]
- .sum()
- .reindex(dates, fill_value=0)
- .rename("n_remapped"))
-
- # ── 2.6 Score entropy (distribution spread) ────────────────
- def entropy(row):
- p = row.dropna()
- p = p[p > 0]
- if len(p) == 0:
- return np.nan
- p = p / p.sum()
- return -(p * np.log(p)).sum()
-
- timeline["entropy"] = pivot.apply(entropy, axis=1).values
-
- # ── 2.7 Individual score trajectories ──────────────────────
- # Identify which reg_ids were ever remapped
- ever_remapped = set(mapping.loc[mapping["changed"], "reg_orig"].unique())
-
- # ── 2.8 Surgery detail table ───────────────────────────────
- surgery_detail = surgery.copy()
- surgery_detail["gain_pct_of_score"] = (
- surgery_detail["gain_vs_no_surgery"]
- / surgery_detail["score_before"].clip(lower=1e-9) * 100
- ).round(2)
-
- return {
- "timeline": timeline,
- "surgery_stats": surgery_stats,
- "pivot": pivot,
- "churn": churn,
- "ever_remapped": ever_remapped,
- "surgery_detail": surgery_detail,
- "dates": [d.strftime("%Y-%m-%d") for d in dates],
- }
-
-
-# ─────────────────────────────────────────────────────────────
-# 3. PRINT CONSOLE SUMMARY
-# ─────────────────────────────────────────────────────────────
-
-def print_summary(analytics, surgery):
- tl = analytics["timeline"]
- ss = analytics["surgery_stats"]
-
- print("\n" + "=" * 65)
- print(" CARMIGNAC PIPELINE — RESULTS SUMMARY")
- print("=" * 65)
-
- print(f"\n Date range : {tl.index.min().date()} → {tl.index.max().date()}")
- print(f" Total months : {len(tl)}")
- print(f" Reg IDs : {analytics['pivot'].shape[1]}")
-
- print(f"\n ── Score (Σ) ──────────────────────────────────────────")
- print(f" At t_ref (latest) : {tl['sum_post'].iloc[-1]:.6f}")
- print(f" At t_min (earliest): {tl['sum_post'].iloc[0]:.6f}")
- print(f" Min (post-surgery) : {tl['sum_post'].min():.6f} "
- f"({tl['sum_post'].idxmin().date()})")
- print(f" Min (pre-surgery) : {tl['sum_pre'].min():.6f} "
- f"({tl['sum_pre'].idxmin().date()})")
- print(f" Max recovery (pct) : {tl['recovery_pct'].max():.2f}%")
-
- print(f"\n ── Surgeries ─────────────────────────────────────────")
- if len(surgery) == 0:
- print(" No surgeries performed.")
- else:
- print(f" Total operations : {len(surgery)}")
- print(f" Total score gained : {surgery['gain_vs_no_surgery'].sum():.6f}")
- print(f" Avg Jaccard : {surgery['jaccard_composite'].mean():.4f}")
- print(f" Avg gain / surgery : {surgery['gain_vs_no_surgery'].mean():.6f}")
- print()
- print(f" {'Date':12s} {'Reg orig':12s} {'From':15s} {'To':15s} "
- f"{'Jaccard':>8s} {'Gain':>10s}")
- print(" " + "-" * 78)
- for _, row in surgery.sort_values("date").iterrows():
- print(f" {str(row['date'].date()):12s} {row['reg_orig']:12s} "
- f"{row['reg_from']:15s} {row['reg_to']:15s} "
- f"{row['jaccard_composite']:8.4f} {row['gain_vs_no_surgery']:10.6f}")
-
- print(f"\n ── Mapping churn ─────────────────────────────────────")
- ch = analytics["churn"]
- print(f" Max remapped at one date : {int(ch.max())} ({ch.idxmax().date() if ch.max()>0 else 'N/A'})")
- print(f" Reg IDs ever remapped : {len(analytics['ever_remapped'])}")
-
- print(f"\n ── Score entropy (distribution spread) ───────────────")
- ent = analytics["timeline"]["entropy"]
- print(f" Mean entropy : {ent.mean():.4f}")
- print(f" Std entropy : {ent.std():.4f}")
- print()
-
-
-# ─────────────────────────────────────────────────────────────
-# 4. BUILD HTML REPORT
-# ─────────────────────────────────────────────────────────────
-
-def build_html(analytics, surgery, scores, mapping):
- tl = analytics["timeline"]
- ss = analytics["surgery_stats"]
- piv = analytics["pivot"]
- ch = analytics["churn"]
- dates_str = analytics["dates"]
-
- # ── helpers to serialise for JS ─────────────────────────────
- def jf(arr, decimals=6):
- return json.dumps([round(float(v), decimals) if not np.isnan(v) else None
- for v in arr])
-
- def js(arr):
- return json.dumps(list(arr))
-
- # ── colour palette ───────────────────────────────────────────
- REG_COLORS = [
- "#2563eb","#16a34a","#dc2626","#d97706","#7c3aed",
- "#0891b2","#db2777","#65a30d","#ea580c","#6366f1",
- "#059669","#b45309","#9333ea","#0284c7","#e11d48",
- ]
-
- # ── 4.1 Surgery sparkline data ──────────────────────────────
- surg_dates = [d.strftime("%Y-%m-%d") for d in ss.index]
- n_surg = jf(ss["n_surgeries"].values, 0)
- total_gain = jf(ss["total_gain"].values)
- avg_gain = jf(ss["avg_gain"].values)
- avg_jaccard = jf(ss["avg_jaccard"].values)
-
- # ── 4.2 Individual trajectories ────────────────────────────
- reg_ids = list(piv.columns)
- traj_datasets = []
- for idx, rid in enumerate(reg_ids):
- col = analytics["ever_remapped"]
- dashed = rid in col
- traj_datasets.append({
- "label": rid,
- "data": [round(float(v), 6) if not np.isnan(v) else None
- for v in piv[rid].values],
- "borderColor": REG_COLORS[idx % len(REG_COLORS)],
- "backgroundColor": REG_COLORS[idx % len(REG_COLORS)] + "22",
- "borderWidth": 2 if not dashed else 2,
- "borderDash": [] if not dashed else [6, 3],
- "pointRadius": 0,
- "tension": 0.3,
- "fill": False,
- })
-
- traj_json = json.dumps(traj_datasets)
-
- # ── 4.3 Surgery detail table rows ──────────────────────────
- sd = analytics["surgery_detail"].sort_values("date")
- surg_rows_html = ""
- if len(sd) == 0:
- surg_rows_html = "
| No surgeries performed |
"
- else:
- for _, r in sd.iterrows():
- gain_class = "gain-high" if r["gain_vs_no_surgery"] > 0.05 else "gain-low"
- surg_rows_html += f"""
-
- | {r['date'].date()} |
- {r['reg_orig']} |
- {r['reg_from']} |
- → |
- {r['reg_to']} |
- {r['jaccard_composite']:.4f} |
- +{r['gain_vs_no_surgery']:.6f} |
- {r['gain_pct_of_score']:.1f}% |
-
"""
-
- # ── 4.4 Top accounts table ──────────────────────────────────
- last_date = piv.index.max()
- top_accounts = piv.loc[last_date].dropna().sort_values(ascending=False)
- top_rows_html = ""
- for rank, (rid, sc) in enumerate(top_accounts.items(), 1):
- remapped = "✓" if rid in analytics["ever_remapped"] else ""
- bar_w = int(sc / top_accounts.max() * 100)
- color = REG_COLORS[(rank - 1) % len(REG_COLORS)]
- top_rows_html += f"""
-
- | #{rank} |
- {rid} |
- {sc:.6f} |
-
-
- |
- {remapped} |
-
"""
-
- # ─────────────────────────────────────────────────────────────
- # HTML TEMPLATE
- # ─────────────────────────────────────────────────────────────
- html = f"""
-
-
-
-
-Carmignac Pipeline — Analysis Report
-
-
-
-
-
-
-
-
-
-
-
- Σ score at t_ref
- {tl['sum_post'].iloc[-1]:.4f}
- post-surgery
-
-
- Σ score at t_min
- {tl['sum_post'].iloc[0]:.4f}
- post-surgery
-
-
- Max recovery
- {tl['recovery_pct'].max():.1f}%
- score rescued by surgery
-
-
- Total surgeries
- {len(surgery)}
- operations performed
-
-
- Reg IDs universe
- {piv.shape[1]}
- at reference date
-
-
- Ever remapped
- {len(analytics['ever_remapped'])}
- reg IDs w/ code change
-
-
-
-
-
-
-
01 · Score Integrity Over Time
-
-
-
-
-
-
-
-
02 · Individual Score Trajectories
-
-
-
-
03 · Surgery Operations
-
-
-
-
-
-
04 · Surgery Detail Log
-
-
-
-
- {'
No surgeries were performed on this dataset.
' if len(surgery) == 0 else f"""
-
-
-
- | Date |
- Reg orig |
- Code from |
- |
- Code to |
- Jaccard |
- Score gain |
- % of score |
-
-
- {surg_rows_html}
-
"""}
-
-
-
-
05 · Score Ranking at t_ref
-
-
-
-
-
-
-
- | Rank |
- Registrar ID |
- Score (weight) |
- Relative size |
- Remapped |
-
-
- {top_rows_html}
-
-
-
-
-
-
-
-
-
-
-
-"""
-
- return html
-
-
-# ─────────────────────────────────────────────────────────────
-# 5. MAIN
-# ─────────────────────────────────────────────────────────────
-
-def main():
- parser = argparse.ArgumentParser(description="Carmignac pipeline results analyser")
- parser.add_argument("--scores", default="repair_results/carmignac_scores.csv")
- parser.add_argument("--mapping", default="repair_results/carmignac_mapping.csv")
- parser.add_argument("--surgery", default="repair_results/carmignac_surgery_log.csv")
- parser.add_argument("--out", default="repair_results/carmignac_report.html")
- args = parser.parse_args()
-
- # Resolve paths relative to this script's directory if files not found
- base = os.path.dirname(os.path.abspath(__file__))
- def resolve(p):
- if os.path.exists(p):
- return p
- alt = os.path.join(base, p)
- if os.path.exists(alt):
- return alt
- sys.exit(f"[ERROR] File not found: {p}")
-
- scores_path = resolve(args.scores)
- mapping_path = resolve(args.mapping)
- surgery_path = resolve(args.surgery)
-
- print(f"[Load] scores : {scores_path}")
- print(f"[Load] mapping : {mapping_path}")
- print(f"[Load] surgery : {surgery_path}")
-
- scores, mapping, surgery = load_outputs(scores_path, mapping_path, surgery_path)
- analytics = compute_analytics(scores, mapping, surgery)
-
- print_summary(analytics, surgery)
-
- html = build_html(analytics, surgery, scores, mapping)
-
- out_path = args.out
- with open(out_path, "w", encoding="utf-8") as f:
- f.write(html)
- print(f"\n[Report] Written to → {out_path}")
-
-
-if __name__ == "__main__":
- main()
diff --git a/repair_challenge/carmignac_repair.py b/repair_challenge/carmignac_repair.py
new file mode 100644
index 0000000..6fab02f
--- /dev/null
+++ b/repair_challenge/carmignac_repair.py
@@ -0,0 +1,679 @@
+"""
+Carmignac Data Challenge — Registrar ID Repair Pipeline
+=========================================================
+Étape 1 : Filtrage & univers de référence à t=31/10/2025
+Étape 2 : Score de cohérence temporelle (propagation vers le passé)
+Étape 3 : Chirurgie de code (matching 1-to-1)
+"""
+
+import pandas as pd
+import numpy as np
+from collections import defaultdict
+import os
+import s3fs
+
+# ─────────────────────────────────────────────
+# PARAMÈTRES
+# ─────────────────────────────────────────────
+ALPHA = 0.03 # tolérance réconciliation : 3% du stock à t
+MIN_AUM_EUR = 5e6 # seuil filtrage étape 1 — 0 pour les heads de test
+MIN_JACCARD = 0.3 # seuil minimal similarité portefeuille pour chirurgie
+SCORE_DROP_THRESHOLD = 0.1 # si score chute de >10% → candidat chirurgie
+
+EXCLUDE_REGISTRAR = ["Off Distribution", "Private Clients"]
+
+# ─────────────────────────────────────────────
+# 1. CHARGEMENT
+# ─────────────────────────────────────────────
+def load_data(aum_path, flows_path):
+ fs = s3fs.S3FileSystem(
+ client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'},
+ key = os.environ["AWS_ACCESS_KEY_ID"],
+ secret = os.environ["AWS_SECRET_ACCESS_KEY"],
+ token = os.environ["AWS_SESSION_TOKEN"])
+
+ with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f:
+ flows = pd.read_csv(f, sep=";")
+
+ with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f:
+ aum = pd.read_csv(f, sep=";")
+
+ aum['Centralisation Date'] = pd.to_datetime(aum['Centralisation Date'])
+ flows['Centralisation Date'] = pd.to_datetime(flows['Centralisation Date'])
+
+ # Noms courts
+ aum = aum.rename(columns={
+ 'Registrar Account - ID': 'reg_id',
+ 'Product - Isin': 'isin',
+ 'Centralisation Date': 'date',
+ 'Quantity - AUM': 'qty_aum',
+ 'Value - AUM €': 'val_eur',
+ 'Registrar Account - Region': 'region',
+ })
+ flows = flows.rename(columns={
+ 'Registrar Account - ID': 'reg_id',
+ 'Product - Isin': 'isin',
+ 'Centralisation Date': 'date',
+ 'Quantity - NetFlows': 'qty_net',
+ 'Value € - NetFlows': 'val_net_eur',
+ })
+
+ aum['reg_id'] = aum['reg_id'].astype(str)
+ flows['reg_id'] = flows['reg_id'].astype(str)
+
+ return aum, flows
+
+# ─────────────────────────────────────────────
+# 2. ÉTAPE 1 — Univers de référence à T_REF
+# ─────────────────────────────────────────────
+def build_reference_universe(aum, t_ref=None):
+ """
+ Construit l'univers de référence à t_ref (dernière date par défaut).
+ Retourne :
+ - aum_ref : AUM à t_ref pour chaque (reg_id, isin)
+ - weights : poids normalisé par reg_id
+ - universe : ensemble des reg_id retenus (>= MIN_AUM_EUR)
+ """
+ if t_ref is None:
+ t_ref = aum['date'].max()
+
+ print(f"\n[Étape 1] Date de référence : {t_ref.date()}")
+
+ # Exclure Off Distribution / Private Clients (sur région ou nom)
+ mask_excl = aum['reg_id'].isin(EXCLUDE_REGISTRAR)
+ if 'region' in aum.columns:
+ mask_excl |= aum['region'].isin(EXCLUDE_REGISTRAR)
+ aum_clean = aum[~mask_excl].copy()
+
+ # AUM à t_ref
+ aum_ref = aum_clean[aum_clean['date'] == t_ref][['reg_id', 'isin', 'qty_aum', 'val_eur']].copy()
+
+ # AUM total par reg_id à t_ref
+ aum_by_reg = aum_ref.groupby('reg_id')['val_eur'].sum().rename('total_eur')
+
+ # Filtrage >= MIN_AUM_EUR
+ universe = set(aum_by_reg[aum_by_reg >= MIN_AUM_EUR].index)
+
+ total_eur_universe = aum_by_reg[aum_by_reg.index.isin(universe)].sum()
+ total_eur_all = aum_by_reg.sum()
+ coverage = total_eur_universe / total_eur_all if total_eur_all > 0 else 0
+
+ print(f" Registrar IDs à t_ref : {len(aum_by_reg)}")
+ print(f" Dont >= {MIN_AUM_EUR/1e6:.0f}M€ : {len(universe)}")
+ print(f" Couverture encours : {coverage:.1%}")
+
+ # Poids initiaux (scores à t_ref)
+ weights = (aum_by_reg[aum_by_reg.index.isin(universe)] / total_eur_universe).to_dict()
+
+ return aum_ref, weights, universe, t_ref
+
+# ─────────────────────────────────────────────
+# 3. PANEL AUM MENSUEL (forward-fill)
+# ─────────────────────────────────────────────
+def build_monthly_panel(aum, universe, t_ref):
+ """
+ Construit un panel mensuel complet (forward-fill des quantités AUM)
+ pour TOUS les reg_ids présents dans l'historique AUM — y compris les codes
+ historiques hors univers de référence, nécessaires pour la chirurgie.
+ """
+ # Toutes les fin de mois entre la première date et t_ref
+ date_min = aum['date'].min()
+ all_months = pd.date_range(start=date_min, end=t_ref, freq='ME')
+
+ # Pivot : (reg_id, isin) → série temporelle de qty_aum
+ aum_sorted = aum.sort_values(['reg_id', 'isin', 'date'])
+
+ # On ne garde que les lignes jusqu'à t_ref
+ aum_sorted = aum_sorted[aum_sorted['date'] <= t_ref]
+
+ # Multi-index pivot
+ panel = aum_sorted.pivot_table(
+ index='date', columns=['reg_id', 'isin'], values='qty_aum', aggfunc='last'
+ )
+
+ # Réindexer sur toutes les fins de mois
+ panel = panel.reindex(all_months)
+
+ # Forward-fill : si pas de mouvement, la quantité reste la même
+ panel = panel.ffill()
+
+ # Backward-fill initial pour les comptes qui démarrent après la première date
+ # (on ne remonte pas avant leur première apparition → on garde NaN)
+
+ print(f"\n[Panel mensuel] {len(all_months)} mois, {panel.shape[1]} (reg_id, isin) paires")
+
+ return panel, all_months
+
+# ─────────────────────────────────────────────
+# 4. FLOWS AGRÉGÉS PAR MOIS
+# ─────────────────────────────────────────────
+def aggregate_flows_monthly(flows, all_months):
+ """
+ Agrège les flows infra-mensuels sur chaque fenêtre ]fin_mois(t-1), fin_mois(t)].
+ Retourne un DataFrame indexé par (fin_mois, reg_id, isin).
+ """
+ flows_f = flows[flows['date'] <= all_months[-1]].copy()
+
+ # Associer chaque transaction à la fin de mois correspondante
+ # = la première fin de mois >= date de transaction
+ flows_f['month_end'] = flows_f['date'].apply(
+ lambda d: all_months[all_months >= d][0] if any(all_months >= d) else pd.NaT
+ )
+ flows_f = flows_f.dropna(subset=['month_end'])
+
+ # Agrégation
+ monthly_flows = flows_f.groupby(['month_end', 'reg_id', 'isin'])['qty_net'].sum()
+ monthly_flows = monthly_flows.reset_index()
+ monthly_flows.columns = ['date', 'reg_id', 'isin', 'qty_net_month']
+
+ print(f"\n[Flows mensuels] {len(monthly_flows)} enregistrements (reg_id, isin, mois)")
+
+ return monthly_flows
+
+# ─────────────────────────────────────────────
+# 5. ÉTAPE 2 — Score de cohérence temporelle
+# ─────────────────────────────────────────────
+def compute_reconciliation_error(qty_t_minus1, qty_t, net_flow, alpha=ALPHA):
+ """
+ Calcule l'erreur de réconciliation normalisée pour un (reg_id, isin, mois).
+
+ Attendu : qty_t_minus1 + net_flow ≈ qty_t
+ Erreur : |qty_t_minus1 + net_flow - qty_t| / max(|qty_t|, |qty_t_minus1|)
+
+ Retourne :
+ - err_ratio : erreur relative (0 = parfait)
+ - is_break : True si err_ratio > alpha
+ """
+ denom = max(abs(qty_t), abs(qty_t_minus1), 1e-9)
+ err = abs(qty_t_minus1 + net_flow - qty_t)
+ err_ratio = err / denom
+ return err_ratio, err_ratio > alpha
+
+def score_propagation(panel, monthly_flows, weights, universe, all_months):
+ """
+ Propage les scores de t_ref vers t=0 (passé).
+
+ À chaque mois t (en remontant), pour chaque reg_id dans l'univers courant :
+ - Calculer l'erreur de réconciliation pondérée par ISIN
+ - Dégrader le score proportionnellement
+
+ Retourne :
+ - scores_history : dict {date → {reg_id → score}}
+ - errors_history : dict {date → {reg_id → err_pondérée}}
+ - mapping : dict {reg_id_original → reg_id_courant} (après chirurgie)
+ """
+ # Initialisation
+ scores = dict(weights) # scores à t_ref
+ scores_history = {all_months[-1]: dict(scores)}
+ errors_history = {}
+
+ # Mapping actuel (identité au départ)
+ mapping = {r: r for r in universe}
+
+ # Flows indexés pour accès rapide
+ flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month']
+
+ # Remonter dans le temps
+ for i in range(len(all_months) - 2, -1, -1):
+ t_prev = all_months[i]
+ t_curr = all_months[i + 1]
+
+ errors_at_t = {}
+ new_scores = {}
+
+ for reg_orig, reg_curr in mapping.items():
+ score_curr = scores.get(reg_orig, 0)
+ if score_curr == 0:
+ new_scores[reg_orig] = 0
+ continue
+
+ # ISIN détenus par ce reg à t_curr (après mapping)
+ if reg_curr in panel.columns.get_level_values(0):
+ isin_list = panel[reg_curr].columns.tolist()
+ else:
+ # reg_curr n'existe pas du tout dans le panel → rupture totale
+ new_scores[reg_orig] = 0
+ errors_at_t[reg_orig] = 1.0
+ continue
+
+ total_aum_t = 0
+ weighted_err = 0
+ valid_isin_count = 0
+ all_nan_at_prev = True # détecte si le compte n'existait pas à t_prev
+
+ for isin in isin_list:
+ qty_t = panel[reg_curr][isin].get(t_curr, np.nan)
+ qty_t_prev = panel[reg_curr][isin].get(t_prev, np.nan)
+
+ if pd.isna(qty_t):
+ continue
+
+ if not pd.isna(qty_t_prev):
+ all_nan_at_prev = False
+
+ if pd.isna(qty_t_prev):
+ # ISIN existait à t_curr mais pas à t_prev → rupture sur cet ISIN
+ # On le traite comme une erreur maximale pondérée par son AUM
+ weight_isin = abs(qty_t)
+ weighted_err += 1.0 * weight_isin
+ total_aum_t += weight_isin
+ valid_isin_count += 1
+ continue
+
+ if qty_t == 0 and qty_t_prev == 0:
+ continue
+ # Flow agrégé sur ]t_prev, t_curr]
+ try:
+ net_flow = flows_idx.loc[(t_curr, reg_curr, isin)]
+ except KeyError:
+ net_flow = 0.0
+
+ err_ratio, is_break = compute_reconciliation_error(
+ qty_t_prev, qty_t, net_flow, alpha=ALPHA
+ )
+
+ # Pondération par AUM à t_curr
+ weight_isin = abs(qty_t)
+ weighted_err += err_ratio * weight_isin
+ total_aum_t += weight_isin
+ valid_isin_count += 1
+
+ if total_aum_t > 0 and valid_isin_count > 0:
+ avg_err = weighted_err / total_aum_t
+ else:
+ avg_err = 0.0
+
+ errors_at_t[reg_orig] = avg_err
+
+ # Dégradation du score : score(t-1) = score(t) * (1 - err_pondérée)
+ # Clippée entre 0 et score_curr
+ degradation = min(avg_err, 1.0)
+ new_scores[reg_orig] = score_curr * (1.0 - degradation)
+
+ scores = new_scores
+ scores_history[t_prev] = dict(scores)
+ errors_history[t_prev] = dict(errors_at_t)
+
+ total_score = sum(scores.values())
+ print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | "
+ f"Comptes actifs = {sum(1 for v in scores.values() if v > 0)}")
+
+ return scores_history, errors_history, mapping
+
+# ─────────────────────────────────────────────
+# 6. ÉTAPE 3 — Chirurgie de code
+# ─────────────────────────────────────────────
+def jaccard_isin(set_a, set_b):
+ """Coefficient de Jaccard entre deux ensembles d'ISIN."""
+ if not set_a or not set_b:
+ return 0.0
+ inter = len(set_a & set_b)
+ union = len(set_a | set_b)
+ return inter / union if union > 0 else 0.0
+
+def find_best_candidate(reg_orig, reg_curr, t_prev, t_curr,
+ panel, flows_idx, all_regs_at_t_prev, mapping_inv):
+ """
+ Pour un reg_id dont le score a fortement chuté, cherche le meilleur
+ candidat j à t_prev tel que :
+ - j n'est pas déjà mappé à un autre compte original
+ - Le portefeuille ISIN de j à t_prev est similaire à celui de reg_curr à t_curr
+ - La réconciliation est bonne
+
+ Retourne (best_candidate, best_score_composite) ou (None, 0)
+ """
+ # ISIN du compte cible à t_curr
+ if reg_curr not in panel.columns.get_level_values(0):
+ return None, 0.0
+
+ isin_curr = set(panel[reg_curr].columns[
+ panel[reg_curr].loc[t_curr].notna() & (panel[reg_curr].loc[t_curr] != 0)
+ ].tolist())
+
+ if not isin_curr:
+ return None, 0.0
+
+ best_candidate = None
+ best_composite = 0.0
+
+ for j in all_regs_at_t_prev:
+ # Ne pas réutiliser un code déjà mappé
+ if j in mapping_inv:
+ continue
+ # Ne pas mapper sur soi-même si déjà présent
+ if j == reg_curr:
+ continue
+
+ if j not in panel.columns.get_level_values(0):
+ continue
+
+ # ISIN de j à t_prev
+ col_j = panel[j]
+ isin_j = set(col_j.columns[
+ col_j.loc[t_prev].notna() & (col_j.loc[t_prev] != 0)
+ ].tolist()) if t_prev in col_j.index else set()
+
+ if not isin_j:
+ continue
+
+ jac = jaccard_isin(isin_curr, isin_j)
+ if jac < MIN_JACCARD:
+ continue
+
+ # Erreur de réconciliation pour les ISIN communs
+ common_isin = isin_curr & isin_j
+ total_aum = 0
+ weighted_err = 0
+
+ for isin in common_isin:
+ qty_t = panel[reg_curr][isin].get(t_curr, np.nan) if isin in panel[reg_curr].columns else np.nan
+ qty_t_prev = panel[j][isin].get(t_prev, np.nan) if isin in panel[j].columns else np.nan
+
+ if pd.isna(qty_t) or pd.isna(qty_t_prev):
+ continue
+
+ try:
+ net_flow = flows_idx.loc[(t_curr, j, isin)]
+ except KeyError:
+ net_flow = 0.0
+
+ err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow)
+ weight_isin = abs(qty_t)
+ weighted_err += err_ratio * weight_isin
+ total_aum += weight_isin
+
+ avg_err = weighted_err / total_aum if total_aum > 0 else 1.0
+
+ composite = jac * (1.0 - min(avg_err, 1.0))
+
+ if composite > best_composite:
+ best_composite = composite
+ best_candidate = j
+
+ return best_candidate, best_composite
+
+
+def _recompute_score_with_candidate(reg_orig, candidate, t_prev, t_curr,
+ panel, flows_idx, score_curr):
+ """
+ Recalcule proprement l'erreur de réconciliation pour un candidat donné,
+ et retourne le score après chirurgie.
+ """
+ if candidate not in panel.columns.get_level_values(0):
+ return score_curr * 0 # candidat inexistant
+
+ isin_list_cand = panel[candidate].columns.tolist()
+ isin_list_curr = panel[reg_orig].columns.tolist() if reg_orig in panel.columns.get_level_values(0) else []
+
+ total_aum = 0
+ weighted_err = 0
+
+ for isin in isin_list_curr:
+ qty_t = panel[reg_orig][isin].get(t_curr, np.nan) if isin in panel[reg_orig].columns else np.nan
+ if pd.isna(qty_t) or qty_t == 0:
+ continue
+
+ qty_t_prev = panel[candidate][isin].get(t_prev, np.nan) if isin in panel[candidate].columns else np.nan
+
+ try:
+ net_flow = flows_idx.loc[(t_curr, candidate, isin)]
+ except KeyError:
+ net_flow = 0.0
+
+ if pd.isna(qty_t_prev):
+ err_ratio = 1.0
+ else:
+ err_ratio, _ = compute_reconciliation_error(qty_t_prev, qty_t, net_flow)
+
+ weight_isin = abs(qty_t)
+ weighted_err += err_ratio * weight_isin
+ total_aum += weight_isin
+
+ avg_err = weighted_err / total_aum if total_aum > 0 else 1.0
+ return score_curr * (1.0 - min(avg_err, 1.0))
+
+
+def run_surgery_pass(scores_history, errors_history, panel, monthly_flows,
+ weights, universe, all_months):
+ """
+ Deuxième passe : pour chaque mois avec des ruptures fortes,
+ tente une chirurgie de code et recalcule les scores.
+
+ Corrections par rapport à la passe naïve :
+ - Après chirurgie, le score est recalculé proprement (pas juste composite)
+ - Le mapping propagé en arrière utilise le bon code à chaque étape
+ - Pré-filtre ISIN pour performance sur grand dataset
+
+ Retourne :
+ - mapping_history : {date → {reg_orig → reg_used}}
+ - surgery_log : liste des opérations effectuées
+ - scores_final : scores au dernier mois
+ """
+ flows_idx = monthly_flows.set_index(['date', 'reg_id', 'isin'])['qty_net_month']
+
+ # Tous les reg_ids présents dans le panel (univers + codes historiques)
+ all_regs_in_panel = set(panel.columns.get_level_values(0))
+
+ # Pré-calcul : ensemble d'ISIN par reg_id à chaque date (pour pré-filtre rapide)
+ # {reg_id → {date → set(isin)}}
+ reg_isin_at_date = {}
+ for reg in all_regs_in_panel:
+ reg_isin_at_date[reg] = {}
+ col = panel[reg]
+ for date in col.index:
+ active = set(col.columns[(col.loc[date].notna()) & (col.loc[date] != 0)].tolist())
+ if active:
+ reg_isin_at_date[reg][date] = active
+
+ # Mapping courant : reg_orig → reg_used
+ mapping = {r: r for r in universe}
+ mapping_inv = {r: r for r in universe}
+
+ surgery_log = []
+ mapping_history = {all_months[-1]: dict(mapping)}
+ scores_history_corrected = {all_months[-1]: dict(weights)}
+
+ # Scores courants (initialisés à t_ref)
+ scores = dict(weights)
+
+ for i in range(len(all_months) - 2, -1, -1):
+ t_prev = all_months[i]
+ t_curr = all_months[i + 1]
+
+ new_scores = {}
+ new_mapping = {}
+
+ for reg_orig in list(mapping.keys()):
+ reg_curr = mapping[reg_orig]
+ score_curr = scores.get(reg_orig, 0)
+
+ if score_curr == 0:
+ new_scores[reg_orig] = 0
+ new_mapping[reg_orig] = reg_curr
+ continue
+
+ # Erreur sans chirurgie (depuis étape 2)
+ err = errors_history.get(t_prev, {}).get(reg_orig, 0.0)
+ score_prev_no_surgery = score_curr * (1.0 - min(err, 1.0))
+ drop_ratio = 1.0 - (score_prev_no_surgery / score_curr) if score_curr > 0 else 0
+
+ if drop_ratio > SCORE_DROP_THRESHOLD:
+ # ── ISIN du compte courant à t_curr (pour pré-filtre) ──
+ isin_curr = reg_isin_at_date.get(reg_curr, {}).get(t_curr, set())
+
+ # ── Candidats disponibles (non déjà mappés) ──
+ available = all_regs_in_panel - set(mapping_inv.keys()) - {reg_curr}
+
+ best_candidate = None
+ best_score_after = score_prev_no_surgery # baseline = pas de chirurgie
+ best_composite = 0.0
+
+ for j in available:
+ # Pré-filtre rapide : overlap ISIN minimal
+ isin_j = reg_isin_at_date.get(j, {}).get(t_prev, set())
+ if not isin_curr or not isin_j:
+ continue
+ inter = len(isin_curr & isin_j)
+ if inter == 0:
+ continue
+ jac = inter / len(isin_curr | isin_j)
+ if jac < MIN_JACCARD:
+ continue
+
+ # Score après chirurgie avec ce candidat
+ score_after = _recompute_score_with_candidate(
+ reg_curr, j, t_prev, t_curr, panel, flows_idx, score_curr
+ )
+ composite = jac * (score_after / score_curr) if score_curr > 0 else 0
+
+ if score_after > best_score_after:
+ best_score_after = score_after
+ best_candidate = j
+ best_composite = composite
+
+ if best_candidate:
+ surgery_log.append({
+ 'date': t_prev,
+ 'reg_orig': reg_orig,
+ 'reg_from': reg_curr,
+ 'reg_to': best_candidate,
+ 'jaccard_composite': round(best_composite, 4),
+ 'score_before': round(score_curr, 6),
+ 'score_after': round(best_score_after, 6),
+ 'drop_without_surgery': round(drop_ratio, 4),
+ 'gain_vs_no_surgery': round(best_score_after - score_prev_no_surgery, 6),
+ })
+ print(f" 🔧 CHIRURGIE {t_prev.date()} | {reg_orig} : "
+ f"{reg_curr} → {best_candidate} "
+ f"(composite={best_composite:.3f}, "
+ f"score {score_curr:.4f}→{best_score_after:.4f})")
+
+ # Mise à jour mapping
+ if reg_curr in mapping_inv:
+ del mapping_inv[reg_curr]
+ mapping_inv[best_candidate] = reg_orig
+ new_mapping[reg_orig] = best_candidate
+ new_scores[reg_orig] = best_score_after
+ else:
+ new_mapping[reg_orig] = reg_curr
+ new_scores[reg_orig] = score_prev_no_surgery
+ else:
+ new_mapping[reg_orig] = reg_curr
+ new_scores[reg_orig] = score_prev_no_surgery
+
+ mapping = new_mapping
+ mapping_inv = {v: k for k, v in mapping.items()}
+ scores = new_scores
+ mapping_history[t_prev] = dict(mapping)
+ scores_history_corrected[t_prev] = dict(scores)
+
+ total_score = sum(s for s in scores.values() if not np.isnan(s))
+ n_surgeries = sum(1 for op in surgery_log if op['date'] == t_prev)
+ print(f" {t_prev.date()} | Σ scores = {total_score:.4f} | "
+ f"Chirurgies = {n_surgeries}")
+
+ return mapping_history, surgery_log, scores, scores_history_corrected
+
+# ─────────────────────────────────────────────
+# 7. EXPORT RÉSULTATS
+# ─────────────────────────────────────────────
+def export_results(scores_history, mapping_history, surgery_log, all_months, out_prefix="carmignac"):
+ """Exporte les résultats clés en CSV."""
+
+ # Score history
+ rows = []
+ for date, sc in scores_history.items():
+ for reg, score in sc.items():
+ rows.append({'date': date, 'reg_id': reg, 'score': score})
+ df_scores = pd.DataFrame(rows) if rows else pd.DataFrame(columns=['date', 'reg_id', 'score'])
+ if not df_scores.empty:
+ df_scores = df_scores.sort_values(['date', 'score'], ascending=[True, False])
+ df_scores.to_csv(f"repair_results/{out_prefix}_scores.csv", index=False)
+
+ # Mapping history
+ rows_m = []
+ for date, mp in mapping_history.items():
+ for reg_orig, reg_used in mp.items():
+ rows_m.append({'date': date, 'reg_orig': reg_orig, 'reg_used': reg_used,
+ 'changed': reg_orig != reg_used})
+ df_mapping = pd.DataFrame(rows_m) if rows_m else pd.DataFrame(columns=['date', 'reg_orig', 'reg_used', 'changed'])
+ if not df_mapping.empty:
+ df_mapping = df_mapping.sort_values(['date', 'reg_orig'])
+ df_mapping.to_csv(f"repair_results/{out_prefix}_mapping.csv", index=False)
+
+ # Surgery log
+ if surgery_log:
+ df_surgery = pd.DataFrame(surgery_log).sort_values('date')
+ df_surgery.to_csv(f"repair_results/{out_prefix}_surgery_log.csv", index=False)
+ print(f"\n[Export] {len(surgery_log)} opérations de chirurgie sauvegardées.")
+ else:
+ print("\n[Export] Aucune chirurgie effectuée sur ce subset.")
+
+ print(f"[Export] Scores → {out_prefix}_scores.csv")
+ print(f"[Export] Mapping → {out_prefix}_mapping.csv")
+
+ return df_scores, df_mapping
+
+# ─────────────────────────────────────────────
+# 8. PIPELINE PRINCIPAL
+# ─────────────────────────────────────────────
+def run_pipeline(aum_path, flows_path):
+ print("=" * 60)
+ print("CARMIGNAC — Pipeline de réparation des Registrar IDs")
+ print("=" * 60)
+
+ # Chargement
+ aum, flows = load_data(aum_path, flows_path)
+
+ # Étape 1 — Univers de référence
+ aum_ref, weights, universe, t_ref = build_reference_universe(aum)
+
+ print(f"\n Top 5 comptes par poids :")
+ for reg, w in sorted(weights.items(), key=lambda x: -x[1])[:5]:
+ print(f" {reg} : {w:.4f} ({w*100:.2f}%)")
+
+ # Panel mensuel
+ panel, all_months = build_monthly_panel(aum, universe, t_ref)
+
+ # Flows mensuels agrégés
+ monthly_flows = aggregate_flows_monthly(flows, all_months)
+
+ # Étape 2 — Score de cohérence (sans chirurgie)
+ print("\n[Étape 2] Propagation des scores (sans chirurgie)...")
+ scores_history, errors_history, _ = score_propagation(
+ panel, monthly_flows, weights, universe, all_months
+ )
+
+ # Étape 3 — Chirurgie
+ print("\n[Étape 3] Passe de chirurgie...")
+ mapping_history, surgery_log, final_scores, scores_history_corrected = run_surgery_pass(
+ scores_history, errors_history, panel, monthly_flows,
+ weights, universe, all_months
+ )
+
+ # Export — on utilise les scores corrigés (post-chirurgie) comme référence
+ print("\n[Export des résultats...]")
+ df_scores, df_mapping = export_results(
+ scores_history_corrected, mapping_history, surgery_log, all_months
+ )
+
+ # Résumé final
+ print("\n" + "=" * 60)
+ print("RÉSUMÉ FINAL")
+ print("=" * 60)
+ print(f" Dates couvertes : {all_months[0].date()} → {all_months[-1].date()}")
+ print(f" Comptes dans l'univers : {len(universe)}")
+ print(f" Chirurgies effectuées : {len(surgery_log)}")
+ score_by_date = {d: sum(s for s in sc.values() if s == s)
+ for d, sc in scores_history_corrected.items()}
+ print(f" Σ scores à t_ref : {score_by_date[t_ref]:.4f}")
+ print(f" Σ scores à t_min : {score_by_date[all_months[0]]:.4f}")
+
+ return df_scores, df_mapping, surgery_log, scores_history_corrected, mapping_history
+
+
+if __name__ == "__main__":
+ df_scores, df_mapping, surgery_log, scores_history, mapping_history = run_pipeline(
+ "s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv",
+ "s3://projet-bdc-data/carmignac/Flows ENSAE V2 -20251105.csv"
+ )
diff --git a/repair_challenge/repair_results/carmignac_report.html b/repair_challenge/repair_results/carmignac_report_0.1.html
similarity index 100%
rename from repair_challenge/repair_results/carmignac_report.html
rename to repair_challenge/repair_results/carmignac_report_0.1.html