From aa954dbfa71b70f724ba9a71fc7b5e94fb72386c Mon Sep 17 00:00:00 2001 From: pgoze-ensae Date: Mon, 8 Dec 2025 12:01:52 +0000 Subject: [PATCH] Reconcialiation script added --- .../__pycache__/load.cpython-313.pyc | Bin 0 -> 1377 bytes reconciliation/load.py | 19 ++ reconciliation/reconcile.py | 210 ++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 reconciliation/__pycache__/load.cpython-313.pyc create mode 100644 reconciliation/load.py create mode 100644 reconciliation/reconcile.py diff --git a/reconciliation/__pycache__/load.cpython-313.pyc b/reconciliation/__pycache__/load.cpython-313.pyc new file mode 100644 index 0000000000000000000000000000000000000000..df894ea8db823ab61a3482bc485823c8ae244e38 GIT binary patch literal 1377 zcma)5%}*Og6rcU@#~M4xaS0KQ#D?^MY3=2sq$Tu2*m0yl>Leaqt8lQE-LcuS-d)YC zp>WDEoTwsI$+5>&>VMEv(;jkMN2yv3zO|R$sC+=msWZlgBoazzwC~M(@ArOh=FMz3 zk%$2{@;_gi${_#EV_=kdqi;FsRXFZKE5xB07L zMgB;9A6x|=g311tOxE$bsDg2zvRTT1CXIu-a6Y0cR(l*LxJ+X2`W;s{EvKRSn@Bat zwpzDrqUw&R8m_aAu-}h0Ro_)j-Ph|_Zz4kD$T2Dee}>l?WSc3U3GHjslzu8wJPsS0aLg+dX++Dh^3+U=Wkgu7s|P%f^%7CAvehP!>Y zR9n5fQY>X;nlNk&v7%c$I&KguZJ}KnCdk0Zry<|nLeAa-nPVIAu=@@2vm2(7<-67l z9XG9pqZ`^%+w@F@Oxn;XM zhyTnILD!PP5kh5)J*P*iOA%}{dNuwC)W{s84gs8}pB)!$7 zVY6Q2I#lO%Xq1_~72(fW<%8bPHr*!DTxa)NOV@T>yrtdahc^6L;qFe+tPi@ zbu^n@cFx=7Y>&9mxIx%cKm~X6*-oDkUxq8h9BJ@4U%`Bn=cg@RM$Gm)Z_G{ z^b`1l(N0c13?EA%wkjNjPm#;dZ@yIS2bHx>YUJ_aqs1q!r`Ou4+`}kGmN_zh@w-og zp>!apj}tukcLlyWW&~w{CB6}VMJ}_0{4#ziKO;RGzElvUXEU+<7m;7ZSRob3gm99D z@h}rK?6D`V6Z{_c@IL$qx=aF`W`agPZMtU5MxSGqnd}z>B*ny$BnZMGP!7S+AxL%Q L3&IsV&iwxYRvtW! literal 0 HcmV?d00001 diff --git a/reconciliation/load.py b/reconciliation/load.py new file mode 100644 index 0000000..1fd0fd0 --- /dev/null +++ b/reconciliation/load.py @@ -0,0 +1,19 @@ +import os +import s3fs +import pandas as pd + +def load(): + """Loading the csv fils and converting them to dataframes""" + fs = s3fs.S3FileSystem( + client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'}, + key = os.environ["AWS_ACCESS_KEY_ID"], + secret = os.environ["AWS_SECRET_ACCESS_KEY"], + token = os.environ["AWS_SESSION_TOKEN"]) + + with fs.open('s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as aum: + df_aum = pd.read_csv(aum, sep=";") + + with fs.open('projet-bdc-data//carmignac/Flows ENSAE V2 -20251105.csv', 'rb') as flows: + df_flows = pd.read_csv(flows, sep=";") + + return df_aum, df_flows diff --git a/reconciliation/reconcile.py b/reconciliation/reconcile.py new file mode 100644 index 0000000..6ea75e8 --- /dev/null +++ b/reconciliation/reconcile.py @@ -0,0 +1,210 @@ +import pandas as pd +import numpy as np +from sklearn.metrics.pairwise import cosine_similarity +from datetime import datetime, timedelta +from load import load + +class AssetReconciler: + + def __init__(self, df_aum, df_flows): + """ + Initialize with the raw AUM and Flows dataframes. + """ + self.df_aum = df_aum.copy() + self.df_flows = df_flows.copy() + + # Basic cleaning + print("Parsing dates...") + self.df_aum['Centralisation Date'] = pd.to_datetime(self.df_aum['Centralisation Date'], errors='coerce') + self.df_flows['Centralisation Date'] = pd.to_datetime(self.df_flows['Centralisation Date'], errors='coerce') + + # Standardize column names for internal use + self.col_id = 'Registrar Account - ID' + self.col_isin = 'Product - Isin' + self.col_qty_aum = 'Quantity - AUM' + self.col_qty_flow = 'Quantity - NetFlows' + self.col_block = 'Registrar Account - Country' + + def _get_portfolio_matrix(self, df, date_filter): + """ + Pivots data to create a matrix: Index=AccountID, Columns=ISINs, Values=Quantity + """ + subset = df[df['Centralisation Date'] == date_filter] + if subset.empty: + print(f"Warning: No data found for date {date_filter}") + return pd.DataFrame() + + # Pivot: Rows are Clients, Columns are ISINs + matrix = subset.pivot_table( + index=self.col_id, + columns=self.col_isin, + values=self.col_qty_aum, + aggfunc='sum' + ).fillna(0) + + return matrix + + def _get_aggregated_flows(self, start_date, end_date): + """ + Sums up net flows per Account/ISIN between two dates. + """ + mask = (self.df_flows['Centralisation Date'] > start_date) & (self.df_flows['Centralisation Date'] <= end_date) + subset = self.df_flows[mask] + + if subset.empty: + return pd.DataFrame() + + flow_matrix = subset.pivot_table( + index=self.col_id, + columns=self.col_isin, + values=self.col_qty_flow, + aggfunc='sum' + ).fillna(0) + + return flow_matrix + + def match_accounts(self, date_past, date_current, similarity_threshold=0.95, magnitude_tolerance=0.2): + """ + Main logic to link Past Accounts to Current Accounts. + """ + print(f"--- Running Matching Algorithm ---") + print(f"Comparing State: {date_past.date()} -> {date_current.date()}") + + # 1. Get AUM Snapshots + mat_past = self._get_portfolio_matrix(self.df_aum, date_past) + mat_curr = self._get_portfolio_matrix(self.df_aum, date_current) + + if mat_past.empty or mat_curr.empty: + return pd.DataFrame() + + # 2. Get Flows + mat_flows = self._get_aggregated_flows(date_past, date_current) + + # 3. Blocking Strategy + # We only compare accounts if they belong to the same Country (or other stable attribute) + # To do this efficiently, we create a mapping of ID -> Country + past_countries = self.df_aum[self.df_aum['Centralisation Date'] == date_past].set_index(self.col_id)[self.col_block].to_dict() + + results = [] + + # Iterate through unique countries to reduce matrix size (Blocking) + unique_countries = set(past_countries.values()) + + for country in unique_countries: + # Filter matrices for this block + # Identify IDs in this country + ids_in_country_past = [i for i in mat_past.index if past_countries.get(i) == country] + + # Note: For Current IDs, we need to fetch their country too. + curr_countries = self.df_aum[self.df_aum['Centralisation Date'] == date_current].set_index(self.col_id)[self.col_block].to_dict() + ids_in_country_curr = [i for i in mat_curr.index if curr_countries.get(i) == country] + + if not ids_in_country_past or not ids_in_country_curr: + continue + + # Slice the matrices + block_past = mat_past.loc[ids_in_country_past] + block_curr = mat_curr.loc[ids_in_country_curr] + + # Align Flows to this block + block_flows = pd.DataFrame(0, index=block_past.index, columns=block_past.columns) + if not mat_flows.empty: + # Only use flows for IDs that exist in the past block + common_ids = block_past.index.intersection(mat_flows.index) + common_isins = block_past.columns.intersection(mat_flows.columns) + if not common_ids.empty: + block_flows.loc[common_ids, common_isins] = mat_flows.loc[common_ids, common_isins] + + # 4. Reconstruction: Expected State = Past + Flows + expected_state = block_past.add(block_flows, fill_value=0) + + # 5. Align Dimensions (Union of ISINs) + all_isins = list(set(expected_state.columns) | set(block_curr.columns)) + vec_expected = expected_state.reindex(columns=all_isins, fill_value=0) + vec_actual = block_curr.reindex(columns=all_isins, fill_value=0) + + # 6. Calculate Cosine Similarity + # Result is a matrix: Rows=PastIDs, Cols=CurrentIDs + sim_matrix = cosine_similarity(vec_expected, vec_actual) + + # 7. Find Best Matches + for idx, past_id in enumerate(vec_expected.index): + # Find best score in the row + best_idx = np.argmax(sim_matrix[idx]) + best_score = sim_matrix[idx][best_idx] + curr_id = vec_actual.index[best_idx] + + # 8. Magnitude Check (Euclidean safeguard) + # Ensure we don't match a small retail client to a huge institutional one + total_shares_exp = vec_expected.loc[past_id].sum() + total_shares_act = vec_actual.loc[curr_id].sum() + + # Calculate ratio (handle div by zero) + if total_shares_act == 0: + ratio = 0 + else: + ratio = total_shares_exp / total_shares_act + + # Check if ratio is within tolerance (e.g. 0.8 to 1.2) + is_magnitude_ok = (1 - magnitude_tolerance) <= ratio <= (1 + magnitude_tolerance) + + match_status = "Unmatched" + if best_score >= similarity_threshold and is_magnitude_ok: + match_status = "High Confidence Match" + elif past_id == curr_id: + match_status = "Same ID (Retained)" + + results.append({ + 'Past_ID': past_id, + 'Predicted_Current_ID': curr_id, + 'Similarity_Score': round(best_score, 4), + 'Magnitude_Ratio': round(ratio, 4), + 'Match_Status': match_status, + 'Country': country + }) + + return pd.DataFrame(results) + +# ========================================== +# MAIN EXECUTION +# ========================================== +if __name__ == "__main__": + # 1. Load Data + df_aum, df_flows = load() + + # 2. Initialize Logic + reconciler = AssetReconciler(df_aum, df_flows) + + # 3. AUTO-DETECT DATES + available_dates = sorted(reconciler.df_aum['Centralisation Date'].unique()) + + if len(available_dates) < 2: + print("\nERROR: Not enough dates found in data to perform comparison.") + print(f"Dates found: {[str(d) for d in available_dates]}") + else: + # Automatically pick the First and Last date found in the file + date_past = available_dates[0] # First date + date_current = available_dates[-1] # Last date + + print(f"\nAuto-Detected Analysis Period:") + print(f"Start (Past): {date_past}") + print(f"End (Current): {date_current}") + + # 4. Run Linkage + results = reconciler.match_accounts(date_past, date_current) + + # 5. Calculate KPI + if not results.empty: + total_past = len(results) + matches = len(results[results['Match_Status'].isin(['High Confidence Match', 'Same ID (Retained)'])]) + kpi_percentage = (matches / total_past) * 100 + + print("\n" + "="*40) + print("FINAL RESULTS") + print("="*40) + print(results[['Past_ID', 'Predicted_Current_ID', 'Similarity_Score', 'Magnitude_Ratio', 'Match_Status']]) + print("-" * 40) + print(f"KPI: {kpi_percentage:.1f}% of Past Client Codes successfully linked to Current Codes.") + print("="*40) + else: + print("No matches found.") \ No newline at end of file