Project_Carmignac/detection_rupture.py

153 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
def detect_ruptures(df, epsilon=0.05):
# Colonnes clés pour identifier les comptes
key_cols = [
'Agreement - Code',
'Company - Id',
'Company - Ultimate Parent Id',
'Registrar Account - Region',
'RegistrarAccount - Country',
'Registrar Account - ID'
]
# Travailler sur une copie
df_temp = df.copy()
# Colonnes de dates
df_temp['Centralisation Date'] = pd.to_datetime(df_temp['Centralisation Date'])
# Dates distinctes
full_dates = (
pd.Series(df_temp['Centralisation Date'].unique())
.sort_values()
.reset_index(drop=True)
)
# Combinaisons comptes × dates
accounts = df_temp[key_cols].drop_duplicates()
full_index = accounts.merge(
pd.DataFrame({'Centralisation Date': full_dates}),
how='cross'
)
# Agréger les AUM par clé
agg_cols = key_cols + ['Centralisation Date']
df_agg = (
df_temp.groupby(agg_cols)['Value - AUM €']
.sum()
.reset_index()
)
# Merge sur toutes les combinaisons
df_full = pd.merge(full_index, df_agg, on=agg_cols, how='left')
# Remplissage des trous par 0
df_full['Value - AUM €'] = df_full['Value - AUM €'].fillna(0)
# Tri
df_full = df_full.sort_values(key_cols + ['Centralisation Date'])
# Variation et valeur précédente
df_full['AUM_diff'] = df_full.groupby(key_cols)['Value - AUM €'].diff().fillna(0)
df_full['prev_value'] = df_full.groupby(key_cols)['Value - AUM €'].shift(1).fillna(0)
# Comptes qui perdent tout
df_zero = df_full[(df_full['AUM_diff'] < 0) & (df_full['Value - AUM €'] == 0)].copy()
# Comptes qui partent de 0
df_from_zero = df_full[(df_full['AUM_diff'] > 0) & (df_full['prev_value'] == 0)].copy()
# Colonnes pour le merge (sans ID)
merge_cols = [
'Centralisation Date',
'Agreement - Code',
'Company - Id',
'Company - Ultimate Parent Id',
'Registrar Account - Region',
'RegistrarAccount - Country'
]
# Détection des ruptures
ruptures = pd.merge(df_zero, df_from_zero, on=merge_cols, suffixes=('_old','_new'))
# Calcul de la différence relative selon epsilon
ruptures['diff_rel'] = abs(ruptures['AUM_diff_old'] + ruptures['AUM_diff_new']) / (
(abs(ruptures['AUM_diff_old']) + abs(ruptures['AUM_diff_new'])) / 2
)
# Filtrage avec epsilon
ruptures = ruptures[ruptures['diff_rel'] <= epsilon].drop(columns=['diff_rel'])
# Colonnes finales
ruptures_df = ruptures[['Centralisation Date','Registrar Account - ID_old','Registrar Account - ID_new','AUM_diff_new']]
ruptures_df.columns = ['date','old_account','new_account','value']
return ruptures_df
def check_isin_continuity(df, rupture, tol=0.05):
"""
Vérifie que les ISIN évoluent continuellement entre old_account et new_account.
Args:
df
rupture (pd.DataFrame): Table avec colonnes ['date', 'old_account', 'new_account', 'value']
tol (float): Tolérance relative maximale (5%)
Returns:
pd.DataFrame: Table avec colonnes supplémentaires :
'isin', 'old_value', 'new_value', 'relative_change', 'check'
"""
df['Centralisation Date'] = pd.to_datetime(df['Centralisation Date'])
rupture['date'] = pd.to_datetime(rupture['date'])
# Dictionnaire des dates disponibles par compte pour trouver la date précédente
dates_by_account = df.groupby('Registrar Account - ID')['Centralisation Date'].unique().to_dict()
records = []
for _, row in rupture.iterrows():
date = row['date']
old_account = row['old_account']
new_account = row['new_account']
# Date précédente pour l'ancien compte
past_dates = [d for d in dates_by_account.get(old_account, []) if d < date]
if not past_dates:
continue
prev_date = max(past_dates)
# Filtrer df pour old_account à date précédente et new_account à date de rupture
df_old = df[(df['Registrar Account - ID'] == old_account) &
(df['Centralisation Date'] == prev_date)]
df_new = df[(df['Registrar Account - ID'] == new_account) &
(df['Centralisation Date'] == date)]
# Tous les ISIN concernés
isins = set(df_old['Product - Isin']).union(df_new['Product - Isin'])
for isin in isins:
old_val = df_old[df_old['Product - Isin'] == isin]['Quantity - AUM'].sum()
new_val = df_new[df_new['Product - Isin'] == isin]['Quantity - AUM'].sum()
old = df_old['Quantity - AUM'].sum()
if old_val == 0:
rel_change = None
check = True
else:
rel_change = (new_val - old_val) / old
check = abs(rel_change) <= tol
records.append({
'date': date,
'old_account': old_account,
'new_account': new_account,
'isin': isin,
'old_value': old_val,
'new_value': new_val,
'relative_change': rel_change,
'check': check
})
return pd.DataFrame(records)