Project_Carmignac/repair_challenge/carmignac_branch.py
2026-03-20 11:27:37 +00:00

389 lines
16 KiB
Python

"""
Carmignac Data Challenge — AUM Branching / Repair
==================================================
Takes as input:
- The original AUM file (pre-repair)
- The mapping CSV produced by carmignac_repair.py
- (Optionally) the surgery log, for audit annotation
Produces:
- A repaired AUM file where every Registrar Account ID is replaced
by its canonical identity (reg_orig) as determined by the pipeline.
Core logic
----------
The mapping table encodes, for every (date, reg_orig) pair, which
physical code (reg_used) was actually present in the data at that date.
reg_orig = the stable canonical identity (output label)
reg_used = the code that appeared in the raw data at that date
For rows where reg_used != reg_orig (changed=True), the raw code is a
historical alias that the surgery pass identified as belonging to
reg_orig. The repair simply relabels those rows.
For accounts not in the repair universe (below the AUM threshold, or
excluded categories), rows are passed through unchanged.
Self-mapped surgeries (reg_from == reg_to in the surgery log) do not
require any relabelling — they signal a data quality issue on that
month, not a code change.
Usage
-----
python carmignac_branch.py # default paths
python carmignac_branch.py \\
--aum raw_AUM.csv \\
--mapping carmignac_mapping.csv \\
--surgery carmignac_surgery_log.csv \\
--out AUM_repaired.csv
"""
import argparse
import os
import sys
import s3fs
import pandas as pd
# ─────────────────────────────────────────────────────────────
# 1. LOAD
# ─────────────────────────────────────────────────────────────
def load_inputs(mapping_path, surgery_path):
fs = s3fs.S3FileSystem(
client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'},
key = os.environ["AWS_ACCESS_KEY_ID"],
secret = os.environ["AWS_SECRET_ACCESS_KEY"],
token = os.environ["AWS_SESSION_TOKEN"])
with fs.open('s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f:
aum = pd.read_csv(f, sep=";")
mapping = pd.read_csv(mapping_path, parse_dates=["date"])
surgery = pd.read_csv(surgery_path, parse_dates=["date"]) if surgery_path else pd.DataFrame()
# Normalise ID columns to string
aum["Registrar Account - ID"] = aum["Registrar Account - ID"].astype(str)
mapping["reg_orig"] = mapping["reg_orig"].astype(str)
mapping["reg_used"] = mapping["reg_used"].astype(str)
if not surgery.empty:
surgery["reg_orig"] = surgery["reg_orig"].astype(str)
surgery["reg_from"] = surgery["reg_from"].astype(str)
surgery["reg_to"] = surgery["reg_to"].astype(str)
return aum, mapping, surgery
# ─────────────────────────────────────────────────────────────
# 2. BUILD RENAME LOOKUP
# ─────────────────────────────────────────────────────────────
def build_rename_lookup(mapping):
"""
Returns a dict {(date, reg_used) -> reg_orig}
restricted to rows where reg_used != reg_orig (actual changes).
For self-mapped surgeries or stable accounts, no entry is needed.
"""
changed = mapping[mapping["changed"] & (mapping["reg_orig"] != mapping["reg_used"])]
lookup = {}
for _, row in changed.iterrows():
key = (row["date"], row["reg_used"])
if key in lookup and lookup[key] != row["reg_orig"]:
# Conflict: two different reg_origs claim the same (date, reg_used)
# Should never happen with a well-formed mapping, but warn if it does
print(f" [WARN] Conflicting mapping at {row['date'].date()} "
f"reg_used={row['reg_used']}: "
f"{lookup[key]} vs {row['reg_orig']} — keeping first")
else:
lookup[key] = row["reg_orig"]
return lookup
# ─────────────────────────────────────────────────────────────
# 3. APPLY BRANCHING
# ─────────────────────────────────────────────────────────────
def apply_branching(aum, lookup):
"""
Renames Registrar Account - ID in the AUM dataframe according to
the lookup {(date, reg_used) -> reg_orig}.
Rows not in the lookup are left untouched.
Returns:
- repaired : the full AUM dataframe with corrected IDs
- audit : a subset showing only the renamed rows, with both
the original and canonical IDs for verification
"""
aum = aum.copy()
aum["Centralisation Date"] = pd.to_datetime(aum["Centralisation Date"])
aum["_date_key"] = aum["Centralisation Date"]
aum["_reg_key"] = aum["Registrar Account - ID"].astype(str)
# Vectorised lookup via merge
lookup_df = pd.DataFrame(
[(d, reg_used, reg_orig) for (d, reg_used), reg_orig in lookup.items()],
columns=["_date_key", "_reg_key", "_canonical_id"]
)
merged = aum.merge(lookup_df, on=["_date_key", "_reg_key"], how="left")
# Audit: rows that were actually renamed
renamed_mask = merged["_canonical_id"].notna()
audit = merged[renamed_mask].copy()
audit["original_reg_id"] = audit["_reg_key"]
audit["canonical_reg_id"] = audit["_canonical_id"]
audit = audit[["Centralisation Date", "original_reg_id", "canonical_reg_id",
"Product - Isin", "Quantity - AUM", "Value - AUM €"]]
# Apply rename
merged.loc[renamed_mask, "Registrar Account - ID"] = merged.loc[renamed_mask, "_canonical_id"]
# Drop helper columns
repaired = merged.drop(columns=["_date_key", "_reg_key", "_canonical_id"])
return repaired, audit
# ─────────────────────────────────────────────────────────────
# 4. CONSISTENCY CHECK
# ─────────────────────────────────────────────────────────────
def consistency_check(original, repaired, mapping, surgery):
"""
Sanity checks after branching:
1. Row count preserved
2. No reg_used alias remains in the repaired file (for changed entries)
3. For each (reg_orig, isin, date) there is at most one row
(branching should not create duplicates)
4. Summary of surgery operations applied
"""
print("\n[Consistency checks]")
# 1. Row count
if len(original) == len(repaired):
print(f" ✓ Row count preserved : {len(repaired)}")
else:
print(f" ✗ Row count changed : {len(original)}{len(repaired)}")
# 2. Aliases eliminated
changed = mapping[mapping["changed"] & (mapping["reg_orig"] != mapping["reg_used"])]
aliases = set(changed["reg_used"].unique())
still_present = set(repaired["Registrar Account - ID"].astype(str)) & aliases
if not still_present:
print(f" ✓ All {len(aliases)} aliased code(s) successfully relabelled")
else:
print(f"{len(still_present)} aliased code(s) still present: {still_present}")
# 3. Duplicates
key_cols = ["Registrar Account - ID", "Product - Isin", "Centralisation Date"]
dup_count = repaired.duplicated(subset=key_cols).sum()
if dup_count == 0:
print(f" ✓ No duplicate (reg_id, isin, date) keys")
else:
print(f"{dup_count} duplicate (reg_id, isin, date) rows found — inspect manually")
print(repaired[repaired.duplicated(subset=key_cols, keep=False)]
[key_cols + ["Quantity - AUM"]].head(10).to_string(index=False))
# 4. Surgery summary
if not surgery.empty:
print(f"\n[Surgery operations applied]")
for _, op in surgery.sort_values("date").iterrows():
self_map = " [self-map — data quality flag, no rename]" \
if op["reg_from"] == op["reg_to"] else ""
print(f" {op['date'].date()} | {op['reg_orig']} : "
f"{op['reg_from']}{op['reg_to']}"
f" (Jaccard={op['jaccard_composite']:.4f}, "
f"gain={op['gain_vs_no_surgery']:.6f}){self_map}")
# ─────────────────────────────────────────────────────────────
# 5. EXPORT PATHS (branched accounts only)
# ─────────────────────────────────────────────────────────────
def export_paths(aum, mapping, surgery, repaired):
"""
Builds a condensed AUM file for ALL accounts in the repair universe
(i.e. every reg_orig present in the mapping).
- Stable accounts (no surgery): single leg where reg_used == reg_orig
throughout, pulled directly from the repaired AUM.
- Branched accounts (at least one genuine surgery): multiple legs,
reg_used shows which physical code was active at each date.
The output makes every account's full path explicit:
reg_orig | reg_used | date | isin | qty_aum | ...
─────────┼───────────────┼────────────┼──────┼─────────┼───
REG_001 | REG_001 | 2020-01-31 | ... | ... | <- stable
REG_002 | REG_002_OLD | 2020-01-31 | ... | ... | <- leg 1
REG_002 | REG_002 | 2022-07-31 | ... | ... | <- leg 2
Self-mapped surgeries (reg_from == reg_to) are noted in the summary
but do not add extra legs — the account kept its code.
Returns the paths DataFrame (never None if mapping is non-empty).
"""
# All canonical accounts in the universe
all_accounts = sorted(mapping["reg_orig"].astype(str).unique())
# Branched accounts (genuine code changes only)
branched_accounts = set()
if not surgery.empty:
genuine = surgery[surgery["reg_from"] != surgery["reg_to"]]
branched_accounts = set(genuine["reg_orig"].astype(str).unique())
print(f"\n[Paths] {len(all_accounts)} account(s) in universe, "
f"{len(branched_accounts)} branched: "
f"{sorted(branched_accounts) or 'none'}")
# Build (date, reg_orig) → reg_used lookup from mapping
map_df = mapping[["date", "reg_orig", "reg_used"]].copy()
map_df["date"] = pd.to_datetime(map_df["date"])
map_df["reg_orig"] = map_df["reg_orig"].astype(str)
map_df["reg_used"] = map_df["reg_used"].astype(str)
map_df = map_df.rename(columns={"date": "_date_key", "reg_orig": "_reg_key"})
# Pull all universe rows from the repaired AUM
aum_universe = repaired[
repaired["Registrar Account - ID"].astype(str).isin(all_accounts)
].copy()
aum_universe["Centralisation Date"] = pd.to_datetime(aum_universe["Centralisation Date"])
aum_universe["_date_key"] = aum_universe["Centralisation Date"]
aum_universe["_reg_key"] = aum_universe["Registrar Account - ID"].astype(str)
# Join reg_used from mapping
paths = aum_universe.merge(
map_df[["_date_key", "_reg_key", "reg_used"]],
on=["_date_key", "_reg_key"],
how="left",
).drop(columns=["_date_key", "_reg_key"])
# For stable accounts, mapping may not cover every AUM date (e.g. sparse
# months) — fall back to reg_orig (= Registrar Account - ID) for those.
paths["reg_used"] = paths["reg_used"].fillna(
paths["Registrar Account - ID"].astype(str)
)
# Rename canonical column
paths = paths.rename(columns={"Registrar Account - ID": "reg_orig"})
# Column order
other_cols = [c for c in paths.columns if c not in ("reg_orig", "reg_used")]
paths = paths[["reg_orig", "reg_used"] + other_cols]
paths = paths.sort_values(["reg_orig", "Centralisation Date", "Product - Isin"])
paths = paths.reset_index(drop=True)
# Summary
for acc in all_accounts:
sub = paths[paths["reg_orig"] == acc]
legs = list(sub["reg_used"].unique())
tag = " [branched]" if acc in branched_accounts else " [stable]"
print(f" {acc}: {len(sub)} rows, legs = {legs}{tag}")
return paths
# ─────────────────────────────────────────────────────────────
# 6. MAIN
# ─────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Apply Carmignac repair mapping to the raw AUM file"
)
parser.add_argument("--mapping", default="repair_results/carmignac_mapping.csv",
help="Path to mapping CSV from carmignac_repair.py")
parser.add_argument("--surgery", default="repair_results/carmignac_surgery_log.csv",
help="Path to surgery log CSV (optional, for audit)")
parser.add_argument("--out", default="AUM_repaired.csv",
help="Output path for repaired AUM CSV")
parser.add_argument("--audit", default="AUM_repair_audit.csv",
help="Output path for audit CSV (renamed rows only)")
parser.add_argument("--paths", default="AUM_paths.csv",
help="Output path for condensed paths CSV (branched accounts only)")
args = parser.parse_args()
def resolve(p, required=True):
if os.path.exists(p):
return p
alt = os.path.join(os.path.dirname(os.path.abspath(__file__)), p)
if os.path.exists(alt):
return alt
if required:
sys.exit(f"[ERROR] File not found: {p}")
return None
mapping_path = resolve(args.mapping)
surgery_path = resolve(args.surgery, required=False)
print("=" * 60)
print("CARMIGNAC — AUM Branching / Repair")
print("=" * 60)
print(f" Mapping : {mapping_path}")
print(f" Surgery : {surgery_path or '(not provided)'}")
# Load
aum, mapping, surgery = load_inputs(mapping_path, surgery_path)
print(f"\n Raw AUM rows : {len(aum)}")
print(f" Mapping rows : {len(mapping)}")
print(f" Mapping changed rows : {mapping['changed'].sum()}")
print(f" Surgery operations : {len(surgery)}")
# Build lookup
lookup = build_rename_lookup(mapping)
print(f"\n Rename operations : {len(lookup)}")
if lookup:
sample = list(lookup.items())[:3]
for (d, used), orig in sample:
print(f" ({d.date()}, {used}) → {orig}")
if len(lookup) > 3:
print(f" ... and {len(lookup) - 3} more")
# Apply
repaired, audit = apply_branching(aum, lookup)
print(f"\n Rows renamed : {len(audit)}")
# Checks
consistency_check(aum, repaired, mapping, surgery)
# Save
out_dir = os.path.dirname(os.path.abspath(args.out))
os.makedirs(out_dir, exist_ok=True)
repaired.to_csv(args.out, index=False)
print(f"\n ✓ Repaired AUM → {args.out}")
if len(audit) > 0:
audit.to_csv(args.audit, index=False)
print(f" ✓ Audit log → {args.audit}")
else:
print(f"(No rows renamed — audit log not written)")
# Paths: condensed AUM for branched accounts
df_paths = export_paths(aum, mapping, surgery, repaired)
if df_paths is not None:
df_paths.to_csv(args.paths, index=False)
print(f" ✓ Paths file → {args.paths}")
# Print renamed reg_ids summary
if len(audit) > 0:
print("\n[Renamed identifiers]")
summary = (audit.groupby(["original_reg_id", "canonical_reg_id"])
.size()
.reset_index(name="n_rows"))
for _, row in summary.iterrows():
print(f" {row['original_reg_id']:20s}{row['canonical_reg_id']:20s} "
f"({row['n_rows']} rows)")
print("\nDone.")
if __name__ == "__main__":
main()