From 7f285a54dc1eb4ffbbd66e2d5f1c83a11f313540 Mon Sep 17 00:00:00 2001 From: pgoze-ensae Date: Thu, 26 Mar 2026 23:28:24 +0000 Subject: [PATCH] added months flagging --- repair_challenge/carmignac_diagnostics.py | 684 ++++++++++++++++++++++ 1 file changed, 684 insertions(+) create mode 100644 repair_challenge/carmignac_diagnostics.py diff --git a/repair_challenge/carmignac_diagnostics.py b/repair_challenge/carmignac_diagnostics.py new file mode 100644 index 0000000..c36df67 --- /dev/null +++ b/repair_challenge/carmignac_diagnostics.py @@ -0,0 +1,684 @@ +""" +Carmignac Data Challenge — Broken Months Diagnostics +===================================================== +Detects months where the aggregate stock-flow equation is violated +at the ISIN level (across all accounts): + + Σ_r Q_{r,s}(t) - Σ_r Q_{r,s}(t-1) ≠ Σ_r F_{r,s}(t-1→t) + +The residual is the "missing flow": + missing_{s}(t) = [Q_agg(t) - Q_agg(t-1)] - F_agg(t) + +This is a market-level check, independent of individual account identity. +It captures: + - Genuinely missing flow records + - End-of-month accounting lags (transactions dated at boundary) + - Corporate actions (dividends, splits) not reflected in flows + +Outputs +------- + carmignac_broken_months.csv — machine-readable, loaded by carmignac_repair.py + carmignac_diagnostics.html — interactive HTML report + +Usage +----- + python carmignac_diagnostics.py + python carmignac_diagnostics.py \\ + --aum raw_AUM.csv \\ + --flows raw_flows.csv \\ + --out carmignac_broken_months.csv \\ + --html carmignac_diagnostics.html \\ + --alpha 0.02 +""" + +import argparse +import json +import os +import sys + +from collections import defaultdict +import s3fs + +import numpy as np +import pandas as pd + + +# ───────────────────────────────────────────────────────────── +# 1. LOAD +# ───────────────────────────────────────────────────────────── +def load_data(): + fs = s3fs.S3FileSystem( + client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'}, + key = os.environ["AWS_ACCESS_KEY_ID"], + secret = os.environ["AWS_SECRET_ACCESS_KEY"], + token = os.environ["AWS_SESSION_TOKEN"]) + + with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f: + flows = pd.read_csv(f, sep=";") + + with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f: + aum = pd.read_csv(f, sep=";") + + aum['Centralisation Date'] = pd.to_datetime(aum['Centralisation Date']) + flows['Centralisation Date'] = pd.to_datetime(flows['Centralisation Date']) + + return aum, flows + +# ───────────────────────────────────────────────────────────── +# 2. AGGREGATE AND DETECT BROKEN MONTHS +# ───────────────────────────────────────────────────────────── + +def detect_broken_months(aum, flows, alpha=0.1, lag_days=3): + """ + For each (isin, month-end t), compute: + - Q_agg(t) : total shares held across all accounts + - Q_agg(t-1) : idem previous month (forward-filled) + - F_agg(t) : total net flows recorded in ]EOM(t-1), EOM(t)] + - missing(t) : [Q_agg(t) - Q_agg(t-1)] - F_agg(t) + - missing_pct : |missing| / max(Q_agg(t), Q_agg(t-1)) + + A month is flagged as "broken" when missing_pct > alpha. + + Additionally, a month is flagged as a potential "lag" when: + - It is broken with the standard window + - But would NOT be broken if flows dated within lag_days of EOM + are shifted to the adjacent month + + Parameters + ---------- + alpha : tolerance threshold (same as ALPHA in carmignac_repair.py) + lag_days : number of boundary days to test for accounting lag + + Returns + ------- + df_broken : DataFrame with all (isin, date) pairs where missing_pct > alpha + df_all : Full DataFrame including non-broken months (for plotting) + """ + # Monthly calendar + t_min = aum["Centralisation Date"].min() + t_max = aum["Centralisation Date"].max() + all_months = pd.date_range(t_min, t_max, freq="ME") + + # ── Aggregate AUM per (isin, month-end) ────────────────────── + aum_agg = ( + aum.groupby(["Product - Isin", "Centralisation Date"])["Quantity - AUM"] + .sum() + .reset_index() + .rename(columns={"Product - Isin": "isin", + "Centralisation Date": "date", + "Quantity - AUM": "qty_agg"}) + ) + # Forward-fill sparse panel + aum_pivot = aum_agg.pivot(index="date", columns="isin", values="qty_agg") + aum_pivot = aum_pivot.reindex(all_months).ffill() + + # ── Aggregate flows per (isin, month-end) — standard window ── + def bucket_flows(flows_df, months, lower_offset=0, upper_offset=0): + """Aggregate flows with optional boundary extension (in days).""" + fc = flows_df.copy() + + def assign_month(d): + # Extended window: ]EOM(t-1) - lower_offset, EOM(t) + upper_offset] + for m in months: + eom_prev = m - pd.offsets.MonthEnd(1) + lo = eom_prev - pd.Timedelta(days=lower_offset) + hi = m + pd.Timedelta(days=upper_offset) + if lo < d <= hi: + return m + return pd.NaT + + fc["month_end"] = fc["Centralisation Date"].apply(assign_month) + fc = fc.dropna(subset=["month_end"]) + agg = (fc.groupby(["Product - Isin", "month_end"])["Quantity - NetFlows"] + .sum() + .reset_index() + .rename(columns={"Product - Isin": "isin", + "month_end": "date", + "Quantity - NetFlows": "flow_agg"})) + return agg + + flows_std = bucket_flows(flows, all_months) + flows_lag = bucket_flows(flows, all_months, + lower_offset=lag_days, + upper_offset=lag_days) + + def flows_to_pivot(df, months): + piv = df.pivot(index="date", columns="isin", values="flow_agg") + return piv.reindex(months).fillna(0.0) + + fpiv_std = flows_to_pivot(flows_std, all_months) + fpiv_lag = flows_to_pivot(flows_lag, all_months) + + # ── Compute residuals ───────────────────────────────────────── + rows = [] + isins = aum_pivot.columns.tolist() + + for i in range(1, len(all_months)): + t_curr = all_months[i] + t_prev = all_months[i - 1] + + for isin in isins: + q_curr = aum_pivot[isin].get(t_curr, np.nan) if isin in aum_pivot.columns else np.nan + q_prev = aum_pivot[isin].get(t_prev, np.nan) if isin in aum_pivot.columns else np.nan + + if pd.isna(q_curr) or pd.isna(q_prev): + continue + + delta = q_curr - q_prev + denom = max(abs(q_curr), abs(q_prev), 1e-9) + + # Standard window + f_std = fpiv_std[isin].get(t_curr, 0.0) if isin in fpiv_std.columns else 0.0 + missing_std = delta - f_std + pct_std = abs(missing_std) / denom + + # Extended lag window + f_lag = fpiv_lag[isin].get(t_curr, 0.0) if isin in fpiv_lag.columns else 0.0 + missing_lag = delta - f_lag + pct_lag = abs(missing_lag) / denom + + broken_std = pct_std > alpha + broken_lag = pct_lag > alpha + + # A "lag" month: broken with standard, NOT broken with extended window + is_lag = broken_std and (not broken_lag) + + rows.append({ + "date": t_curr, + "isin": isin, + "q_agg_prev": round(q_prev, 3), + "q_agg_curr": round(q_curr, 3), + "delta_aum": round(delta, 3), + "flow_agg": round(f_std, 3), + "missing_flow": round(missing_std, 3), + "missing_pct": round(pct_std, 6), + "broken": broken_std, + "is_lag": is_lag, + }) + + df_all = pd.DataFrame(rows) + df_broken = df_all[df_all["broken"]].sort_values("missing_pct", ascending=False) + return df_broken, df_all + + +# ───────────────────────────────────────────────────────────── +# 3. PRINT SUMMARY +# ───────────────────────────────────────────────────────────── + +def print_summary(df_broken, df_all, alpha): + total = len(df_all) + n_broken = len(df_broken) + n_lag = df_broken["is_lag"].sum() + + print("\n" + "=" * 60) + print(" CARMIGNAC — Broken Months Diagnostics") + print("=" * 60) + print(f" (isin, month) pairs examined : {total}") + print(f" Broken (missing_pct > {alpha:.0%}) : {n_broken} " + f"({n_broken/total*100:.1f}%)") + print(f" Of which likely lag : {n_lag}") + print(f" Of which genuine gap : {n_broken - n_lag}") + + if n_broken: + print("\n Top 10 by missing_pct:") + cols = ["date", "isin", "missing_flow", "missing_pct", "is_lag"] + print(df_broken[cols].head(10).to_string(index=False)) + + # Monthly breakdown + by_month = (df_broken.groupby("date") + .agg(n_broken=("isin", "count"), + total_missing=("missing_flow", lambda x: x.abs().sum())) + .sort_values("n_broken", ascending=False) + .head(5)) + if len(by_month): + print("\n Most affected months:") + print(by_month.to_string()) + print() + + +# ───────────────────────────────────────────────────────────── +# 4. BUILD HTML REPORT +# ───────────────────────────────────────────────────────────── + +def build_html(df_broken, df_all, alpha): + # ── JS-ready data ──────────────────────────────────────────── + # Timeline: n_broken and total_missing per month + tl = (df_all[df_all["broken"]] + .groupby("date") + .agg(n_broken=("isin", "count"), + total_missing=("missing_flow", lambda x: x.abs().sum()), + n_lag=("is_lag", "sum")) + .reindex(df_all["date"].sort_values().unique()) + .fillna(0)) + tl.index = pd.to_datetime(tl.index) + dates_str = json.dumps([d.strftime("%Y-%m-%d") for d in tl.index]) + + def jf(arr, dec=4): + return json.dumps([round(float(v), dec) if not np.isnan(v) else None for v in arr]) + + n_broken_js = jf(tl["n_broken"].values, 0) + total_miss_js = jf(tl["total_missing"].values) + n_lag_js = jf(tl["n_lag"].values, 0) + + # Per-ISIN summary + isin_sum = (df_broken.groupby("isin") + .agg(n_months=("date", "count"), + avg_pct=("missing_pct", "mean"), + total_abs=("missing_flow", lambda x: x.abs().sum())) + .sort_values("total_abs", ascending=False)) + + ISIN_COLORS = [ + "#2563eb","#16a34a","#dc2626","#d97706","#7c3aed", + "#0891b2","#db2777","#65a30d","#ea580c","#6366f1", + ] + + # Per-ISIN missing_pct timeseries for the top 5 ISINs + top_isins = isin_sum.head(5).index.tolist() + all_dates = sorted(df_all["date"].unique()) + isin_ts_datasets = [] + for idx, isin in enumerate(top_isins): + sub = df_all[df_all["isin"] == isin].set_index("date")["missing_pct"].reindex(all_dates).fillna(0) + isin_ts_datasets.append({ + "label": isin, + "data": [round(float(v) * 100, 3) for v in sub.values], + "borderColor": ISIN_COLORS[idx % len(ISIN_COLORS)], + "backgroundColor": ISIN_COLORS[idx % len(ISIN_COLORS)] + "22", + "borderWidth": 2, + "pointRadius": 0, + "tension": 0.3, + "fill": False, + }) + isin_ts_json = json.dumps(isin_ts_datasets) + all_dates_str = json.dumps([d.strftime("%Y-%m-%d") if hasattr(d, 'strftime') + else str(d)[:10] for d in all_dates]) + + # Detail table rows + detail_rows = "" + for _, r in df_broken.head(200).iterrows(): + lag_badge = 'lag' if r["is_lag"] else "" + pct_class = "pct-high" if r["missing_pct"] > 0.1 else "pct-med" + detail_rows += f""" + + {r['date'].strftime('%Y-%m-%d') if hasattr(r['date'], 'strftime') else str(r['date'])[:10]} + {r['isin']} + {r['q_agg_prev']:,.1f} + {r['q_agg_curr']:,.1f} + {r['flow_agg']:,.1f} + {r['missing_flow']:+,.1f} + {r['missing_pct']*100:.2f}% + {lag_badge} + """ + + # ISIN summary table + isin_rows = "" + for isin, row in isin_sum.iterrows(): + isin_rows += f""" + + {isin} + {int(row['n_months'])} + {row['avg_pct']*100:.2f}% + {row['total_abs']:,.1f} + """ + + # KPIs + total = len(df_all) + n_broken_kpi = len(df_broken) + n_lag_kpi = int(df_broken["is_lag"].sum()) + n_genuine = n_broken_kpi - n_lag_kpi + max_pct = df_broken["missing_pct"].max() * 100 if len(df_broken) else 0 + n_isins = df_broken["isin"].nunique() + + no_broken_msg = "" + if n_broken_kpi == 0: + no_broken_msg = '
✓ No broken months detected at this threshold.
' + + html = f""" + + + + +Carmignac — Broken Months Diagnostics + + + + + +
+
Carmignac × ENSAE · Data Challenge 2025
+

