""" Broken Months Diagnostics ===================================================== Detects months where the aggregate stock-flow equation is violated at the ISIN level (across all accounts) The residual is the "missing flow": missing_{s}(t) = [Q_agg(t) - Q_agg(t-1)] - F_agg(t) This is a market-level check, independent of individual account identity. It captures: - Genuinely missing flow records - End-of-month accounting lags (transactions dated at boundary) - Corporate actions (dividends, splits) not reflected in flows Outputs ------- carmignac_broken_months.csv — machine-readable, loaded by carmignac_repair.py carmignac_diagnostics.html — interactive HTML report Usage ----- python carmignac_diagnostics.py python carmignac_diagnostics.py \\ --aum raw_AUM.csv \\ --flows raw_flows.csv \\ --out carmignac_broken_months.csv \\ --html carmignac_diagnostics.html \\ --alpha 0.02 """ import argparse import os import sys import numpy as np import pandas as pd from helpers import build_html_diagnostics, load_data_diagnostics # ───────────────────────────────────────────────────────────── # AGGREGATE AND DETECT BROKEN MONTHS # ───────────────────────────────────────────────────────────── def detect_broken_months(aum, flows, alpha=0.02, lag_days=3): """ For each (isin, month-end t), compute: - Q_agg(t) : total shares held across all accounts - Q_agg(t-1) : idem previous month (forward-filled) - F_agg(t) : total net flows recorded in ]EOM(t-1), EOM(t)] - missing(t) : [Q_agg(t) - Q_agg(t-1)] - F_agg(t) - missing_pct : |missing| / max(Q_agg(t), Q_agg(t-1)) A month is flagged as "broken" when missing_pct > alpha. Additionally, a month is flagged as a potential "lag" when: - It is broken with the standard window - But would NOT be broken if flows dated within lag_days of EOM are shifted to the adjacent month Parameters : alpha : tolerance threshold (same as ALPHA in carmignac_repair.py) lag_days : number of boundary days to test for accounting lag Returns : df_broken : DataFrame with all (isin, date) pairs where missing_pct > alpha df_all : Full DataFrame including non-broken months (for plotting) """ # Monthly calendar t_min = aum["Centralisation Date"].min() t_max = aum["Centralisation Date"].max() all_months = pd.date_range(t_min, t_max, freq="ME") # ── Aggregate AUM per (isin, month-end) ────────────────────── aum_agg = ( aum.groupby(["Product - Isin", "Centralisation Date"])["Quantity - AUM"] .sum() .reset_index() .rename( columns={ "Product - Isin": "isin", "Centralisation Date": "date", "Quantity - AUM": "qty_agg", } ) ) # Forward-fill sparse panel aum_pivot = aum_agg.pivot(index="date", columns="isin", values="qty_agg") aum_pivot = aum_pivot.reindex(all_months).ffill() # ── Aggregate flows per (isin, month-end) — standard window ── def bucket_flows(flows_df, months, lower_offset=0, upper_offset=0): """Aggregate flows with optional boundary extension (in days).""" fc = flows_df.copy() def assign_month(d): # Extended window: ]EOM(t-1) - lower_offset, EOM(t) + upper_offset] for m in months: eom_prev = m - pd.offsets.MonthEnd(1) lo = eom_prev - pd.Timedelta(days=lower_offset) hi = m + pd.Timedelta(days=upper_offset) if lo < d <= hi: return m return pd.NaT fc["month_end"] = fc["Centralisation Date"].apply(assign_month) fc = fc.dropna(subset=["month_end"]) agg = ( fc.groupby(["Product - Isin", "month_end"])["Quantity - NetFlows"] .sum() .reset_index() .rename( columns={ "Product - Isin": "isin", "month_end": "date", "Quantity - NetFlows": "flow_agg", } ) ) return agg flows_std = bucket_flows(flows, all_months) flows_lag = bucket_flows( flows, all_months, lower_offset=lag_days, upper_offset=lag_days ) def flows_to_pivot(df, months): piv = df.pivot(index="date", columns="isin", values="flow_agg") return piv.reindex(months).fillna(0.0) fpiv_std = flows_to_pivot(flows_std, all_months) fpiv_lag = flows_to_pivot(flows_lag, all_months) # ── Compute residuals ───────────────────────────────────────── rows = [] isins = aum_pivot.columns.tolist() for i in range(1, len(all_months)): t_curr = all_months[i] t_prev = all_months[i - 1] for isin in isins: q_curr = ( aum_pivot[isin].get(t_curr, np.nan) if isin in aum_pivot.columns else np.nan ) q_prev = ( aum_pivot[isin].get(t_prev, np.nan) if isin in aum_pivot.columns else np.nan ) if pd.isna(q_curr) or pd.isna(q_prev): continue delta = q_curr - q_prev # Standard window f_std = fpiv_std[isin].get(t_curr, 0.0) if isin in fpiv_std.columns else 0.0 missing_std = delta - f_std # Extended lag window f_lag = fpiv_lag[isin].get(t_curr, 0.0) if isin in fpiv_lag.columns else 0.0 missing_lag = delta - f_lag # ── Denominator choice ──────────────────────────────── # Normalise by the size of the *movement* (max of delta_AUM # and recorded flow), not by the stock level. This avoids # astronomically large percentages when a position is tiny # but the missing flow is a normal-sized number. # # Interpretation: "what fraction of the expected movement # is unaccounted for?" # # A minimum absolute threshold (min_abs_shares) suppresses # noise from residual micro-positions (rounding artefacts). min_abs_shares = 1.0 # ignore positions smaller than 1 share movement = max(abs(delta), abs(f_std), min_abs_shares) denom_std = movement movement_lag = max(abs(delta), abs(f_lag), min_abs_shares) denom_lag = movement_lag pct_std = abs(missing_std) / denom_std pct_lag = abs(missing_lag) / denom_lag broken_std = pct_std > alpha broken_lag = pct_lag > alpha # A "lag" month: broken with standard, NOT broken with extended window is_lag = broken_std and (not broken_lag) rows.append( { "date": t_curr, "isin": isin, "q_agg_prev": round(q_prev, 3), "q_agg_curr": round(q_curr, 3), "delta_aum": round(delta, 3), "flow_agg": round(f_std, 3), "missing_flow": round(missing_std, 3), "missing_pct": round(pct_std, 6), "broken": broken_std, "is_lag": is_lag, } ) df_all = pd.DataFrame(rows) df_broken = df_all[df_all["broken"]].sort_values("missing_pct", ascending=False) return df_broken, df_all # ───────────────────────────────────────────────────────────── # AGGREGATE (CROSS-ISIN) BROKEN MONTHS # ───────────────────────────────────────────────────────────── def detect_aggregate_broken_months(aum, flows, alpha=0.02, lag_days=3): """ Same stock-flow check as detect_broken_months, but aggregated across ALL ISINs for each month: Q_total(t) - Q_total(t-1) != F_total(t) where Q_total(t) = sum over all (reg_id, isin) of Q_{r,s}(t). This catches months where the global portfolio is incoherent even if every individual ISIN is fine (e.g. cross-ISIN netting errors), and provides a cleaner high-level view. Returns : df_agg : DataFrame indexed by month with columns: q_total_prev, q_total_curr, delta_aum, flow_total, missing_flow, missing_pct, broken, is_lag """ t_min = aum["Centralisation Date"].min() t_max = aum["Centralisation Date"].max() all_months = pd.date_range(t_min, t_max, freq="ME") # ── Total AUM per month (all ISIN, all accounts) ───────────── aum_monthly = ( aum.groupby("Centralisation Date")["Quantity - AUM"] .sum() .reindex(all_months) .ffill() .rename("q_total") ) # ── Bucket flows helper (reuse same window logic) ───────────── def bucket_total_flows(flows_df, months, lower_offset=0, upper_offset=0): fc = flows_df.copy() def assign_month(d): for m in months: eom_prev = m - pd.offsets.MonthEnd(1) lo = eom_prev - pd.Timedelta(days=lower_offset) hi = m + pd.Timedelta(days=upper_offset) if lo < d <= hi: return m return pd.NaT fc["month_end"] = fc["Centralisation Date"].apply(assign_month) fc = fc.dropna(subset=["month_end"]) return ( fc.groupby("month_end")["Quantity - NetFlows"] .sum() .reindex(months) .fillna(0.0) ) flow_std = bucket_total_flows(flows, all_months) flow_lag = bucket_total_flows( flows, all_months, lower_offset=lag_days, upper_offset=lag_days ) # ── Compute residuals ───────────────────────────────────────── rows = [] min_abs_shares = 1.0 for i in range(1, len(all_months)): t_curr = all_months[i] t_prev = all_months[i - 1] q_curr = aum_monthly.get(t_curr, np.nan) q_prev = aum_monthly.get(t_prev, np.nan) if pd.isna(q_curr) or pd.isna(q_prev): continue delta = q_curr - q_prev f_std = flow_std.get(t_curr, 0.0) f_lag = flow_lag.get(t_curr, 0.0) miss_std = delta - f_std miss_lag = delta - f_lag movement_std = max(abs(delta), abs(f_std), min_abs_shares) movement_lag = max(abs(delta), abs(f_lag), min_abs_shares) pct_std = abs(miss_std) / movement_std pct_lag = abs(miss_lag) / movement_lag broken_std = pct_std > alpha broken_lag = pct_lag > alpha is_lag = broken_std and (not broken_lag) rows.append( { "date": t_curr, "q_total_prev": round(q_prev, 3), "q_total_curr": round(q_curr, 3), "delta_aum": round(delta, 3), "flow_total": round(f_std, 3), "missing_flow": round(miss_std, 3), "missing_pct": round(pct_std, 6), "broken": broken_std, "is_lag": is_lag, } ) df_agg = pd.DataFrame(rows) return df_agg # ───────────────────────────────────────────────────────────── # ERROR ACCOUNT # ───────────────────────────────────────────────────────────── def build_error_account(aum, flows, lag_days=3): """ Builds a synthetic "error account" that absorbs the stock-flow residuals that cannot be explained by recorded flows. Construction (backwards from t_ref): Stock_error(t_ref) = 0 (by definition) Stock_error(t-1) = Stock_error(t) - Residual(t) where Residual(t) = [Σ_r Q_{r,s}(t) - Σ_r Q_{r,s}(t-1)] - Σ_r F_{r,s}(t) for each ISIN s independently. By construction, adding this error account to the AUM restores the stock-flow equality at every (isin, month). Also computes an aggregated error account (summed over all ISINs). Returns ------- df_err_isin : DataFrame with columns (date, isin, residual, stock_error, stock_error_pct) where stock_error_pct = stock_error / max(total_isin_aum, 1) df_err_agg : DataFrame with columns (date, residual_agg, stock_error_agg, stock_error_agg_pct) """ t_min = aum["Centralisation Date"].min() t_max = aum["Centralisation Date"].max() all_months = pd.date_range(t_min, t_max, freq="ME") # ── ISIN-level AUM panel (forward-filled) ──────────────────── aum_agg = ( aum.groupby(["Product - Isin", "Centralisation Date"])["Quantity - AUM"] .sum() .reset_index() .rename( columns={ "Product - Isin": "isin", "Centralisation Date": "date", "Quantity - AUM": "qty", } ) ) aum_pivot = aum_agg.pivot(index="date", columns="isin", values="qty") aum_pivot = aum_pivot.reindex(all_months).ffill() # ── ISIN-level flow aggregation (standard window) ───────────── def bucket_isin_flows(flows_df, months): fc = flows_df.copy() def assign_month(d): for m in months: eom_prev = m - pd.offsets.MonthEnd(1) if eom_prev < d <= m: return m return pd.NaT fc["month_end"] = fc["Centralisation Date"].apply(assign_month) fc = fc.dropna(subset=["month_end"]) return ( fc.groupby(["Product - Isin", "month_end"])["Quantity - NetFlows"] .sum() .unstack("Product - Isin") .reindex(months) .fillna(0.0) ) flow_pivot = bucket_isin_flows(flows, all_months) # ── Compute residuals per (isin, month) ─────────────────────── isins = aum_pivot.columns.tolist() # residual[t] = delta_AUM[t] - flow[t] residuals = {} # {isin: Series indexed by month} for isin in isins: res_series = {} for i in range(1, len(all_months)): t_curr = all_months[i] t_prev = all_months[i - 1] q_curr = aum_pivot[isin].get(t_curr, np.nan) q_prev = aum_pivot[isin].get(t_prev, np.nan) if pd.isna(q_curr) or pd.isna(q_prev): continue delta = q_curr - q_prev f = flow_pivot[isin].get(t_curr, 0.0) if isin in flow_pivot.columns else 0.0 res_series[t_curr] = delta - f residuals[isin] = pd.Series(res_series) # ── Build error stock backwards from t_ref ──────────────────── t_ref = all_months[-1] rows_isin = [] for isin in isins: res = residuals[isin] # Maximum AUM for this ISIN (for normalisation) max_aum = aum_pivot[isin].max() if pd.isna(max_aum) or max_aum < 1: max_aum = 1.0 # Propagate backwards: stock(t_ref) = 0 stock = 0.0 # Build dict keyed by date stock_by_date = {t_ref: 0.0} for i in range(len(all_months) - 2, -1, -1): t_curr = all_months[i + 1] t_prev = all_months[i] r = res.get(t_curr, 0.0) stock = stock - r stock_by_date[t_prev] = stock for t in all_months: s = stock_by_date.get(t, np.nan) r = res.get(t, 0.0) rows_isin.append( { "date": t, "isin": isin, "residual": round(r, 4), "stock_error": round(s, 4) if not pd.isna(s) else np.nan, "stock_error_pct": round(abs(s) / max_aum * 100, 4) if not pd.isna(s) else np.nan, } ) df_err_isin = pd.DataFrame(rows_isin).sort_values(["date", "isin"]) # ── Aggregated error account ────────────────────────────────── # Total AUM across all ISINs at each month total_aum_by_month = aum_pivot.sum(axis=1) max_total_aum = total_aum_by_month.max() if pd.isna(max_total_aum) or max_total_aum < 1: max_total_aum = 1.0 # Aggregate residual = sum of ISIN residuals agg_res = {} for i in range(1, len(all_months)): t_curr = all_months[i] total_r = sum(residuals[isin].get(t_curr, 0.0) for isin in isins) agg_res[t_curr] = total_r agg_stock = 0.0 agg_stock_by_date = {t_ref: 0.0} for i in range(len(all_months) - 2, -1, -1): t_curr = all_months[i + 1] t_prev = all_months[i] agg_stock = agg_stock - agg_res.get(t_curr, 0.0) agg_stock_by_date[t_prev] = agg_stock rows_agg = [] for t in all_months: s = agg_stock_by_date.get(t, np.nan) r = agg_res.get(t, 0.0) rows_agg.append( { "date": t, "residual_agg": round(r, 4), "stock_error_agg": round(s, 4) if not pd.isna(s) else np.nan, "stock_error_agg_pct": round(abs(s) / max_total_aum * 100, 4) if not pd.isna(s) else np.nan, } ) df_err_agg = pd.DataFrame(rows_agg).sort_values("date") return df_err_isin, df_err_agg # ───────────────────────────────────────────────────────────── # PRINT SUMMARY # ───────────────────────────────────────────────────────────── def print_summary(df_broken, df_all, alpha): total = len(df_all) n_broken = len(df_broken) n_lag = df_broken["is_lag"].sum() print("\n" + "=" * 60) print(" CARMIGNAC — Broken Months Diagnostics") print("=" * 60) print(f" (isin, month) pairs examined : {total}") print( f" Broken (missing_pct > {alpha:.0%}) : {n_broken} " f"({n_broken / total * 100:.1f}%)" ) print(f" Of which likely lag : {n_lag}") print(f" Of which genuine gap : {n_broken - n_lag}") if n_broken: print("\n Top 10 by missing_pct:") cols = ["date", "isin", "missing_flow", "missing_pct", "is_lag"] print(df_broken[cols].head(10).to_string(index=False)) # Monthly breakdown by_month = ( df_broken.groupby("date") .agg( n_broken=("isin", "count"), total_missing=("missing_flow", lambda x: x.abs().sum()), ) .sort_values("n_broken", ascending=False) .head(5) ) if len(by_month): print("\n Most affected months:") print(by_month.to_string()) print() # ───────────────────────────────────────────────────────────── # MAIN # ───────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="Detect broken months in Carmignac AUM/Flows data" ) parser.add_argument( "--out", default="carmignac_broken_months.csv", help="Machine-readable output (loaded by carmignac_repair.py)", ) parser.add_argument("--html", default="carmignac_diagnostics.html") parser.add_argument( "--alpha", type=float, default=0.05, help="Tolerance threshold (default 0.05 = 5%%)", ) parser.add_argument( "--lag", type=int, default=3, help="Boundary days to test for accounting lag (default 3)", ) args = parser.parse_args() def resolve(p): if os.path.exists(p): return p alt = os.path.join(os.path.dirname(os.path.abspath(__file__)), p) if os.path.exists(alt): return alt sys.exit(f"[ERROR] File not found: {p}") print("[Load] AUM") print("[Load] Flows") aum, flows = load_data_diagnostics() print( f"\n[Detect] Running broken-month detection (α={args.alpha:.1%}, lag=±{args.lag}d)..." ) df_broken, df_all = detect_broken_months( aum, flows, alpha=args.alpha, lag_days=args.lag ) df_agg = detect_aggregate_broken_months( aum, flows, alpha=args.alpha, lag_days=args.lag ) print("\n[Error account] Building error account...") df_err_isin, df_err_agg = build_error_account(aum, flows, lag_days=args.lag) print_summary(df_broken, df_all, args.alpha) n_agg_broken = int(df_agg["broken"].sum()) print( f" Aggregate broken months : {n_agg_broken} " f"(of which lags: {int(df_agg['is_lag'].sum())})" ) max_err = float(df_err_agg["stock_error_agg"].abs().max()) print( f" Max aggregate error stock : {max_err:,.1f} shares " f"({float(df_err_agg['stock_error_agg_pct'].max()):.3f}% of total AUM)" ) # CSV output — this is what carmignac_repair.py loads if len(df_broken): df_broken.to_csv(args.out, index=False) print(f"[Export] Broken months CSV → {args.out}") else: pd.DataFrame(columns=["date", "isin", "missing_pct", "is_lag"]).to_csv( args.out, index=False ) print(f"[Export] No broken months — empty CSV → {args.out}") # Error account CSV err_out = args.out.replace("broken_months", "error_account") df_err_isin.to_csv(err_out, index=False) err_agg_out = err_out.replace("error_account", "error_account_agg") df_err_agg.to_csv(err_agg_out, index=False) print(f"[Export] Error account (ISIN) → {err_out}") print(f"[Export] Error account (agg) → {err_agg_out}") html = build_html_diagnostics( df_broken, df_all, df_agg, df_err_isin, df_err_agg, args.alpha ) with open(args.html, "w", encoding="utf-8") as f: f.write(html) print(f"[Export] HTML report → {args.html}") if __name__ == "__main__": main()