branching
This commit is contained in:
parent
4d1e08274e
commit
aac1fe43ea
388
repair_challenge/carmignac_branch.py
Normal file
388
repair_challenge/carmignac_branch.py
Normal file
|
|
@ -0,0 +1,388 @@
|
|||
"""
|
||||
Carmignac Data Challenge — AUM Branching / Repair
|
||||
==================================================
|
||||
Takes as input:
|
||||
- The original AUM file (pre-repair)
|
||||
- The mapping CSV produced by carmignac_repair.py
|
||||
- (Optionally) the surgery log, for audit annotation
|
||||
|
||||
Produces:
|
||||
- A repaired AUM file where every Registrar Account ID is replaced
|
||||
by its canonical identity (reg_orig) as determined by the pipeline.
|
||||
|
||||
Core logic
|
||||
----------
|
||||
The mapping table encodes, for every (date, reg_orig) pair, which
|
||||
physical code (reg_used) was actually present in the data at that date.
|
||||
|
||||
reg_orig = the stable canonical identity (output label)
|
||||
reg_used = the code that appeared in the raw data at that date
|
||||
|
||||
For rows where reg_used != reg_orig (changed=True), the raw code is a
|
||||
historical alias that the surgery pass identified as belonging to
|
||||
reg_orig. The repair simply relabels those rows.
|
||||
|
||||
For accounts not in the repair universe (below the AUM threshold, or
|
||||
excluded categories), rows are passed through unchanged.
|
||||
|
||||
Self-mapped surgeries (reg_from == reg_to in the surgery log) do not
|
||||
require any relabelling — they signal a data quality issue on that
|
||||
month, not a code change.
|
||||
|
||||
Usage
|
||||
-----
|
||||
python carmignac_branch.py # default paths
|
||||
python carmignac_branch.py \\
|
||||
--aum raw_AUM.csv \\
|
||||
--mapping carmignac_mapping.csv \\
|
||||
--surgery carmignac_surgery_log.csv \\
|
||||
--out AUM_repaired.csv
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import s3fs
|
||||
import pandas as pd
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
# 1. LOAD
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
|
||||
def load_inputs(mapping_path, surgery_path):
|
||||
fs = s3fs.S3FileSystem(
|
||||
client_kwargs={'endpoint_url': 'https://'+'minio-simple.lab.groupe-genes.fr'},
|
||||
key = os.environ["AWS_ACCESS_KEY_ID"],
|
||||
secret = os.environ["AWS_SECRET_ACCESS_KEY"],
|
||||
token = os.environ["AWS_SESSION_TOKEN"])
|
||||
|
||||
with fs.open('s3://projet-bdc-data/carmignac/AUM ENSAE V2 -20251105.csv', 'rb') as f:
|
||||
aum = pd.read_csv(f, sep=";")
|
||||
|
||||
mapping = pd.read_csv(mapping_path, parse_dates=["date"])
|
||||
surgery = pd.read_csv(surgery_path, parse_dates=["date"]) if surgery_path else pd.DataFrame()
|
||||
|
||||
# Normalise ID columns to string
|
||||
aum["Registrar Account - ID"] = aum["Registrar Account - ID"].astype(str)
|
||||
mapping["reg_orig"] = mapping["reg_orig"].astype(str)
|
||||
mapping["reg_used"] = mapping["reg_used"].astype(str)
|
||||
if not surgery.empty:
|
||||
surgery["reg_orig"] = surgery["reg_orig"].astype(str)
|
||||
surgery["reg_from"] = surgery["reg_from"].astype(str)
|
||||
surgery["reg_to"] = surgery["reg_to"].astype(str)
|
||||
|
||||
return aum, mapping, surgery
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
# 2. BUILD RENAME LOOKUP
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
|
||||
def build_rename_lookup(mapping):
|
||||
"""
|
||||
Returns a dict {(date, reg_used) -> reg_orig}
|
||||
restricted to rows where reg_used != reg_orig (actual changes).
|
||||
|
||||
For self-mapped surgeries or stable accounts, no entry is needed.
|
||||
"""
|
||||
changed = mapping[mapping["changed"] & (mapping["reg_orig"] != mapping["reg_used"])]
|
||||
|
||||
lookup = {}
|
||||
for _, row in changed.iterrows():
|
||||
key = (row["date"], row["reg_used"])
|
||||
if key in lookup and lookup[key] != row["reg_orig"]:
|
||||
# Conflict: two different reg_origs claim the same (date, reg_used)
|
||||
# Should never happen with a well-formed mapping, but warn if it does
|
||||
print(f" [WARN] Conflicting mapping at {row['date'].date()} "
|
||||
f"reg_used={row['reg_used']}: "
|
||||
f"{lookup[key]} vs {row['reg_orig']} — keeping first")
|
||||
else:
|
||||
lookup[key] = row["reg_orig"]
|
||||
|
||||
return lookup
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
# 3. APPLY BRANCHING
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
|
||||
def apply_branching(aum, lookup):
|
||||
"""
|
||||
Renames Registrar Account - ID in the AUM dataframe according to
|
||||
the lookup {(date, reg_used) -> reg_orig}.
|
||||
|
||||
Rows not in the lookup are left untouched.
|
||||
|
||||
Returns:
|
||||
- repaired : the full AUM dataframe with corrected IDs
|
||||
- audit : a subset showing only the renamed rows, with both
|
||||
the original and canonical IDs for verification
|
||||
"""
|
||||
aum = aum.copy()
|
||||
aum["Centralisation Date"] = pd.to_datetime(aum["Centralisation Date"])
|
||||
aum["_date_key"] = aum["Centralisation Date"]
|
||||
aum["_reg_key"] = aum["Registrar Account - ID"].astype(str)
|
||||
|
||||
# Vectorised lookup via merge
|
||||
lookup_df = pd.DataFrame(
|
||||
[(d, reg_used, reg_orig) for (d, reg_used), reg_orig in lookup.items()],
|
||||
columns=["_date_key", "_reg_key", "_canonical_id"]
|
||||
)
|
||||
|
||||
merged = aum.merge(lookup_df, on=["_date_key", "_reg_key"], how="left")
|
||||
|
||||
# Audit: rows that were actually renamed
|
||||
renamed_mask = merged["_canonical_id"].notna()
|
||||
audit = merged[renamed_mask].copy()
|
||||
audit["original_reg_id"] = audit["_reg_key"]
|
||||
audit["canonical_reg_id"] = audit["_canonical_id"]
|
||||
audit = audit[["Centralisation Date", "original_reg_id", "canonical_reg_id",
|
||||
"Product - Isin", "Quantity - AUM", "Value - AUM €"]]
|
||||
|
||||
# Apply rename
|
||||
merged.loc[renamed_mask, "Registrar Account - ID"] = merged.loc[renamed_mask, "_canonical_id"]
|
||||
|
||||
# Drop helper columns
|
||||
repaired = merged.drop(columns=["_date_key", "_reg_key", "_canonical_id"])
|
||||
|
||||
return repaired, audit
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
# 4. CONSISTENCY CHECK
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
|
||||
def consistency_check(original, repaired, mapping, surgery):
|
||||
"""
|
||||
Sanity checks after branching:
|
||||
|
||||
1. Row count preserved
|
||||
2. No reg_used alias remains in the repaired file (for changed entries)
|
||||
3. For each (reg_orig, isin, date) there is at most one row
|
||||
(branching should not create duplicates)
|
||||
4. Summary of surgery operations applied
|
||||
"""
|
||||
print("\n[Consistency checks]")
|
||||
|
||||
# 1. Row count
|
||||
if len(original) == len(repaired):
|
||||
print(f" ✓ Row count preserved : {len(repaired)}")
|
||||
else:
|
||||
print(f" ✗ Row count changed : {len(original)} → {len(repaired)}")
|
||||
|
||||
# 2. Aliases eliminated
|
||||
changed = mapping[mapping["changed"] & (mapping["reg_orig"] != mapping["reg_used"])]
|
||||
aliases = set(changed["reg_used"].unique())
|
||||
still_present = set(repaired["Registrar Account - ID"].astype(str)) & aliases
|
||||
if not still_present:
|
||||
print(f" ✓ All {len(aliases)} aliased code(s) successfully relabelled")
|
||||
else:
|
||||
print(f" ✗ {len(still_present)} aliased code(s) still present: {still_present}")
|
||||
|
||||
# 3. Duplicates
|
||||
key_cols = ["Registrar Account - ID", "Product - Isin", "Centralisation Date"]
|
||||
dup_count = repaired.duplicated(subset=key_cols).sum()
|
||||
if dup_count == 0:
|
||||
print(f" ✓ No duplicate (reg_id, isin, date) keys")
|
||||
else:
|
||||
print(f" ✗ {dup_count} duplicate (reg_id, isin, date) rows found — inspect manually")
|
||||
print(repaired[repaired.duplicated(subset=key_cols, keep=False)]
|
||||
[key_cols + ["Quantity - AUM"]].head(10).to_string(index=False))
|
||||
|
||||
# 4. Surgery summary
|
||||
if not surgery.empty:
|
||||
print(f"\n[Surgery operations applied]")
|
||||
for _, op in surgery.sort_values("date").iterrows():
|
||||
self_map = " [self-map — data quality flag, no rename]" \
|
||||
if op["reg_from"] == op["reg_to"] else ""
|
||||
print(f" {op['date'].date()} | {op['reg_orig']} : "
|
||||
f"{op['reg_from']} → {op['reg_to']}"
|
||||
f" (Jaccard={op['jaccard_composite']:.4f}, "
|
||||
f"gain={op['gain_vs_no_surgery']:.6f}){self_map}")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
# 5. EXPORT PATHS (branched accounts only)
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
|
||||
def export_paths(aum, mapping, surgery, repaired):
|
||||
"""
|
||||
Builds a condensed AUM file for ALL accounts in the repair universe
|
||||
(i.e. every reg_orig present in the mapping).
|
||||
|
||||
- Stable accounts (no surgery): single leg where reg_used == reg_orig
|
||||
throughout, pulled directly from the repaired AUM.
|
||||
- Branched accounts (at least one genuine surgery): multiple legs,
|
||||
reg_used shows which physical code was active at each date.
|
||||
|
||||
The output makes every account's full path explicit:
|
||||
|
||||
reg_orig | reg_used | date | isin | qty_aum | ...
|
||||
─────────┼───────────────┼────────────┼──────┼─────────┼───
|
||||
REG_001 | REG_001 | 2020-01-31 | ... | ... | <- stable
|
||||
REG_002 | REG_002_OLD | 2020-01-31 | ... | ... | <- leg 1
|
||||
REG_002 | REG_002 | 2022-07-31 | ... | ... | <- leg 2
|
||||
|
||||
Self-mapped surgeries (reg_from == reg_to) are noted in the summary
|
||||
but do not add extra legs — the account kept its code.
|
||||
|
||||
Returns the paths DataFrame (never None if mapping is non-empty).
|
||||
"""
|
||||
# All canonical accounts in the universe
|
||||
all_accounts = sorted(mapping["reg_orig"].astype(str).unique())
|
||||
|
||||
# Branched accounts (genuine code changes only)
|
||||
branched_accounts = set()
|
||||
if not surgery.empty:
|
||||
genuine = surgery[surgery["reg_from"] != surgery["reg_to"]]
|
||||
branched_accounts = set(genuine["reg_orig"].astype(str).unique())
|
||||
|
||||
print(f"\n[Paths] {len(all_accounts)} account(s) in universe, "
|
||||
f"{len(branched_accounts)} branched: "
|
||||
f"{sorted(branched_accounts) or 'none'}")
|
||||
|
||||
# Build (date, reg_orig) → reg_used lookup from mapping
|
||||
map_df = mapping[["date", "reg_orig", "reg_used"]].copy()
|
||||
map_df["date"] = pd.to_datetime(map_df["date"])
|
||||
map_df["reg_orig"] = map_df["reg_orig"].astype(str)
|
||||
map_df["reg_used"] = map_df["reg_used"].astype(str)
|
||||
map_df = map_df.rename(columns={"date": "_date_key", "reg_orig": "_reg_key"})
|
||||
|
||||
# Pull all universe rows from the repaired AUM
|
||||
aum_universe = repaired[
|
||||
repaired["Registrar Account - ID"].astype(str).isin(all_accounts)
|
||||
].copy()
|
||||
aum_universe["Centralisation Date"] = pd.to_datetime(aum_universe["Centralisation Date"])
|
||||
aum_universe["_date_key"] = aum_universe["Centralisation Date"]
|
||||
aum_universe["_reg_key"] = aum_universe["Registrar Account - ID"].astype(str)
|
||||
|
||||
# Join reg_used from mapping
|
||||
paths = aum_universe.merge(
|
||||
map_df[["_date_key", "_reg_key", "reg_used"]],
|
||||
on=["_date_key", "_reg_key"],
|
||||
how="left",
|
||||
).drop(columns=["_date_key", "_reg_key"])
|
||||
|
||||
# For stable accounts, mapping may not cover every AUM date (e.g. sparse
|
||||
# months) — fall back to reg_orig (= Registrar Account - ID) for those.
|
||||
paths["reg_used"] = paths["reg_used"].fillna(
|
||||
paths["Registrar Account - ID"].astype(str)
|
||||
)
|
||||
|
||||
# Rename canonical column
|
||||
paths = paths.rename(columns={"Registrar Account - ID": "reg_orig"})
|
||||
|
||||
# Column order
|
||||
other_cols = [c for c in paths.columns if c not in ("reg_orig", "reg_used")]
|
||||
paths = paths[["reg_orig", "reg_used"] + other_cols]
|
||||
paths = paths.sort_values(["reg_orig", "Centralisation Date", "Product - Isin"])
|
||||
paths = paths.reset_index(drop=True)
|
||||
|
||||
# Summary
|
||||
for acc in all_accounts:
|
||||
sub = paths[paths["reg_orig"] == acc]
|
||||
legs = list(sub["reg_used"].unique())
|
||||
tag = " [branched]" if acc in branched_accounts else " [stable]"
|
||||
print(f" {acc}: {len(sub)} rows, legs = {legs}{tag}")
|
||||
|
||||
return paths
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
# 6. MAIN
|
||||
# ─────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Apply Carmignac repair mapping to the raw AUM file"
|
||||
)
|
||||
parser.add_argument("--mapping", default="repair_results/carmignac_mapping.csv",
|
||||
help="Path to mapping CSV from carmignac_repair.py")
|
||||
parser.add_argument("--surgery", default="repair_results/carmignac_surgery_log.csv",
|
||||
help="Path to surgery log CSV (optional, for audit)")
|
||||
parser.add_argument("--out", default="AUM_repaired.csv",
|
||||
help="Output path for repaired AUM CSV")
|
||||
parser.add_argument("--audit", default="AUM_repair_audit.csv",
|
||||
help="Output path for audit CSV (renamed rows only)")
|
||||
parser.add_argument("--paths", default="AUM_paths.csv",
|
||||
help="Output path for condensed paths CSV (branched accounts only)")
|
||||
args = parser.parse_args()
|
||||
|
||||
def resolve(p, required=True):
|
||||
if os.path.exists(p):
|
||||
return p
|
||||
alt = os.path.join(os.path.dirname(os.path.abspath(__file__)), p)
|
||||
if os.path.exists(alt):
|
||||
return alt
|
||||
if required:
|
||||
sys.exit(f"[ERROR] File not found: {p}")
|
||||
return None
|
||||
|
||||
mapping_path = resolve(args.mapping)
|
||||
surgery_path = resolve(args.surgery, required=False)
|
||||
|
||||
print("=" * 60)
|
||||
print("CARMIGNAC — AUM Branching / Repair")
|
||||
print("=" * 60)
|
||||
print(f" Mapping : {mapping_path}")
|
||||
print(f" Surgery : {surgery_path or '(not provided)'}")
|
||||
|
||||
# Load
|
||||
aum, mapping, surgery = load_inputs(mapping_path, surgery_path)
|
||||
print(f"\n Raw AUM rows : {len(aum)}")
|
||||
print(f" Mapping rows : {len(mapping)}")
|
||||
print(f" Mapping changed rows : {mapping['changed'].sum()}")
|
||||
print(f" Surgery operations : {len(surgery)}")
|
||||
|
||||
# Build lookup
|
||||
lookup = build_rename_lookup(mapping)
|
||||
print(f"\n Rename operations : {len(lookup)}")
|
||||
if lookup:
|
||||
sample = list(lookup.items())[:3]
|
||||
for (d, used), orig in sample:
|
||||
print(f" ({d.date()}, {used}) → {orig}")
|
||||
if len(lookup) > 3:
|
||||
print(f" ... and {len(lookup) - 3} more")
|
||||
|
||||
# Apply
|
||||
repaired, audit = apply_branching(aum, lookup)
|
||||
print(f"\n Rows renamed : {len(audit)}")
|
||||
|
||||
# Checks
|
||||
consistency_check(aum, repaired, mapping, surgery)
|
||||
|
||||
# Save
|
||||
out_dir = os.path.dirname(os.path.abspath(args.out))
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
|
||||
repaired.to_csv(args.out, index=False)
|
||||
print(f"\n ✓ Repaired AUM → {args.out}")
|
||||
|
||||
if len(audit) > 0:
|
||||
audit.to_csv(args.audit, index=False)
|
||||
print(f" ✓ Audit log → {args.audit}")
|
||||
else:
|
||||
print(f"(No rows renamed — audit log not written)")
|
||||
|
||||
# Paths: condensed AUM for branched accounts
|
||||
df_paths = export_paths(aum, mapping, surgery, repaired)
|
||||
if df_paths is not None:
|
||||
df_paths.to_csv(args.paths, index=False)
|
||||
print(f" ✓ Paths file → {args.paths}")
|
||||
|
||||
# Print renamed reg_ids summary
|
||||
if len(audit) > 0:
|
||||
print("\n[Renamed identifiers]")
|
||||
summary = (audit.groupby(["original_reg_id", "canonical_reg_id"])
|
||||
.size()
|
||||
.reset_index(name="n_rows"))
|
||||
for _, row in summary.iterrows():
|
||||
print(f" {row['original_reg_id']:20s} → {row['canonical_reg_id']:20s} "
|
||||
f"({row['n_rows']} rows)")
|
||||
|
||||
print("\nDone.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -16,7 +16,7 @@ import s3fs
|
|||
# PARAMÈTRES
|
||||
# ─────────────────────────────────────────────
|
||||
ALPHA = 0.03 # tolérance réconciliation : 5% du stock à t
|
||||
MIN_AUM_EUR = 0 # seuil filtrage étape 1 — 0 pour les heads de test, 5e6 en prod
|
||||
MIN_AUM_EUR = 5e6 # seuil filtrage étape 1 — 0 pour les heads de test, 5e6 en prod
|
||||
MIN_JACCARD = 0.3 # seuil minimal similarité portefeuille pour chirurgie
|
||||
SCORE_DROP_THRESHOLD = 0.1 # si score chute de >10% → candidat chirurgie
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user