BDC-team-1/utils_sales_forecast.py

326 lines
12 KiB
Python

# importations
import pandas as pd
from pandas import DataFrame
import numpy as np
import os
import s3fs
import matplotlib.pyplot as plt
from scipy.optimize import fsolve
import pickle
import warnings
import io
# functions
def load_train_test(type_of_activity):
"""
Loads the training and test datasets from S3 storage for the type of activity specified.
Args:
- type_of_activity (str)
Returns:
DataFrame: Training dataset.
DataFrame: Test dataset.
"""
# BUCKET = f"projet-bdc2324-team1/Generalization/{type_of_activity}"
BUCKET = f"projet-bdc2324-team1/1_Temp/1_0_Modelling_Datasets/{type_of_activity}"
File_path_train = BUCKET + "/Train_set.csv"
File_path_test = BUCKET + "/Test_set.csv"
with fs.open( File_path_train, mode="rb") as file_in:
dataset_train = pd.read_csv(file_in, sep=",")
# dataset_train['y_has_purchased'] = dataset_train['y_has_purchased'].fillna(0)
with fs.open(File_path_test, mode="rb") as file_in:
dataset_test = pd.read_csv(file_in, sep=",")
# dataset_test['y_has_purchased'] = dataset_test['y_has_purchased'].fillna(0)
return dataset_train, dataset_test
def features_target_split(dataset_train, dataset_test):
"""
Splits the dataset into features and target variables for training and testing.
Args:
- dataset_train (DataFrame): Training dataset.
- dataset_test (DataFrame): Test dataset.
Returns:
DataFrame: Features of the training dataset.
DataFrame: Features of the test dataset.
DataFrame: Target variable of the training dataset.
DataFrame: Target variable of the test dataset.
"""
features_l = ['nb_tickets', 'nb_purchases', 'total_amount', 'nb_suppliers', 'vente_internet_max', 'purchase_date_min', 'purchase_date_max',
'time_between_purchase', 'fidelity', 'is_email_true', 'opt_in', #'is_partner', 'nb_tickets_internet',
'gender_female', 'gender_male', 'gender_other', 'nb_campaigns', 'nb_campaigns_opened']
X_train = dataset_train # [features_l]
y_train = dataset_train[['y_has_purchased']]
X_test = dataset_test # [features_l]
y_test = dataset_test[['y_has_purchased']]
return X_train, X_test, y_train, y_test
def load_model(type_of_activity, model):
"""
Loads from S3 storage the optimal parameters of the chosen ML model saved in a pickle file.
Args:
- type_of_activity (str)
- model (str)
Returns:
Model: machine learning model pre-trained with a scikit learn pipeline.
"""
# BUCKET = f"projet-bdc2324-team1/Output_model/{type_of_activity}/{model}/"
BUCKET = f"projet-bdc2324-team1/2_Output/2_1_Modeling_results/standard/{type_of_activity}/{model}/"
filename = model + '.pkl'
file_path = BUCKET + filename
with fs.open(file_path, mode="rb") as f:
model_bytes = f.read()
model = pickle.loads(model_bytes)
return model
def df_segment(df, y, model) :
"""
Segments customers into 4 groups based on the propensity scores given by a previously-loaded ML model.
Args:
- df (DataFrame): DataFrame to be segmented.
- y (Series): True target variable.
- model (Model): Pre-trained machine learning model for prediction.
Returns:
DataFrame: Segmented DataFrame with predicted values and true values for y.
"""
y_pred = model.predict(df)
y_pred_prob = model.predict_proba(df)[:, 1]
df_segment = df
df_segment["has_purchased"] = y
df_segment["has_purchased_estim"] = y_pred
df_segment["score"] = y_pred_prob
df_segment["quartile"] = np.where(df_segment['score']<0.25, '1',
np.where(df_segment['score']<0.5, '2',
np.where(df_segment['score']<0.75, '3', '4')))
return df_segment
def odd_ratio(score) :
"""
Args:
- score (Union[float, int])
Returns:
float: Odd ratio value.
"""
return score / (1 - score)
def adjust_score_1(score) :
"""
Adjust scores by replacing ones with the second highest value.
Allows to compute odd ratios then.
Args:
- score (List[Union[float, int]])
Returns:
np.ndarray: Adjusted score values.
"""
second_best_score = np.array([element for element in score if element !=1]).max()
new_score = np.array([element if element!=1 else second_best_score for element in score])
return new_score
def adjusted_score(odd_ratio, bias) :
"""
Adjust the score based on the odd ratio and bias.
Args:
- odd_ratio (Union[float, int])
- bias (Union[float, int])
Returns:
float: Adjusted score value.
"""
adjusted_score = odd_ratio/(bias+odd_ratio)
return adjusted_score
def find_bias(odd_ratios, y_objective, initial_guess=10) :
"""
Find the bias needed to adjust scores so that their sum is equal to the total number of purchases observed.
Args:
- odd_ratios (List[float]): List of odd ratios associated to the scores that have be adjusted.
- y_objective (Union[float, int]): Objective value => total number of purchases.
- initial_guess (Union[float, int], optional): Initial guess for the bias. Default is 10 (bias is approximately 6 for sports, 10 for music and 22 for museums)
Returns:
float: Estimated bias value.
"""
bias_estimated = fsolve(lambda bias : sum([adjusted_score(element, bias) for element in list(odd_ratios)]) - y_objective, x0=initial_guess)
return bias_estimated[0]
def plot_hist_scores(df, score, score_adjusted, type_of_activity) :
"""
Plot a histogram comparing scores and adjusted scores.
Args:
- df (DataFrame): DataFrame containing score data.
- score (str): Name of the column in df representing the original scores.
- score_adjusted (str): Name of the column in df representing the adjusted scores.
- type_of_activity (str) : type of activity of the companies considered.
Returns:
None
"""
plt.figure()
plt.hist(df[score], label = "score", alpha=0.6)
plt.hist(df[score_adjusted], label="adjusted score", alpha=0.6)
plt.legend()
plt.xlabel("probability of a future purchase")
plt.ylabel("count")
plt.title(f"Comparison between score and adjusted score for {type_of_activity} companies")
# plt.show()
def project_tickets_CA (df, nb_purchases, nb_tickets, total_amount, score_adjusted, duration_ref, duration_projection) :
"""
Project tickets sold and total amount based on the adjusted scores and the duration of periods of study / projection.
Args:
- df (DataFrame): DataFrame containing information about past sales.
- nb_purchases (str) : Name of the column in df representing the number of purchases.
- nb_tickets (str): Name of the column in df representing the number of tickets.
- total_amount (str): Name of the column in df representing the total amount.
- score_adjusted (str): Name of the column in df representing the adjusted score.
- duration_ref (int or float): Duration of the period of reference for the construction of the variables X.
- duration_projection (int or float): Duration of the period of projection of sales / revenue.
Returns:
DataFrame: DataFrame completed with sales and total amount projections.
"""
duration_ratio = duration_ref/duration_projection
df_output = df
# project number of tickets : at least 1 ticket purchased if the customer purchased
df_output.loc[:,"nb_tickets_projected"] = df_output.loc[:,nb_tickets].apply(lambda x : max(1, x /duration_ratio))
# project amount : if the customer buys a ticket, we expect the amount to be at least the average price of tickets
# for customers purchasing exactly one ticket
if df_output.loc[df_output[nb_tickets]==1].shape[0] > 0 :
avg_price = df_output.loc[df_output[nb_tickets]==1][total_amount].mean()
else :
avg_price = df_output[total_amount].mean()
# we compute the avg price of ticket for each customer
df_output["avg_ticket_price"] = df_output[total_amount]/df_output[nb_tickets]
# correct negatives total amounts
df_output.loc[:,"total_amount_corrected"] = np.where(df_output[total_amount] < 0,
avg_price * df_output[nb_tickets],
df_output[total_amount])
df_output.loc[:,"total_amount_projected"] = np.where(
# if no ticket bought in the past, we take the average price
df_output[nb_tickets]==0, avg_price,
# if avg prices of tickets are negative, we recompute the expected amount based on the avg price of a ticket
# observed on the whole population
np.where(X_test_segment["avg_ticket_price"] < 0, avg_price * df_output.loc[:,"nb_tickets_projected"],
# else, the amount projected is the average price of tickets bought by the customer * nb tickets projected
df_output["avg_ticket_price"] * df_output.loc[:,"nb_tickets_projected"])
)
df_output.loc[:,"nb_tickets_expected"] = df_output.loc[:,score_adjusted] * df_output.loc[:,"nb_tickets_projected"]
df_output.loc[:,"total_amount_expected"] = df_output.loc[:,score_adjusted] * df_output.loc[:,"total_amount_projected"]
df_output.loc[:,"pace_purchase"] = (duration_ref/df_output.loc[:,nb_purchases]).apply(lambda x : np.nan if x==np.inf else x)
return df_output
def summary_expected_CA(df, segment, nb_tickets_expected, total_amount_expected, total_amount, pace_purchase,
duration_ref=17, duration_projection=12) :
"""
Generate a summary of expected customer sales based on segments.
Args:
- df (DataFrame): DataFrame containing customer data.
- segment (str): Name of the column in df representing customer segments.
- nb_tickets_expected (str): Name of the column in df representing the expected number of tickets.
- total_amount_expected (str): Name of the column in df representing the expected total amount.
- total_amount (str): Name of the column in df representing the total amount.
- pace_purchase (str) : Name of the column in df representing the average time between 2 purchases in months.
- duration_ref (int or float): Duration of the period of reference for the construction of the variables X.
- duration_projection (int or float): Duration of the period of projection of sales / revenue.
Returns:
DataFrame: Summary DataFrame containing expected customer sales metrics.
"""
# compute nb tickets estimated and total amount expected
df_expected_CA = df.groupby(segment)[[nb_tickets_expected, total_amount_expected]].sum().reset_index()
# number of customers by segment
df_expected_CA.insert(1, "size", df.groupby(segment).size().values)
# size in percent of all customers
df_expected_CA.insert(2, "size_perct", 100 * df_expected_CA["size"]/df_expected_CA["size"].sum())
# compute share of CA recovered
duration_ratio=duration_ref/duration_projection
df_expected_CA["revenue_recovered_perct"] = 100 * duration_ratio * df_expected_CA[total_amount_expected] / \
df.groupby(segment)[total_amount].sum().values
df_expected_CA["share_future_revenue_perct"] = 100 * duration_ratio * df_expected_CA[total_amount_expected] / \
df[total_amount].sum()
df_drop_null_pace = df.dropna(subset=[pace_purchase])
df_expected_CA["pace_purchase"] = df_drop_null_pace.groupby(segment)[pace_purchase].mean().values
return df_expected_CA
def save_file_s3_ca(File_name, type_of_activity):
"""
Saves a file in S3 storage.
Args:
- File_name (str)
- type_of_activity (str)
"""
image_buffer = io.BytesIO()
plt.savefig(image_buffer, format='png', dpi=120)
image_buffer.seek(0)
PATH = f"projet-bdc2324-team1/2_Output/2_3_Sales_Forecast/{type_of_activity}/"
FILE_PATH_OUT_S3 = PATH + File_name + type_of_activity + '.png'
with fs.open(FILE_PATH_OUT_S3, 'wb') as s3_file:
s3_file.write(image_buffer.read())
plt.close()