# Cleaning and merge functions # Cleaning function def cleaning_date(df, column_name): """ Nettoie la colonne spécifiée du DataFrame en convertissant les valeurs en datetime avec le format ISO8601. Parameters: - df: DataFrame Le DataFrame contenant la colonne à nettoyer. - column_name: str Le nom de la colonne à nettoyer. Returns: - DataFrame Le DataFrame modifié avec la colonne nettoyée. """ df[column_name] = pd.to_datetime(df[column_name], utc = True, format = 'ISO8601') return df def preprocessing_customerplus(customerplus = None): customerplus_copy = customerplus.copy() # Passage en format date cleaning_date(customerplus_copy, 'first_buying_date') cleaning_date(customerplus_copy, 'last_visiting_date') # Selection des variables customerplus_copy.drop(['lastname', 'firstname', 'email', 'civility', 'note', 'created_at', 'updated_at', 'deleted_at', 'extra', 'reference', 'extra_field', 'identifier', 'need_reload', 'preferred_category', 'preferred_supplier', 'preferred_formula', 'zipcode', 'last_visiting_date'], axis = 1, inplace=True) customerplus_copy.rename(columns = {'id' : 'customer_id'}, inplace = True) return customerplus_copy def preprocessing_tickets_area(tickets = None, purchases = None, suppliers = None, type_ofs = None): # Base des tickets tickets = tickets[['id', 'purchase_id', 'product_id', 'is_from_subscription', 'type_of', 'supplier_id']] tickets.rename(columns = {'id' : 'ticket_id'}, inplace = True) # Base des fournisseurs suppliers = suppliers[['id', 'name']] suppliers.rename(columns = {'name' : 'supplier_name'}, inplace = True) suppliers['supplier_name'] = suppliers['supplier_name'].fillna('') # Base des types de billets type_ofs = type_ofs[['id', 'name', 'children']] type_ofs.rename(columns = {'name' : 'type_of_ticket_name'}, inplace = True) # Base des achats # Nettoyage de la date d'achat cleaning_date(purchases, 'purchase_date') # Selection des variables purchases = purchases[['id', 'purchase_date', 'customer_id']] # Fusions # Fusion avec fournisseurs ticket_information = pd.merge(tickets, suppliers, left_on = 'supplier_id', right_on = 'id', how = 'inner') ticket_information.drop(['supplier_id', 'id'], axis = 1, inplace=True) # Fusion avec type de tickets ticket_information = pd.merge(ticket_information, type_ofs, left_on = 'type_of', right_on = 'id', how = 'inner') ticket_information.drop(['type_of', 'id'], axis = 1, inplace=True) # Fusion avec achats ticket_information = pd.merge(ticket_information, purchases, left_on = 'purchase_id', right_on = 'id', how = 'inner') ticket_information.drop(['id'], axis = 1, inplace=True) return ticket_information def preprocessing_target_area(targets = None, target_types = None, customer_target_mappings = None): # Target.csv cleaning targets = targets[["id", "target_type_id", "name"]] targets.rename(columns = {'id' : 'target_id' , 'name' : 'target_name'}, inplace = True) # target_type cleaning target_types = target_types[["id","is_import","name"]].add_prefix("target_type_") #customer_target_mappings cleaning customer_target_mappings = customer_target_mappings[["id", "customer_id", "target_id"]] # Merge target et target_type targets_full = pd.merge(targets, target_types, left_on='target_type_id', right_on='target_type_id', how='inner') targets_full.drop(['target_type_id'], axis = 1, inplace=True) # Merge targets_full = pd.merge(customer_target_mappings, targets_full, left_on='target_id', right_on='target_id', how='inner') targets_full.drop(['target_id'], axis = 1, inplace=True) return targets_full def preprocessing_campaigns_area(campaign_stats = None, campaigns = None): # campaign_stats cleaning campaign_stats = campaign_stats[["id", "campaign_id", "customer_id", "opened_at", "sent_at", "delivered_at"]] cleaning_date(campaign_stats, 'opened_at') cleaning_date(campaign_stats, 'sent_at') cleaning_date(campaign_stats, 'delivered_at') # campaigns cleaning campaigns = campaigns[["id", "name", "service_id", "sent_at"]].add_prefix("campaign_") cleaning_date(campaigns, 'campaign_sent_at') # Merge campaigns_full = pd.merge(campaign_stats, campaigns, on = "campaign_id", how = "left") campaigns_full.drop(['campaign_id'], axis = 1, inplace=True) return campaigns_full def display_databases(file_name): """ This function returns the file from s3 storage """ file_path = BUCKET + "/" + directory_path + "/" + file_name print("File path : ", file_path) with fs.open(file_path, mode="rb") as file_in: df = pd.read_csv(file_in, sep=",") print("Shape : ", df.shape) return df def remove_horodates(df): """ this function remove horodate columns like created_at and updated_at """ df = df.drop(columns = ["created_at", "updated_at"]) return df def order_columns_id(df): """ this function puts all id columns at the beginning in order to read the dataset easier """ substring = 'id' id_columns = [col for col in df.columns if substring in col] remaining_col = [col for col in df.columns if substring not in col] new_order = id_columns + remaining_col return df[new_order] def process_df_2(df): """ This function organizes dataframe """ df = remove_horodates(df) print("Number of columns : ", len(df.columns)) df = order_columns_id(df) print("Columns : ", df.columns) return df def load_dataset(name): """ This function loads csv file """ df = display_databases(name) df = process_df_2(df) # drop na : #df = df.dropna(axis=1, thresh=len(df)) # if identifier in table : delete it if 'identifier' in df.columns: df = df.drop(columns = 'identifier') return df def create_products_table(): # first merge products and categories print("first merge products and categories") products = load_dataset("1products.csv") categories = load_dataset("1categories.csv") # Drop useless columns products = products.drop(columns = ['apply_price', 'extra_field', 'amount_consumption']) categories = categories.drop(columns = ['extra_field', 'quota']) #Merge products_theme = products.merge(categories, how = 'left', left_on = 'category_id', right_on = 'id', suffixes=('_products', '_categories')) products_theme = products_theme.rename(columns = {"name" : "name_categories"}) # Second merge products_theme and type of categories print("Second merge products_theme and type of categories") type_of_categories = load_dataset("1type_of_categories.csv") type_of_categories = type_of_categories.drop(columns = 'id') products_theme = products_theme.merge(type_of_categories, how = 'left', left_on = 'category_id', right_on = 'category_id' ) # Index cleaning products_theme = products_theme.drop(columns = ['id_categories']) products_theme = order_columns_id(products_theme) return products_theme def create_events_table(): # first merge events and seasons : print("first merge events and seasons : ") events = load_dataset("1events.csv") seasons = load_dataset("1seasons.csv") # Drop useless columns events = events.drop(columns = ['manual_added', 'is_display']) seasons = seasons.drop(columns = ['start_date_time']) events_theme = events.merge(seasons, how = 'left', left_on = 'season_id', right_on = 'id', suffixes=('_events', '_seasons')) # Secondly merge events_theme and event_types print("Secondly merge events_theme and event_types : ") event_types = load_dataset("1event_types.csv") event_types = event_types.drop(columns = ['fidelity_delay']) events_theme = events_theme.merge(event_types, how = 'left', left_on = 'event_type_id', right_on = 'id', suffixes=('_events', '_event_type')) events_theme = events_theme.rename(columns = {"name" : "name_event_types"}) events_theme = events_theme.drop(columns = 'id') # thirdly merge events_theme and facilities print("thirdly merge events_theme and facilities : ") facilities = load_dataset("1facilities.csv") facilities = facilities.drop(columns = ['fixed_capacity']) events_theme = events_theme.merge(facilities, how = 'left', left_on = 'facility_id', right_on = 'id', suffixes=('_events', '_facility')) events_theme = events_theme.rename(columns = {"name" : "name_facilities", "id_events" : "event_id"}) events_theme = events_theme.drop(columns = 'id') # Index cleaning events_theme = events_theme.drop(columns = ['id_seasons']) events_theme = order_columns_id(events_theme) return events_theme def create_representations_table(): representations = load_dataset("1representations.csv") representations = representations.drop(columns = ['serial', 'open', 'satisfaction', 'is_display', 'expected_filling', 'max_filling', 'extra_field', 'start_date_time', 'end_date_time', 'name', 'representation_type_id']) representations_capacity = load_dataset("1representation_category_capacities.csv") representations_capacity = representations_capacity.drop(columns = ['expected_filling', 'max_filling']) representations_theme = representations.merge(representations_capacity, how='left', left_on='id', right_on='representation_id', suffixes=('_representation', '_representation_cap')) # index cleaning representations_theme = representations_theme.drop(columns = ["id_representation"]) representations_theme = order_columns_id(representations_theme) return representations_theme def uniform_product_df(): """ This function returns the uniform product dataset """ print("Products theme columns : ", products_theme.columns) print("\n Representation theme columns : ", representation_theme.columns) print("\n Events theme columns : ", events_theme.columns) products_global = products_theme.merge(representation_theme, how='left', on= ["representation_id", "category_id"]) products_global = products_global.merge(events_theme, how='left', on='event_id', suffixes = ("_representation", "_event")) products_global = order_columns_id(products_global) # remove useless columns products_global = products_global.drop(columns = ['type_of_id']) # 'name_events', 'name_seasons', 'name_categories' return products_global