In questa pagina illustro il progetto Analysis Instacart Online Grocery realizzato per il corso di Modelli e Tecniche per BigData dell’Università della Calabria disponibile nelle repository del mio account GitHub.

Introduzione

Il mondo sta attualmente attraversando un’era digitale in cui l’accumulo e l’analisi di grandi quantità di dati sta giocando un ruolo sempre più importante nel prendere decisioni a livello aziendale e governativo. Questo rapporto mira a esplorare tecnologie avanzate per raccogliere, archiviare e analizzare i dati al fine di fornire informazioni preziose e supportare le decisioni di business.

L’e-commerce sta diventando sempre più prevalente nell’era attuale, con milioni di consumatori che fanno acquisti online ogni giorno. La capacità di raccogliere e analizzare i dati sulle vendite è fondamentale per comprendere il comportamento dei consumatori e per migliorare la strategia di marketing e vendita dell’azienda. Un po’ quello che avviene con il Data Mining ma con le opportune differenze che puoi individuare in questo approfondimento.

Per questo progetto è stata condotta un’analisi dettagliata delle vendite provenienti dai dataset forniti da Instacart, uno dei servizi di consegna alimentari più grandi negli Stati Uniti.

Instacart shopping

Instacart shopping (Fonte screen: instacart.com).

Si tratta di un’app che collabora con la maggior parte dei supermercati americani, negozi specializzati e addirittura farmacie. In sostanza, gli ordini vengono evasi e consegnati da un personal shopper, che:

  • preleva;
  • confeziona
  • consegna l’ordine entro il periodo di tempo stabilito.

Obiettivi del progetto

Sono state effettuate analisi di vario tipo, dal prodotto più venduto fino alla variazione degli acquisti dello stesso durante la settimana. È stata condotta un’analisi dettagliata degli ordini per ottenere eventuali risultati utili ai gestori del servizio per:

  • ottimizzare le risorse;
  • aumentare il fatturato con delle strategie di marketing mirato;
  • incrementare la soddisfazione dei propri utenti.

Basandosi su questi risultati si potrebbero attuare delle strategie di marketing così da enfatizzare l’acquisto dei prodotti più popolari, concentrandosi sulla promozione di essi.

I datasource sono stati forniti da Instacart stesso, disponibili sul sito Kaggle: https://www.kaggle.com/datasets/yasserh/instacart-online-grocery-basket-analysis-dataset. Vengono forniti più di 200mila users univoci, con un range dai 4 ai 100 ordini l’uno e più di 50mila prodotti.

Tecnologia scelta: Spark

La prima scelta progettuale fatta è stata quella della scelta dell’utilizzo di Spark combinato a Python (se cerchi un buon manuale per imparare Python clicca qui). Ci sono molti vantaggi nell’utilizzo di Apache Spark per l’elaborazione dei dati:

  • Scalabilità: Spark è progettato per lavorare su cluster di computer, rendendolo ideale per elaborare grandi quantità di dati in modo efficiente;
  • Velocità: Spark è molto più veloce rispetto ad altre soluzioni di elaborazione dei dati poiché utilizza la memoria del sistema invece di scrivere su disco rigido.

Sono stati fatti diversi test prima di iniziare con l’analisi dei dati vera e propria.

Il primo approccio utilizzato è stato quello di fare uso di Scala per la manipolazione dei dati, servendosi degli RDD per ottenere informazioni dai datasources. Questo approccio risultava molto lento e macchinoso, si è deciso così di passare all’utilizzo di Python e di SparkSQL, che garantiscono molta più velocità e facilità di scrittura delle query. In particolare, SparkSQL, rispetto all’utilizzo di Scala, garantisce:

  • Interfaccia SQL: Spark SQL offre un’interfaccia SQL standard, che rende più facile elaborare e analizzare i dati tabellari;
  • Ottimizzazione automatica: Spark SQL utilizza tecnologie di ottimizzazione automatica per ottenere prestazioni elevate durante l’elaborazione dei dati;
  • Supporto per molte fonti di dati: Spark SQL supporta molte fonti di dati, tra cui file, database, sistemi di gestione dei dati distribuiti e molto altro ancora.

Oltre a questo, Python ha reso facile anche la realizzazione di un’interfaccia grafica tramite cui gli utenti possono interagire con il sistema.

Implementazione del sistema

Vediamo, quindi, quali sono state le varie fasi che hanno portato alla realizzazione del sistema di analisi.

Software “Analysis Instacart Online Grocery”

homepage analysis instacart online grocery

Homepage Analysis Instacart Online Grocery.

Il software di analisi “Analysis Instacart Online Grocery” è stato realizzato interamente in Python con l’importazione delle principali librerie Tkinter e Matplotlib, mentre è stato usato Pyspark per il software di analisi. Esso permette all’utente di selezionare una delle query disponibili e di poter, eventualmente, inserire parametri qualora ce ne fosse il bisogno. Permette, inoltre, di consultare i risultati a video attraverso la visualizzazione tabellare dei risultati e, talvolta, anche attraverso l’utilizzo di grafici molto intuitivi e facili da leggere.

Lettura dei file .csv e download dei datasources

Come primo passo del progetto, dopo il download dei datasources, è stata innanzitutto inizializzata una sessione Spark e, in seguito, utilizzato il metodo spark.read.option(“header”,True).csv(“datasource.csv”) per poter leggere i file csv contenenti i dati e trasformarli in DataFrame.

spark = SparkSession.builder.appName(“Main”).config(“spark.driver.memory”,”15g”).getOrCreate()
#creazione DataFrame
dfOrdersDB = spark.read.option(“header”,True).csv(“orders.csv”)
dfOrderProductsPrior = spark.read.option(“header”,True).csv(“order_products__prior.csv”)
dfOrderProductsTrain = spark.read.option(“header”,True).csv(“order_products__train.csv”)
dfAisles = spark.read.option(“header”,True).csv(“aisles.csv”)
dfDepartments = spark.read.option(“header”,True).csv(“departments.csv”)
dfProducts = spark.read.option(“header”,True).csv(“products.csv”)

