As a data scientist or data analyst, one of the important topics that every data analyst should be familiar with is the distributed data processing technologies. In this project, different queries will be conducted to the dataset to extract useful information out of it.
But in this case, the data is so big that working with it on the local machine is not easy to be done. That is when the distributed data processing and Spark Technology will become handy.
So in this project, Pyspark module in python and google colab environment will be use in order to apply some queries to the dataset which related to lastfm website which is an online music service where users can listen to different songs. This dataset is containing two csv files listening.csv and genre.csv. The dataset contains over 13 million rows of data.
Source:
# Install PySpark library
!pip install pyspark
# Mount google drive
from google.colab import drive
drive.mount('/content/drive')
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max, struct
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark.sql import SQLContext
# Create spark session
spark = SparkSession.builder.appName('spark_app').getOrCreate
# Create shortcut term
sc = SparkContext.getOrCreate()
# label SQL Context
sqlContext = SQLContext(sc)
# Import listening.csv file
listening_csv_path = '/content/drive/My Drive/dataset/listenings.csv'
# Create data frame
listening_df = sqlContext.read.load(listening_csv_path,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
# Check data
listening_df.show()
# Delete useless columns
listening_df = listening_df.drop('date')
# Drop null values
listening_df = listening_df.na.drop()
# Check dataset
listening_df.show()
# Check schema
listening_df.printSchema()
# Check dataframe dimension
shape = (listening_df.count(), len(listening_df.columns))
# Show shape
print(shape)
# Query: Select two columns, tracks and artist
q0 = listening_df.select('artist', 'track')
# Show dataframe
q0.show()
# Query: Records of those users who have listened to Rihanna
q1 = listening_df.select('*').filter(listening_df.artist == 'Rihanna')
# Show dataframe
q1.show()
# Query: Top 10 users who are fan of Rihanna
q2 = listening_df.select('user_id').filter(listening_df.artist == 'Rihanna').groupby('user_id').agg(count('user_id').alias('count')).orderBy(desc('count')).limit(10)
# Show dataframe
q2.show()
# Query: Top 10 famous tracks
q3 = listening_df.select('artist', 'track').groupby('artist', 'track').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
# Show dataframe
q3.show()
# Query: Top 10 famous tracks of Rihanna
q4 = listening_df.select('artist', 'track').filter(listening_df.artist == 'Rihanna').groupby('artist', 'track').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
# Check dataframe
q4.show()
# Query: Top 10 famous albums
q5 = listening_df.select('artist', 'album').groupby('artist', 'album').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
# Check dataframe
q5.show()
# Import genre.csv file
genre_csv_path = '/content/drive/My Drive/dataset/genre.csv'
# Create data frame
genre_df = sqlContext.read.load(genre_csv_path,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
# Check data
genre_df.show()
# Check dataframe
listening_df.show()
# Inner join these two dataframes
data = listening_df.join(genre_df, how = 'inner', on = ['artist'])
# Show data
data.show()
# Query: Top 10 users who are fan of pop music
q6 = data.select('user_id').filter(data.genre == 'pop').groupby('user_id').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
# Check dataframe
q6.show()
# Query: Top 10 famous genres
q7 = data.select('genre').groupby('genre').agg(count('*').alias('count')).orderBy(desc('count')).limit(10)
# Check dataframe
q7.show()
# Query: Find out each user favorite genre
q8_1 = data.select('user_id', 'genre').groupby('user_id', 'genre').agg(count('*').alias('count')).orderBy('user_id')
# Check dataframe
q8_1.show()
# Query: Combine count and genre column, find each user favorite genre
q8_2 = q8_1.groupby('user_id').agg(max(struct(col('count'), col('genre'))).alias('max')).select(col('user_id'), col('max.genre'))
# Check dataframe
q8_2.show()
# Query: Find out how many pop, rock, metal and hiphop singers
q9 = genre_df.select('genre').filter((col('genre') == 'pop') | (col('genre') == 'rock') | (col('genre') == 'metal') | (col('genre') == 'hip hop')).groupby('genre').agg(count('genre').alias('count'))
# Chech dataframe
q9.show()
# Returns all the records as a list of class:Row
q9_list = q9.collect()
# Get the list of genre
labels = [ row['genre'] for row in q9_list ]
# Get the count in each genre
counts = [ row['count'] for row in q9_list ]
# Show label and counts
print(labels)
print(counts)
# Visualize these two list using a bar chart
plt.bar(labels, counts)
plt.show()