Broken Months Diagnostics

+
+ Aggregate stock-flow equation check · ISIN level · threshold α = {alpha:.1%} +
+
+ +
+
+ (ISIN, month) pairs + {total:,} + examined +
+
+ Broken months + {n_broken_kpi:,} + {n_broken_kpi/total*100:.1f}% of pairs +
+
+ Likely lags + {n_lag_kpi} + resolved by ±{3}d window +
+
+ Genuine gaps + {n_genuine} + unresolved by lag fix +
+
+ ISINs affected + {n_isins} + distinct ISINs +
+
+ Max missing % + {max_pct:.1f}% + worst single (isin, month) +
+
+ +
+ + + +
+
+ Broken (isin, month) pairs per month + Stacked: genuine gaps (red) vs likely accounting lags (amber) +
+
+
+
+
+ +
+
+
+ Total absolute missing flow per month + Sum of |missing flow| across all broken ISINs +
+
+
+
+
+ +
+
+ Missing % — top 5 ISINs over time + |missing| / max(Q(t), Q(t-1)) per ISIN +
+
+
+
+
+
+ + + +
+
+ ISIN summary — most affected +
+
+ {'
No broken months detected.
' if n_broken_kpi == 0 else f""" + + + + + + {isin_rows} +
ISINBroken monthsAvg missing %Total |missing| (shares)
"""} +
+
+ + + +
+
+ All broken (isin, month) pairs + + lag = likely resolved by extending flow window ±3 days + +
+
Threshold α = {alpha:.1%} · showing up to 200 rows
+
+ {'
✓ No broken months detected at this threshold.
' if n_broken_kpi == 0 else f""" + + + + + + + + {detail_rows} +
DateISINQ(t-1)Q(t)Net flowMissingMissing %
"""} +
+
+ +
+ + + + +""" + return html + + +# ───────────────────────────────────────────────────────────── +# 5. MAIN +# ───────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Detect broken months in Carmignac AUM/Flows data" + ) + parser.add_argument("--out", default="carmignac_broken_months.csv", + help="Machine-readable output (loaded by carmignac_repair.py)") + parser.add_argument("--html", default="carmignac_diagnostics.html") + parser.add_argument("--alpha", type=float, default=0.02, + help="Tolerance threshold (default 0.02 = 2%%)") + parser.add_argument("--lag", type=int, default=3, + help="Boundary days to test for accounting lag (default 3)") + args = parser.parse_args() + + def resolve(p): + if os.path.exists(p): + return p + alt = os.path.join(os.path.dirname(os.path.abspath(__file__)), p) + if os.path.exists(alt): + return alt + sys.exit(f"[ERROR] File not found: {p}") + + print("[Load] AUM") + print("[Load] Flows") + aum, flows = load_data() + + print(f"\n[Detect] Running broken-month detection (α={args.alpha:.1%}, lag=±{args.lag}d)...") + df_broken, df_all = detect_broken_months(aum, flows, alpha=args.alpha, lag_days=args.lag) + + print_summary(df_broken, df_all, args.alpha) + + # CSV output — this is what carmignac_repair.py will load + if len(df_broken): + df_broken.to_csv(args.out, index=False) + print(f"[Export] Broken months CSV → {args.out}") + else: + # Write empty file so repair pipeline can always try to load it + pd.DataFrame(columns=["date", "isin", "missing_pct", "is_lag"]).to_csv(args.out, index=False) + print(f"[Export] No broken months — empty CSV → {args.out}") + + html = build_html(df_broken, df_all, args.alpha) + with open(args.html, "w", encoding="utf-8") as f: + f.write(html) + print(f"[Export] HTML report → {args.html}") + + +if __name__ == "__main__": + main()