diff --git a/0_1_Input_cleaning.py b/0_1_Input_cleaning.py new file mode 100644 index 0000000..a1b7110 --- /dev/null +++ b/0_1_Input_cleaning.py @@ -0,0 +1,58 @@ +# Business Data Challenge - Team 1 + +import pandas as pd +import numpy as np +import os +import s3fs +import re +import warnings + +# Create filesystem object +S3_ENDPOINT_URL = "https://" + os.environ["AWS_S3_ENDPOINT"] +fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': S3_ENDPOINT_URL}) + +# Import cleaning and merge functions +exec(open('0_Cleaning_and_merge_functions.py').read()) + +# Output folder +BUCKET_OUT = "projet-bdc2324-team1" + +# Ignore warning +warnings.filterwarnings('ignore') + + +def export_dataset(df, output_name): + print('Exportation of dataset :', output_name) + FILE_PATH_OUT_S3 = BUCKET_OUT + "/" + output_name + with fs.open(FILE_PATH_OUT_S3, 'w') as file_out: + df.to_csv(file_out, index = False) + +## 1 - Cleaning of the datasets +for tenant_id in ("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "101"): + # Cleaning customerplus + df1_customerplus_clean = preprocessing_customerplus(directory_path = tenant_id) + + ## Exportation + export_dataset(df = df1_customerplus_clean, output_name = "0_Input/Company_"+ tenant_id +"/customerplus_cleaned.csv") + + # Cleaning target area + df1_target_information = preprocessing_target_area(directory_path = tenant_id) + ## Exportation + export_dataset(df = df1_target_information, output_name = "0_Input/Company_"+ tenant_id +"/target_information.csv") + + # Cleaning campaign area + df1_campaigns_information = preprocessing_campaigns_area(directory_path = tenant_id) + ## Exportation + export_dataset(df = df1_campaigns_information, output_name = "0_Input/Company_"+ tenant_id +"/campaigns_information.csv") + + ## Exportation + # export_dataset(df = df1_campaigns_information, output_name = "0_Temp/Company 1 - Campaigns dataset clean.csv") + + # Cleaning product area + df1_products_purchased_reduced = uniform_product_df(directory_path = tenant_id) + ## Exportation + export_dataset(df = df1_products_purchased_reduced, output_name = "0_Input/Company_"+ tenant_id +"/products_purchased_reduced.csv") + #Exportation + # export_dataset(df = df1_products_purchased_reduced, output_name = "0_Temp/Company 1 - Purchases.csv") + + print("\n ------------------------------------------------------------------ \n --------------------- END CLEANING COMPANY " + tenant_id + " --------------------- \n ------------------------------------------------------------------") \ No newline at end of file diff --git a/0_Cleaning_and_merge.py b/0_2_Dataset_construction.py similarity index 51% rename from 0_Cleaning_and_merge.py rename to 0_2_Dataset_construction.py index 860cec1..e91a78a 100644 --- a/0_Cleaning_and_merge.py +++ b/0_2_Dataset_construction.py @@ -7,22 +7,35 @@ import s3fs import re import warnings +# Create filesystem object +S3_ENDPOINT_URL = "https://" + os.environ["AWS_S3_ENDPOINT"] +fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': S3_ENDPOINT_URL}) + + # Import cleaning and merge functions -exec(open('BDC-team-1/0_KPI_functions.py').read()) -## 2 - Construction of KPIs on a given period +exec(open('0_KPI_functions.py').read()) -def explanatory_variables(min_date, max_date, df_campaigns_information = df1_campaigns_information, df_products_purchased_reduced = df1_products_purchased_reduced, df_customerplus_clean = df1_customerplus_clean): +# Ignore warning +warnings.filterwarnings('ignore') +def dataset_construction(min_date, end_features_date, max_date, directory_path): + + # Import customerplus + df_customerplus_clean = display_databases(directory_path, file_name = "customerplus_cleaned") + df_campaigns_information = display_databases(directory_path, file_name = "campaigns_information", datetime_col = ['opened_at', 'sent_at', 'campaign_sent_at']) + df_products_purchased_reduced = display_databases(directory_path, file_name = "products_purchased_reduced", datetime_col = ['purchase_date']) + # Filtre de cohérence pour la mise en pratique de notre méthode max_date = pd.to_datetime(max_date, utc = True, format = 'ISO8601') + end_features_date = pd.to_datetime(end_features_date, utc = True, format = 'ISO8601') min_date = pd.to_datetime(min_date, utc = True, format = 'ISO8601') #Filtre de la base df_campaigns_information - df_campaigns_information = df_campaigns_information[(df_campaigns_information['sent_at'] <= max_date) & (df_campaigns_information['sent_at'] >= min_date)] - df_campaigns_information['opened_at'][df_campaigns_information['opened_at'] >= max_date] = np.datetime64('NaT') - + df_campaigns_information = df_campaigns_information[(df_campaigns_information['sent_at'] <= end_features_date) & (df_campaigns_information['sent_at'] >= min_date)] + df_campaigns_information['opened_at'][df_campaigns_information['opened_at'] >= end_features_date] = np.datetime64('NaT') + #Filtre de la base df_products_purchased_reduced - df_products_purchased_reduced = df_products_purchased_reduced[(df_products_purchased_reduced['purchase_date'] <= max_date) & (df_products_purchased_reduced['purchase_date'] >= min_date)] + df_products_purchased_reduced = df_products_purchased_reduced[(df_products_purchased_reduced['purchase_date'] <= end_features_date) & (df_products_purchased_reduced['purchase_date'] >= min_date)] print("Data filtering : SUCCESS") @@ -63,23 +76,24 @@ def explanatory_variables(min_date, max_date, df_campaigns_information = df1_cam df_customer_product[['nb_tickets', 'nb_purchases', 'total_amount', 'nb_suppliers', 'vente_internet_max', 'nb_tickets_internet']] = df_customer_product[['nb_tickets', 'nb_purchases', 'total_amount', 'nb_suppliers', 'vente_internet_max', 'nb_tickets_internet']].fillna(0) print("Explanatory variable construction : SUCCESS") - - return df_customer_product -# Fonction pour créer les variables expliquée -def explained_variable(min_date, max_date, df_products_purchased_reduced = df1_products_purchased_reduced): - - # Filtrer la base d'achat - df_products_purchased_reduced = df_products_purchased_reduced[(df_products_purchased_reduced['purchase_date'] <= max_date) & (df_products_purchased_reduced['purchase_date'] > min_date)] + # 2. Construction of the explained variable + df_products_purchased_to_predict = df_products_purchased_reduced[(df_products_purchased_reduced['purchase_date'] <= max_date) & (df_products_purchased_reduced['purchase_date'] > end_features_date)] # Indicatrice d'achat - df_products_purchased_reduced['y_has_purchased'] = 1 + df_products_purchased_to_predict['y_has_purchased'] = 1 - y = df_products_purchased_reduced[['customer_id', 'event_type_id', 'y_has_purchased']].drop_duplicates() + y = df_products_purchased_to_predict[['customer_id', 'y_has_purchased']].drop_duplicates() print("Explained variable construction : SUCCESS") - return y + # 3. Merge between explained and explanatory variables + dataset = pd.merge(df_customer_product, y, on = ['customer_id'], how = 'left') + + # 0 if there is no purchase + dataset[['y_has_purchased']].fillna(0) + + return dataset ## Exportation @@ -87,36 +101,28 @@ def explained_variable(min_date, max_date, df_products_purchased_reduced = df1_p BUCKET_OUT = "projet-bdc2324-team1/1_Output/Logistique Regression databases - First approach" # Dataset test -X_test = explanatory_variables(min_date = "2021-08-01", max_date = "2023-08-01", df_campaigns_information = df1_campaigns_information, df_products_purchased_reduced = df1_products_purchased_reduced, df_customerplus_clean = df1_customerplus_clean) +dataset_test = dataset_construction(min_date = "2021-08-01", end_features_date = "2023-08-01", max_date = "2023-11-01", directory_path = "1") -y_test = explained_variable(min_date = "2023-08-01", max_date = "2023-11-01", df_products_purchased_reduced = df1_products_purchased_reduced) +# # Exportation +# FILE_KEY_OUT_S3 = "dataset_test.csv" +# FILE_PATH_OUT_S3 = BUCKET_OUT + "/" + FILE_KEY_OUT_S3 -dataset_test = pd.merge(X_test, y_test, on = ['customer_id', 'event_type_id'], how = 'left') +# with fs.open(FILE_PATH_OUT_S3, 'w') as file_out: +# dataset_test.to_csv(file_out, index = False) -# Exportation -FILE_KEY_OUT_S3 = "dataset_test.csv" -FILE_PATH_OUT_S3 = BUCKET_OUT + "/" + FILE_KEY_OUT_S3 - -with fs.open(FILE_PATH_OUT_S3, 'w') as file_out: - dataset_test.to_csv(file_out, index = False) - -print("Exportation dataset test : SUCCESS") +# print("Exportation dataset test : SUCCESS") # Dataset train -X_train = explanatory_variables(min_date = "2021-05-01", max_date = "2023-05-01", df_campaigns_information = df1_campaigns_information, df_products_purchased_reduced = df1_products_purchased_reduced, df_customerplus_clean = df1_customerplus_clean) - -y_train = explained_variable(min_date = "2023-05-01", max_date = "2023-08-01", df_products_purchased_reduced = df1_products_purchased_reduced) - -dataset_train = pd.merge(X_train, y_train, on = ['customer_id', 'event_type_id'], how = 'left') +dataset_train = dataset_construction(min_date = "2021-05-01", end_features_date = "2023-05-01", max_date = "2023-08-01", directory_path = "1") # Exportation -FILE_KEY_OUT_S3 = "dataset_train.csv" -FILE_PATH_OUT_S3 = BUCKET_OUT + "/" + FILE_KEY_OUT_S3 +# FILE_KEY_OUT_S3 = "dataset_train.csv" +# FILE_PATH_OUT_S3 = BUCKET_OUT + "/" + FILE_KEY_OUT_S3 -with fs.open(FILE_PATH_OUT_S3, 'w') as file_out: - dataset_train.to_csv(file_out, index = False) +# with fs.open(FILE_PATH_OUT_S3, 'w') as file_out: +# dataset_train.to_csv(file_out, index = False) -print("Exportation dataset train : SUCCESS") +# print("Exportation dataset train : SUCCESS") print("FIN DE LA GENERATION DES DATASETS : SUCCESS") diff --git a/0_Input_cleaning.py b/0_Input_cleaning.py deleted file mode 100644 index 151c41a..0000000 --- a/0_Input_cleaning.py +++ /dev/null @@ -1,56 +0,0 @@ -# Business Data Challenge - Team 1 - -import pandas as pd -import numpy as np -import os -import s3fs -import re -import warnings - -# Create filesystem object -S3_ENDPOINT_URL = "https://" + os.environ["AWS_S3_ENDPOINT"] -fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': S3_ENDPOINT_URL}) - -# Import cleaning and merge functions -exec(open('0_Cleaning_and_merge_functions.py').read()) - -# Output folder -BUCKET_OUT = "projet-bdc2324-team1" - -# Ignore warning -warnings.filterwarnings('ignore') - - -def export_dataset(df, output_name): - print('Exportation of temporary dataset :', output_name) - FILE_PATH_OUT_S3 = BUCKET_OUT + "/" + output_name - with fs.open(FILE_PATH_OUT_S3, 'w') as file_out: - df.to_csv(file_out, index = False) - -## 1 - Cleaning of the datasets - -# Cleaning customerplus -df1_customerplus_clean = preprocessing_customerplus(directory_path = "1") - -## Exportation -export_dataset(df = df1_customerplus_clean, output_name = "0_Input/Company_1/customerplus_cleaned.csv") - -# Cleaning target area -df1_target_information = preprocessing_target_area(directory_path = "1") -## Exportation -export_dataset(df = df1_campaigns_information, output_name = "0_Input/Company_1/Campaigns dataset clean.csv") - -# Cleaning campaign area -df1_campaigns_information = preprocessing_campaigns_area(directory_path = "1") -## Exportation -export_dataset(df = df1_campaigns_information, output_name = "0_Input/Company_1/Campaigns dataset clean.csv") - -## Exportation -export_dataset(df = df1_campaigns_information, output_name = "0_Temp/Company 1 - Campaigns dataset clean.csv") - -# Cleaning product area -df1_products_purchased_reduced = uniform_product_df(directory_path = "1") -## Exportation -export_dataset(df = df1_campaigns_information, output_name = "0_Input/Company_1/Campaigns dataset clean.csv") -#Exportation -export_dataset(df = df1_products_purchased_reduced, output_name = "0_Temp/Company 1 - Purchases.csv") diff --git a/0_KPI_functions.py b/0_KPI_functions.py index 59e1b07..e054e02 100644 --- a/0_KPI_functions.py +++ b/0_KPI_functions.py @@ -1,6 +1,20 @@ # Function de construction de KPI +def custom_date_parser(date_string): + return pd.to_datetime(date_string, utc = True, format = 'ISO8601') + +def display_databases(directory_path, file_name, datetime_col = None): + """ + This function returns the file from s3 storage + """ + file_path = "projet-bdc2324-team1" + "/0_Input/Company_" + directory_path + "/" + file_name + ".csv" + print("File path : ", file_path) + with fs.open(file_path, mode="rb") as file_in: + df = pd.read_csv(file_in, sep=",", parse_dates = datetime_col, date_parser=custom_date_parser) + return df + def campaigns_kpi_function(campaigns_information = None): + # Nombre de campagnes de mails nb_campaigns = campaigns_information[['customer_id', 'campaign_name']].groupby('customer_id').count().reset_index() nb_campaigns.rename(columns = {'campaign_name' : 'nb_campaigns'}, inplace = True) @@ -29,23 +43,23 @@ def campaigns_kpi_function(campaigns_information = None): def tickets_kpi_function(tickets_information = None): tickets_information_copy = tickets_information.copy() - + # Dummy : Canal de vente en ligne liste_mots = ['en ligne', 'internet', 'web', 'net', 'vad', 'online'] # vad = vente à distance tickets_information_copy['vente_internet'] = tickets_information_copy['supplier_name'].str.contains('|'.join(liste_mots), case=False).astype(int) # Proportion de vente en ligne - prop_vente_internet = tickets_information_copy[tickets_information_copy['vente_internet'] == 1].groupby(['customer_id', 'event_type_id'])['ticket_id'].count().reset_index() + prop_vente_internet = tickets_information_copy[tickets_information_copy['vente_internet'] == 1].groupby(['customer_id'])['ticket_id'].count().reset_index() prop_vente_internet.rename(columns = {'ticket_id' : 'nb_tickets_internet'}, inplace = True) # Average amount - avg_amount = (tickets_information_copy.groupby(["event_type_id", 'name_event_types']) - .agg({"amount" : "mean"}).reset_index() - .rename(columns = {'amount' : 'avg_amount'})) + # avg_amount = (tickets_information_copy.groupby(["event_type_id", 'name_event_types']) + # .agg({"amount" : "mean"}).reset_index() + # .rename(columns = {'amount' : 'avg_amount'})) - tickets_kpi = (tickets_information_copy[['event_type_id', 'customer_id', 'purchase_id' ,'ticket_id','supplier_name', 'purchase_date', 'amount', 'vente_internet']] - .groupby(['customer_id', 'event_type_id']) + tickets_kpi = (tickets_information_copy[['customer_id', 'purchase_id' ,'ticket_id','supplier_name', 'purchase_date', 'amount', 'vente_internet']] + .groupby(['customer_id']) .agg({'ticket_id': 'count', 'purchase_id' : 'nunique', 'amount' : 'sum', @@ -61,8 +75,7 @@ def tickets_kpi_function(tickets_information = None): 'purchase_id_nunique' : 'nb_purchases', 'amount_sum' : 'total_amount', 'supplier_name_nunique' : 'nb_suppliers', - 'customer_id_' : 'customer_id', - 'event_type_id_' : 'event_type_id'}, inplace = True) + 'customer_id_' : 'customer_id'}, inplace = True) tickets_kpi['time_between_purchase'] = tickets_kpi['purchase_date_max'] - tickets_kpi['purchase_date_min'] tickets_kpi['time_between_purchase'] = tickets_kpi['time_between_purchase'] / np.timedelta64(1, 'D') # En nombre de jours @@ -73,10 +86,10 @@ def tickets_kpi_function(tickets_information = None): tickets_kpi['purchase_date_min'] = (max_date - tickets_kpi['purchase_date_min']) / np.timedelta64(1, 'D') - tickets_kpi = tickets_kpi.merge(prop_vente_internet, on = ['customer_id', 'event_type_id'], how = 'left') + tickets_kpi = tickets_kpi.merge(prop_vente_internet, on = ['customer_id'], how = 'left') tickets_kpi['nb_tickets_internet'] = tickets_kpi['nb_tickets_internet'].fillna(0) - tickets_kpi = tickets_kpi.merge(avg_amount, how='left', on= 'event_type_id') + # tickets_kpi = tickets_kpi.merge(avg_amount, how='left', on= 'event_type_id') return tickets_kpi