diff --git a/repair_challenge/carmignac_branch.py b/repair_challenge/carmignac_branch.py new file mode 100644 index 0000000..64221d6 --- /dev/null +++ b/repair_challenge/carmignac_branch.py @@ -0,0 +1,388 @@ +""" +Carmignac Data Challenge — AUM Branching / Repair +================================================== +Takes as input: + - The original AUM file (pre-repair) + - The mapping CSV produced by carmignac_repair.py + - (Optionally) the surgery log, for audit annotation + +Produces: + - A repaired AUM file where every Registrar Account ID is replaced + by its canonical identity (reg_orig) as determined by the pipeline. + +Core logic +---------- +The mapping table encodes, for every (date, reg_orig) pair, which +physical code (reg_used) was actually present in the data at that date. + + reg_orig = the stable canonical identity (output label) + reg_used = the code that appeared in the raw data at that date + +For rows where reg_used != reg_orig (changed=True), the raw code is a +historical alias that the surgery pass identified as belonging to +reg_orig. The repair simply relabels those rows. + +For accounts not in the repair universe (below the AUM threshold, or +excluded categories), rows are passed through unchanged. + +Self-mapped surgeries (reg_from == reg_to in the surgery log) do not +require any relabelling — they signal a data quality issue on that +month, not a code change. + +Usage +----- + python carmignac_branch.py # default paths + python carmignac_branch.py \\ + --aum raw_AUM.csv \\ + --mapping carmignac_mapping.csv \\ + --surgery carmignac_surgery_log.csv \\ + --out AUM_repaired.csv +""" + +import argparse +import os +import sys +import s3fs +import pandas as pd + + +# ───────────────────────────────────────────────────────────── +# 1. LOAD +# ───────────────────────────────────────────────────────────── + +def load_inputs(mapping_path, surgery_path): + fs = s3fs.S3FileSystem( + client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'}, + key = os.environ["AWS_ACCESS_KEY_ID"], + secret = os.environ["AWS_SECRET_ACCESS_KEY"], + token = os.environ["AWS_SESSION_TOKEN"]) + + with fs.open('s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f: + aum = pd.read_csv(f, sep=";") + + mapping = pd.read_csv(mapping_path, parse_dates=["date"]) + surgery = pd.read_csv(surgery_path, parse_dates=["date"]) if surgery_path else pd.DataFrame() + + # Normalise ID columns to string + aum["Registrar Account - ID"] = aum["Registrar Account - ID"].astype(str) + mapping["reg_orig"] = mapping["reg_orig"].astype(str) + mapping["reg_used"] = mapping["reg_used"].astype(str) + if not surgery.empty: + surgery["reg_orig"] = surgery["reg_orig"].astype(str) + surgery["reg_from"] = surgery["reg_from"].astype(str) + surgery["reg_to"] = surgery["reg_to"].astype(str) + + return aum, mapping, surgery + + +# ───────────────────────────────────────────────────────────── +# 2. BUILD RENAME LOOKUP +# ───────────────────────────────────────────────────────────── + +def build_rename_lookup(mapping): + """ + Returns a dict {(date, reg_used) -> reg_orig} + restricted to rows where reg_used != reg_orig (actual changes). + + For self-mapped surgeries or stable accounts, no entry is needed. + """ + changed = mapping[mapping["changed"] & (mapping["reg_orig"] != mapping["reg_used"])] + + lookup = {} + for _, row in changed.iterrows(): + key = (row["date"], row["reg_used"]) + if key in lookup and lookup[key] != row["reg_orig"]: + # Conflict: two different reg_origs claim the same (date, reg_used) + # Should never happen with a well-formed mapping, but warn if it does + print(f" [WARN] Conflicting mapping at {row['date'].date()} " + f"reg_used={row['reg_used']}: " + f"{lookup[key]} vs {row['reg_orig']} — keeping first") + else: + lookup[key] = row["reg_orig"] + + return lookup + + +# ───────────────────────────────────────────────────────────── +# 3. APPLY BRANCHING +# ───────────────────────────────────────────────────────────── + +def apply_branching(aum, lookup): + """ + Renames Registrar Account - ID in the AUM dataframe according to + the lookup {(date, reg_used) -> reg_orig}. + + Rows not in the lookup are left untouched. + + Returns: + - repaired : the full AUM dataframe with corrected IDs + - audit : a subset showing only the renamed rows, with both + the original and canonical IDs for verification + """ + aum = aum.copy() + aum["Centralisation Date"] = pd.to_datetime(aum["Centralisation Date"]) + aum["_date_key"] = aum["Centralisation Date"] + aum["_reg_key"] = aum["Registrar Account - ID"].astype(str) + + # Vectorised lookup via merge + lookup_df = pd.DataFrame( + [(d, reg_used, reg_orig) for (d, reg_used), reg_orig in lookup.items()], + columns=["_date_key", "_reg_key", "_canonical_id"] + ) + + merged = aum.merge(lookup_df, on=["_date_key", "_reg_key"], how="left") + + # Audit: rows that were actually renamed + renamed_mask = merged["_canonical_id"].notna() + audit = merged[renamed_mask].copy() + audit["original_reg_id"] = audit["_reg_key"] + audit["canonical_reg_id"] = audit["_canonical_id"] + audit = audit[["Centralisation Date", "original_reg_id", "canonical_reg_id", + "Product - Isin", "Quantity - AUM", "Value - AUM €"]] + + # Apply rename + merged.loc[renamed_mask, "Registrar Account - ID"] = merged.loc[renamed_mask, "_canonical_id"] + + # Drop helper columns + repaired = merged.drop(columns=["_date_key", "_reg_key", "_canonical_id"]) + + return repaired, audit + + +# ───────────────────────────────────────────────────────────── +# 4. CONSISTENCY CHECK +# ───────────────────────────────────────────────────────────── + +def consistency_check(original, repaired, mapping, surgery): + """ + Sanity checks after branching: + + 1. Row count preserved + 2. No reg_used alias remains in the repaired file (for changed entries) + 3. For each (reg_orig, isin, date) there is at most one row + (branching should not create duplicates) + 4. Summary of surgery operations applied + """ + print("\n[Consistency checks]") + + # 1. Row count + if len(original) == len(repaired): + print(f" ✓ Row count preserved : {len(repaired)}") + else: + print(f" ✗ Row count changed : {len(original)} → {len(repaired)}") + + # 2. Aliases eliminated + changed = mapping[mapping["changed"] & (mapping["reg_orig"] != mapping["reg_used"])] + aliases = set(changed["reg_used"].unique()) + still_present = set(repaired["Registrar Account - ID"].astype(str)) & aliases + if not still_present: + print(f" ✓ All {len(aliases)} aliased code(s) successfully relabelled") + else: + print(f" ✗ {len(still_present)} aliased code(s) still present: {still_present}") + + # 3. Duplicates + key_cols = ["Registrar Account - ID", "Product - Isin", "Centralisation Date"] + dup_count = repaired.duplicated(subset=key_cols).sum() + if dup_count == 0: + print(f" ✓ No duplicate (reg_id, isin, date) keys") + else: + print(f" ✗ {dup_count} duplicate (reg_id, isin, date) rows found — inspect manually") + print(repaired[repaired.duplicated(subset=key_cols, keep=False)] + [key_cols + ["Quantity - AUM"]].head(10).to_string(index=False)) + + # 4. Surgery summary + if not surgery.empty: + print(f"\n[Surgery operations applied]") + for _, op in surgery.sort_values("date").iterrows(): + self_map = " [self-map — data quality flag, no rename]" \ + if op["reg_from"] == op["reg_to"] else "" + print(f" {op['date'].date()} | {op['reg_orig']} : " + f"{op['reg_from']} → {op['reg_to']}" + f" (Jaccard={op['jaccard_composite']:.4f}, " + f"gain={op['gain_vs_no_surgery']:.6f}){self_map}") + + +# ───────────────────────────────────────────────────────────── +# 5. EXPORT PATHS (branched accounts only) +# ───────────────────────────────────────────────────────────── + +def export_paths(aum, mapping, surgery, repaired): + """ + Builds a condensed AUM file for ALL accounts in the repair universe + (i.e. every reg_orig present in the mapping). + + - Stable accounts (no surgery): single leg where reg_used == reg_orig + throughout, pulled directly from the repaired AUM. + - Branched accounts (at least one genuine surgery): multiple legs, + reg_used shows which physical code was active at each date. + + The output makes every account's full path explicit: + + reg_orig | reg_used | date | isin | qty_aum | ... + ─────────┼───────────────┼────────────┼──────┼─────────┼─── + REG_001 | REG_001 | 2020-01-31 | ... | ... | <- stable + REG_002 | REG_002_OLD | 2020-01-31 | ... | ... | <- leg 1 + REG_002 | REG_002 | 2022-07-31 | ... | ... | <- leg 2 + + Self-mapped surgeries (reg_from == reg_to) are noted in the summary + but do not add extra legs — the account kept its code. + + Returns the paths DataFrame (never None if mapping is non-empty). + """ + # All canonical accounts in the universe + all_accounts = sorted(mapping["reg_orig"].astype(str).unique()) + + # Branched accounts (genuine code changes only) + branched_accounts = set() + if not surgery.empty: + genuine = surgery[surgery["reg_from"] != surgery["reg_to"]] + branched_accounts = set(genuine["reg_orig"].astype(str).unique()) + + print(f"\n[Paths] {len(all_accounts)} account(s) in universe, " + f"{len(branched_accounts)} branched: " + f"{sorted(branched_accounts) or 'none'}") + + # Build (date, reg_orig) → reg_used lookup from mapping + map_df = mapping[["date", "reg_orig", "reg_used"]].copy() + map_df["date"] = pd.to_datetime(map_df["date"]) + map_df["reg_orig"] = map_df["reg_orig"].astype(str) + map_df["reg_used"] = map_df["reg_used"].astype(str) + map_df = map_df.rename(columns={"date": "_date_key", "reg_orig": "_reg_key"}) + + # Pull all universe rows from the repaired AUM + aum_universe = repaired[ + repaired["Registrar Account - ID"].astype(str).isin(all_accounts) + ].copy() + aum_universe["Centralisation Date"] = pd.to_datetime(aum_universe["Centralisation Date"]) + aum_universe["_date_key"] = aum_universe["Centralisation Date"] + aum_universe["_reg_key"] = aum_universe["Registrar Account - ID"].astype(str) + + # Join reg_used from mapping + paths = aum_universe.merge( + map_df[["_date_key", "_reg_key", "reg_used"]], + on=["_date_key", "_reg_key"], + how="left", + ).drop(columns=["_date_key", "_reg_key"]) + + # For stable accounts, mapping may not cover every AUM date (e.g. sparse + # months) — fall back to reg_orig (= Registrar Account - ID) for those. + paths["reg_used"] = paths["reg_used"].fillna( + paths["Registrar Account - ID"].astype(str) + ) + + # Rename canonical column + paths = paths.rename(columns={"Registrar Account - ID": "reg_orig"}) + + # Column order + other_cols = [c for c in paths.columns if c not in ("reg_orig", "reg_used")] + paths = paths[["reg_orig", "reg_used"] + other_cols] + paths = paths.sort_values(["reg_orig", "Centralisation Date", "Product - Isin"]) + paths = paths.reset_index(drop=True) + + # Summary + for acc in all_accounts: + sub = paths[paths["reg_orig"] == acc] + legs = list(sub["reg_used"].unique()) + tag = " [branched]" if acc in branched_accounts else " [stable]" + print(f" {acc}: {len(sub)} rows, legs = {legs}{tag}") + + return paths + + +# ───────────────────────────────────────────────────────────── +# 6. MAIN +# ───────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Apply Carmignac repair mapping to the raw AUM file" + ) + parser.add_argument("--mapping", default="repair_results/carmignac_mapping.csv", + help="Path to mapping CSV from carmignac_repair.py") + parser.add_argument("--surgery", default="repair_results/carmignac_surgery_log.csv", + help="Path to surgery log CSV (optional, for audit)") + parser.add_argument("--out", default="AUM_repaired.csv", + help="Output path for repaired AUM CSV") + parser.add_argument("--audit", default="AUM_repair_audit.csv", + help="Output path for audit CSV (renamed rows only)") + parser.add_argument("--paths", default="AUM_paths.csv", + help="Output path for condensed paths CSV (branched accounts only)") + args = parser.parse_args() + + def resolve(p, required=True): + if os.path.exists(p): + return p + alt = os.path.join(os.path.dirname(os.path.abspath(__file__)), p) + if os.path.exists(alt): + return alt + if required: + sys.exit(f"[ERROR] File not found: {p}") + return None + + mapping_path = resolve(args.mapping) + surgery_path = resolve(args.surgery, required=False) + + print("=" * 60) + print("CARMIGNAC — AUM Branching / Repair") + print("=" * 60) + print(f" Mapping : {mapping_path}") + print(f" Surgery : {surgery_path or '(not provided)'}") + + # Load + aum, mapping, surgery = load_inputs(mapping_path, surgery_path) + print(f"\n Raw AUM rows : {len(aum)}") + print(f" Mapping rows : {len(mapping)}") + print(f" Mapping changed rows : {mapping['changed'].sum()}") + print(f" Surgery operations : {len(surgery)}") + + # Build lookup + lookup = build_rename_lookup(mapping) + print(f"\n Rename operations : {len(lookup)}") + if lookup: + sample = list(lookup.items())[:3] + for (d, used), orig in sample: + print(f" ({d.date()}, {used}) → {orig}") + if len(lookup) > 3: + print(f" ... and {len(lookup) - 3} more") + + # Apply + repaired, audit = apply_branching(aum, lookup) + print(f"\n Rows renamed : {len(audit)}") + + # Checks + consistency_check(aum, repaired, mapping, surgery) + + # Save + out_dir = os.path.dirname(os.path.abspath(args.out)) + os.makedirs(out_dir, exist_ok=True) + + repaired.to_csv(args.out, index=False) + print(f"\n ✓ Repaired AUM → {args.out}") + + if len(audit) > 0: + audit.to_csv(args.audit, index=False) + print(f" ✓ Audit log → {args.audit}") + else: + print(f"(No rows renamed — audit log not written)") + + # Paths: condensed AUM for branched accounts + df_paths = export_paths(aum, mapping, surgery, repaired) + if df_paths is not None: + df_paths.to_csv(args.paths, index=False) + print(f" ✓ Paths file → {args.paths}") + + # Print renamed reg_ids summary + if len(audit) > 0: + print("\n[Renamed identifiers]") + summary = (audit.groupby(["original_reg_id", "canonical_reg_id"]) + .size() + .reset_index(name="n_rows")) + for _, row in summary.iterrows(): + print(f" {row['original_reg_id']:20s} → {row['canonical_reg_id']:20s} " + f"({row['n_rows']} rows)") + + print("\nDone.") + + +if __name__ == "__main__": + main() diff --git a/repair_challenge/carmignac_repair.py b/repair_challenge/carmignac_repair.py index 60df85d..8dbed26 100644 --- a/repair_challenge/carmignac_repair.py +++ b/repair_challenge/carmignac_repair.py @@ -16,9 +16,9 @@ import s3fs # PARAMÈTRES # ───────────────────────────────────────────── ALPHA = 0.03 # tolérance réconciliation : 5% du stock à t -MIN_AUM_EUR = 0 # seuil filtrage étape 1 — 0 pour les heads de test, 5e6 en prod +MIN_AUM_EUR = 5e6 # seuil filtrage étape 1 — 0 pour les heads de test, 5e6 en prod MIN_JACCARD = 0.3 # seuil minimal similarité portefeuille pour chirurgie -SCORE_DROP_THRESHOLD = 0.1 # si score chute de >10% → candidat chirurgie +SCORE_DROP_THRESHOLD = 0.1 # si score chute de >10% → candidat chirurgie EXCLUDE_REGISTRAR = ["Off Distribution", "Private Clients"]