#creazione view
dfOrderProductsPrior.createOrReplaceTempView(“OrderProductsPrior”)
dfOrderProductsTrain.createOrReplaceTempView(“OrderProductsTrain”)
dfAisles.createOrReplaceTempView(“Aisles”)
dfDepartments.createOrReplaceTempView(“Departments”)
dfProducts.createOrReplaceTempView(“Products”)

Come si può vedere, sono stati letti i 5 file contenenti i dati. Essi, in particolare, sono:

  • Orders.csv che contiene tutti gli ordini effettuati nell’e-commerce con i seguenti attributi:
    • order_id;
    • user_id;
    • eval_set (set di valutazione che può essere prior o train);
    • order_number (numero dell’ordine del particolare user_id);
    • order_dow (valore intero tra 0 e 7 che indica il giorno della settimana in cui si è concluso l’ordine);
    • order_hour_of_day (valore intero tra 0 e 24 che indica l’ora del giorno in cui si è concluso l’ordine);
    • days_since_prior_order (valore che indica quanti giorni sono passati tra l’ultimo ordine e il penultimo del particolare user_id).
  • Order_products_train.csv che contiene tutti gli ordini più recenti di ogni utente, ovvero l’ultimo ordine effettuato. Gli attributi a disposizione sono:
    • order_id;
    • product_id;
    • add_to_cart_order (valore che indica la posizione di product_id all’interno dell’ordine);
    • reordered.
  • Order_products_prior.csv che contiene lo storico di tutti gli ordini effettuati da qualsiasi utente. I campi sono uguali a quelli appena visti in Order_products_train.csv.
  • Aisles.csv che contiene l’id di ogni corridoio e il rispettivo nome;
  • Deparments.csv che contiene l’id di ogni dipartimento e il rispettivo nome;
  • Products.csv che contiene tutte le informazioni riguardo a ogni specifico prodotto venduto, in particolare gli attributi di questo dataset sono:
    • product_id;
    • product_name;
    • aisle_id;
    • department_id.

Sono state create delle “view” così da poter utilizzare i dataframe creati all’interno della query in maniera più agevole.

riepilogo dei dataset

Riepilogo dei Dataset.

Pre-processing dei dati

La seconda fase pre-analisi è stata quella di effettuare un’attenta lettura dei dati e correzione degli stessi qualora venissero trovate delle incongruenze. Vengono esplicate di seguito tutte le azioni che sono state effettuate su di essi.

Rimozione ordini appartenenti al dataset test

La prima è stata trovata proprio nel fatto che, nei dati forniti, mancasse un dataset. In particolare, il file “Orders.csv” contiene un attributo di nome “eval_set” che indica a quale dataset di ordini appartiene lo specifico ordine preso in considerazione.

Tra questi appaiono valori come “prior” (indicante il fatto che l’ordine si trovasse nel dataset orders_products__prior) o “train” (indicante il fatto che l’ordine si trovasse nel dataset orders_products__train). Esiste, però, un terzo valore chiamato “test”. Si suppone che questo indichi l’appartenenza ad un file utilizzato come test, dataset che, per l’appunto, è mancante. Così sono stati rimossi tutti gli ordini che contengono il valore appena descritto nell’attributo “eval_set”. Si riporta di seguito la query utilizzata per la rimozione degli ordini “difettati”.

