Exploring Stock Prices Using Spark SQL

Introduction

In this project, distributed programming using Spark will be implemented and then derive knowledge from data. The data will be loaded for analysis, and will be explored by using Spark RDD, Spark DataFrames. Explored more and performed statistical analysis on Stock prices datasets using Apache Spark SQL and Spark DataFrame API. Lastly, a parquet tables will be created and then store their results in them.

Apache Spark

  • Open source distributed cluster computing framework.
  • Provides an interface for programming entire clusters with implicit parallelism and fault tolerance

Use cases:

  • Interactive data analysis at scale
  • Streaming data analysis
  • Machine learning pipelines
  • ETL (Extract-Transform-Load)

Dataset:

  • Tesla Stock Prices
  • Amazon Stock Prices
  • Google Stock Prices

Source:

  • Yahoo Finance

Spark Application And Spark Context

In [3]:
# Import library
from pyspark import SparkContext

# Create shorcut term
sc = SparkContext.getOrCreate()
In [4]:
# Check type
type(sc)
Out[4]:
pyspark.context.SparkContext

Load data into spark

In [5]:
# Define collections of numbers
data = [1, 2, 3, 4, 5]

# Parallel RDD
distData = sc.parallelize(data)
In [6]:
# Import library
from pyspark.sql import SQLContext

# label SQL Context
sqlContext = SQLContext(sc)
In [7]:
# Create list
l = [('Alice', 1)]

# Create dataframe
df = sqlContext.createDataFrame(l, ['name', 'age'])

# Show data
df.collect()
Out[7]:
[Row(name='Alice', age=1)]
In [8]:
# Label data
data_file = 'TSLA.csv'

# Read the csv
raw_rdd = sc.textFile(data_file).cache()

# Return the five element in the data
raw_rdd.take(5)
Out[8]:
['Date,Open,High,Low,Close,AdjClose,Volume',
 '2019-07-15,248.000000,254.419998,244.860001,253.500000,253.500000,11000100',
 '2019-07-16,249.300003,253.529999,247.929993,252.380005,252.380005,8149000',
 '2019-07-17,255.669998,258.309998,253.350006,254.860001,254.860001,9764700',
 '2019-07-18,255.050003,255.750000,251.889999,253.539993,253.539993,4764500']
In [9]:
# Check type
type(raw_rdd)
Out[9]:
pyspark.rdd.RDD
In [10]:
# Apply map transformation
csv_rdd = raw_rdd.map(lambda row: row.split(","))

# Show data
print(csv_rdd.take(2))

# Check data type
print(type(csv_rdd))
[['Date', 'Open', 'High', 'Low', 'Close', 'AdjClose', 'Volume'], ['2019-07-15', '248.000000', '254.419998', '244.860001', '253.500000', '253.500000', '11000100']]
<class 'pyspark.rdd.PipelinedRDD'>
In [11]:
# Check length
len(csv_rdd.take(1)[0])
Out[11]:
7

Spark DataFrames

- Immutable distributed collections of data
- Data organized into named columns

Operations:

  • Filter
  • Group By
  • Compute Aggregation
  • SQL Queries
