In this project, distributed programming using Spark will be implemented and then derive knowledge from data. The data will be loaded for analysis, and will be explored by using Spark RDD, Spark DataFrames. Explored more and performed statistical analysis on Stock prices datasets using Apache Spark SQL and Spark DataFrame API. Lastly, a parquet tables will be created and then store their results in them.
Apache Spark
Use cases:
Dataset:
Source:
# Import library
from pyspark import SparkContext
# Create shorcut term
sc = SparkContext.getOrCreate()
# Check type
type(sc)
# Define collections of numbers
data = [1, 2, 3, 4, 5]
# Parallel RDD
distData = sc.parallelize(data)
# Import library
from pyspark.sql import SQLContext
# label SQL Context
sqlContext = SQLContext(sc)
# Create list
l = [('Alice', 1)]
# Create dataframe
df = sqlContext.createDataFrame(l, ['name', 'age'])
# Show data
df.collect()
# Label data
data_file = 'TSLA.csv'
# Read the csv
raw_rdd = sc.textFile(data_file).cache()
# Return the five element in the data
raw_rdd.take(5)
# Check type
type(raw_rdd)
# Apply map transformation
csv_rdd = raw_rdd.map(lambda row: row.split(","))
# Show data
print(csv_rdd.take(2))
# Check data type
print(type(csv_rdd))
# Check length
len(csv_rdd.take(1)[0])
- Immutable distributed collections of data
- Data organized into named columns
Operations:
# Create data frame
df = sqlContext.read.load(data_file,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
# Check count
df.count()
# Check data
df.take(5)
# Check schema
df.printSchema()
# Import library
import pandas
# Check pandas dataframe
df.toPandas().head(5)
# Import library
from pyspark.sql.types import *
# Set up structure type
schema = StructType([
StructField("date", StringType(), True),
StructField("openprice", IntegerType(), True),
StructField("highprice", IntegerType(), True),
StructField("lowprice", IntegerType(), True),
StructField("closeprice", IntegerType(), True),
StructField("volume", IntegerType(), True),
StructField("adjcloseprice", IntegerType(), True)])
# Create dataframe
df2 = sqlContext.read.load(data_file,
format='com.databricks.spark.csv',
header='true',
schema=schema)
# Check dataframe
df2.take(2)
# Check schema
df2.printSchema()
# Check type
type(df)
# Show data
df.show(5)
# Label data
tesla_data_file = 'TSLA.csv'
# Create dataframe
tesla_df = sqlContext.read.load(tesla_data_file,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
# Show data
tesla_df.show()
# Label data
google_data_file = 'GOOG.csv'
# Create dataframe
google_df = sqlContext.read.load(google_data_file,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
# Show data
google_df.show()
# Label data
amazon_data_file = 'AMZN.csv'
# Create dataframe
amazon_df = sqlContext.read.load(amazon_data_file,
format='com.databricks.spark.csv',
header='true',
inferSchema='true')
# Show data
amazon_df.show()
# Import libraries
from pyspark.sql.functions import year, month, dayofmonth
import datetime
# Average closing price per year for AMZN
# stocksDF.select(year($"dt").alias("yr"), $"adjcloseprice").groupBy("yr").avg("adjcloseprice").orderBy(desc("yr")).show
amazon_df.select(year("Date").alias("year"), "AdjClose").groupby("year").avg("AdjClose").sort("year").show()
# Compute the average closing price per month for apc
# stocksDF.select(year($"dt").alias("yr"),month($"dt").alias("mo"), $"adjcloseprice")
# .groupBy("yr","mo").agg(avg("adjcloseprice")).orderBy(desc("yr"),desc("mo")).show
amazon_df.select(year("Date").alias("year"),
month("Date").alias("month"),
"AdjClose").groupby("year", "month").avg("AdjClose").sort("year", "month").show()
# Register the DataFrames as temp views
amazon_df.registerTempTable("amazon_stocks")
google_df.registerTempTable("google_stocks")
tesla_df.registerTempTable("tesla_stocks")
# Query entry
sqlContext.sql("SELECT * FROM amazon_stocks").show(5)
# Calculate and display the average closing price per month for XOM ordered by year,month
sqlContext.sql("""SELECT year(amazon_stocks.Date) as yr, month(amazon_stocks.Date) as mo, avg(amazon_stocks.AdjClose) from amazon_stocks group By year(amazon_stocks.Date), month(amazon_stocks.Date)""").show()
# When did the closing price for SPY go up or down by more than 2 dollars?
sqlContext.sql("SELECT google_stocks.Date, google_stocks.Open, google_stocks.Close, abs(google_stocks.Close - google_stocks.Open) as spydif FROM google_stocks WHERE abs(google_stocks.Close - google_stocks.Open) > 4 ").show()
# What was the max, min closing price for SPY and XOM by Year?</font>
sqlContext.sql("SELECT year(tesla_stocks.Date) as yr, max(tesla_stocks.AdjClose), min(tesla_stocks.AdjClose) FROM tesla_stocks group By year(tesla_stocks.Date)").show()
# Check physical plan
sqlContext.sql("SELECT year(tesla_stocks.Date) as yr, max(tesla_stocks.AdjClose), min(tesla_stocks.AdjClose) FROM tesla_stocks group By year(tesla_stocks.Date)").explain()
# Join all stock closing prices in order to compare
joinclose=sqlContext.sql("SELECT tesla_stocks.Date, tesla_stocks.AdjClose as teslaclose, amazon_stocks.AdjClose as amazonclose, google_stocks.AdjClose as googleclose from tesla_stocks join google_stocks on tesla_stocks.Date = google_stocks.Date join amazon_stocks on tesla_stocks.Date = amazon_stocks.Date").cache()
# Show data frame
joinclose.show()
# Create temporary table
joinclose.registerTempTable("joinclose")
# Check average close price by year
sqlContext.sql("SELECT year(joinclose.Date) as yr, avg(joinclose.teslaclose) as teslaclose, avg(joinclose.amazonclose) as amazonclose, avg(joinclose.googleclose) as googleclose from joinclose group By year(joinclose.Date) order by year(joinclose.Date)").show()
# Save as parquet file
joinclose.write.format("parquet").save("joinstocks.parquet")
# Import parquet file beck to data frame
final_df = sqlContext.read.parquet("joinstocks.parquet")
# Show data frame
final_df.show()
# Check schema
final_df.printSchema()
# Average of each by month
final_df.select(year("Date").alias("year"),
month("Date").alias("month"),
"teslaclose", "amazonclose",
"googleclose").groupby("year",
"month").avg("teslaclose",
"amazonclose",
"googleclose").sort("year",
"month").show()
# Check physical plan
final_df.select(year("Date").alias("year"),
month("Date").alias("month"),
"teslaclose", "amazonclose",
"googleclose").groupby("year",
"month").avg("teslaclose",
"amazonclose",
"googleclose").sort("year",
"month").explain()