{ "nbformat": 4, "nbformat_minor": 5, "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.13.0" } }, "cells": [ { "cell_type": "markdown", "metadata": {}, "source": "# AUM–Flow Consistency Analysis\n\n5 sections:\n1. Imports & Data Loading\n2. Build Panel\n3. Rupture Detection (data-driven threshold)\n4. Level Shift Repair\n5. Figures & Diagnostics" }, { "cell_type": "markdown", "metadata": {}, "source": "## 0. Imports & Data Loading" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "import pandas as pd\nimport numpy as np\nimport matplotlib.pyplot as plt\nimport plotly.graph_objects as go\n\nstocks = pd.read_csv('stocks.csv', low_memory=False)\nflows = pd.read_csv('flows.csv', low_memory=False)\n\nstocks[\"Centralisation Date\"] = pd.to_datetime(stocks[\"Centralisation Date\"])\nflows[\"Centralisation Date\"] = pd.to_datetime(flows[\"Centralisation Date\"])\n\nprint(f\"Stocks: {stocks.shape}\")\nprint(f\"Flows: {flows.shape}\")" }, { "cell_type": "markdown", "metadata": {}, "source": "## 1. Build Panel (Account x ISIN x Date)\n\nMerge AUM and flows at the finest granularity. Missing flows are filled with 0. Lagged variables are computed to reconstruct the accounting identity Q(t) = Q(t-1) + F(t-1)." }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "KEY = [\"Registrar Account - ID\", \"Product - Isin\", \"Centralisation Date\"]\nGROUP = [\"Registrar Account - ID\", \"Product - Isin\"]\n\nstocks_panel = stocks[KEY + [\"Quantity - AUM\"]].copy()\n\nflows_panel = (\n flows[KEY + [\"Quantity - NetFlows\"]]\n .groupby(KEY, as_index=False)[\"Quantity - NetFlows\"]\n .sum()\n)\n\ndf = stocks_panel.merge(flows_panel, on=KEY, how=\"left\")\ndf[\"Quantity - NetFlows\"] = df[\"Quantity - NetFlows\"].fillna(0)\ndf = df.sort_values(KEY)\n\ndf[\"prev_stock\"] = df.groupby(GROUP)[\"Quantity - AUM\"].shift(1)\ndf[\"prev_flows\"] = df.groupby(GROUP)[\"Quantity - NetFlows\"].shift(1).fillna(0)\ndf[\"expected_stock\"] = df[\"prev_stock\"] + df[\"prev_flows\"]\n\ndf[\"gap\"] = df[\"Quantity - AUM\"] - df[\"expected_stock\"]\ndf[\"gap_abs\"] = df[\"gap\"].abs()\ndf[\"gap_rel\"] = df[\"gap_abs\"] / df[\"prev_stock\"].abs().replace(0, np.nan)\n\nprint(f\"Panel size: {df.shape}\")\nprint(f\"Accounts: {df['Registrar Account - ID'].nunique():,}\")\nprint(f\"ISINs: {df['Product - Isin'].nunique():,}\")\nprint(f\"Dates: {df['Centralisation Date'].nunique():,}\")" }, { "cell_type": "markdown", "metadata": {}, "source": "## 2. Rupture Detection (data-driven threshold)\n\nThe detection threshold EPSILON is calibrated from the data:\n1. Trim the top 10% of relative gaps to avoid calibrating on resets\n2. Define EPSILON as the 99th percentile of the trimmed distribution\n\nA rupture is flagged when gap_rel > EPSILON, conditional on prev_stock > 0." }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "# Calibration sample: valid observations with positive previous stock\nvalid_gaps = df.loc[\n df[\"gap_rel\"].notna() & (df[\"prev_stock\"] > 0),\n \"gap_rel\"\n]\n\n# Trim top 10% to avoid calibrating on structural resets\ngap_rel_trimmed = valid_gaps[valid_gaps <= valid_gaps.quantile(0.90)]\n\n# EPSILON = 99th percentile of the trimmed distribution\nEPSILON = gap_rel_trimmed.quantile(0.99)\nprint(f\"Detection threshold EPSILON: {EPSILON:.4%}\")\n\n# Flag ruptures\ndf[\"rupture_flag\"] = (\n df[\"prev_stock\"].notna()\n & (df[\"prev_stock\"] > 0)\n & (df[\"gap_rel\"] > EPSILON)\n)\n\n# Remove edge effect at last observation date\nlast_date = df[\"Centralisation Date\"].max()\ndf.loc[(df[\"rupture_flag\"]) & (df[\"Centralisation Date\"] == last_date), \"rupture_flag\"] = False\n\n# ISIN-level summary\nrupture_isin_summary = (\n df.groupby(GROUP)\n .agg(\n n_ruptures =(\"rupture_flag\", \"sum\"),\n total_obs =(\"rupture_flag\", \"count\"),\n rupture_ratio=(\"rupture_flag\", \"mean\"),\n max_gap_abs =(\"gap_abs\", \"max\"),\n max_gap_rel =(\"gap_rel\", \"max\")\n )\n .reset_index()\n)\n\n# Account-level summary\nrupture_account_summary = (\n df.groupby(\"Registrar Account - ID\")\n .agg(\n n_ruptures =(\"rupture_flag\", \"sum\"),\n total_obs =(\"rupture_flag\", \"count\"),\n rupture_ratio=(\"rupture_flag\", \"mean\"),\n max_gap_abs =(\"gap_abs\", \"max\"),\n max_gap_rel =(\"gap_rel\", \"max\")\n )\n .reset_index()\n)\n\ndf.to_csv(\"aum_flow_gaps.csv\", index=False)\nrupture_isin_summary.to_csv(\"rupture_isin_summary.csv\", index=False)\nrupture_account_summary.to_csv(\"rupture_summary.csv\", index=False)\n\nprint(f\"Total ruptures detected: {df['rupture_flag'].sum():,}\")\nprint(f\"Share of observations flagged: {df['rupture_flag'].mean():.2%}\")" }, { "cell_type": "markdown", "metadata": {}, "source": "## 3. Level Shift Repair\n\nDetect and correct persistent level shifts. A gap stable over MIN_PERSISTENCE consecutive periods is identified as an operational level shift and corrected by subtracting the shift from all subsequent AUM observations.\n\n**Parameters:**\n- — tolerance on gap stability\n- — minimum relative gap (5%)\n- — minimum consecutive stable periods" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "GAP_TOL = 1e-6\nREL_GAP_THR = 0.05\nMIN_PERSISTENCE = 3\n\ndef repair_group(g):\n \"\"\"Detect and correct the first persistent level shift in a (account, ISIN) trajectory.\"\"\"\n g = g.copy()\n obs = g[\"Quantity - AUM\"].values\n flows_ = g[\"Quantity - NetFlows\"].values\n corrected = obs.copy()\n\n expected = np.empty_like(obs)\n expected[0] = np.nan\n for t in range(1, len(obs)):\n expected[t] = corrected[t-1] + flows_[t-1]\n\n gap = obs - expected\n rel_gap = np.abs(gap) / np.maximum(np.abs(expected), 1.0)\n\n idx = None\n for i in range(1, len(obs) - MIN_PERSISTENCE):\n if (\n rel_gap[i] > REL_GAP_THR\n and np.all(np.abs(gap[i:i+MIN_PERSISTENCE] - gap[i]) < GAP_TOL)\n and np.all(np.abs(np.diff(flows_[i:i+MIN_PERSISTENCE])) < GAP_TOL)\n ):\n idx = i\n break\n\n if idx is None:\n return g\n\n shift = gap[idx]\n corrected[idx:] = obs[idx:] - shift\n g.loc[g.index[idx]:, \"repair_flag\"] = True\n\n expected_corr = np.empty_like(obs)\n expected_corr[0] = np.nan\n for t in range(1, len(obs)):\n expected_corr[t] = corrected[t-1] + flows_[t-1]\n\n g[\"corrected_aum\"] = corrected\n g[\"expected_stock_corr\"] = expected_corr\n return g\n\n\ndf_repair = df.copy().sort_values(KEY)\ndf_repair[\"corrected_aum\"] = df_repair[\"Quantity - AUM\"]\ndf_repair[\"expected_stock_corr\"] = df_repair[\"expected_stock\"]\ndf_repair[\"repair_flag\"] = False\n\ndf_repair = (\n df_repair\n .groupby(GROUP, group_keys=False)\n .apply(repair_group)\n)\n\ndf_repair[\"gap_before\"] = df_repair[\"Quantity - AUM\"] - df_repair[\"expected_stock\"]\ndf_repair[\"gap_after\"] = df_repair[\"corrected_aum\"] - df_repair[\"expected_stock_corr\"]\ndf_repair[\"rupture_before\"] = df_repair[\"gap_before\"].abs() > GAP_TOL\ndf_repair[\"rupture_after\"] = df_repair[\"gap_after\"].abs() > GAP_TOL\n\nprint(\"=== REPAIR SUMMARY ===\")\nprint(pd.DataFrame({\n \"Before repair\": [df_repair[\"rupture_before\"].sum()],\n \"After repair\": [df_repair[\"rupture_after\"].sum()],\n \"Repaired points\": [df_repair[\"repair_flag\"].sum()]\n}))\n\n# Save repaired panel\ndf_repair[[\n \"Registrar Account - ID\", \"Product - Isin\", \"Centralisation Date\",\n \"Quantity - AUM\", \"corrected_aum\", \"Quantity - NetFlows\",\n \"expected_stock\", \"expected_stock_corr\",\n \"gap_before\", \"gap_after\", \"repair_flag\"\n]].rename(columns={\n \"Quantity - AUM\": \"aum_raw\",\n \"corrected_aum\": \"aum_repaired\",\n \"Quantity - NetFlows\": \"flows\",\n \"expected_stock\": \"expected_aum_raw\",\n \"expected_stock_corr\": \"expected_aum_repaired\"\n}).to_csv(\"df_repaired.csv\", index=False)\n\n# Rebuild full stocks file with repaired AUM\nstocks_repaired = stocks.copy()\nstocks_repaired[\"Centralisation Date\"] = pd.to_datetime(stocks_repaired[\"Centralisation Date\"])\n\nrepair_map = df_repair[KEY + [\"corrected_aum\", \"repair_flag\"]].rename(\n columns={\"corrected_aum\": \"Quantity - AUM repaired\"}\n)\nstocks_repaired = stocks_repaired.merge(repair_map, on=KEY, how=\"left\")\nstocks_repaired[\"Quantity - AUM original\"] = stocks_repaired[\"Quantity - AUM\"]\nstocks_repaired[\"Quantity - AUM\"] = np.where(\n stocks_repaired[\"repair_flag\"] == True,\n stocks_repaired[\"Quantity - AUM repaired\"],\n stocks_repaired[\"Quantity - AUM\"]\n)\nstocks_repaired[\"nav_ccy\"] = stocks_repaired[\"Value - AUM CCY\"] / stocks_repaired[\"Quantity - AUM original\"]\nstocks_repaired[\"nav_eur\"] = stocks_repaired[\"Value - AUM €\"] / stocks_repaired[\"Quantity - AUM original\"]\nstocks_repaired[\"Value - AUM CCY\"] = stocks_repaired[\"Quantity - AUM\"] * stocks_repaired[\"nav_ccy\"]\nstocks_repaired[\"Value - AUM €\"] = stocks_repaired[\"Quantity - AUM\"] * stocks_repaired[\"nav_eur\"]\nstocks_repaired = stocks_repaired.drop(columns=[\"Quantity - AUM repaired\", \"Quantity - AUM original\", \"nav_ccy\", \"nav_eur\"])\n\nprint(f\"\nShare of repaired observations: {stocks_repaired['repair_flag'].mean():.4%}\")\nprint(f\"Number of repaired rows: {stocks_repaired['repair_flag'].sum():,}\")\nstocks_repaired.to_csv(\"AUM_repaired.csv\", index=False)" }, { "cell_type": "markdown", "metadata": {}, "source": "## 4. Figures" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "# ---- Figure A: Distribution of accounts by rupture intensity ----\n\nrs = rupture_account_summary.copy()\nbins = [0, 0.01, 0.10, 0.30, 1.01]\nlabels = [\"Clean / quasi-clean (<=1%)\", \"Moderate (1-10%)\", \"High (10-30%)\", \"Severe (>30%)\"]\nrs[\"rupture_class\"] = pd.cut(rs[\"rupture_ratio\"], bins=bins, labels=labels, include_lowest=True)\n\ndist = (rs[\"rupture_class\"].value_counts(normalize=True).sort_index() * 100).round(1)\n\nfig_a = go.Figure(data=[go.Pie(\n labels=dist.index, values=dist.values,\n hole=0.45, textinfo=\"percent\", hoverinfo=\"label+percent\"\n)])\nfig_a.update_layout(\n legend=dict(orientation=\"h\", yanchor=\"top\", y=-0.15, xanchor=\"center\", x=0.5),\n legend_title_text=\"Rupture ratio\"\n)\nfig_a.show()" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "# ---- Figure B: Rupture rate over time (active flows only) ----\n# Denominator restricted to observations with active flows\n# to avoid dilution by dormant positions.\n\nactive_obs = df[df[\"Quantity - NetFlows\"] != 0]\n\ntime_stats = (\n active_obs\n .groupby(\"Centralisation Date\")\n .agg(total_obs=(\"rupture_flag\", \"count\"), n_ruptures=(\"rupture_flag\", \"sum\"))\n .reset_index()\n)\ntime_stats[\"rupture_rate\"] = time_stats[\"n_ruptures\"] / time_stats[\"total_obs\"]\ntime_stats[\"rupture_rate_ma\"] = time_stats[\"rupture_rate\"].rolling(window=6, center=True).mean()\n\nplt.figure(figsize=(12, 5))\nplt.plot(time_stats[\"Centralisation Date\"], time_stats[\"rupture_rate\"] * 100,\n color=\"lightgray\", linewidth=1, alpha=0.6, label=\"Monthly rupture rate (active flows only)\")\nplt.plot(time_stats[\"Centralisation Date\"], time_stats[\"rupture_rate_ma\"] * 100,\n color=\"#1f77b4\", linewidth=2.5, label=\"6-month moving average\")\nplt.ylabel(\"Rupture rate (%)\")\nplt.xlabel(\"Date\")\nplt.grid(True, linestyle=\"--\", alpha=0.4)\nplt.legend(frameon=False)\nplt.tight_layout()\nplt.show()" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "# ---- Figure C: Example trajectories (top 5 worst) ----\n\ndef plot_isin_dynamics(df, account_id, isin, title_suffix=\"\"):\n sub = df[\n (df[\"Registrar Account - ID\"] == account_id) &\n (df[\"Product - Isin\"] == isin)\n ].sort_values(\"Centralisation Date\")\n\n if sub.empty:\n print(\"No data for this (account, ISIN).\")\n return\n\n fig, ax = plt.subplots(figsize=(10, 4))\n ax.plot(sub[\"Centralisation Date\"], sub[\"Quantity - AUM\"],\n label=\"Observed AUM\", linewidth=2, color=\"black\")\n ax.plot(sub[\"Centralisation Date\"], sub[\"expected_stock\"],\n label=\"Flow-implied AUM\", linestyle=\"--\", linewidth=2, color=\"grey\")\n rupt = sub[sub[\"rupture_flag\"]]\n ax.scatter(rupt[\"Centralisation Date\"], rupt[\"Quantity - AUM\"],\n color=\"red\", s=25, zorder=5, label=\"Discontinuity\")\n ax.set_title(f\"Account {account_id} — ISIN {isin} {title_suffix}\", fontsize=11)\n ax.set_ylabel(\"AUM (shares)\")\n ax.legend(loc=\"best\")\n plt.tight_layout()\n plt.show()\n\nworst = rupture_isin_summary.sort_values(\"rupture_ratio\", ascending=False).head(5)\nfor _, row in worst.iterrows():\n plot_isin_dynamics(df, row[\"Registrar Account - ID\"], row[\"Product - Isin\"])" }, { "cell_type": "markdown", "metadata": {}, "source": "## 5. Diagnostics" }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": "# Distribution of gap magnitudes (ruptures only)\n\ndf_r = df[df[\"rupture_flag\"]].copy()\nq90 = df_r[\"gap_abs\"].quantile(0.90)\nq99 = df_r[\"gap_abs\"].quantile(0.99)\n\nfig, axes = plt.subplots(1, 2, figsize=(14, 4))\naxes[0].hist(df_r[\"gap_abs\"], bins=100, log=True)\naxes[0].set_title(\"Distribution of absolute gaps (log scale)\")\naxes[0].set_xlabel(\"Absolute gap (shares)\")\naxes[0].set_ylabel(\"Frequency (log)\")\n\naxes[1].hist(df_r[\"gap_rel\"].dropna(), bins=100, log=True)\naxes[1].set_title(\"Distribution of relative gaps\")\naxes[1].set_xlabel(\"|gap| / prev_stock\")\naxes[1].set_ylabel(\"Frequency (log)\")\n\nplt.tight_layout()\nplt.show()\n\n# Gap classification\navg_gap_by_account = df_r.groupby(\"Registrar Account - ID\")[\"gap\"].mean().rename(\"avg_gap\")\ndf_r = df_r.merge(avg_gap_by_account, on=\"Registrar Account - ID\", how=\"left\")\n\ndef classify_gap(row):\n if row[\"gap_abs\"] >= q99: return \"reset\"\n if row[\"gap_abs\"] >= q90: return \"spike\"\n if abs(row[\"avg_gap\"]) > row[\"gap_abs\"]: return \"shift\"\n return \"micro\"\n\ndf_r[\"discontinuity_type\"] = df_r.apply(classify_gap, axis=1)\nprint(\"=== GAP CLASSIFICATION ===\")\nprint((df_r[\"discontinuity_type\"].value_counts(normalize=True) * 100).round(1))" } ] }