In [12]:
# Create data frame
df = sqlContext.read.load(data_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
In [13]:
# Check count
df.count()
Out[13]:
253
In [14]:
# Check data
df.take(5)
Out[14]:
[Row(Date='2019-07-15', Open=248.0, High=254.419998, Low=244.860001, Close=253.5, AdjClose=253.5, Volume=11000100),
 Row(Date='2019-07-16', Open=249.300003, High=253.529999, Low=247.929993, Close=252.380005, AdjClose=252.380005, Volume=8149000),
 Row(Date='2019-07-17', Open=255.669998, High=258.309998, Low=253.350006, Close=254.860001, AdjClose=254.860001, Volume=9764700),
 Row(Date='2019-07-18', Open=255.050003, High=255.75, Low=251.889999, Close=253.539993, AdjClose=253.539993, Volume=4764500),
 Row(Date='2019-07-19', Open=255.690002, High=259.959991, Low=254.619995, Close=258.179993, AdjClose=258.179993, Volume=7048400)]
In [15]:
# Check schema
df.printSchema()
root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- AdjClose: double (nullable = true)
 |-- Volume: integer (nullable = true)

In [16]:
# Import library
import pandas

# Check pandas dataframe
df.toPandas().head(5)
Out[16]:
Date Open High Low Close AdjClose Volume
0 2019-07-15 248.000000 254.419998 244.860001 253.500000 253.500000 11000100
1 2019-07-16 249.300003 253.529999 247.929993 252.380005 252.380005 8149000
2 2019-07-17 255.669998 258.309998 253.350006 254.860001 254.860001 9764700
3 2019-07-18 255.050003 255.750000 251.889999 253.539993 253.539993 4764500
4 2019-07-19 255.690002 259.959991 254.619995 258.179993 258.179993 7048400

Explore And Query Data

In [17]:
# Import library
from pyspark.sql.types import *

# Set up structure type
schema = StructType([
   StructField("date", StringType(), True),
   StructField("openprice", IntegerType(), True),
   StructField("highprice", IntegerType(), True),
   StructField("lowprice", IntegerType(), True),
   StructField("closeprice", IntegerType(), True),
   StructField("volume", IntegerType(), True),
   StructField("adjcloseprice", IntegerType(), True)])

# Create dataframe
df2 = sqlContext.read.load(data_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      schema=schema)
In [18]:
# Check dataframe
df2.take(2)
Out[18]:
[Row(date='2019-07-15', openprice=None, highprice=None, lowprice=None, closeprice=None, volume=None, adjcloseprice=11000100),
 Row(date='2019-07-16', openprice=None, highprice=None, lowprice=None, closeprice=None, volume=None, adjcloseprice=8149000)]
In [19]:
# Check schema
df2.printSchema()
root
 |-- date: string (nullable = true)
 |-- openprice: integer (nullable = true)
 |-- highprice: integer (nullable = true)
 |-- lowprice: integer (nullable = true)
 |-- closeprice: integer (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjcloseprice: integer (nullable = true)

In [20]:
# Check type
type(df)
Out[20]:
pyspark.sql.dataframe.DataFrame
In [21]:
# Show data
df.show(5)
+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2019-07-15|     248.0|254.419998|244.860001|     253.5|     253.5|11000100|
|2019-07-16|249.300003|253.529999|247.929993|252.380005|252.380005| 8149000|
|2019-07-17|255.669998|258.309998|253.350006|254.860001|254.860001| 9764700|
|2019-07-18|255.050003|    255.75|251.889999|253.539993|253.539993| 4764500|
|2019-07-19|255.690002|259.959991|254.619995|258.179993|258.179993| 7048400|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows

In [22]:
# Label data
tesla_data_file = 'TSLA.csv'

# Create dataframe
tesla_df = sqlContext.read.load(tesla_data_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
# Show data
tesla_df.show()
+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close|  AdjClose|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2019-07-15|     248.0|254.419998|244.860001|     253.5|     253.5|11000100|
|2019-07-16|249.300003|253.529999|247.929993|252.380005|252.380005| 8149000|
|2019-07-17|255.669998|258.309998|253.350006|254.860001|254.860001| 9764700|
|2019-07-18|255.050003|    255.75|251.889999|253.539993|253.539993| 4764500|
|2019-07-19|255.690002|259.959991|254.619995|258.179993|258.179993| 7048400|
|2019-07-22|    258.75|262.149994|254.190002|255.679993|255.679993| 6842400|
|2019-07-23|256.709991|260.480011|     254.5|260.170013|260.170013| 5023100|
|2019-07-24|259.170013|266.070007|258.160004|264.880005|264.880005|11072800|
|2019-07-25|     233.5|     234.5|225.550003|228.820007|228.820007|22418300|
|2019-07-26|226.919998|230.259995|    222.25|228.039993|228.039993|10027700|
|2019-07-29|227.089996|235.940002|226.029999|235.770004|235.770004| 9273300|
|2019-07-30|232.899994|243.360001|232.179993|242.259995|242.259995| 8109000|
|2019-07-31|     243.0|246.679993|236.649994|241.610001|241.610001| 9178200|
|2019-08-01|242.649994|244.509995|231.770004|233.850006|233.850006| 8259500|
|2019-08-02|231.350006|236.270004|229.229996|234.339996|234.339996| 6136500|
|2019-08-05|229.600006|231.369995|225.779999|228.320007|228.320007| 7028300|
|2019-08-06|231.880005|     232.5|    225.75|    230.75|    230.75| 5564200|
|2019-08-07|     226.5|233.570007|225.800003|233.419998|233.419998| 4776500|
|2019-08-08|234.449997|239.800003|232.649994|238.300003|238.300003| 5274300|
|2019-08-09|236.050003|238.960007|233.809998|235.009995|235.009995| 3898200|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 20 rows

In [23]:
# Label data
google_data_file = 'GOOG.csv'

# Create dataframe
google_df = sqlContext.read.load(google_data_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

# Show data
google_df.show()
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|1146.859985|1150.819946|1139.400024|1150.339966|1150.339966| 903800|
|2019-07-16|     1146.0|1158.579956|     1145.0|1153.579956|1153.579956|1238800|
|2019-07-17|1150.969971|1158.359985| 1145.77002|1146.349976|1146.349976|1170000|
|2019-07-18| 1141.73999| 1147.60498| 1132.72998|1146.329956|1146.329956|1291300|
|2019-07-19|1148.189941|1151.140015|1129.619995|1130.099976|1130.099976|1647200|
|2019-07-22|1133.449951|    1139.25| 1124.23999|1138.069946|1138.069946|1301500|
|2019-07-23|     1144.0|1146.900024|1131.800049|1146.209961|1146.209961|1093700|
|2019-07-24|1131.900024|     1144.0| 1126.98999|1137.810059|1137.810059|1589800|
|2019-07-25|1137.819946|1141.699951|1120.920044|1132.119995|1132.119995|2209800|
|2019-07-26|1224.040039|1265.550049|     1224.0|1250.410034|1250.410034|4805800|
|2019-07-29|1241.050049|1247.369995| 1228.22998|1239.410034|1239.410034|2223700|
|2019-07-30|1225.410034|1234.869995|1223.300049|1225.140015|1225.140015|1453300|
|2019-07-31|     1223.0|     1234.0|1207.764038|1216.680054|1216.680054|1725500|
|2019-08-01|1214.030029|1234.109985|1205.719971| 1209.01001| 1209.01001|1698500|
|2019-08-02| 1200.73999|1206.900024|1188.939941| 1193.98999| 1193.98999|1645100|
|2019-08-05|1170.040039| 1175.23999|1140.140015|1152.319946|1152.319946|2597500|
|2019-08-06|1163.310059|1179.959961|     1160.0|1169.949951|1169.949951|1709400|
|2019-08-07|     1156.0|1178.444946|1149.624023| 1173.98999| 1173.98999|1444300|
|2019-08-08|1182.829956| 1205.01001| 1173.02002|1204.800049|1204.800049|1468000|
|2019-08-09| 1197.98999|1203.880005|1183.603027| 1188.01001| 1188.01001|1065700|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
only showing top 20 rows

In [24]:
# Label data
amazon_data_file = 'AMZN.csv'

# Create dataframe
amazon_df = sqlContext.read.load(amazon_data_file, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

# Show data
amazon_df.show()
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
|2019-07-17|2007.050049|     2012.0|1992.030029|1992.030029|1992.030029|2558800|
|2019-07-18| 1980.01001|     1987.5|1951.550049|1977.900024|1977.900024|3504300|
|2019-07-19|1991.209961|     1996.0| 1962.22998| 1964.52002| 1964.52002|3185600|
|2019-07-22|1971.140015|     1989.0| 1958.26001|1985.630005|1985.630005|2900000|
|2019-07-23| 1995.98999|1997.790039|1973.130005| 1994.48999| 1994.48999|2703500|
|2019-07-24|1969.300049|2001.300049|1965.869995|2000.810059|2000.810059|2631300|
|2019-07-25|     2001.0|2001.199951|1972.719971|1973.819946|1973.819946|4136500|
|2019-07-26|     1942.0|1950.900024| 1924.51001|1943.050049|1943.050049|4927100|
|2019-07-29|     1930.0| 1932.22998|1890.540039|1912.449951|1912.449951|4493200|
|2019-07-30|1891.119995|1909.890015| 1883.47998|1898.530029|1898.530029|2910900|
|2019-07-31|1898.109985|1899.550049|1849.439941|1866.780029|1866.780029|4470700|
|2019-08-01|1871.719971|1897.920044| 1844.01001|1855.319946|1855.319946|4713300|
|2019-08-02|1845.069946|1846.359985| 1808.02002| 1823.23999| 1823.23999|4956200|
|2019-08-05|1770.219971|1788.670044|1748.780029|1765.130005|1765.130005|6058200|
|2019-08-06| 1792.22998| 1793.77002|1753.400024|1787.829956|1787.829956|5070300|
|2019-08-07| 1773.98999|1798.930054|     1757.0|1793.400024|1793.400024|4526900|
|2019-08-08|     1806.0| 1834.26001|1798.109985|1832.890015|1832.890015|3701200|
|2019-08-09|1828.949951|1831.089966|1802.219971|1807.579956|1807.579956|2879800|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
only showing top 20 rows

Spark SQL

In [26]:
# Import libraries
from pyspark.sql.functions import year, month, dayofmonth
import datetime

# Average closing price per year for AMZN
# stocksDF.select(year($"dt").alias("yr"), $"adjcloseprice").groupBy("yr").avg("adjcloseprice").orderBy(desc("yr")).show
amazon_df.select(year("Date").alias("year"), "AdjClose").groupby("year").avg("AdjClose").sort("year").show()
+----+------------------+
|year|     avg(AdjClose)|
+----+------------------+
|2019|1800.6161329411755|
|2020| 2236.414478798507|
+----+------------------+

In [27]:
# Compute the average closing price per month for apc
# stocksDF.select(year($"dt").alias("yr"),month($"dt").alias("mo"), $"adjcloseprice")
# .groupBy("yr","mo").agg(avg("adjcloseprice")).orderBy(desc("yr"),desc("mo")).show
amazon_df.select(year("Date").alias("year"),
                month("Date").alias("month"),
                "AdjClose").groupby("year", "month").avg("AdjClose").sort("year", "month").show()
+----+-----+------------------+
|year|month|     avg(AdjClose)|
+----+-----+------------------+
|2019|    7|1964.6846265384618|
|2019|    8|1793.6027220909093|
|2019|    9|     1799.12099615|
|2019|   10|1752.3317498695653|
|2019|   11|      1774.2939941|
|2019|   12|1785.7728446190476|
|2020|    1|1884.2376128571425|
|2020|    2|2066.1752672631574|
|2020|    3|1872.3104358636365|
|2020|    4|2228.7052408571426|
|2020|    5|2394.1840209499996|
|2020|    6|      2613.5454545|
|2020|    7| 3053.100016222222|
+----+-----+------------------+

In [29]:
# Register the DataFrames as temp views
amazon_df.registerTempTable("amazon_stocks")
google_df.registerTempTable("google_stocks")
tesla_df.registerTempTable("tesla_stocks")
In [30]:
# Query entry
sqlContext.sql("SELECT * FROM amazon_stocks").show(5)
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|      Date|       Open|       High|        Low|      Close|   AdjClose| Volume|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
|2019-07-15|2021.400024|2022.900024|2001.550049| 2020.98999| 2020.98999|2981300|
|2019-07-16|2010.579956|2026.319946|2001.219971|2009.900024|2009.900024|2618200|
|2019-07-17|2007.050049|     2012.0|1992.030029|1992.030029|1992.030029|2558800|
|2019-07-18| 1980.01001|     1987.5|1951.550049|1977.900024|1977.900024|3504300|
|2019-07-19|1991.209961|     1996.0| 1962.22998| 1964.52002| 1964.52002|3185600|
+----------+-----------+-----------+-----------+-----------+-----------+-------+
only showing top 5 rows

In [31]:
# Calculate and display the average closing price per month for XOM ordered by year,month 
sqlContext.sql("""SELECT year(amazon_stocks.Date) as yr, month(amazon_stocks.Date) as mo, avg(amazon_stocks.AdjClose) from amazon_stocks group By year(amazon_stocks.Date), month(amazon_stocks.Date)""").show()
+----+---+------------------+
|  yr| mo|     avg(AdjClose)|
+----+---+------------------+
|2019| 10|1752.3317498695653|
|2020|  6|      2613.5454545|
|2020|  3|1872.3104358636365|
|2019|  8|1793.6027220909093|
|2020|  4|2228.7052408571426|
|2020|  1|1884.2376128571425|
|2019|  9|     1799.12099615|
|2019| 12|1785.7728446190476|
|2020|  7| 3053.100016222222|
|2020|  2|2066.1752672631574|
|2019|  7|1964.6846265384618|
|2019| 11|      1774.2939941|
|2020|  5|2394.1840209499996|
+----+---+------------------+

In [32]:
# When did the closing price for SPY go up or down by more than 2 dollars?
sqlContext.sql("SELECT google_stocks.Date, google_stocks.Open, google_stocks.Close, abs(google_stocks.Close - google_stocks.Open) as spydif FROM google_stocks WHERE abs(google_stocks.Close - google_stocks.Open) > 4 ").show()
+----------+-----------+-----------+------------------+
|      Date|       Open|      Close|            spydif|
+----------+-----------+-----------+------------------+
|2019-07-16|     1146.0|1153.579956| 7.579956000000038|
|2019-07-17|1150.969971|1146.349976| 4.619995000000017|
|2019-07-18| 1141.73999|1146.329956| 4.589966000000004|
|2019-07-19|1148.189941|1130.099976| 18.08996500000012|
|2019-07-22|1133.449951|1138.069946| 4.619995000000017|
|2019-07-24|1131.900024|1137.810059|  5.91003499999988|
|2019-07-25|1137.819946|1132.119995|5.6999510000000555|
|2019-07-26|1224.040039|1250.410034|26.369995000000017|
|2019-07-31|     1223.0|1216.680054| 6.319946000000073|
|2019-08-01|1214.030029| 1209.01001|5.0200190000000475|
|2019-08-02| 1200.73999| 1193.98999|              6.75|
|2019-08-05|1170.040039|1152.319946|17.720092999999906|
|2019-08-06|1163.310059|1169.949951| 6.639892000000145|
|2019-08-07|     1156.0| 1173.98999|17.989990000000034|
|2019-08-08|1182.829956|1204.800049|21.970092999999906|
|2019-08-09| 1197.98999| 1188.01001| 9.979980000000069|
|2019-08-12|1179.209961|1174.709961|               4.5|
|2019-08-13|1171.459961| 1197.27002| 25.81005899999991|
|2019-08-14|1176.310059|1164.290039|12.020019999999931|
|2019-08-19|1190.089966|1198.449951| 8.359985000000052|
+----------+-----------+-----------+------------------+
only showing top 20 rows

In [33]:
# What was the max, min closing price for SPY and XOM by Year?</font> 
sqlContext.sql("SELECT year(tesla_stocks.Date) as yr, max(tesla_stocks.AdjClose), min(tesla_stocks.AdjClose) FROM tesla_stocks group By year(tesla_stocks.Date)").show()
+----+-------------+-------------+
|  yr|max(AdjClose)|min(AdjClose)|
+----+-------------+-------------+
|2019|   430.940002|   211.399994|
|2020|  1544.650024|   361.220001|
+----+-------------+-------------+

In [34]:
# Check physical plan
sqlContext.sql("SELECT year(tesla_stocks.Date) as yr, max(tesla_stocks.AdjClose), min(tesla_stocks.AdjClose) FROM tesla_stocks group By year(tesla_stocks.Date)").explain()
== Physical Plan ==
*(2) HashAggregate(keys=[year(cast(Date#133 as date))#524], functions=[max(AdjClose#138), min(AdjClose#138)])
+- Exchange hashpartitioning(year(cast(Date#133 as date))#524, 200), true, [id=#324]
   +- *(1) HashAggregate(keys=[year(cast(Date#133 as date)) AS year(cast(Date#133 as date))#524], functions=[partial_max(AdjClose#138), partial_min(AdjClose#138)])
      +- FileScan csv [Date#133,AdjClose#138] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/work/TSLA.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Date:string,AdjClose:double>


In [35]:
# Join all stock closing prices in order to compare
joinclose=sqlContext.sql("SELECT tesla_stocks.Date, tesla_stocks.AdjClose as teslaclose, amazon_stocks.AdjClose as amazonclose, google_stocks.AdjClose as googleclose from tesla_stocks join google_stocks on tesla_stocks.Date = google_stocks.Date join amazon_stocks on tesla_stocks.Date = amazon_stocks.Date").cache()

# Show data frame
joinclose.show()

# Create temporary table
joinclose.registerTempTable("joinclose")
+----------+----------+-----------+-----------+
|      Date|teslaclose|amazonclose|googleclose|
+----------+----------+-----------+-----------+
|2019-07-15|     253.5| 2020.98999|1150.339966|
|2019-07-16|252.380005|2009.900024|1153.579956|
|2019-07-17|254.860001|1992.030029|1146.349976|
|2019-07-18|253.539993|1977.900024|1146.329956|
|2019-07-19|258.179993| 1964.52002|1130.099976|
|2019-07-22|255.679993|1985.630005|1138.069946|
|2019-07-23|260.170013| 1994.48999|1146.209961|
|2019-07-24|264.880005|2000.810059|1137.810059|
|2019-07-25|228.820007|1973.819946|1132.119995|
|2019-07-26|228.039993|1943.050049|1250.410034|
|2019-07-29|235.770004|1912.449951|1239.410034|
|2019-07-30|242.259995|1898.530029|1225.140015|
|2019-07-31|241.610001|1866.780029|1216.680054|
|2019-08-01|233.850006|1855.319946| 1209.01001|
|2019-08-02|234.339996| 1823.23999| 1193.98999|
|2019-08-05|228.320007|1765.130005|1152.319946|
|2019-08-06|    230.75|1787.829956|1169.949951|
|2019-08-07|233.419998|1793.400024| 1173.98999|
|2019-08-08|238.300003|1832.890015|1204.800049|
|2019-08-09|235.009995|1807.579956| 1188.01001|
+----------+----------+-----------+-----------+
only showing top 20 rows

In [36]:
# Check average close price by year
sqlContext.sql("SELECT year(joinclose.Date) as yr, avg(joinclose.teslaclose) as teslaclose, avg(joinclose.amazonclose) as amazonclose, avg(joinclose.googleclose) as googleclose from joinclose group By year(joinclose.Date) order by year(joinclose.Date)").show()
+----+------------------+------------------+------------------+
|  yr|        teslaclose|       amazonclose|       googleclose|
+----+------------------+------------------+------------------+
|2019|283.62126070588243|1800.6161329411755|1245.3833654621849|
|2020| 761.8206739179104| 2236.414478798507|1362.8286906865671|
+----+------------------+------------------+------------------+

Save Spark DataFrames

In [37]:
# Save as parquet file
joinclose.write.format("parquet").save("joinstocks.parquet")
In [38]:
# Import parquet file beck to data frame
final_df = sqlContext.read.parquet("joinstocks.parquet")

# Show data frame
final_df.show()
+----------+----------+-----------+-----------+
|      Date|teslaclose|amazonclose|googleclose|
+----------+----------+-----------+-----------+
|2019-07-15|     253.5| 2020.98999|1150.339966|
|2019-07-16|252.380005|2009.900024|1153.579956|
|2019-07-17|254.860001|1992.030029|1146.349976|
|2019-07-18|253.539993|1977.900024|1146.329956|
|2019-07-19|258.179993| 1964.52002|1130.099976|
|2019-07-22|255.679993|1985.630005|1138.069946|
|2019-07-23|260.170013| 1994.48999|1146.209961|
|2019-07-24|264.880005|2000.810059|1137.810059|
|2019-07-25|228.820007|1973.819946|1132.119995|
|2019-07-26|228.039993|1943.050049|1250.410034|
|2019-07-29|235.770004|1912.449951|1239.410034|
|2019-07-30|242.259995|1898.530029|1225.140015|
|2019-07-31|241.610001|1866.780029|1216.680054|
|2019-08-01|233.850006|1855.319946| 1209.01001|
|2019-08-02|234.339996| 1823.23999| 1193.98999|
|2019-08-05|228.320007|1765.130005|1152.319946|
|2019-08-06|    230.75|1787.829956|1169.949951|
|2019-08-07|233.419998|1793.400024| 1173.98999|
|2019-08-08|238.300003|1832.890015|1204.800049|
|2019-08-09|235.009995|1807.579956| 1188.01001|
+----------+----------+-----------+-----------+
only showing top 20 rows

In [39]:
# Check schema
final_df.printSchema()
root
 |-- Date: string (nullable = true)
 |-- teslaclose: double (nullable = true)
 |-- amazonclose: double (nullable = true)
 |-- googleclose: double (nullable = true)

In [40]:
# Average of each by month
final_df.select(year("Date").alias("year"), 
                 month("Date").alias("month"), 
                 "teslaclose", "amazonclose", 
                 "googleclose").groupby("year", 
                                        "month").avg("teslaclose", 
                                                     "amazonclose", 
                                                     "googleclose").sort("year", 
                                                                         "month").show()
                
+----+-----+------------------+------------------+------------------+
|year|month|   avg(teslaclose)|  avg(amazonclose)|  avg(googleclose)|
+----+-----+------------------+------------------+------------------+
|2019|    7|248.43769253846148|1964.6846265384618|1170.1961483076923|
|2019|    8|225.10272704545451|1793.6027220909093|1180.6868120454546|
|2019|    9|237.26149830000003|     1799.12099615|     1220.83952035|
|2019|   10| 266.3547840434783|1752.3317498695653|1232.7117442608696|
|2019|   11|338.30000000000007|      1774.2939941|     1304.27899165|
|2019|   12| 377.6947631904762|1785.7728446190476|1340.8676351904762|
|2020|    1| 528.6590503809524|1884.2376128571425|1436.6537968571424|
|2020|    2| 797.4468415263159|2066.1752672631574|1464.1105184736841|
|2020|    3| 559.1013613181818|1872.3104358636365|1188.3940984545457|
|2020|    4| 663.5985761428572|2228.7052408571426|1234.1404797142854|
|2020|    5|      799.42549745|2394.1840209499996|1381.1137511999998|
|2020|    6| 963.5422779545456|      2613.5454545|1431.0477184545452|
|2020|    7|1378.7111273333333| 3053.100016222222|1496.0299885555555|
+----+-----+------------------+------------------+------------------+

In [41]:
# Check physical plan
final_df.select(year("Date").alias("year"), 
                 month("Date").alias("month"), 
                 "teslaclose", "amazonclose", 
                 "googleclose").groupby("year", 
                                        "month").avg("teslaclose", 
                                                     "amazonclose", 
                                                     "googleclose").sort("year", 
                                                                         "month").explain()
== Physical Plan ==
*(3) Sort [year#886 ASC NULLS FIRST, month#887 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(year#886 ASC NULLS FIRST, month#887 ASC NULLS FIRST, 200), true, [id=#543]
   +- *(2) HashAggregate(keys=[year#886, month#887], functions=[avg(teslaclose#806), avg(amazonclose#807), avg(googleclose#808)])
      +- Exchange hashpartitioning(year#886, month#887, 200), true, [id=#539]
         +- *(1) HashAggregate(keys=[year#886, month#887], functions=[partial_avg(teslaclose#806), partial_avg(amazonclose#807), partial_avg(googleclose#808)])
            +- *(1) Project [year(cast(Date#805 as date)) AS year#886, month(cast(Date#805 as date)) AS month#887, teslaclose#806, amazonclose#807, googleclose#808]
               +- *(1) ColumnarToRow
                  +- FileScan parquet [Date#805,teslaclose#806,amazonclose#807,googleclose#808] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/home/jovyan/work/joinstocks.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Date:string,teslaclose:double,amazonclose:double,googleclose:double>