import pandas as pd import statsmodels.api as sm def compute_static_features(flows_df, aum_df): """Generates descriptive features from Flows and AUM.""" # --- 1. Flow Dynamics --- flow_stats = flows_df.groupby('Registrar Account - ID').agg( total_subs=('Value € - Subscription', 'sum'), total_reds=('Value € - Redemption', 'sum'), net_flow_vol=('Value € - NetFlows', 'sum'), txn_count=('Agreement - Code', 'count'), # Tenure: Days between first and last activity tenure_days=('Centralisation Date', lambda x: (x.max() - x.min()).days) ) # Flow Ratio: -1 (Pure Seller) to +1 (Pure Buyer) flow_stats['buy_sell_ratio'] = (flow_stats['total_subs'] - flow_stats['total_reds']) / \ (flow_stats['total_subs'] + flow_stats['total_reds'] + 1e-6) # --- 2. Product Preferences --- # Calculate % of flows going to each Asset Type asset_pivot = flows_df.groupby(['Registrar Account - ID', 'Product - Asset Type'])['Value € - Subscription'].sum().unstack(fill_value=0) asset_pct = asset_pivot.div(asset_pivot.sum(axis=1) + 1e-6, axis=0).add_prefix('pct_flow_') # --- 3. AUM Stats --- aum_stats = aum_df.groupby('Registrar Account - ID').agg( avg_aum=('Value - AUM €', 'mean'), aum_volatility=('Value - AUM €', 'std') ) # Merge features = flow_stats.join(asset_pct).join(aum_stats, how='outer').fillna(0) return features def compute_market_sensitivities(flows_df, rates_df, gov_df, freq='M'): """ Computes Beta sensitivity to Rates and Gov Bonds. Freq: 'M' (Monthly) recommended for long history. """ # 1. Prepare Market Factors # Resample Rates (Take last value of period) rates_res = rates_df.set_index('Date').resample(freq)['Yld to Maturity'].last() delta_rates = rates_res.diff().rename('Delta_Rate') # Resample Gov Bonds (Using 'EG04' 7-10Y Euro Gov as proxy) gov_target = gov_df[gov_df['Bond/Index'] == 'EG04'].set_index('Date') gov_target = gov_target[~gov_target.index.duplicated(keep='first')] # Dedup # Calculate return over period gov_res = gov_target['Total Return % 1-wk-LOC'].resample(freq).apply(lambda x: (1 + x/100).prod() - 1) gov_res = gov_res.rename('Bond_Return') market_factors = pd.concat([delta_rates, gov_res], axis=1).dropna() # 2. Prepare Client Flows (Aggregated by same frequency) flows_df['Period'] = flows_df['Centralisation Date'].dt.to_period(freq).dt.to_timestamp() client_betas = [] # Only analyze clients with sufficient activity (>5 transactions) active_clients = flows_df['Registrar Account - ID'].value_counts() active_clients = active_clients[active_clients >= 5].index for client in active_clients: c_flows = flows_df[flows_df['Registrar Account - ID'] == client] c_ts = c_flows.groupby('Period')['Quantity - NetFlows'].sum() merged = pd.concat([c_ts, market_factors], axis=1, join='inner') if len(merged) >= 5: Y = merged['Quantity - NetFlows'] X = merged[['Delta_Rate', 'Bond_Return']] X = sm.add_constant(X) try: model = sm.OLS(Y, X).fit() client_betas.append({ 'Registrar Account - ID': client, 'beta_rate': model.params.get('Delta_Rate', 0), 'beta_bond': model.params.get('Bond_Return', 0), 'r_squared': model.rsquared }) except: continue if not client_betas: return pd.DataFrame(columns=['Registrar Account - ID', 'beta_rate', 'beta_bond', 'r_squared']) return pd.DataFrame(client_betas).set_index('Registrar Account - ID')