dfOrdersDB.createOrReplaceTempView(“OrdersDB”)
dfOrders = spark.sql(“SELECT * FROM OrdersDB WHERE eval_set != ‘test'”)
dfOrders.createOrReplaceTempView(“Orders”)

È stato precedentemente letto il dataset “Orders.csv” e salvato nella view “OrdersDB” e, in seguito, sono stati rimossi gli elementi appena descritti.

Controllo univocità delle chiavi

È stato, inoltre, fatto un controllo sulle chiavi, dove è stato effettuato un raggruppamento e un conteggio sulle stesse, ottenendo come risultato, per ogni attributo rappresentante la chiave di ogni dataset, un valore di “count” pari a 1. Questo permette di capire, quindi, che tutte le chiavi presenti sono univoche.

Unione order_prior e order_train

Altra analisi di pre-processing effettuata è stata quella di andare a creare un dataset unificato, per motivi di comodità di indagine, unendo “orders_products__prior” e “orders_products__train”. Ciò è stato fatto per quei casi in cui non serviva avere una differenza tra ultimi ordini effettuati e storico degli stessi.

dfOrderUnified = spark.sql(“SELECT * FROM OrderProductsPrior UNION ALL SELECT * FROM OrderProductsTrain”)
dfOrderUnified.createOrReplaceTempView(“OrderUnified”)

In particolare, viene fatta una UNION tra le due tabelle e salvate in una view dal nome “OrderUnified”.

Controllo prodotti inesistenti

def prodottiInesistentiInOrder():

r = spark.sql(“SELECT OrderUnified.product_id FROM OrderUnified WHERE OrderUnified.product_id NOT IN ( SELECT Products.product_id FROM Products)”).count()
print(“Prodotti presenti in qualche ordine ma non presenti in Products”)

Come si può vedere, è stata utilizzata una query SparkSQL, controllando che l’ID dello specifico prodotto non sia contenuto nel dataset dei prodotti. Se almeno un ID non è all’interno di quel file, allora qualsiasi ordine che conterrà quel file sarà vittima di pruning.

Rimozione duplicati

È stato, inoltre, fatto un controllo utilizzando due query, una che fa uso della “SELECT DISTINCT” e l’altra che sfrutta una “SELECT”. In questo modo, si può controllare se una selezione di qualsiasi dataset fatto con la “DISTINCT” (ovvero evitando le ripetizioni) coincida con una selezione fatta senza la suddetta funzione. Viene presentato un solo esempio a scopo informativo.

def senzaDuplicatiAisles():

r = spark.sql(“SELECT DISTINCT Aisles.aisle_id, Aisles.aisle ”
“FROM Aisles”).count()
print(“Aisles senza duplicati ” + str(r))

def conDuplicatiAisles():

r = spark.sql(“SELECT Aisles.aisle_id, Aisles.aisle ”
“FROM Aisles”).count()
print(“Aisles con duplicati ” + str(r))

Il risultato delle due query ha dato una risposta positiva, ovvero, tutti i dataset erano congrui alle aspettative, ovvero non erano presenti duplicati. Questo ha permesso di poter continuare l’analisi senza dover fare eventuali pruning sui datasources.

Risultati delle query di preprocessing riguardante il controllo dei duplicati

Risultati delle query di preprocessing riguardante il controllo dei duplicati.

Query di analisi dei datasources

La terza e ultima fase consiste nella creazione di query di analisi sui dataset pre-processati in precedenza. Come detto, sono state implementate in SparkSQL attraverso l’utilizzo di PySpark su linguaggio di programmazione Python.

Molte sono state create per cercare di scoprire quali sono le abitudini alimentari dei vari utenti, ponendo attenzione su tipi di analisi che potrebbero essere utilizzate dal negoziante per motivi di marketing, per una migliore gestione del magazzino o dei fornitori, per altri vari motivi esplicitati di volta in volta.

Le query in questione sono state classificate in base alla velocità di esecuzione delle stesse con le seguenti categorie:

  • Fast;
  • Medium;
  • Slow;
  • Very Slow.

Inoltre, è possibile fare un’ulteriore distinzione tra:

  • quelle che hanno una rappresentazione dei risultati con tabella e grafico (per dare maggiore risalto ai risultati ottenuti);
  • quelle con una rappresentazione solo attraverso tabella;
  • query di utilità nel caso in cui l’utente volesse analizzare qualche dettaglio con maggior precisione e una serie di query a supporto di quelle più complesse.

Query con tabella e grafico

  • ordiniPerOra();

Restituisce la quantità totale di ordini per ogni ora. Lo scopo di questa analisi è, quindi, rilevare gli orari in cui vengono fatti più ordini e permettere al venditore una migliore gestione del personale e del magazzino.

def ordiniPerOra():

return spark.sql(“SELECT order_hour_of_day, COUNT(*) AS n FROM Orders GROUP BY order_hour_of_day”).rdd

Da risultato, le 10 sono l’ora in cui vengono effettuati più ordini, mentre il minimo numero di ordini viene registrato alle 3.

ordini per ora

Ordini per ora.

  • ordiniPerGiorno();

Restituisce la quantità totale di ordini per ogni giorno. Come nel caso precedente, lo scopo di tale analisi è rilevare il giorno in cui vengono fatti più ordini e permettere al venditore una migliore gestione del personale e del magazzino.

def ordiniPerGiorno():

return spark.sql(“SELECT order_dow, COUNT(*) AS n FROM Orders GROUP BY order_dow”).rdd

Il giorno della settimana in cui viene registrato il picco massimo di ordini effettuati è il lunedì, mentre il venerdì si verifica il minimo settimanale.

Vendite per giorno

Vendite per giorno.

  • dipStessoCorridoioDiversiProd();

Restituisce il numero di prodotti diversi che ha uno stesso dipartimento all’interno di uno stesso corridoio.  I risultati ottenuti, possono essere sfruttati insieme a quelli di altre query per risultare utili al venditore.

def dipStessoCorridoioDiversiProd():

dfDepartAisle = spark.sql(“SELECT Products.department_id, Products.aisle_id, COUNT(*) AS number FROM Products GROUP BY Products.department_id, Products.aisle_id”)

dfDepartAisle.createOrReplaceTempView(“DepartAisle”)

return spark.sql(“SELECT DISTINCT Departments.department, Aisles.aisle, DepartAisle.number FROM Departments INNER JOIN DepartAisle ON Departments.department_id = DepartAisle.department_id INNER JOIN Aisles ON Aisles.aisle_id = DepartAisle.aisle_id “).rdd

Dai risultati è possibile notare che il corridoio “Missing” all’interno del dipartimento “Missing” è quello che ha il numero maggiore di prodotti diversi, sono esattamente 1258 prodotti diversi.

Numero prodotti diversi per stesso dipartimento e corridoio

Numero prodotti diversi per stesso dipartimento e corridoio.

  • aislesDepartmentsRiacquistati();

Restituisce il corridoio e il dipartimento in cui sono stati riacquistati più prodotti. Questo risultato, in combinazione con altre analisi (quali ad esempio quella dei prodotti più riacquistati) potrebbe dare suggerimenti utili al venditore sulla disposizione dei prodotti nei dipartimenti e nei corridoi.

def aislesDepartmentsRiacquistati():

prodottiRiordinati.createOrReplaceTempView(“ProdottiRiordinati”)

return spark.sql(“SELECT Products.aisle_id, Products.department_id, COUNT(ProdottiRiordinati.product_id) AS n FROM Products INNER JOIN ProdottiRiordinati ON ProdottiRiordinati.product_id = Products.product_id GROUP BY Products.aisle_id, Products.department_id”).rdd

L’analisi mette in evidenza due corridoi in particolare, ovvero i seguenti:

  • corridoio 24 del dipartimento 4 con 2726251 prodotti riacquistati
  • corridoio 83 del dipartimento 4 con 2123540 prodotti riacquistati

Entrambi si distaccano notevolmente per numero di prodotti riacquistati rispetto a tutti gli altri corridoi. Tutto ciò è ben visibile anche graficamente.

Numero prodotti diversi riacquistati per stesso corridoio e stesso dipartimento

Numero prodotti diversi riacquistati per stesso corridoio e stesso dipartimento.

  • venditePerCorridoio();

Restituisce il numero di vendite per ogni corridoio. Si tratta di una funzione interessante per il venditore in quanto permette di scoprire quali sono i corridoi che hanno venduto più prodotti in generale. Questo risultato può essere molto utile in fase di analisi, sia per motivi legati al marketing e sia per una migliore disposizione dei prodotti.

def venditePerCorridoio():

dfIdAisle_Quantita = spark.sql(“SELECT Products.aisle_id, COUNT(*) AS quantita FROM Products INNER JOIN OrderUnified ON OrderUnified.product_id = Products.product_id GROUP BY Products.aisle_id ORDER BY quantita DESC”)

dfIdAisle_Quantita.createOrReplaceTempView(“Aisle_Quantita”)

return spark.sql(“SELECT Aisles.aisle, Aisle_Quantita.quantita FROM Aisle_Quantita INNER JOIN Aisles ON Aisle_Quantita.aisle_id = Aisles.aisle_id”).rdd

Anche in questo caso l’analisi mette in evidenza due corridoi in particolare:

  • corridoio Fresh Fruits con 3792661 prodotti acquistati
  • corridoio Fresh Vegetables con 3568630 prodotti acquistati

Entrambi si distaccano notevolmente per numero di prodotti acquistati rispetto a tutti gli altri corridoi. Tutto ciò è ben visibile anche graficamente.

Vendite per corridoio

Vendite per corridoio.

  • top15ProdottiWeekend();

Restituisce i 15 prodotti più venduti nel fine settimana ordinati per valore decrescente, ovvero in cima sarà presente il prodotto che registra il numero maggiore di vendite e così via. Lo scopo di questa analisi, oltre a supportare il venditore, permettendogli una più efficiente gestione del magazzino e del personale nel fine settimana, offre informazioni generali sul comportamento dei clienti nei weekend. Il risultato potrebbe far intuire che nel fine settimana le abitudini alimentari delle persone cambiano rispetto ai giorni lavorativi, proprio per il fatto che solitamente non lavorano e quindi hanno più tempo a disposizione.

def top15ProdottiWeekend():

q1 = spark.sql(“SELECT order_id, order_dow FROM Orders WHERE order_dow = 5 OR order_dow = 6”)

q1.createOrReplaceTempView(“Q1”)

q2 = spark.sql(“SELECT product_id, COUNT(product_id) AS n FROM Q1 INNER JOIN OrderUnified ON Q1.order_id = OrderUnified.order_id GROUP BY product_id ORDER BY n”)

q2.createOrReplaceTempView(“Q2”)

return spark.sql(“SELECT Products.product_name, Q2.n FROM Q2 INNER JOIN Products ON Q2.product_id = Products.product_id ORDER BY Q2.n DESC LIMIT 20”).rdd

Dai risultati dell’analisi è possibile notare come nella top 3 dei prodotti più acquistati nel fine settimana (quindi da venerdì a domenica), vi siano i seguenti:

  • Banana
  • Bag of Organic Bananas
  • Organic Strawberries

I 15 prodotti più acquistati nel fine settimana sono esplicati in modo molto chiaro dal seguente grafico.

Top 15 prodotti weekend

Top 15 prodotti weekend.

  • top10ProdottiUtente(user_id);

Restituisce la lista dei 10 prodotti più acquistati da un utente specificato in input. Lo scopo di tale analisi potrebbe essere quello scoprire gli interessi di acquisto di un particolare utente per proporgli delle promozioni mirate sui prodotti più apprezzati.

def top10ProdottiUtente(user_id):

q1 = spark.sql(“SELECT user_id, order_id FROM Orders WHERE user_id = %a” %user_id) q1.createOrReplaceTempView(“q1”)

return spark.sql(“SELECT Products.product_name, COUNT(OrderUnified.product_id) AS n FROM q1 INNER JOIN OrderUnified ON q1.order_id = OrderUnified.order_id INNER JOIN Products ON Products.product_id = OrderUnified.product_id GROUP BY Products.product_name ORDER BY n DESC LIMIT 10”).rdd

A scopo di esempio è stata eseguita la query con l’utente che ha come “user_id” il numero “1”. Dal grafico a barre emerge che l’utente “1” ha acquistato con maggior quantità “soda, original beef jerky” and “pistachios rispetto ad altri prodotti.

Top 10 prodotti acquistati da un utente

Top 10 prodotti acquistati da un utente.

  • variazioneOrdiniTrainPrior();

Restituisce la variazione giornaliera delle vendite presenti in “Order_products_train” (ultimi ordini effettuati da ciascun utente) e di quelle presenti in “Order_products_prior” (storico degli ordini di tutti gli utenti). Poi vengono messe a confronto e mostrato a video il grafico rappresentante.

confronto variazione vendite giornaliere prodotti in train e prodotti in prior

Confronto variazione vendite giornaliere prodotti in train e prodotti in prior.

  • top7ProdottiCorrelati(product_name);

Restituisce i 7 prodotti più correlati ad uno specifico prodotto ricevuto in input. In altre parole, viene cercato quali sono i prodotti che vengono più spesso acquistati insieme allo specifico prodotto passato in input.

La ricerca avviene all’interno di tutti gli ordini effettuati da ogni utente e viene restituita una lista ordinata in maniera decrescente per numero di volte che prodotti “correlati” a quello desiderato sono stati acquistati. Questa funzione gioca un ruolo molto importante all’interno dell’analisi dei prodotti e delle abitudini alimentari dei clienti.

def top7ProdottiCorrelati(product_name):

ordiniProductName = spark.sql(“SELECT OrderUnified.order_id, OrderUnified.product_id FROM Products INNER JOIN OrderUnified ON Products.product_id = OrderUnified.product_id WHERE Products.product_name = %a” % product_name)

ordiniProductName.createOrReplaceTempView(“OrdiniProductName”)

prodottiCorrelati = spark.sql(“SELECT OrderUnified.product_id, COUNT(*) AS n FROM OrdiniProductName INNER JOIN OrderUnified ON OrdiniProductName.order_id = OrderUnified.order_id WHERE OrderUnified.product_id != OrdiniProductName.product_id GROUP BY OrderUnified.product_id ORDER BY n DESC LIMIT 7”)

prodottiCorrelati.createOrReplaceTempView(“ProdottiCorrelati”)

return spark.sql(“SELECT Products.product_name, ProdottiCorrelati.n FROM ProdottiCorrelati INNER JOIN Products ON Products.product_id = ProdottiCorrelati.product_id ORDER BY ProdottiCorrelati.n DESC”).rdd

A scopo di esempio è stata eseguita la query con il prodotto “Banana”. Ciò che possiamo scoprire dal grafico a torta è che i prodotti più correlati a “Banana” che sono stati acquistati fanno tutti parte del reparto della frutta. I più correlati ad essa sono “Organic Strawberries”, “Organic Avocado” e “Organic baby spinach.

7 prodotti più correlati a banana

7 prodotti più correlati a banana.

  • prodottiAcquistatiOra(product_name);

Restituisce la quantità di vendite effettuate per ogni ora del prodotto ricevuto in input.

Tale analisi supporta il venditore in quanto potrebbe gestire in modo migliore il rapporto con i fornitori di un determinato prodotto che viene più acquistato in specifiche fasce orarie.

def prodottoPiuAcquistato(product_name):

mostBrought = spark.sql(“SELECT product_id, COUNT(product_id) AS n FROM OrderUnified GROUP BY product_id ORDER BY n DESC LIMIT 1”)

mostBrought.createOrReplaceTempView(“MostBrought”)q1 = spark.sql(“SELECT Orders.order_id, OrderUnified.product_id, Orders.order_dow FROM Orders INNER JOIN OrderUnified ON Orders.order_id = OrderUnified.order_id “)

q1.createOrReplaceTempView(“q1”)

return spark.sql(“SELECT Products.product_name, q1.order_dow, COUNT(MostBrought.product_id) AS n FROM MostBrought INNER JOIN q1 ON MostBrought.product_id = q1.product_id INNER JOIN Products ON MostBrought.product_id = Products.product_id GROUP BY Products.product_name, q1.order_dow ORDER BY q1.order_dow”).rdd

Ovviamente anche per questo esempio è stato scelto il prodotto “Banana”. Ciò che possiamo imparare dal grafico è che l’ora in cui si vende maggiormente il prodotto scelto è alle 10 di mattina.

ordini per ora del prodotto banana

Ordini per ora del prodotto banana.

  • confrontoVendite3Prodotti();

Questo metodo confronta tre dati di vendite in contemporanea. In particolare, vengono utilizzati tre prodotti diversi (inseriti dall’utente) e la loro variazione di vendita giornaliera.

Vengono in seguito confrontati e messi in un grafico unico. Ciò che si può apprendere dal grafico è che “Taboule Salad” e “Chocolate Sandwich Cookies” hanno delle vendite molto alte e quasi paragonabili tra di loro, mentre “Vegan enchiladas” ha delle vendite molto più basse.

confronto vendite giornaliere di 3 prodotti

Confronto vendite giornaliere di 3 prodotti.

  • variazioneGiornalieraTopFlop();

Questa query viene utilizzata per confrontare il numero di vendite giornaliere tra il prodotto più venduto e il prodotto meno venduto, cosa resa evidente dal grafico sottostante.

confronto variazione vendite giornaliere prodotto maggiormente venduto e meno venduto

Confronto variazione vendite giornaliere prodotto maggiormente venduto e meno venduto.

Query con sola tabella

  • topClientiOrdini();

Con questa query si è cercato di scoprire quali sono i clienti che acquistano di più degli altri. A scopo di analisi questo potrebbe classificare tali clienti come “Fedeli”. La quantità degli ordini effettuati è inoltre ordinata in modo decrescente per risalire fin da subito al cliente più fedele.

def topClientiOrdini():

return spark.sql(“SELECT user_id, COUNT(*) AS n FROM Orders GROUP BY user_id ORDER BY n DESC”).rdd

  • ordiniPiuProdotti();

Il risultato di questa funzione è un elenco degli ordini visualizzabile in ordine decrescente per numero di prodotti nell’ordine. Query puramente analitica.

def ordiniPiuProdotti():

return spark.sql(“SELECT order_id, COUNT(*) AS n FROM OrderUnified ”
” GROUP BY order_id ORDER BY n DESC”).rdd

  • ordiniUtente(id_utente);

Restituisce tutti gli ordini di un utente che viene passato in input dall’utente stesso. Gli ordini saranno raggruppati per ogni utente, restituendo il numero di ordini per ognuno di essi. Esso sarà ordinato in maniera descrescente.

def ordiniUtente(id_utente):

return spark.sql(“SELECT user_id, order_id, order_number, order_dow, order_hour_of_day FROM Orders WHERE user_id = %a ” % id_utente).rdd

  • ordiniUtenteOra(id_utente, ora);

Restituisce tutti gli ordini di uno specifico utente in una precisa fasci oraria.

def ordiniUtenteOra(id_utente, ora):

return spark.sql(“SELECT user_id, order_id, order_number, order_dow,order_hour_of_day FROM Orders WHERE user_id = {} AND order_hour_of_day = {} “.format(id_utente, ora)).rdd

  • mediaDaysSincePriorOrder();

Restituisce la media totale, per ogni utente, del tempo intercorso tra l’ultimo ordine che è stato effettuato e il penultimo.

def mediaDaysSincePriorOrder():

q1 = spark.sql(“SELECT Orders.user_id, Orders.days_since_prior_order FROM OrderProductsTrain INNER JOIN Orders ON OrderProductsTrain.order_id = Orders.order_id”)
q1.createOrReplaceTempView(“Q1”)

return spark.sql(“SELECT AVG(Q1.days_since_prior_order) FROM Q1”).rdd

  • daysSincePriorOrderUtente(id_user);

Restituisce il tempo trascorso tra l’ultimo ordine effettuato da uno specifico utente passato in input e il penultimo.

def daysSincePriorOrderUtente(id_user):

return spark.sql(“SELECT DISTINCT Orders.days_since_prior_order FROM Orders INNER JOIN OrderProductsTrain ON Orders.order_id = OrderProductsTrain.order_id WHERE Orders.user_id = %a” % id_user).rdd

  • posizionePrioritaria();

Funzione che restituisce il prodotto che per primo viene aggiunto al carrello in un ordine. Questo potrebbe indicare che quel tipo di prodotto è un prodotto “prioritario”, ovvero di prima necessità, che il cliente tende a comprare o ricordare in maniera più facile.

def posizionePrioritaria():

q1 = spark.sql(“SELECT DISTINCT product_id, COUNT(product_id) AS n FROM OrderUnified WHERE add_to_cart_order = 1 GROUP BY product_id”)
q1.createOrReplaceTempView(“Q1”)

return spark.sql(“SELECT Q1.product_id, Products.product_name, Q1.n FROM Products INNER JOIN Q1 ON Products.product_id = Q1.product_id ORDER BY Q1.n DESC”).rdd

  • prodottiComuniPiuAcquistati();

Query che ci permette di analizzare e capire quali sono i prodotti più comuni che ciascun cliente ha acquistato. Viene, più specificatamente, fatto un controllo per ogni ordine, andando a confrontare quali sono i prodotti con lo stesso id che vengono acquistati per ogni transazione, così da capire quali sono gli alimenti acquistati con più facilità. Questo permetterà di capire dunque quali sono gli oggetti che possono essere messi in promozione con più facilità rispetto ad altri.

def prodottiComuniPiuAcquistati():

q1 = spark.sql(“SELECT DISTINCT OrderUnified.order_id, OrderUnified.product_id FROM OrderUnified”)
q1.createOrReplaceTempView(“Q1”)

q2 = spark.sql(“SELECT Q1.product_id, COUNT(Q1.product_id) AS n FROM Q1 GROUP BY Q1.product_id”)
q2.createOrReplaceTempView(“Q2”)

return spark.sql(“SELECT Products.product_name, Products.product_id, Q2.n FROM Q2 INNER JOIN Products ON Q2.product_id =  Products.product_id ORDER BY n DESC”)

  • topOraGiornoAcquistoUtente();

Restituisce per ogni utente, per ogni ora e per ogni giorno il numero di ordini effettuati.

def topOraGiornoAcquistoUtente():

return spark.sql(“SELECT user_id, order_dow, order_hour_of_day, COUNT(*) AS n FROM Orders GROUP BY user_id, order_dow, order_hour_of_day ORDER BY n DESC”).rdd

  • topProdottiComprati();

Il seguente metodo permette di scoprire quali sono i prodotti più acquistati. Questo potrebbe essere un dato molto utile al venditore in quanto permette di attuare determinate strategie di marketing che permettono di pubblicizzare al meglio i prodotti più ordinati.

def topProdottiComprati():

return spark.sql(“SELECT product_name, COUNT(*) AS n FROM OrderUnified INNER JOIN Products ON OrderUnified.product_id = Products.product_id GROUP BY Product_name ORDER BY n DESC”).rdd

  • posizione(posizione);

Restituisce i prodotti aggiunti al carrello nella specifica posizione indicata dall’input. I risultati sono ordinati in modo decrescente per numero di volte che un prodotto è stato aggiunto nella determinata posizione. Da questi risultati si potrebbe risalire ai prodotti che vengono considerati di importanza maggiore rispetto ad altri.

def posizione(posizione):

posizione = spark.sql(“SELECT product_id, COUNT(*) AS n FROM OrderUnified WHERE add_to_cart_order = %a GROUP BY product_id” % posizione)
posizione.createOrReplaceTempView(“Posizione”)

return spark.sql(“SELECT Products.product_name, Posizione.n FROM Posizione INNER JOIN Products ON Products.product_id = Posizione.product_id ORDER BY n DESC”).rdd

  • prodottiSoloInTrain();

Restituisce l’elenco dei prodotti che si trovano solo negli ultimi ordini e non si trovano, invece, negli ordini precedenti. Questo tipo di analisi potrebbe far intuire al venditore quali dei nuovi prodotti introdotti (il fatto che non siano presenti in ordini passati fa pensare che i prodotti risultanti siano stati messi in vendita da poco) siano più apprezzati dai clienti.

def prodottiSoloInTrain():

q1 = spark.sql(“SELECT OrderProductsTrain.product_id FROM OrderProductsTrain EXCEPT SELECT OrderProductsPrior.product_id FROM OrderProductsPrior”)
q1.createOrReplaceTempView(“Q1”)

q2 = spark.sql(“SELECT Q1.product_id, COUNT(*) AS n FROM Q1 GROUP BY Q1.product_id”)
q2.createOrReplaceTempView(“Q2”)

return spark.sql(“SELECT Products.product_name, Q2.n FROM Products INNER JOIN Q2 ON Q2.product_id = Products.product_id ORDER BY n DESC”)

  • prodottiInvenduti();

Restituisce i prodotti che non si trovano in nessun ordine. Non si può dare per scontato che tutti i prodotti presenti nel dataset Products siano stati venduti almeno una volta. Quindi, lo scopo di questa analisi è dimostrare la presenza o meno di prodotti mai venduti. Il venditore da tutto ciò può risparmiare su questo tipo di prodotti (visto che hanno un costo spaziale ed economico) e potrebbe, invece, investire su quelli più redditizi.

def prodottiInvenduti():

return spark.sql(“SELECT Products.product_id, Products.product_name FROM Products WHERE product_id NOT IN ( SELECT OrderUnified.product_id FROM OrderUnified)”).rdd

Dall’analisi in questione è risultato che i seguenti prodotti:

  • Protein Granola Apple Crisp con id 3630
  • Unpeeled Apricot Halves in Heavy Syrup con id 7045
  • Single Barrel Kentucky Straight Bourbon Whiskey con id 46625

Non sono mai stati venduti perchè non presenti in alcun ordine.

  • venditePerDipartimento()

Restituisce per ogni dipartimento il numero di prodotti venduti in essi. La query può tornare utile al venditore per una corretta e più efficiente gestione di ogni dipartimento.

def venditePerDipartimento():

q1 = spark.sql(“SELECT Products.department_id, COUNT(*) AS n FROM Products INNER JOIN OrderUnified ON OrderUnified.product_id = Products.product_id GROUP BY Products.department_id”)
q1.createOrReplaceTempView(“Q1”)

return spark.sql(“SELECT Departments.department, Q1.n FROM Departments INNER JOIN Q1 ON Departments.department_id = Q1.department_id”).rdd

  • topProdottiRiordinati();

Restituisce i prodotti che sono stati più riordinati. Chiaramente questa analisi risulta essere molto comoda per promuovere i prodotti più riacquistati in assoluto con strategie di vendita adeguate.

def topProdottiRiordinati():

q1 = spark.sql(“SELECT OrderUnified.product_id, COUNT(*) AS n FROM OrderUnified WHERE OrderUnified.reordered = 1 GROUP BY OrderUnified.product_id”)
q1.createOrReplaceTempView(“Q1”)

return spark.sql(“SELECT Products.product_name, Q1.product_id, Q1.n FROM Products INNER JOIN Q1 ON Products.product_id = Q1.product_id ORDER BY n DESC”).rdd

  • topProdottiRiordinatiPerGiorno();

Come la precedente analisi, ma in questo caso i risultati sono raggruppati per giorno della settimana.

def topProdottiRiordinatiPerGiorno():

q1 = spark.sql(“SELECT OrderUnified.product_id, Orders.order_dow, COUNT(*) AS n FROM Orders INNER JOIN OrderUnified ON Orders.order_id = OrderUnified.order_id WHERE OrderUnified.reordered = 1 GROUP BY OrderUnified.product_id, Orders.order_dow”)
q1.createOrReplaceTempView(“Q1”)

return spark.sql(“SELECT Products.product_name, Q1.product_id, Q1.order_dow, Q1.n FROM Products INNER JOIN Q1 ON Products.product_id = Q1.product_id ORDER BY n DESC”).rdd

Query di utilità

  • prodottiAisle(aisle);

Restituisce tutti i prodotti presenti in un determinato corridoio scelto dall’utente.

def prodottiAisle(aisle):

prodottiAisle = spark.sql(“SELECT Products.product_name, Aisles.aisle, Products.department_id FROM Products INNER JOIN Aisles ON Aisles.aisle_id = Products.aisle_id WHERE Aisles.aisle = %a” % aisle)
prodottiAisle.createOrReplaceTempView(“ProdottiAisles”)

return spark.sql(“SELECT ProdottiAisles.product_name, ProdottiAisles.aisle, Departments.department FROM ProdottiAisles INNER JOIN Departments ON Departments.department_id = ProdottiAisles.department_id “).rdd

  • prodottiAisleDep(aisle, department);

Restituisce tutti i prodotti presenti in uno specifico corridoio scelto dall’utente, appartenenti ad un determinato dipartimento anch’esso scelto dall’utente.

def prodottiAisleDep(aisle, department):

productAisels = spark.sql(“SELECT Products.product_id, Products.product_name, Aisles.aisle, Products.department_id FROM Products INNER JOIN Aisles ON Products.aisle_id = Aisles.aisle_id WHERE Aisles.aisle = %a” % aisle)
productAisels.createOrReplaceTempView(“ProductAisles”)

return spark.sql(“SELECT ProductAisles.product_name, ProductAisles.product_id, ProductAisles.aisle, Departments.department FROM ProductAisles INNER JOIN Departments ON ProductAisles.department_id = Departments.department_id WHERE Departments.department = %a” % department).rdd

  • prodottiDepartment(department);

Restituisce tutti i prodotti forniti da un determinato dipartimento scelto dall’utente.

def prodottiDepartment(department):

prodottiDep = spark.sql(“SELECT Products.product_name, Departments.department, Products.aisle_id FROM Products INNER JOIN Departments ON Departments.department_id = Products.department_id WHERE Departments.department = %a” % department)
prodottiDep.createOrReplaceTempView(“ProdottiDep”)

return spark.sql(“SELECT ProdottiDep.product_name, ProdottiDep.department, Aisles.aisle FROM ProdottiDep INNER JOIN Aisles ON Aisles.aisle_id = ProdottiDep.aisle_id “).rdd

  • prodottiAisleDep(aisle, department);

Restituisce tutti i prodotti appartenenti ad uno specifico dipartimento e ad uno specifico corridoio.

def prodottiAisleDep(aisle, department):

productAisels = spark.sql(“SELECT Products.product_id, Products.product_name, Aisles.aisle, Products.department_id FROM Products INNER JOIN Aisles ON Products.aisle_id = Aisles.aisle_id WHERE Aisles.aisle = %a” % aisle)
productAisels.createOrReplaceTempView(“ProductAisles”)

return spark.sql(“SELECT ProductAisles.product_name, ProductAisles.product_id, ProductAisles.aisle, Departments.department FROM ProductAisles INNER JOIN Departments ON ProductAisles.department_id = Departments.department_id WHERE Departments.department = %a” % department).rdd

  • ultimoOrdineUtente(id_utente);

Restituisce, dato un preciso utente, il suo ultimo ordine.

def ultimoOrdineUtente(id_utente):

ordine = spark.sql(“SELECT Orders.user_id, Orders.order_number, Orders.order_dow, Orders.order_hour_of_day, OrderProductsTrain.order_id, OrderProductsTrain.product_id, OrderProductsTrain.add_to_cart_order, OrderProductsTrain.reordered FROM Orders INNER JOIN OrderProductsTrain ON Orders.order_id = OrderProductsTrain.order_id “WHERE Orders.user_id = %a” % id_utente)
ordine.createOrReplaceTempView(“Ordine”)

return spark.sql(“SELECT Ordine.order_id, Ordine.order_number, Ordine.order_dow, Ordine.order_hour_of_day, Products.product_name, Ordine.add_to_cart_order, Ordine.reordered FROM Ordine INNER JOIN Products ON Products.product_id = Ordine.product_id”).rdd

  • numeroOrdine(user_id, order_id);

Restituisce il numero dell’ordine di uno specifico utente e di uno specifico ordine. Questo permette di capire se il cliente in questione è un cliente “fedele” oppure no, in quanto più ordini ha effettuato più esso sarà fedele.

def numeroOrdine(user_id, order_id):

return spark.sql(“SELECT order_number FROM Orders WHERE order_id = {} AND user_id =  {}.format(order_id, user_id)).rdd

  • prodottoAcquistato(id_product);

Restituisce il numero di volte che uno specifico prodotto è stato riacquistato. Questa funzione, in correlazione con altre, potrebbe essere cruciale nella scelta di marketing per eventuali promozioni, o potrebbe essere molto utile per fare confronti di vendite con altri prodotti.

def prodottoAcquistato(id_product):

return spark.sql(“SELECT Products.product_name, COUNT(*) AS n FROM Products INNER JOIN OrderUnified ON OrderUnified.product_id = Products.product_id WHERE Products.product_id = %a GROUP BY Products.product_name” % id_product).rdd

  • prodottoRiordinato(id_product);

Funzione che ci permette di conoscere, dato uno specifico prodotto, quante volte quel prodotto è stato riordinato. Questo potrebbe ispirare analisi sulla qualità dei vari prodotti, in quanto, si potrebbe pensare che più un prodotto viene riordinato, più questo è apprezzato per via del suo prezzo, della sua qualità, della sua importanza nella vita quotidiana ecc.

def prodottoRiordinato(id_product):

return spark.sql(“SELECT Products.product_name, COUNT(*) FROM Products INNER JOIN OrderUnified ON OrderUnified.product_id = Products.product_id WHERE Products.product_id = %a AND OrderUnified.reordered = 1 GROUP BY Products.product_name” % id_product).rdd

Query di supporto

  • prodottoPiuAcquistato();

Funzione che specifica quanto un prodotto è stato venduto durante tutta la settimana. Esso mostra la variazione giornaliera delle vendite di uno specifico prodotto. Questo ovviamente permette di capire quando il prodotto è venduto di più e di attuare strategie di marketing di conseguenza.

def prodottoPiuAcquistato():

mostBrought = spark.sql(“SELECT product_id, COUNT(product_id) AS n FROM OrderUnified GROUP BY product_id ORDER BY n DESC LIMIT 1”)
mostBrought.createOrReplaceTempView(“MostBrought”)

q1 = spark.sql(“SELECT Orders.order_id, OrderUnified.product_id, Orders.order_dow FROM Orders INNER JOIN OrderUnified ON Orders.order_id = OrderUnified.order_id “)
q1.createOrReplaceTempView(“q1”)

return spark.sql(“SELECT Products.product_name, q1.order_dow, COUNT(MostBrought.product_id) AS n FROM MostBrought INNER JOIN q1 ON MostBrought.product_id = q1.product_id INNER JOIN Products ON MostBrought.product_id = Products.product_id GROUP BY Products.product_name, q1.order_dow ORDER BY q1.order_dow”).rdd

  • prodottoMenoAcquistato();

Funzione che ritorna la variazione giornaliera del prodotto meno acquistato in assoluto.

def prodottoMenoAcquistato():

lessBrought = spark.sql(“SELECT product_id, COUNT(product_id) AS n FROM OrderUnified GROUP BY product_id ORDER BY n ASC LIMIT 1”)
lessBrought.createOrReplaceTempView(“lessBrought”)

q1 = spark.sql(“SELECT Orders.order_id, OrderUnified.product_id,                       Orders.order_dow FROM Orders INNER JOIN OrderUnified ON Orders.order_id = OrderUnified.order_id “)
q1.createOrReplaceTempView(“q1”)

return spark.sql(“SELECT Products.product_name, q1.order_dow, COUNT(lessBrought.product_id) AS n FROM lessBrought INNER JOIN q1 ON lessBrought.product_id = q1.product_id INNER JOIN Products ON lessBrought.product_id = Products.product_id GROUP BY Products.product_name, q1.order_dow ORDER BY q1.order_dow”).rdd

  • prodottoSceltoDaUtente(prodotto);

Funzione che ha lo stesso funzionamento della già citata prodottoPiuAcquistato(), con l’unica differenza che questa volta è l’utente a scegliere il prodotto da analizzare.

def prodottoSceltoDaUtente(prodotto):
lessBrought = spark.sql(“SELECT Products.product_name, OrderUnified.product_id, COUNT(OrderUnified.product_id) AS n FROM OrderUnified INNER JOIN Products ON Products.product_id = OrderUnified.product_id GROUP BY Products.product_name, OrderUnified.product_id”)
lessBrought.createOrReplaceTempView(“lessBrought”)q2 = spark.sql(“SELECT product_name, product_id, n FROM lessBrought WHERE product_name = %a” %prodotto)
q2.createOrReplaceTempView(“q2”)q1 = spark.sql(“SELECT Orders.order_id, OrderUnified.product_id, Orders.order_dow FROM Orders INNER JOIN OrderUnified ON Orders.order_id = OrderUnified.order_id “)
q1.createOrReplaceTempView(“q1”)return spark.sql(“SELECT q2.product_name, q1.order_dow, COUNT(q2.product_name) AS n FROM q2 INNER JOIN q1 ON q2.product_id = q1.product_id GROUP BY q2.product_name, q1.order_dow ORDER BY q1.order_dow”).rdd
  • variazioneOrderPrior();

Questa funzione ritorna la variazione del numero di prodotti venduti all’interno del dataset “Order_product_prior” che contiene lo storico di tutti gli ordini effettuati.

def variazioneOrderPrior():

mostBrought = spark.sql(“SELECT product_id, COUNT(product_id) AS n FROM OrderProductsPrior GROUP BY product_id ORDER BY n DESC”)
mostBrought.createOrReplaceTempView(“MostBrought”)

q1 = spark.sql(“SELECT Orders.eval_set, Orders.order_id, OrderProductsPrior.product_id, Orders.order_dow FROM Orders INNER JOIN OrderProductsPrior ON Orders.order_id = OrderProductsPrior.order_id “)
q1.createOrReplaceTempView(“q1”)

return spark.sql(“SELECT q1.eval_set, q1.order_dow, COUNT(MostBrought.product_id) AS n FROM MostBrought INNER JOIN q1 ON MostBrought.product_id = q1.product_id INNER JOIN Products ON MostBrought.product_id = Products.product_id GROUP BY q1.eval_set, q1.order_dow ORDER BY q1.order_dow”).rdd

Conclusioni

Per concludere, durante lo svolgimento di questo progetto sono state apprese nuove tecniche di modellazione e gestione dei dati che saranno molto utili in futuro. In un mondo in cui la tecnologia è in continua evoluzione e in cui i dati rappresentano un tesoro inestimabile da cui ottenere informazioni preziose, imparare ad avere a che fare con grandi dataset è di fondamentale importanza.

Inoltre, per la comprensione dei dati e degli attributi di ogni datasource, sono state fatte numerose ricerche online. In particolare, sono stati utilizzati due paper che hanno permesso un lavoro più agevole:

Sito da cui sono state reperite informazioni riguardante l’e-commerce e la relazione tra i vari attributi con una piccola spiegazione.

Paper di analisi sullo stesso dataset effettuato con l’utilizzo delle librerie Numpy e Pandas. Ha permesso di avere una più chiara idea sugli attributi presenti all’interno dei dataset. In particolare ha permesso di fare chiarezza sul valore “test” presente in alcuni ordini nel campo “eval_set”.