paco-dev #2
BIN
reconciliation/__pycache__/load.cpython-313.pyc
Normal file
BIN
reconciliation/__pycache__/load.cpython-313.pyc
Normal file
Binary file not shown.
19
reconciliation/load.py
Normal file
19
reconciliation/load.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
import os
|
||||
import s3fs
|
||||
import pandas as pd
|
||||
|
||||
def load():
|
||||
"""Loading the csv fils and converting them to dataframes"""
|
||||
fs = s3fs.S3FileSystem(
|
||||
client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'},
|
||||
key = os.environ["AWS_ACCESS_KEY_ID"],
|
||||
secret = os.environ["AWS_SECRET_ACCESS_KEY"],
|
||||
token = os.environ["AWS_SESSION_TOKEN"])
|
||||
|
||||
with fs.open('s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as aum:
|
||||
df_aum = pd.read_csv(aum, sep=";")
|
||||
|
||||
with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as flows:
|
||||
df_flows = pd.read_csv(flows, sep=";")
|
||||
|
||||
return df_aum, df_flows
|
||||
210
reconciliation/reconcile.py
Normal file
210
reconciliation/reconcile.py
Normal file
|
|
@ -0,0 +1,210 @@
|
|||
import pandas as pd
|
||||
import numpy as np
|
||||
from sklearn.metrics.pairwise import cosine_similarity
|
||||
from datetime import datetime, timedelta
|
||||
from load import load
|
||||
|
||||
class AssetReconciler:
|
||||
|
||||
def __init__(self, df_aum, df_flows):
|
||||
"""
|
||||
Initialize with the raw AUM and Flows dataframes.
|
||||
"""
|
||||
self.df_aum = df_aum.copy()
|
||||
self.df_flows = df_flows.copy()
|
||||
|
||||
# Basic cleaning
|
||||
print("Parsing dates...")
|
||||
self.df_aum['Centralisation Date'] = pd.to_datetime(self.df_aum['Centralisation Date'], errors='coerce')
|
||||
self.df_flows['Centralisation Date'] = pd.to_datetime(self.df_flows['Centralisation Date'], errors='coerce')
|
||||
|
||||
# Standardize column names for internal use
|
||||
self.col_id = 'Registrar Account - ID'
|
||||
self.col_isin = 'Product - Isin'
|
||||
self.col_qty_aum = 'Quantity - AUM'
|
||||
self.col_qty_flow = 'Quantity - NetFlows'
|
||||
self.col_block = 'Registrar Account - Country'
|
||||
|
||||
def _get_portfolio_matrix(self, df, date_filter):
|
||||
"""
|
||||
Pivots data to create a matrix: Index=AccountID, Columns=ISINs, Values=Quantity
|
||||
"""
|
||||
subset = df[df['Centralisation Date'] == date_filter]
|
||||
if subset.empty:
|
||||
print(f"Warning: No data found for date {date_filter}")
|
||||
return pd.DataFrame()
|
||||
|
||||
# Pivot: Rows are Clients, Columns are ISINs
|
||||
matrix = subset.pivot_table(
|
||||
index=self.col_id,
|
||||
columns=self.col_isin,
|
||||
values=self.col_qty_aum,
|
||||
aggfunc='sum'
|
||||
).fillna(0)
|
||||
|
||||
return matrix
|
||||
|
||||
def _get_aggregated_flows(self, start_date, end_date):
|
||||
"""
|
||||
Sums up net flows per Account/ISIN between two dates.
|
||||
"""
|
||||
mask = (self.df_flows['Centralisation Date'] > start_date) & (self.df_flows['Centralisation Date'] <= end_date)
|
||||
subset = self.df_flows[mask]
|
||||
|
||||
if subset.empty:
|
||||
return pd.DataFrame()
|
||||
|
||||
flow_matrix = subset.pivot_table(
|
||||
index=self.col_id,
|
||||
columns=self.col_isin,
|
||||
values=self.col_qty_flow,
|
||||
aggfunc='sum'
|
||||
).fillna(0)
|
||||
|
||||
return flow_matrix
|
||||
|
||||
def match_accounts(self, date_past, date_current, similarity_threshold=0.95, magnitude_tolerance=0.2):
|
||||
"""
|
||||
Main logic to link Past Accounts to Current Accounts.
|
||||
"""
|
||||
print(f"--- Running Matching Algorithm ---")
|
||||
print(f"Comparing State: {date_past.date()} -> {date_current.date()}")
|
||||
|
||||
# 1. Get AUM Snapshots
|
||||
mat_past = self._get_portfolio_matrix(self.df_aum, date_past)
|
||||
mat_curr = self._get_portfolio_matrix(self.df_aum, date_current)
|
||||
|
||||
if mat_past.empty or mat_curr.empty:
|
||||
return pd.DataFrame()
|
||||
|
||||
# 2. Get Flows
|
||||
mat_flows = self._get_aggregated_flows(date_past, date_current)
|
||||
|
||||
# 3. Blocking Strategy
|
||||
# We only compare accounts if they belong to the same Country (or other stable attribute)
|
||||
# To do this efficiently, we create a mapping of ID -> Country
|
||||
past_countries = self.df_aum[self.df_aum['Centralisation Date'] == date_past].set_index(self.col_id)[self.col_block].to_dict()
|
||||
|
||||
results = []
|
||||
|
||||
# Iterate through unique countries to reduce matrix size (Blocking)
|
||||
unique_countries = set(past_countries.values())
|
||||
|
||||
for country in unique_countries:
|
||||
# Filter matrices for this block
|
||||
# Identify IDs in this country
|
||||
ids_in_country_past = [i for i in mat_past.index if past_countries.get(i) == country]
|
||||
|
||||
# Note: For Current IDs, we need to fetch their country too.
|
||||
curr_countries = self.df_aum[self.df_aum['Centralisation Date'] == date_current].set_index(self.col_id)[self.col_block].to_dict()
|
||||
ids_in_country_curr = [i for i in mat_curr.index if curr_countries.get(i) == country]
|
||||
|
||||
if not ids_in_country_past or not ids_in_country_curr:
|
||||
continue
|
||||
|
||||
# Slice the matrices
|
||||
block_past = mat_past.loc[ids_in_country_past]
|
||||
block_curr = mat_curr.loc[ids_in_country_curr]
|
||||
|
||||
# Align Flows to this block
|
||||
block_flows = pd.DataFrame(0, index=block_past.index, columns=block_past.columns)
|
||||
if not mat_flows.empty:
|
||||
# Only use flows for IDs that exist in the past block
|
||||
common_ids = block_past.index.intersection(mat_flows.index)
|
||||
common_isins = block_past.columns.intersection(mat_flows.columns)
|
||||
if not common_ids.empty:
|
||||
block_flows.loc[common_ids, common_isins] = mat_flows.loc[common_ids, common_isins]
|
||||
|
||||
# 4. Reconstruction: Expected State = Past + Flows
|
||||
expected_state = block_past.add(block_flows, fill_value=0)
|
||||
|
||||
# 5. Align Dimensions (Union of ISINs)
|
||||
all_isins = list(set(expected_state.columns) | set(block_curr.columns))
|
||||
vec_expected = expected_state.reindex(columns=all_isins, fill_value=0)
|
||||
vec_actual = block_curr.reindex(columns=all_isins, fill_value=0)
|
||||
|
||||
# 6. Calculate Cosine Similarity
|
||||
# Result is a matrix: Rows=PastIDs, Cols=CurrentIDs
|
||||
sim_matrix = cosine_similarity(vec_expected, vec_actual)
|
||||
|
||||
# 7. Find Best Matches
|
||||
for idx, past_id in enumerate(vec_expected.index):
|
||||
# Find best score in the row
|
||||
best_idx = np.argmax(sim_matrix[idx])
|
||||
best_score = sim_matrix[idx][best_idx]
|
||||
curr_id = vec_actual.index[best_idx]
|
||||
|
||||
# 8. Magnitude Check (Euclidean safeguard)
|
||||
# Ensure we don't match a small retail client to a huge institutional one
|
||||
total_shares_exp = vec_expected.loc[past_id].sum()
|
||||
total_shares_act = vec_actual.loc[curr_id].sum()
|
||||
|
||||
# Calculate ratio (handle div by zero)
|
||||
if total_shares_act == 0:
|
||||
ratio = 0
|
||||
else:
|
||||
ratio = total_shares_exp / total_shares_act
|
||||
|
||||
# Check if ratio is within tolerance (e.g. 0.8 to 1.2)
|
||||
is_magnitude_ok = (1 - magnitude_tolerance) <= ratio <= (1 + magnitude_tolerance)
|
||||
|
||||
match_status = "Unmatched"
|
||||
if best_score >= similarity_threshold and is_magnitude_ok:
|
||||
match_status = "High Confidence Match"
|
||||
elif past_id == curr_id:
|
||||
match_status = "Same ID (Retained)"
|
||||
|
||||
results.append({
|
||||
'Past_ID': past_id,
|
||||
'Predicted_Current_ID': curr_id,
|
||||
'Similarity_Score': round(best_score, 4),
|
||||
'Magnitude_Ratio': round(ratio, 4),
|
||||
'Match_Status': match_status,
|
||||
'Country': country
|
||||
})
|
||||
|
||||
return pd.DataFrame(results)
|
||||
|
||||
# ==========================================
|
||||
# MAIN EXECUTION
|
||||
# ==========================================
|
||||
if __name__ == "__main__":
|
||||
# 1. Load Data
|
||||
df_aum, df_flows = load()
|
||||
|
||||
# 2. Initialize Logic
|
||||
reconciler = AssetReconciler(df_aum, df_flows)
|
||||
|
||||
# 3. AUTO-DETECT DATES
|
||||
available_dates = sorted(reconciler.df_aum['Centralisation Date'].unique())
|
||||
|
||||
if len(available_dates) < 2:
|
||||
print("\nERROR: Not enough dates found in data to perform comparison.")
|
||||
print(f"Dates found: {[str(d) for d in available_dates]}")
|
||||
else:
|
||||
# Automatically pick the First and Last date found in the file
|
||||
date_past = available_dates[0] # First date
|
||||
date_current = available_dates[-1] # Last date
|
||||
|
||||
print(f"\nAuto-Detected Analysis Period:")
|
||||
print(f"Start (Past): {date_past}")
|
||||
print(f"End (Current): {date_current}")
|
||||
|
||||
# 4. Run Linkage
|
||||
results = reconciler.match_accounts(date_past, date_current)
|
||||
|
||||
# 5. Calculate KPI
|
||||
if not results.empty:
|
||||
total_past = len(results)
|
||||
matches = len(results[results['Match_Status'].isin(['High Confidence Match', 'Same ID (Retained)'])])
|
||||
kpi_percentage = (matches / total_past) * 100
|
||||
|
||||
print("\n" + "="*40)
|
||||
print("FINAL RESULTS")
|
||||
print("="*40)
|
||||
print(results[['Past_ID', 'Predicted_Current_ID', 'Similarity_Score', 'Magnitude_Ratio', 'Match_Status']])
|
||||
print("-" * 40)
|
||||
print(f"KPI: {kpi_percentage:.1f}% of Past Client Codes successfully linked to Current Codes.")
|
||||
print("="*40)
|
||||
else:
|
||||
print("No matches found.")
|
||||
Loading…
Reference in New Issue
Block a user