"""
Carmignac Data Challenge — Broken Months Diagnostics
=====================================================
Detects months where the aggregate stock-flow equation is violated
at the ISIN level (across all accounts):
Σ_r Q_{r,s}(t) - Σ_r Q_{r,s}(t-1) ≠ Σ_r F_{r,s}(t-1→t)
The residual is the "missing flow":
missing_{s}(t) = [Q_agg(t) - Q_agg(t-1)] - F_agg(t)
This is a market-level check, independent of individual account identity.
It captures:
- Genuinely missing flow records
- End-of-month accounting lags (transactions dated at boundary)
- Corporate actions (dividends, splits) not reflected in flows
Outputs
-------
carmignac_broken_months.csv — machine-readable, loaded by carmignac_repair.py
carmignac_diagnostics.html — interactive HTML report
Usage
-----
python carmignac_diagnostics.py
python carmignac_diagnostics.py \\
--aum raw_AUM.csv \\
--flows raw_flows.csv \\
--out carmignac_broken_months.csv \\
--html carmignac_diagnostics.html \\
--alpha 0.02
"""
import argparse
import json
import os
import sys
import numpy as np
import pandas as pd
from collections import defaultdict
import s3fs
# ─────────────────────────────────────────────────────────────
# 1. LOAD
# ─────────────────────────────────────────────────────────────
def load_data():
fs = s3fs.S3FileSystem(
client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'},
key = os.environ["AWS_ACCESS_KEY_ID"],
secret = os.environ["AWS_SECRET_ACCESS_KEY"],
token = os.environ["AWS_SESSION_TOKEN"])
with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as f:
flows = pd.read_csv(f, sep=";")
with fs.open('projet-bdc-data//carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f:
aum = pd.read_csv(f, sep=";")
aum['Centralisation Date'] = pd.to_datetime(aum['Centralisation Date'])
flows['Centralisation Date'] = pd.to_datetime(flows['Centralisation Date'])
return aum, flows
# ─────────────────────────────────────────────────────────────
# 2. AGGREGATE AND DETECT BROKEN MONTHS
# ─────────────────────────────────────────────────────────────
def detect_broken_months(aum, flows, alpha=0.02, lag_days=3):
"""
For each (isin, month-end t), compute:
- Q_agg(t) : total shares held across all accounts
- Q_agg(t-1) : idem previous month (forward-filled)
- F_agg(t) : total net flows recorded in ]EOM(t-1), EOM(t)]
- missing(t) : [Q_agg(t) - Q_agg(t-1)] - F_agg(t)
- missing_pct : |missing| / max(Q_agg(t), Q_agg(t-1))
A month is flagged as "broken" when missing_pct > alpha.
Additionally, a month is flagged as a potential "lag" when:
- It is broken with the standard window
- But would NOT be broken if flows dated within lag_days of EOM
are shifted to the adjacent month
Parameters
----------
alpha : tolerance threshold (same as ALPHA in carmignac_repair.py)
lag_days : number of boundary days to test for accounting lag
Returns
-------
df_broken : DataFrame with all (isin, date) pairs where missing_pct > alpha
df_all : Full DataFrame including non-broken months (for plotting)
"""
# Monthly calendar
t_min = aum["Centralisation Date"].min()
t_max = aum["Centralisation Date"].max()
all_months = pd.date_range(t_min, t_max, freq="ME")
# ── Aggregate AUM per (isin, month-end) ──────────────────────
aum_agg = (
aum.groupby(["Product - Isin", "Centralisation Date"])["Quantity - AUM"]
.sum()
.reset_index()
.rename(columns={"Product - Isin": "isin",
"Centralisation Date": "date",
"Quantity - AUM": "qty_agg"})
)
# Forward-fill sparse panel
aum_pivot = aum_agg.pivot(index="date", columns="isin", values="qty_agg")
aum_pivot = aum_pivot.reindex(all_months).ffill()
# ── Aggregate flows per (isin, month-end) — standard window ──
def bucket_flows(flows_df, months, lower_offset=0, upper_offset=0):
"""Aggregate flows with optional boundary extension (in days)."""
fc = flows_df.copy()
def assign_month(d):
# Extended window: ]EOM(t-1) - lower_offset, EOM(t) + upper_offset]
for m in months:
eom_prev = m - pd.offsets.MonthEnd(1)
lo = eom_prev - pd.Timedelta(days=lower_offset)
hi = m + pd.Timedelta(days=upper_offset)
if lo < d <= hi:
return m
return pd.NaT
fc["month_end"] = fc["Centralisation Date"].apply(assign_month)
fc = fc.dropna(subset=["month_end"])
agg = (fc.groupby(["Product - Isin", "month_end"])["Quantity - NetFlows"]
.sum()
.reset_index()
.rename(columns={"Product - Isin": "isin",
"month_end": "date",
"Quantity - NetFlows": "flow_agg"}))
return agg
flows_std = bucket_flows(flows, all_months)
flows_lag = bucket_flows(flows, all_months,
lower_offset=lag_days,
upper_offset=lag_days)
def flows_to_pivot(df, months):
piv = df.pivot(index="date", columns="isin", values="flow_agg")
return piv.reindex(months).fillna(0.0)
fpiv_std = flows_to_pivot(flows_std, all_months)
fpiv_lag = flows_to_pivot(flows_lag, all_months)
# ── Compute residuals ─────────────────────────────────────────
rows = []
isins = aum_pivot.columns.tolist()
for i in range(1, len(all_months)):
t_curr = all_months[i]
t_prev = all_months[i - 1]
for isin in isins:
q_curr = aum_pivot[isin].get(t_curr, np.nan) if isin in aum_pivot.columns else np.nan
q_prev = aum_pivot[isin].get(t_prev, np.nan) if isin in aum_pivot.columns else np.nan
if pd.isna(q_curr) or pd.isna(q_prev):
continue
delta = q_curr - q_prev
# Standard window
f_std = fpiv_std[isin].get(t_curr, 0.0) if isin in fpiv_std.columns else 0.0
missing_std = delta - f_std
# Extended lag window
f_lag = fpiv_lag[isin].get(t_curr, 0.0) if isin in fpiv_lag.columns else 0.0
missing_lag = delta - f_lag
# ── Denominator choice ────────────────────────────────
# Normalise by the size of the *movement* (max of delta_AUM
# and recorded flow), not by the stock level. This avoids
# astronomically large percentages when a position is tiny
# but the missing flow is a normal-sized number.
#
# Interpretation: "what fraction of the expected movement
# is unaccounted for?" 100% = the entire movement is missing.
#
# A minimum absolute threshold (min_abs_shares) suppresses
# noise from residual micro-positions (rounding artefacts).
min_abs_shares = 1.0 # ignore positions smaller than 1 share
movement = max(abs(delta), abs(f_std), min_abs_shares)
denom_std = movement
movement_lag = max(abs(delta), abs(f_lag), min_abs_shares)
denom_lag = movement_lag
pct_std = abs(missing_std) / denom_std
pct_lag = abs(missing_lag) / denom_lag
broken_std = pct_std > alpha
broken_lag = pct_lag > alpha
# A "lag" month: broken with standard, NOT broken with extended window
is_lag = broken_std and (not broken_lag)
rows.append({
"date": t_curr,
"isin": isin,
"q_agg_prev": round(q_prev, 3),
"q_agg_curr": round(q_curr, 3),
"delta_aum": round(delta, 3),
"flow_agg": round(f_std, 3),
"missing_flow": round(missing_std, 3),
"missing_pct": round(pct_std, 6),
"broken": broken_std,
"is_lag": is_lag,
})
df_all = pd.DataFrame(rows)
df_broken = df_all[df_all["broken"]].sort_values("missing_pct", ascending=False)
return df_broken, df_all
# ─────────────────────────────────────────────────────────────
# 2b. AGGREGATE (CROSS-ISIN) BROKEN MONTHS
# ─────────────────────────────────────────────────────────────
def detect_aggregate_broken_months(aum, flows, alpha=0.02, lag_days=3):
"""
Same stock-flow check as detect_broken_months, but aggregated
across ALL ISINs for each month:
Q_total(t) - Q_total(t-1) != F_total(t)
where Q_total(t) = sum over all (reg_id, isin) of Q_{r,s}(t).
This catches months where the global portfolio is incoherent even
if every individual ISIN is fine (e.g. cross-ISIN netting errors),
and provides a cleaner high-level view.
Returns
-------
df_agg : DataFrame indexed by month with columns:
q_total_prev, q_total_curr, delta_aum, flow_total,
missing_flow, missing_pct, broken, is_lag
"""
t_min = aum["Centralisation Date"].min()
t_max = aum["Centralisation Date"].max()
all_months = pd.date_range(t_min, t_max, freq="ME")
# ── Total AUM per month (all ISIN, all accounts) ─────────────
aum_monthly = (
aum.groupby("Centralisation Date")["Quantity - AUM"]
.sum()
.reindex(all_months)
.ffill()
.rename("q_total")
)
# ── Bucket flows helper (reuse same window logic) ─────────────
def bucket_total_flows(flows_df, months, lower_offset=0, upper_offset=0):
fc = flows_df.copy()
def assign_month(d):
for m in months:
eom_prev = m - pd.offsets.MonthEnd(1)
lo = eom_prev - pd.Timedelta(days=lower_offset)
hi = m + pd.Timedelta(days=upper_offset)
if lo < d <= hi:
return m
return pd.NaT
fc["month_end"] = fc["Centralisation Date"].apply(assign_month)
fc = fc.dropna(subset=["month_end"])
return (fc.groupby("month_end")["Quantity - NetFlows"]
.sum()
.reindex(months)
.fillna(0.0))
flow_std = bucket_total_flows(flows, all_months)
flow_lag = bucket_total_flows(flows, all_months,
lower_offset=lag_days, upper_offset=lag_days)
# ── Compute residuals ─────────────────────────────────────────
rows = []
min_abs_shares = 1.0
for i in range(1, len(all_months)):
t_curr = all_months[i]
t_prev = all_months[i - 1]
q_curr = aum_monthly.get(t_curr, np.nan)
q_prev = aum_monthly.get(t_prev, np.nan)
if pd.isna(q_curr) or pd.isna(q_prev):
continue
delta = q_curr - q_prev
f_std = flow_std.get(t_curr, 0.0)
f_lag = flow_lag.get(t_curr, 0.0)
miss_std = delta - f_std
miss_lag = delta - f_lag
movement_std = max(abs(delta), abs(f_std), min_abs_shares)
movement_lag = max(abs(delta), abs(f_lag), min_abs_shares)
pct_std = abs(miss_std) / movement_std
pct_lag = abs(miss_lag) / movement_lag
broken_std = pct_std > alpha
broken_lag = pct_lag > alpha
is_lag = broken_std and (not broken_lag)
rows.append({
"date": t_curr,
"q_total_prev": round(q_prev, 3),
"q_total_curr": round(q_curr, 3),
"delta_aum": round(delta, 3),
"flow_total": round(f_std, 3),
"missing_flow": round(miss_std, 3),
"missing_pct": round(pct_std, 6),
"broken": broken_std,
"is_lag": is_lag,
})
df_agg = pd.DataFrame(rows)
return df_agg
# ─────────────────────────────────────────────────────────────
# 2c. ERROR ACCOUNT
# ─────────────────────────────────────────────────────────────
def build_error_account(aum, flows, lag_days=3):
"""
Builds a synthetic "error account" that absorbs the stock-flow
residuals that cannot be explained by recorded flows.
Construction (backwards from t_ref):
Stock_error(t_ref) = 0 (by definition)
Stock_error(t-1) = Stock_error(t) - Residual(t)
where Residual(t) = [Σ_r Q_{r,s}(t) - Σ_r Q_{r,s}(t-1)] - Σ_r F_{r,s}(t)
for each ISIN s independently.
By construction, adding this error account to the AUM restores the
stock-flow equality at every (isin, month).
Also computes an aggregated error account (summed over all ISINs).
Returns
-------
df_err_isin : DataFrame with columns
(date, isin, residual, stock_error, stock_error_pct)
where stock_error_pct = stock_error / max(total_isin_aum, 1)
df_err_agg : DataFrame with columns
(date, residual_agg, stock_error_agg, stock_error_agg_pct)
"""
t_min = aum["Centralisation Date"].min()
t_max = aum["Centralisation Date"].max()
all_months = pd.date_range(t_min, t_max, freq="ME")
# ── ISIN-level AUM panel (forward-filled) ────────────────────
aum_agg = (
aum.groupby(["Product - Isin", "Centralisation Date"])["Quantity - AUM"]
.sum()
.reset_index()
.rename(columns={"Product - Isin": "isin",
"Centralisation Date": "date",
"Quantity - AUM": "qty"})
)
aum_pivot = aum_agg.pivot(index="date", columns="isin", values="qty")
aum_pivot = aum_pivot.reindex(all_months).ffill()
# ── ISIN-level flow aggregation (standard window) ─────────────
def bucket_isin_flows(flows_df, months):
fc = flows_df.copy()
def assign_month(d):
for m in months:
eom_prev = m - pd.offsets.MonthEnd(1)
if eom_prev < d <= m:
return m
return pd.NaT
fc["month_end"] = fc["Centralisation Date"].apply(assign_month)
fc = fc.dropna(subset=["month_end"])
return (fc.groupby(["Product - Isin", "month_end"])["Quantity - NetFlows"]
.sum()
.unstack("Product - Isin")
.reindex(months)
.fillna(0.0))
flow_pivot = bucket_isin_flows(flows, all_months)
# ── Compute residuals per (isin, month) ───────────────────────
isins = aum_pivot.columns.tolist()
# residual[t] = delta_AUM[t] - flow[t]
residuals = {} # {isin: Series indexed by month}
for isin in isins:
res_series = {}
for i in range(1, len(all_months)):
t_curr = all_months[i]
t_prev = all_months[i - 1]
q_curr = aum_pivot[isin].get(t_curr, np.nan)
q_prev = aum_pivot[isin].get(t_prev, np.nan)
if pd.isna(q_curr) or pd.isna(q_prev):
continue
delta = q_curr - q_prev
f = (flow_pivot[isin].get(t_curr, 0.0)
if isin in flow_pivot.columns else 0.0)
res_series[t_curr] = delta - f
residuals[isin] = pd.Series(res_series)
# ── Build error stock backwards from t_ref ────────────────────
t_ref = all_months[-1]
rows_isin = []
for isin in isins:
res = residuals[isin]
# Maximum AUM for this ISIN (for normalisation)
max_aum = aum_pivot[isin].max()
if pd.isna(max_aum) or max_aum < 1:
max_aum = 1.0
# Propagate backwards: stock(t_ref) = 0
stock = 0.0
# Build dict keyed by date
stock_by_date = {t_ref: 0.0}
for i in range(len(all_months) - 2, -1, -1):
t_curr = all_months[i + 1]
t_prev = all_months[i]
r = res.get(t_curr, 0.0)
stock = stock - r
stock_by_date[t_prev] = stock
for t in all_months:
s = stock_by_date.get(t, np.nan)
r = res.get(t, 0.0)
rows_isin.append({
"date": t,
"isin": isin,
"residual": round(r, 4),
"stock_error": round(s, 4) if not pd.isna(s) else np.nan,
"stock_error_pct": round(abs(s) / max_aum * 100, 4)
if not pd.isna(s) else np.nan,
})
df_err_isin = pd.DataFrame(rows_isin).sort_values(["date", "isin"])
# ── Aggregated error account ──────────────────────────────────
# Total AUM across all ISINs at each month
total_aum_by_month = aum_pivot.sum(axis=1)
max_total_aum = total_aum_by_month.max()
if pd.isna(max_total_aum) or max_total_aum < 1:
max_total_aum = 1.0
# Aggregate residual = sum of ISIN residuals
agg_res = {}
for i in range(1, len(all_months)):
t_curr = all_months[i]
total_r = sum(residuals[isin].get(t_curr, 0.0) for isin in isins)
agg_res[t_curr] = total_r
agg_stock = 0.0
agg_stock_by_date = {t_ref: 0.0}
for i in range(len(all_months) - 2, -1, -1):
t_curr = all_months[i + 1]
t_prev = all_months[i]
agg_stock = agg_stock - agg_res.get(t_curr, 0.0)
agg_stock_by_date[t_prev] = agg_stock
rows_agg = []
for t in all_months:
s = agg_stock_by_date.get(t, np.nan)
r = agg_res.get(t, 0.0)
rows_agg.append({
"date": t,
"residual_agg": round(r, 4),
"stock_error_agg": round(s, 4) if not pd.isna(s) else np.nan,
"stock_error_agg_pct": round(abs(s) / max_total_aum * 100, 4)
if not pd.isna(s) else np.nan,
})
df_err_agg = pd.DataFrame(rows_agg).sort_values("date")
return df_err_isin, df_err_agg
# ─────────────────────────────────────────────────────────────
# 3. PRINT SUMMARY
# ─────────────────────────────────────────────────────────────
def print_summary(df_broken, df_all, alpha):
total = len(df_all)
n_broken = len(df_broken)
n_lag = df_broken["is_lag"].sum()
print("\n" + "=" * 60)
print(" CARMIGNAC — Broken Months Diagnostics")
print("=" * 60)
print(f" (isin, month) pairs examined : {total}")
print(f" Broken (missing_pct > {alpha:.0%}) : {n_broken} "
f"({n_broken/total*100:.1f}%)")
print(f" Of which likely lag : {n_lag}")
print(f" Of which genuine gap : {n_broken - n_lag}")
if n_broken:
print("\n Top 10 by missing_pct:")
cols = ["date", "isin", "missing_flow", "missing_pct", "is_lag"]
print(df_broken[cols].head(10).to_string(index=False))
# Monthly breakdown
by_month = (df_broken.groupby("date")
.agg(n_broken=("isin", "count"),
total_missing=("missing_flow", lambda x: x.abs().sum()))
.sort_values("n_broken", ascending=False)
.head(5))
if len(by_month):
print("\n Most affected months:")
print(by_month.to_string())
print()
# ─────────────────────────────────────────────────────────────
# 4. BUILD HTML REPORT
# ─────────────────────────────────────────────────────────────
def build_html(df_broken, df_all, df_agg, df_err_isin, df_err_agg, alpha):
# ── JS-ready data ────────────────────────────────────────────
# Timeline: n_broken and total_missing per month
tl = (df_all[df_all["broken"]]
.groupby("date")
.agg(n_broken=("isin", "count"),
total_missing=("missing_flow", lambda x: x.abs().sum()),
n_lag=("is_lag", "sum"))
.reindex(df_all["date"].sort_values().unique())
.fillna(0))
tl.index = pd.to_datetime(tl.index)
dates_str = json.dumps([d.strftime("%Y-%m-%d") for d in tl.index])
def jf(arr, dec=4):
return json.dumps([round(float(v), dec) if not np.isnan(v) else None for v in arr])
ISIN_COLORS = [
"#2563eb","#16a34a","#dc2626","#d97706","#7c3aed",
"#0891b2","#db2777","#65a30d","#ea580c","#6366f1",
]
n_broken_js = jf(tl["n_broken"].values, 0)
total_miss_js = jf(tl["total_missing"].values)
n_lag_js = jf(tl["n_lag"].values, 0)
# Aggregate (cross-ISIN) JS data
agg_dates_str = json.dumps([d.strftime("%Y-%m-%d") for d in pd.to_datetime(df_agg["date"])])
agg_delta_js = jf(df_agg["delta_aum"].values)
agg_flow_js = jf(df_agg["flow_total"].values)
agg_missing_js = jf(df_agg["missing_flow"].values)
agg_pct_js = jf((df_agg["missing_pct"] * 100).values)
# Aggregate KPIs
n_agg_broken = int(df_agg["broken"].sum())
n_agg_lag = int(df_agg["is_lag"].sum())
n_agg_genuine = n_agg_broken - n_agg_lag
max_agg_pct = float(df_agg["missing_pct"].max() * 100) if len(df_agg) else 0
# Aggregate detail table rows
agg_rows = []
for _, r in df_agg[df_agg["broken"]].iterrows():
lb = 'lag' if r["is_lag"] else ""
pc = "pct-high" if r["missing_pct"] > 0.1 else "pct-med"
ds = r["date"].strftime("%Y-%m-%d") if hasattr(r["date"], "strftime") else str(r["date"])[:10]
mc = "miss-neg" if r["missing_flow"] < 0 else "miss-pos"
agg_rows.append(
f'
| {ds} | '
f'{r["q_total_prev"]:,.1f} | '
f'{r["q_total_curr"]:,.1f} | '
f'{r["flow_total"]:,.1f} | '
f'{r["missing_flow"]:+,.1f} | '
f'{r["missing_pct"]*100:.2f}% | '
f'{lb} |
'
)
agg_detail_rows = "".join(agg_rows) if agg_rows else (
'| ✓ No broken months at aggregate level |
'
)
# ── Error account JS data ────────────────────────────────────
err_dates_str = json.dumps([d.strftime("%Y-%m-%d") for d in pd.to_datetime(df_err_agg["date"])])
err_agg_stock_js = jf(df_err_agg["stock_error_agg"].values)
err_agg_res_js = jf(df_err_agg["residual_agg"].values)
err_agg_pct_js = jf(df_err_agg["stock_error_agg_pct"].values)
# Top 5 ISINs by max absolute stock error
top_err_isins = (
df_err_isin.groupby("isin")["stock_error"]
.apply(lambda x: x.abs().max())
.nlargest(5).index.tolist()
)
all_err_dates = sorted(df_err_isin["date"].unique())
err_isin_datasets = []
for idx, isin in enumerate(top_err_isins):
sub = (df_err_isin[df_err_isin["isin"] == isin]
.set_index("date")["stock_error"]
.reindex(all_err_dates))
err_isin_datasets.append({
"label": isin,
"data": [round(float(v), 3) if not pd.isna(v) else None for v in sub.values],
"borderColor": ISIN_COLORS[idx % len(ISIN_COLORS)],
"backgroundColor": ISIN_COLORS[idx % len(ISIN_COLORS)] + "22",
"borderWidth": 1.5, "pointRadius": 0, "tension": 0.3, "fill": False,
})
err_isin_ts_json = json.dumps(err_isin_datasets)
err_isin_dates_str = json.dumps([d.strftime("%Y-%m-%d") if hasattr(d, "strftime")
else str(d)[:10] for d in all_err_dates])
# Error account KPIs
max_agg_stock_err = float(df_err_agg["stock_error_agg"].abs().max())
max_agg_stock_pct = float(df_err_agg["stock_error_agg_pct"].max())
# Stationarity proxy: std / mean_abs (lower = more stationary)
agg_std = float(df_err_agg["stock_error_agg"].std())
agg_mean = float(df_err_agg["stock_error_agg"].abs().mean())
stationarity = round(agg_std / max(agg_mean, 1e-9), 3)
# Error account ISIN detail table (worst months per ISIN)
err_worst = (df_err_isin.assign(abs_stock=df_err_isin["stock_error"].abs())
.sort_values("abs_stock", ascending=False)
.head(200))
err_isin_rows = []
for _, r in err_worst.iterrows():
ds = r["date"].strftime("%Y-%m-%d") if hasattr(r["date"], "strftime") else str(r["date"])[:10]
sc = "miss-neg" if r["stock_error"] < 0 else "miss-pos"
rc = "miss-neg" if r["residual"] < 0 else "miss-pos"
pch = "pct-high" if r["stock_error_pct"] > 5 else ("pct-med" if r["stock_error_pct"] > 1 else "")
err_isin_rows.append(
f'| {ds} | '
f'{r["isin"]} | '
f'{r["residual"]:+,.2f} | '
f'{r["stock_error"]:+,.2f} | '
f'{r["stock_error_pct"]:.3f}% |
'
)
err_isin_detail = "".join(err_isin_rows) if err_isin_rows else (
'| ✓ Error account is flat (no residuals) |
'
)
# Per-ISIN summary
isin_sum = (df_broken.groupby("isin")
.agg(n_months=("date", "count"),
avg_pct=("missing_pct", "mean"),
total_abs=("missing_flow", lambda x: x.abs().sum()))
.sort_values("total_abs", ascending=False))
# Per-ISIN missing_pct timeseries for the top 5 ISINs
top_isins = isin_sum.head(5).index.tolist()
all_dates = sorted(df_all["date"].unique())
isin_ts_datasets = []
for idx, isin in enumerate(top_isins):
sub = df_all[df_all["isin"] == isin].set_index("date")["missing_pct"].reindex(all_dates).fillna(0)
isin_ts_datasets.append({
"label": isin,
"data": [round(float(v) * 100, 3) for v in sub.values],
"borderColor": ISIN_COLORS[idx % len(ISIN_COLORS)],
"backgroundColor": ISIN_COLORS[idx % len(ISIN_COLORS)] + "22",
"borderWidth": 2,
"pointRadius": 0,
"tension": 0.3,
"fill": False,
})
isin_ts_json = json.dumps(isin_ts_datasets)
all_dates_str = json.dumps([d.strftime("%Y-%m-%d") if hasattr(d, 'strftime')
else str(d)[:10] for d in all_dates])
# Detail table rows
detail_rows = ""
for _, r in df_broken.head(200).iterrows():
lag_badge = 'lag' if r["is_lag"] else ""
pct_class = "pct-high" if r["missing_pct"] > 0.1 else "pct-med"
detail_rows += f"""
| {r['date'].strftime('%Y-%m-%d') if hasattr(r['date'], 'strftime') else str(r['date'])[:10]} |
{r['isin']} |
{r['q_agg_prev']:,.1f} |
{r['q_agg_curr']:,.1f} |
{r['flow_agg']:,.1f} |
{r['missing_flow']:+,.1f} |
{r['missing_pct']*100:.2f}% |
{lag_badge} |
"""
# ISIN summary table
isin_rows = ""
for isin, row in isin_sum.iterrows():
isin_rows += f"""
| {isin} |
{int(row['n_months'])} |
{row['avg_pct']*100:.2f}% |
{row['total_abs']:,.1f} |
"""
# KPIs
total = len(df_all)
n_broken_kpi = len(df_broken)
n_lag_kpi = int(df_broken["is_lag"].sum())
n_genuine = n_broken_kpi - n_lag_kpi
max_pct = df_broken["missing_pct"].max() * 100 if len(df_broken) else 0
n_isins = df_broken["isin"].nunique()
no_broken_msg = ""
if n_broken_kpi == 0:
no_broken_msg = '✓ No broken months detected at this threshold.
'
html = f"""
Carmignac — Broken Months Diagnostics
(ISIN, month) pairs
{total:,}
examined
Broken months
{n_broken_kpi:,}
{n_broken_kpi/total*100:.1f}% of pairs
Likely lags
{n_lag_kpi}
resolved by ±{3}d window
Genuine gaps
{n_genuine}
unresolved by lag fix
ISINs affected
{n_isins}
distinct ISINs
Max missing %
{max_pct:.1f}%
worst single (isin, month)
00 · Error account — cumulative residuals
Max |stock error|
{max_agg_stock_err:,.1f} shares
Max % of total AUM
{max_agg_stock_pct:.3f}%
Stationarity (σ/μ)
{stationarity:.3f}
lower = more stationary
| Date | ISIN |
Monthly residual |
Cumulative stock |
% of max AUM |
{err_isin_detail}
01 · Aggregate view — all ISINs combined
| Date |
Σ Q(t−1) | Σ Q(t) |
Σ Flow | Missing |
Missing % | |
{agg_detail_rows}
01 · Timeline — per ISIN
02 · By ISIN
{'
No broken months detected.
' if n_broken_kpi == 0 else f"""
| ISIN | Broken months |
Avg missing % | Total |missing| (shares) |
{isin_rows}
"""}
03 · Detail log
Threshold α = {alpha:.1%} · showing up to 200 rows
{'
✓ No broken months detected at this threshold.
' if n_broken_kpi == 0 else f"""
| Date | ISIN |
Q(t-1) | Q(t) |
Net flow | Missing |
Missing % of movement | |
{detail_rows}
"""}
"""
return html
# ─────────────────────────────────────────────────────────────
# 5. MAIN
# ─────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Detect broken months in Carmignac AUM/Flows data"
)
parser.add_argument("--out", default="carmignac_broken_months.csv",
help="Machine-readable output (loaded by carmignac_repair.py)")
parser.add_argument("--html", default="carmignac_diagnostics.html")
parser.add_argument("--alpha", type=float, default=0.02,
help="Tolerance threshold (default 0.02 = 2%%)")
parser.add_argument("--lag", type=int, default=3,
help="Boundary days to test for accounting lag (default 3)")
args = parser.parse_args()
def resolve(p):
if os.path.exists(p):
return p
alt = os.path.join(os.path.dirname(os.path.abspath(__file__)), p)
if os.path.exists(alt):
return alt
sys.exit(f"[ERROR] File not found: {p}")
print("[Load] AUM")
print("[Load] Flows")
aum, flows = load_data()
print(f"\n[Detect] Running broken-month detection (α={args.alpha:.1%}, lag=±{args.lag}d)...")
df_broken, df_all = detect_broken_months(aum, flows, alpha=args.alpha, lag_days=args.lag)
df_agg = detect_aggregate_broken_months(aum, flows, alpha=args.alpha, lag_days=args.lag)
print(f"\n[Error account] Building error account...")
df_err_isin, df_err_agg = build_error_account(aum, flows, lag_days=args.lag)
print_summary(df_broken, df_all, args.alpha)
n_agg_broken = int(df_agg["broken"].sum())
print(f" Aggregate broken months : {n_agg_broken} "
f"(of which lags: {int(df_agg['is_lag'].sum())})")
max_err = float(df_err_agg["stock_error_agg"].abs().max())
print(f" Max aggregate error stock : {max_err:,.1f} shares "
f"({float(df_err_agg['stock_error_agg_pct'].max()):.3f}% of total AUM)")
# CSV output — this is what carmignac_repair.py will load
if len(df_broken):
df_broken.to_csv(args.out, index=False)
print(f"[Export] Broken months CSV → {args.out}")
else:
pd.DataFrame(columns=["date","isin","missing_pct","is_lag"]).to_csv(args.out, index=False)
print(f"[Export] No broken months — empty CSV → {args.out}")
# Error account CSV
err_out = args.out.replace("broken_months", "error_account")
df_err_isin.to_csv(err_out, index=False)
err_agg_out = err_out.replace("error_account", "error_account_agg")
df_err_agg.to_csv(err_agg_out, index=False)
print(f"[Export] Error account (ISIN) → {err_out}")
print(f"[Export] Error account (agg) → {err_agg_out}")
html = build_html(df_broken, df_all, df_agg, df_err_isin, df_err_agg, args.alpha)
with open(args.html, "w", encoding="utf-8") as f:
f.write(html)
print(f"[Export] HTML report → {args.html}")
if __name__ == "__main__":
main()