PySpark - Using DataFrames
Spark DataFrames
Previously we looked at RDDs, and were the primary data set in Spark 1. In Spark 2 we rarely use RDDs only for low level transformations and control over the dataset. If the data is unstructured or streaming data we then have to rely on RDDs, for everything else we will use DataFrames
SparkSession vs. SparkContext
Up until now we have been using the SparkContext as the entry point to Spark. Moving forward, the SparkSession will be the entry point we will utilize
SparkSession offers:
- Ease of Use
- SparkSession - simplified entry point
- No confusion about which context to use
- Encapsulates SQLContext and Hive Context
To create a SparkSession use SparkSession.builder()
Exploring Data with DataFrames
SparkSession and DataFrames
from pyspark.sql import SparkSession
Import the neccessary libraries
spark = SparkSession.builder\
.appName("Analyzing London Crime Data")\
.getOrCreate
Build a new SparkSession and assign it a name if a session with the same name does not exist. Otherwise, return the existing session with this app name. This will be the entry point to the Spark engine
data = spark.read\ # (1)
.format("csv")\ # (2)
.option("header", "true")\ # (3)
.load("../datasets/london_crime_by_lsoa.csv") # (4)
.read()
returns a DataFrameReader that can be used to read non-streaming data in as a DataFrame..format(...)
sets the file format to be read.options(...)
adds input options for the underlying data source.load(...)
loads input in as a DataFrame from a data source
data.printSchema()
Remember DataFrames are always structured data. Using .printSchema
will print the schema of the tabular data
data.count()
We can see the number of rows in this DataFrame
data.limit(5).show()
Examine data by looking at the fist 5 rows with the .show()
function
Drop and Select Columns
data.dropna()
Drop rows which have values that are not available (N/A). As we know is a major part of data cleaning
data = data.drop('lsoa_code')
data.show(5)
To drop a column we can use .drop(...)
and pass a column name as a string value.
total_boroughs = data.select('borough')\ # (1)
.distinct() # (2)
total_boroughs.show()
- Select a column from the DataFrame
- Select only distinct values
total_boroughs.count()
The number of distinct values for this column from the DataFrame
Filter Records
hackney_data = data.filter(data['borough'] == 'Hackney')
hackney_data.show(5)
Using .filter(...)
we can filter records using columns their data
# (1)
data_2015_2016 = data.filter(data['year'].isin(["2015", "2016"]))
# (2)
data_2015_2016.sample(fraction=0.1).show()
- Notice the use of
.isin(...)
withing the filter parameters. This will select the records with column values in the range of the parameters passed into it .sample(...)
returns a sampled subset of this DataFrame.fraction
determines the fraction size of the full DataFrame to return
data_2014_onwards = data.filter(data['year'] >=2014)
data_2014_onwards.sample(fraction=0.1).show()
Another example of .filter(...)
using >=
comparator to select records with a coloumn value greater or equal to the value passed
Aggregations and grouping
borough_crime_count = data.groupBy('borough')\
.count()
borough_crime_count.show(5)
DataFrames support grouping of data, with the .groupBy(...)
function. .groupBy(...)
can be used on any column
borough_crime_count = data.groupBy('borough')\
.agg({"value":"sum"})
borough_crime_count.show(5)
.agg(...)
is a function that will compute aggregates and return the result as a DataFrame.
Built-in aggregation functions:
- avg
- max
- min
- sum
- count
borough_convictin_sum = data.groupBy('borough')\
.agg({"value":"sum"})\
.withColumnRenamed('sum(value)','convictions')
borough_convictin_sum.show(5)
Using .withColumnRenamed(<original name>, <new name>)
will result in the column name being replaced by a new name
total_borough_convictions = borough_conviction_sum.agg({'convictions':'sum'})
total_borough_convictions.show()
By removing the grouping function, the aggregate will act on the whole DataFrame
total_convictions = total_borough_convictions.collect()[0][0]
Using matrix notation we can grab the value out of the collectiona and assign it to variable
import pyspark.sql.functions as fun
Imports some extra functionality from the PySpark library
borough_percentage_contribution = borough_conviction_sum.withColumn(
'% contribution',
func.round(borough_conviction_sum.convictions / total_convictions * 100, 2))
borough_percentage_contribution.printSchema()
Here we create a new column and use the previous variable to calculate the new column value
borough_percentage_contribution.orderBy(borough_percentage_contribution[2].desc())\
.show(10)
we can use .orderBy(...)
and a column index to transform the DataFrame by ascending and descending order
conviction_monthly = data.filter(data['year'] == 2014)\
.groupBy('month')\
.agg({'value':'sum'})\
.withColumnRenamed('sum(value)', 'convictions')
Here we use a combination of group by, aggregate and column renaming to extract the data
total_conviction_monthly = conviction_monthly.agg({'convictions':'sum'})\
.collect()[0][0]
total_conviction_monthly = conviction_monthly.withColumn(
'percent',
func.round(conviction_monthly.convictions/total_conviction_monthly * 100, 2))
total_conviction_monthly.columns
Now we use more transformations to alter the data more and print the resulting DataFrame columns
total_conviction_monthly.orderBy(total_conviction_monthly.percent.desc()).show()
Finally, we order the resulting DataFrame and display
Aggregations and Visualizations
crimes_category = data.groupBy('major_category')\
.agg({'value':'sum'})\
.withColumnRenamed('sum(value)','convictions')
use group by and aggregates to create a DataFrame
crimes_category.orderBy(crimes_category.convictions.desc()).show()
Order and display the new DataFrame
year_df = data.select('year')
Create a new DataFrame from one column
year_df.agg({'year':'min'}).show()
Use the min aggregate to return the minimum value
year_df.agg({'year':'max'}).show()
Use the max aggregate to reutrn the maximum value
year_df.describe().show()
.describe()
will return:
- count
- mean
- standard deviation
- min
- max
data.crosstab('borough', 'major_category')\
.select('borough_major_category', 'Burglary', 'Drugs', 'Fraud or Forgery', 'Robbery')\
.show()
.crosstab(...)
computes a pair-wise frequency table of the given columns. Also known as a contingency table.
get_ipython().magic('matplotlib inline')
import matplotlib.pyplot as plt
plt.style.use('ggplot')
Matplotlib graphs displayed inline on this notebook
def describe_year(year):
yearly_details = data.filter(data.year == year)\
.groupBy('borough')\
.agg({'value':'sum'})\
.withColumnRenamed('sum(value)', 'convictions')
borough_list = [x[0] for x in yearly_details.toLocalIterator()]
convictions_list = [x[1] for x in yearly_details.toLocalIterator()]
plt.figure(figsize=(33,10))
plt.bar(borough_list, convictions_list)
plt.title('Crime for the year: ' + year, fontsize=30)
plt.xlabel('Boroughs',fontsize=30)
plt.ylabel('Convictions', fontsize=30)
plt.xticks(rotation=90, fontsize=30)
plt.yticks(fontsize=30)
plt.autoscale()
plt.show()
This is a helper function to contain all the necessary steps to create the DataFrame based off year and create the chart to visualize it
Extracting Data and User Defined Functions
In this section we will explore using DataFrames to explore and clean data. We will use User Defined Functions to assist us in the process
from pyspark.sql import SparkSession
First import SparkSession
spark = SparkSession.builder\
.appName('Analyzing soccer players')\
.getOrCreate()
Create SparkSession instance
players = spark.read\
.format('csv')\
.option('header', 'true')\
.load('../datasets/player.csv')
Read in the data source into a DataFrame
players.printSchema()
Look at the schema
players.show(5)
Checkout the first few records
player_attributes = spark.read\
.format('csv')\
.option('header', 'true')\
.load('../datasets/Player_Attributes.csv')
Read in a second CSV data source in a DataFrame
player_attributes.printSchema()
Again, look at the schema
players.count(), player_attributes.count()
Lets view the total record count for each DataFrame
player_attributes.select('player_api_id')\
.distinct()\
.count()
Notice that entities from one DataFrame have a many to one relationship with the records of the other data set
players = players.drop('id', 'player_fifa_api_id')
players.columns
Get rid of unwanted data columns
player_attributes = player_attributes.drop(
'id',
'player_fifa_api_id',
'preferred_foot',
'attacking_work_rate',
'defensive_work_rate',
'crossing',
'jumping',
'sprint_speed',
'balance',
'aggression',
'short_passing',
'potential'
)
player_attributes.columns
Get rid of unwanted data colums
player_attributes = player_attributes.dropna()
players = players.dropna()
Remove records with non available data
players.count(), player_attributes.count()
Look at the new data count
User defined functions
from pyspark.sql.functions import udf
Import the User defined functions library
year_extract_udf = udf(lambda date: date.split('-')[0]) # (1)
player_attributes = player_attributes.withColumn( # (2)
'year',
year_extract_udf(player_attributes.date)
)
- Create an UDF with a lambda function operate on a date value and return the year only
- Create a new column for the year and extract the values from the data column using the UDF
player_attributes = player_attributes.drop('date')
Now we can drop the data column, as the year data has been copied to another column
player_attributes.columns
view the new schema
Joining DataFrames
Spark DataFrames can be joined much like SQL tables can be joined. In this section we will join data to create a new DataFrame.
pa_2016 = player_attributes.filter(player_attributes.year == 2016)
Create a new DataFrame from a subset of another DataFrame
pa_2016.count()
View the count
pa_2016.select(pa_2016.player_api_id)\
.distinct()\
.count()
Select only destinct value to make sure the unique ids match with the DataFrame we want to join
pa_striker_2016 = pa_2016.groupBy('player_api_id')\
.agg({
'finishing':'avg',
'shot_power':'avg',
'acceleration':'avg'
})
Since one data set has many records associated with an entity we will group the records by entity id first, then average the values of the columns we are interested in to create a one to one relatioinship
pa_striker_2016.count()
Check that the two DataFrame counts match
pa_striker_2016.show(5)
Take a quick look at the new aggregated data
pa_striker_2016 = pa_striker_2016.withColumnRenamed('avg(finishing)', 'finishing')\
.withColumnRenamed('avg(shot_power)', 'shot_power')\
.withColumnRenamed('avg(acceleration)', 'acceleration')
Rename the columns for readablity
weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1
total_weight = weight_finishing + weight_shot_power + weight_acceleration
Lets create a weighted grading system to apply more value to some attributes
strikers = pa_striker_2016.withColumn('striker_grade',
(pa_striker_2016.finishing * weight_finishing + \
pa_striker_2016.shot_power * weight_shot_power + \
pa_striker_2016.acceleration * weight_acceleration) / total_weight)
Create a new column and apply the grading syestem to caluculate the each row value
strikers = strikers.drop('finishing',
'acceleration',
'shot_power')
Remove uneeded fields
strikers = strikers.filter(strikers.striker_grade > 70)\
.sort(strikers.striker_grade.desc())
strikers.show(10)
Drop lower grades from the dataset
strikers.count(), players.count()
See how many entities we have left
striker_details = players.join(strikers, players.player_api_id == strikers.player_api_id)
Now we can join the two DataFrames
striker_details.columns
View the columns, and take note the double both join fields
striker_details.count()
Check that count is inline with before
striker_details = players.join(strikers, ['player_api_id'])
Alternate way to join
striker_details.show(5)
view the data
striker_details.columns
View the columns, and take note the single join column
Saving DataFrames to CSV and JSON
Saving to file is pretty straight forward
CSV
striker_details.select("player_name", "striker_grade")\ # (1)
.coalesce(1)\ # (2)
.write\ # (3)
.option('header', 'true')\ # (4)
.csv('striker_grade.csv') # (5)
- Select the columns to export
- how many files to break the data into
- Begin the write command
- Any options to apply
- File format and file name
JSON
striker_details.select("player_name", "striker_grade")\
.write\
.json('striker_grade.json')
Going Further with Joins
Here we will cover other ways to join DataFrames
valuesA = [('John', 100000), ('James', 150000), ('Emily', 65000), ('Nina', 200000)]
tableA = spark.createDataFrame(valuesA, ['name', 'salary'])
Create a DataFrame from a list of tuples
tableA.show()
View DataFrame
valuesB = [('James', 2), ('Emily',3), ('Darth Vader', 5), ('Princess Leia', 6)]
tableB = spark.createDataFrame(valuesB, ['name', 'employee_id'])
Create a second DataFrame
tableB.show()
View DataFrame
inner_join = tableA.join(tableB, tableA.name == tableB.name)
inner_join.show()
This is the behavior that we have seen previously
left_join = tableA.join(tableB, tableA.name == tableB.name, how='left')
left_join.show()
using the how
parameter to explicitly declare the type of join
right_join = tableA.join(tableB, tableA.name == tableB.name, how='right')
right_join.show()
Outer join right
full_outer_join = tableA.join(tableB, tableA.name == tableB.name, how='full')
full_outer_join.show()
Full outer join