225 lines
9.9 KiB
Python
225 lines
9.9 KiB
Python
import pandas as pd
|
|
import statsmodels.api as sm
|
|
import numpy as np
|
|
|
|
def compute_static_features(flows_df, aum_df):
|
|
"""Generates descriptive features from Flows and AUM."""
|
|
|
|
# --- 1. Flow Dynamics ---
|
|
flow_stats = flows_df.groupby('Registrar Account - ID').agg(
|
|
total_subs=('Value € - Subscription', 'sum'),
|
|
total_reds=('Value € - Redemption', 'sum'),
|
|
net_flow_vol=('Value € - NetFlows', 'sum'),
|
|
txn_count=('Agreement - Code', 'count'),
|
|
tenure_days=('Centralisation Date', lambda x: (x.max() - x.min()).days)
|
|
)
|
|
|
|
# Robust Buy/Sell Ratio
|
|
total_vol = flow_stats['total_subs'].abs() + flow_stats['total_reds'].abs()
|
|
flow_stats['buy_sell_ratio'] = (flow_stats['total_subs'] - flow_stats['total_reds']) / (total_vol + 1.0)
|
|
flow_stats['buy_sell_ratio'] = flow_stats['buy_sell_ratio'].clip(-1, 1)
|
|
|
|
# --- 2. Product Preferences ---
|
|
pos_flows = flows_df[flows_df['Value € - Subscription'] > 0]
|
|
asset_pivot = pos_flows.groupby(['Registrar Account - ID', 'Product - Asset Type'])['Value € - Subscription'].sum().unstack(fill_value=0)
|
|
|
|
row_sums = asset_pivot.sum(axis=1)
|
|
asset_pct = asset_pivot.div(row_sums + 1.0, axis=0).add_prefix('pct_flow_')
|
|
|
|
# --- 3. AUM Stats ---
|
|
aum_stats = aum_df.groupby('Registrar Account - ID').agg(
|
|
avg_aum=('Value - AUM €', 'mean'),
|
|
aum_volatility=('Value - AUM €', 'std')
|
|
)
|
|
|
|
features = flow_stats.join(asset_pct).join(aum_stats, how='outer').fillna(0)
|
|
return features
|
|
|
|
def compute_shock_sensitivities(flows_df, aum_df, rates_df, gov_df, freq='ME'):
|
|
"""
|
|
Computes sensitivity using Robust OLS + Dynamic Feature Selection.
|
|
Only targets HIGHLY ACTIVE clients (>= 250 transactions).
|
|
"""
|
|
print(f"DEBUG: Computing Sensitivities (Threshold=250)...")
|
|
|
|
# --- 1. Prepare Market Factors ---
|
|
# Force Numeric Types
|
|
rates_df['Yld to Maturity'] = pd.to_numeric(rates_df['Yld to Maturity'], errors='coerce')
|
|
gov_df['Total Return % 1-wk-LOC'] = pd.to_numeric(gov_df['Total Return % 1-wk-LOC'], errors='coerce')
|
|
|
|
rates_res = rates_df.set_index('Date').resample(freq)['Yld to Maturity'].last()
|
|
delta_rates = rates_res.diff()
|
|
|
|
gov_target = gov_df[gov_df['Bond/Index'] == 'EG04'].set_index('Date')
|
|
gov_target = gov_target[~gov_target.index.duplicated(keep='first')]
|
|
gov_res = gov_target['Total Return % 1-wk-LOC'].resample(freq).apply(lambda x: (1 + x/100).prod() - 1)
|
|
|
|
market_df = pd.concat([delta_rates.rename('Delta_Rate'), gov_res.rename('Bond_Return')], axis=1).dropna()
|
|
|
|
# String Period Index for Robust Merging
|
|
market_df['Period_Str'] = market_df.index.to_period(freq).astype(str)
|
|
market_df = market_df.set_index('Period_Str')
|
|
|
|
# --- 2. Define Shocks ---
|
|
rate_q1 = market_df['Delta_Rate'].quantile(0.25)
|
|
rate_q3 = market_df['Delta_Rate'].quantile(0.75)
|
|
bond_q1 = market_df['Bond_Return'].quantile(0.25)
|
|
bond_q3 = market_df['Bond_Return'].quantile(0.75)
|
|
|
|
market_df['Rate_Spike'] = (market_df['Delta_Rate'] > rate_q3).astype(int)
|
|
market_df['Rate_Drop'] = (market_df['Delta_Rate'] < rate_q1).astype(int)
|
|
market_df['Bond_Rally'] = (market_df['Bond_Return'] > bond_q3).astype(int)
|
|
market_df['Bond_Crash'] = (market_df['Bond_Return'] < bond_q1).astype(int)
|
|
|
|
all_shock_cols = ['Rate_Spike', 'Rate_Drop', 'Bond_Rally', 'Bond_Crash']
|
|
|
|
# --- 3. Funneling ---
|
|
aum_df['Value - AUM €'] = pd.to_numeric(aum_df['Value - AUM €'], errors='coerce')
|
|
mean_aum = aum_df.groupby('Registrar Account - ID')['Value - AUM €'].mean()
|
|
valid_aum_clients = mean_aum[mean_aum > 1000].index
|
|
|
|
# --- UPDATED THRESHOLD HERE ---
|
|
txn_counts = flows_df['Registrar Account - ID'].value_counts()
|
|
active_clients = txn_counts[txn_counts >= 250].index
|
|
|
|
eligible_clients = list(set(valid_aum_clients) & set(active_clients))
|
|
|
|
print(f"Shock Model Funnel: {len(eligible_clients)} clients eligible (Active >= 250 txns).")
|
|
|
|
# --- 4. Regression ---
|
|
flows_df['Period_Str'] = flows_df['Centralisation Date'].dt.to_period(freq).astype(str)
|
|
flows_df['Quantity - NetFlows'] = pd.to_numeric(flows_df['Quantity - NetFlows'], errors='coerce')
|
|
|
|
client_betas = []
|
|
success_count = 0
|
|
failure_printed = False
|
|
|
|
for client in eligible_clients:
|
|
c_flows = flows_df[flows_df['Registrar Account - ID'] == client]
|
|
c_ts = c_flows.groupby('Period_Str')['Quantity - NetFlows'].sum()
|
|
|
|
merged = pd.merge(c_ts, market_df, left_index=True, right_index=True, how='inner')
|
|
|
|
if len(merged) >= 6:
|
|
client_avg_wealth = mean_aum.loc[client]
|
|
|
|
# Skip invalid AUM
|
|
if not np.isfinite(client_avg_wealth) or client_avg_wealth == 0:
|
|
continue
|
|
|
|
Y = merged['Quantity - NetFlows'] / client_avg_wealth
|
|
|
|
# --- Dynamic Feature Selection ---
|
|
# Drop shock columns that are all zeros (event never happened for this client)
|
|
valid_cols = []
|
|
for col in all_shock_cols:
|
|
if merged[col].sum() > 0:
|
|
valid_cols.append(col)
|
|
|
|
X = merged[valid_cols]
|
|
X = sm.add_constant(X)
|
|
|
|
# Check data validity
|
|
if Y.isna().any() or X.isna().any().any():
|
|
if not failure_printed:
|
|
print(f"DEBUG CRASH: Client {client} has NaNs.")
|
|
failure_printed = True
|
|
continue
|
|
|
|
try:
|
|
model = sm.OLS(Y, X).fit()
|
|
|
|
result_dict = {
|
|
'Registrar Account - ID': client,
|
|
'alpha_normal': model.params.get('const', 0),
|
|
'shock_r_squared': model.rsquared
|
|
}
|
|
# Fill missing betas with 0
|
|
for col in all_shock_cols:
|
|
result_dict[f'beta_{col.lower()}'] = model.params.get(col, 0)
|
|
|
|
client_betas.append(result_dict)
|
|
success_count += 1
|
|
except Exception as e:
|
|
if not failure_printed:
|
|
print(f"DEBUG CRASH: {e}")
|
|
failure_printed = True
|
|
continue
|
|
|
|
print(f"DEBUG: Successfully modeled {success_count} clients.")
|
|
|
|
if not client_betas:
|
|
return pd.DataFrame(columns=['Registrar Account - ID', 'alpha_normal',
|
|
'beta_rate_spike', 'beta_rate_drop',
|
|
'beta_bond_rally', 'beta_bond_crash', 'shock_r_squared'])
|
|
|
|
return pd.DataFrame(client_betas).set_index('Registrar Account - ID')
|
|
|
|
def compute_linear_sensitivities(flows_df, aum_df, rates_df, gov_df, freq='M'):
|
|
"""
|
|
Computes standard linear sensitivity: Flow ~ Alpha + Beta_Rate * dRate + Beta_Bond * BondRet
|
|
"""
|
|
print(f"DEBUG: Computing Sensitivities (Linear Model)...")
|
|
|
|
# 1. Prepare Market Data
|
|
rates_df['Yld to Maturity'] = pd.to_numeric(rates_df['Yld to Maturity'], errors='coerce')
|
|
gov_df['Total Return % 1-wk-LOC'] = pd.to_numeric(gov_df['Total Return % 1-wk-LOC'], errors='coerce')
|
|
|
|
rates_res = rates_df.set_index('Date').resample(freq)['Yld to Maturity'].last()
|
|
delta_rates = rates_res.diff()
|
|
|
|
gov_target = gov_df[gov_df['Bond/Index'] == 'EG04'].set_index('Date')
|
|
gov_target = gov_target[~gov_target.index.duplicated(keep='first')]
|
|
gov_res = gov_target['Total Return % 1-wk-LOC'].resample(freq).apply(lambda x: (1 + x/100).prod() - 1)
|
|
|
|
market_df = pd.concat([delta_rates.rename('Delta_Rate'), gov_res.rename('Bond_Return')], axis=1).dropna()
|
|
market_df['Period_Str'] = market_df.index.to_period(freq).astype(str)
|
|
market_df = market_df.set_index('Period_Str')
|
|
|
|
# 2. Funneling
|
|
aum_df['Value - AUM €'] = pd.to_numeric(aum_df['Value - AUM €'], errors='coerce')
|
|
mean_aum = aum_df.groupby('Registrar Account - ID')['Value - AUM €'].mean()
|
|
valid_aum_clients = mean_aum[mean_aum > 1000].index
|
|
|
|
txn_counts = flows_df['Registrar Account - ID'].value_counts()
|
|
active_clients = txn_counts[txn_counts >= 250].index
|
|
eligible_clients = list(set(valid_aum_clients) & set(active_clients))
|
|
|
|
print(f"Linear Model Funnel: {len(eligible_clients)} clients eligible.")
|
|
|
|
# 3. Regression
|
|
flows_df['Period_Str'] = flows_df['Centralisation Date'].dt.to_period(freq).astype(str)
|
|
flows_df['Quantity - NetFlows'] = pd.to_numeric(flows_df['Quantity - NetFlows'], errors='coerce')
|
|
|
|
client_betas = []
|
|
|
|
for client in eligible_clients:
|
|
c_flows = flows_df[flows_df['Registrar Account - ID'] == client]
|
|
c_ts = c_flows.groupby('Period_Str')['Quantity - NetFlows'].sum()
|
|
|
|
merged = pd.merge(c_ts, market_df, left_index=True, right_index=True, how='inner')
|
|
|
|
if len(merged) >= 6:
|
|
client_avg_wealth = mean_aum.loc[client]
|
|
if not np.isfinite(client_avg_wealth) or client_avg_wealth == 0: continue
|
|
|
|
Y = merged['Quantity - NetFlows'] / client_avg_wealth
|
|
X = merged[['Delta_Rate', 'Bond_Return']]
|
|
X = sm.add_constant(X)
|
|
|
|
try:
|
|
model = sm.OLS(Y, X).fit()
|
|
client_betas.append({
|
|
'Registrar Account - ID': client,
|
|
'alpha_linear': model.params.get('const', 0),
|
|
'beta_rate_linear': model.params.get('Delta_Rate', 0),
|
|
'beta_bond_linear': model.params.get('Bond_Return', 0),
|
|
'linear_r_squared': model.rsquared
|
|
})
|
|
except:
|
|
continue
|
|
|
|
if not client_betas:
|
|
return pd.DataFrame(columns=['Registrar Account - ID', 'alpha_linear', 'beta_rate_linear', 'beta_bond_linear', 'linear_r_squared'])
|
|
|
|
return pd.DataFrame(client_betas).set_index('Registrar Account - ID') |