298 lines
14 KiB
Python
298 lines
14 KiB
Python
#### Cleaning and merge functions ####
|
|
|
|
BUCKET = "bdc2324-data"
|
|
|
|
# 1. Basic cleaning functions
|
|
def cleaning_date(df, column_name):
|
|
"""
|
|
Datetime columns cleaning with ISO format
|
|
"""
|
|
df[column_name] = pd.to_datetime(df[column_name], utc = True, format = 'ISO8601')
|
|
return df
|
|
|
|
def display_databases(directory_path, file_name):
|
|
"""
|
|
This function returns the file from s3 storage
|
|
"""
|
|
file_path = BUCKET + "/" + directory_path + "/" + directory_path + file_name + ".csv"
|
|
print("File path : ", file_path)
|
|
with fs.open(file_path, mode="rb") as file_in:
|
|
df = pd.read_csv(file_in, sep=",")
|
|
|
|
print("Shape : ", df.shape)
|
|
return df
|
|
|
|
def remove_horodates(df):
|
|
"""
|
|
this function remove horodate columns like created_at and updated_at
|
|
"""
|
|
df = df.drop(columns = ["created_at", "updated_at"])
|
|
return df
|
|
|
|
def order_columns_id(df):
|
|
"""
|
|
this function puts all id columns at the beginning in order to read the dataset easier
|
|
"""
|
|
substring = 'id'
|
|
id_columns = [col for col in df.columns if substring in col]
|
|
remaining_col = [col for col in df.columns if substring not in col]
|
|
new_order = id_columns + remaining_col
|
|
return df[new_order]
|
|
|
|
def process_df_2(df):
|
|
"""
|
|
This function organizes dataframe
|
|
"""
|
|
df = remove_horodates(df)
|
|
print("Number of columns : ", len(df.columns))
|
|
df = order_columns_id(df)
|
|
print("Columns : ", df.columns)
|
|
return df
|
|
|
|
def load_dataset(directory_path, name):
|
|
"""
|
|
This function loads csv file
|
|
"""
|
|
df = display_databases(directory_path, file_name = name)
|
|
df = process_df_2(df)
|
|
# drop na :
|
|
#df = df.dropna(axis=1, thresh=len(df))
|
|
# if identifier in table : delete it
|
|
if 'identifier' in df.columns:
|
|
df = df.drop(columns = 'identifier')
|
|
return df
|
|
|
|
|
|
# 2. Creation of cleaned and merged datasets
|
|
|
|
def preprocessing_customerplus(directory_path):
|
|
|
|
customerplus_copy = load_dataset(directory_path, name = "customersplus")
|
|
|
|
# Passage en format date
|
|
cleaning_date(customerplus_copy, 'first_buying_date')
|
|
cleaning_date(customerplus_copy, 'last_visiting_date')
|
|
|
|
# Selection des variables
|
|
customerplus_copy.drop(['lastname', 'firstname', 'birthdate', 'language', 'email', 'civility', 'note', 'extra', 'reference', 'extra_field', 'need_reload'], axis = 1, inplace=True) # 'preferred_category', 'preferred_supplier', 'preferred_formula', 'mcp_contact_id', 'last_visiting_date', 'deleted_at', 'last_buying_date', 'max_price', 'ticket_sum', 'average_price', 'average_purchase_delay' , 'average_price_basket', 'average_ticket_basket', 'total_price', 'purchase_count', 'first_buying_date', 'fidelity'
|
|
customerplus_copy.rename(columns = {'id' : 'customer_id'}, inplace = True)
|
|
|
|
return customerplus_copy
|
|
|
|
def preprocessing_target_area(directory_path):
|
|
|
|
# Datasets loading
|
|
targets = load_dataset(directory_path, name = "targets")
|
|
target_types = load_dataset(directory_path, name = "target_types")
|
|
customer_target_mappings = load_dataset(directory_path, name = "customer_target_mappings")
|
|
# target cleaning
|
|
targets = targets[["id", "target_type_id", "name"]]
|
|
targets.rename(columns = {'id' : 'target_id' , 'name' : 'target_name'}, inplace = True)
|
|
|
|
# target_type cleaning
|
|
target_types = target_types[["id","is_import","name"]].add_prefix("target_type_")
|
|
|
|
#customer_target_mappings cleaning
|
|
customer_target_mappings = customer_target_mappings[["id", "customer_id", "target_id"]]
|
|
|
|
# Merge target et target_type
|
|
targets_full = pd.merge(targets, target_types, left_on='target_type_id', right_on='target_type_id', how='inner')
|
|
targets_full.drop(['target_type_id'], axis = 1, inplace=True)
|
|
|
|
# Merge
|
|
targets_full = pd.merge(customer_target_mappings, targets_full, left_on='target_id', right_on='target_id', how='inner')
|
|
targets_full.drop(['target_id'], axis = 1, inplace=True)
|
|
|
|
return targets_full
|
|
|
|
def preprocessing_campaigns_area(directory_path):
|
|
|
|
# Datasets loading
|
|
campaign_stats = load_dataset(directory_path, name = "campaign_stats")
|
|
campaigns = load_dataset(directory_path, name = "campaigns")
|
|
|
|
# campaign_stats cleaning
|
|
campaign_stats = campaign_stats[["id", "campaign_id", "customer_id", "opened_at", "sent_at", "delivered_at"]]
|
|
# cleaning_date(campaign_stats, 'opened_at')
|
|
# cleaning_date(campaign_stats, 'sent_at')
|
|
# cleaning_date(campaign_stats, 'delivered_at')
|
|
|
|
# campaigns cleaning
|
|
campaigns = campaigns[["id", "name", "service_id", "sent_at"]].add_prefix("campaign_")
|
|
# cleaning_date(campaigns, 'campaign_sent_at')
|
|
|
|
# Merge
|
|
campaigns_full = pd.merge(campaign_stats, campaigns, on = "campaign_id", how = "left")
|
|
campaigns_full.drop(['campaign_id'], axis = 1, inplace=True)
|
|
|
|
return campaigns_full
|
|
|
|
def preprocessing_tickets_area(directory_path):
|
|
|
|
# Datasets loading
|
|
tickets = load_dataset(directory_path, name = "tickets")
|
|
|
|
# Supplementary tickets dataset for tenant 101
|
|
if directory_path == '101':
|
|
tickets_1 = load_dataset(directory_path, name = "tickets_1")
|
|
|
|
purchases = load_dataset(directory_path, name = "purchases")
|
|
suppliers = load_dataset(directory_path, name = "suppliers")
|
|
# type_ofs = load_dataset(directory_path, name = "type_ofs")
|
|
|
|
# Base des tickets
|
|
tickets = tickets[['id', 'purchase_id', 'product_id', 'is_from_subscription', 'type_of', 'supplier_id']]
|
|
tickets.rename(columns = {'id' : 'ticket_id'}, inplace = True)
|
|
|
|
if directory_path == '101':
|
|
tickets_1 = tickets_1[['id', 'purchase_id', 'product_id', 'is_from_subscription', 'type_of', 'supplier_id']]
|
|
tickets_1.rename(columns = {'id' : 'ticket_id'}, inplace = True)
|
|
|
|
# Base des fournisseurs
|
|
suppliers = suppliers[['id', 'name']]
|
|
suppliers.rename(columns = {'name' : 'supplier_name'}, inplace = True)
|
|
suppliers['supplier_name'] = suppliers['supplier_name'].fillna('')
|
|
|
|
# Base des types de billets
|
|
# type_ofs = type_ofs[['id', 'name', 'children']]
|
|
# type_ofs.rename(columns = {'name' : 'type_of_ticket_name'}, inplace = True)
|
|
|
|
# Base des achats
|
|
# Nettoyage de la date d'achat
|
|
# cleaning_date(purchases, 'purchase_date')
|
|
|
|
# Selection des variables
|
|
purchases = purchases[['id', 'purchase_date', 'customer_id']]
|
|
|
|
# Fusions
|
|
# Fusion avec fournisseurs
|
|
ticket_information = pd.merge(tickets, suppliers, left_on = 'supplier_id', right_on = 'id', how = 'inner')
|
|
ticket_information.drop(['supplier_id', 'id'], axis = 1, inplace=True)
|
|
|
|
# Fusion avec type de tickets
|
|
# ticket_information = pd.merge(ticket_information, type_ofs, left_on = 'type_of', right_on = 'id', how = 'inner')
|
|
# ticket_information.drop(['type_of', 'id'], axis = 1, inplace=True)
|
|
|
|
# Fusion avec achats
|
|
ticket_information = pd.merge(ticket_information, purchases, left_on = 'purchase_id', right_on = 'id', how = 'inner')
|
|
ticket_information.drop(['id'], axis = 1, inplace=True)
|
|
|
|
if directory_path == '101':
|
|
# Fusion avec fournisseurs
|
|
ticket_information_1 = pd.merge(tickets_1, suppliers, left_on = 'supplier_id', right_on = 'id', how = 'inner')
|
|
ticket_information_1.drop(['supplier_id', 'id'], axis = 1, inplace=True)
|
|
|
|
# Fusion avec achats
|
|
ticket_information_1 = pd.merge(ticket_information_1, purchases, left_on = 'purchase_id', right_on = 'id', how = 'inner')
|
|
ticket_information_1.drop(['id'], axis = 1, inplace=True)
|
|
|
|
return ticket_information, ticket_information_1
|
|
else :
|
|
return ticket_information
|
|
|
|
def create_products_table(directory_path):
|
|
# first merge products and categories
|
|
print("first merge products and categories")
|
|
products = load_dataset(directory_path, name = "products")
|
|
categories = load_dataset(directory_path, name = "categories")
|
|
# Drop useless columns
|
|
products = products.drop(columns = ['apply_price', 'extra_field', 'amount_consumption'])
|
|
categories = categories.drop(columns = ['extra_field', 'quota'])
|
|
|
|
#Merge
|
|
products_theme = products.merge(categories, how = 'left', left_on = 'category_id', right_on = 'id', suffixes=('_products', '_categories'))
|
|
products_theme = products_theme.rename(columns = {"name" : "name_categories"})
|
|
|
|
# Second merge products_theme and type of categories
|
|
# print("Second merge products_theme and type of categories")
|
|
# type_of_categories = load_dataset(directory_path, name = "type_of_categories")
|
|
# type_of_categories = type_of_categories.drop(columns = 'id')
|
|
# products_theme = products_theme.merge(type_of_categories, how = 'left', left_on = 'category_id',
|
|
# right_on = 'category_id' )
|
|
|
|
# Index cleaning
|
|
products_theme = products_theme.drop(columns = ['id_categories'])
|
|
products_theme = order_columns_id(products_theme)
|
|
return products_theme
|
|
|
|
def create_events_table(directory_path):
|
|
# first merge events and seasons :
|
|
print("first merge events and seasons : ")
|
|
events = load_dataset(directory_path, name = "events")
|
|
seasons = load_dataset(directory_path, name = "seasons")
|
|
|
|
# Drop useless columns
|
|
events = events.drop(columns = ['manual_added', 'is_display'])
|
|
seasons = seasons.drop(columns = ['start_date_time'])
|
|
|
|
events_theme = events.merge(seasons, how = 'left', left_on = 'season_id', right_on = 'id', suffixes=('_events', '_seasons'))
|
|
|
|
# Secondly merge events_theme and event_types
|
|
print("Secondly merge events_theme and event_types : ")
|
|
event_types = load_dataset(directory_path, name = "event_types")
|
|
event_types = event_types.drop(columns = ['fidelity_delay'])
|
|
|
|
events_theme = events_theme.merge(event_types, how = 'left', left_on = 'event_type_id', right_on = 'id', suffixes=('_events', '_event_type'))
|
|
events_theme = events_theme.rename(columns = {"name" : "name_event_types"})
|
|
events_theme = events_theme.drop(columns = 'id')
|
|
|
|
# thirdly merge events_theme and facilities
|
|
print("thirdly merge events_theme and facilities : ")
|
|
facilities = load_dataset(directory_path, name = "facilities")
|
|
facilities = facilities.drop(columns = ['fixed_capacity'])
|
|
|
|
events_theme = events_theme.merge(facilities, how = 'left', left_on = 'facility_id', right_on = 'id', suffixes=('_events', '_facility'))
|
|
events_theme = events_theme.rename(columns = {"name" : "name_facilities", "id_events" : "event_id"})
|
|
events_theme = events_theme.drop(columns = 'id')
|
|
|
|
# Index cleaning
|
|
events_theme = events_theme.drop(columns = ['id_seasons'])
|
|
events_theme = order_columns_id(events_theme)
|
|
return events_theme
|
|
|
|
def create_representations_table(directory_path):
|
|
representations = load_dataset(directory_path, name = "representations")
|
|
representations = representations.drop(columns = ['serial', 'satisfaction', 'is_display', 'expected_filling', 'max_filling', 'extra_field', 'name', 'representation_type_id']) # 'start_date_time', 'end_date_time', 'open'
|
|
|
|
representations_capacity = load_dataset(directory_path, name = "representation_category_capacities")
|
|
representations_capacity = representations_capacity.drop(columns = ['expected_filling', 'max_filling'])
|
|
|
|
representations_theme = representations.merge(representations_capacity, how='left', left_on='id', right_on='representation_id', suffixes=('_representation', '_representation_cap'))
|
|
# index cleaning
|
|
representations_theme = representations_theme.drop(columns = ["id_representation"])
|
|
representations_theme = order_columns_id(representations_theme)
|
|
return representations_theme
|
|
|
|
def uniform_product_df(directory_path):
|
|
"""
|
|
This function returns the uniform product dataset
|
|
"""
|
|
products_theme = create_products_table(directory_path)
|
|
representation_theme = create_representations_table(directory_path)
|
|
events_theme = create_events_table(directory_path)
|
|
|
|
if directory_path == '101':
|
|
ticket_information, ticket_information_1 = preprocessing_tickets_area(directory_path)
|
|
else :
|
|
ticket_information = preprocessing_tickets_area(directory_path)
|
|
|
|
print("Products theme columns : ", products_theme.columns)
|
|
print("\n Representation theme columns : ", representation_theme.columns)
|
|
print("\n Events theme columns : ", events_theme.columns)
|
|
|
|
products_global = pd.merge(products_theme, representation_theme, how='left', on= ["representation_id", "category_id"])
|
|
|
|
products_global = pd.merge(products_global, events_theme, how='left', on='event_id', suffixes = ("_representation", "_event"))
|
|
|
|
products_purchased = pd.merge(ticket_information, products_global, left_on = 'product_id', right_on = 'id_products', how = 'inner')
|
|
|
|
products_purchased_reduced = products_purchased[['ticket_id', 'customer_id', 'purchase_id' ,'event_type_id', 'supplier_name', 'purchase_date', 'amount', 'is_full_price', 'name_event_types', 'name_facilities', 'name_categories', 'name_events', 'name_seasons', 'start_date_time', 'end_date_time', 'open']] # 'type_of_ticket_name', 'children',
|
|
|
|
if directory_path == '101':
|
|
products_purchased_1 = pd.merge(ticket_information_1, products_global, left_on = 'product_id', right_on = 'id_products', how = 'inner')
|
|
|
|
products_purchased_reduced_1 = products_purchased_1[['ticket_id', 'customer_id', 'purchase_id' ,'event_type_id', 'supplier_name', 'purchase_date', 'amount', 'is_full_price', 'name_event_types', 'name_facilities', 'name_categories', 'name_events', 'name_seasons', 'start_date_time', 'end_date_time', 'open']] # 'type_of_ticket_name', 'children',
|
|
|
|
return products_purchased_reduced, products_purchased_reduced_1
|
|
else :
|
|
return products_purchased_reduced |