PySpark - Using DataFrames

Spark DataFrames

Previously we looked at RDDs, and were the primary data set in Spark 1. In Spark 2 we rarely use RDDs only for low level transformations and control over the dataset. If the data is unstructured or streaming data we then have to rely on RDDs, for everything else we will use DataFrames

SparkSession vs. SparkContext

Up until now we have been using the SparkContext as the entry point to Spark. Moving forward, the SparkSession will be the entry point we will utilize

SparkSession offers:

  • Ease of Use
    • SparkSession - simplified entry point
    • No confusion about which context to use
    • Encapsulates SQLContext and Hive Context

To create a SparkSession use SparkSession.builder()

Exploring Data with DataFrames

SparkSession and DataFrames

from pyspark.sql import SparkSession

Import the neccessary libraries

spark = SparkSession.builder\
                    .appName("Analyzing London Crime Data")\
                    .getOrCreate

Build a new SparkSession and assign it a name if a session with the same name does not exist. Otherwise, return the existing session with this app name. This will be the entry point to the Spark engine

data = spark.read\	# (1)
            .format("csv")\ # (2)
            .option("header", "true")\ # (3)
            .load("../datasets/london_crime_by_lsoa.csv") # (4)
  1. .read() returns a DataFrameReader that can be used to read non-streaming data in as a DataFrame.
  2. .format(...) sets the file format to be read
  3. .options(...) adds input options for the underlying data source
  4. .load(...) loads input in as a DataFrame from a data source
data.printSchema()

Remember DataFrames are always structured data. Using .printSchema will print the schema of the tabular data

data.count()

We can see the number of rows in this DataFrame

data.limit(5).show()

Examine data by looking at the fist 5 rows with the .show() function

Drop and Select Columns

data.dropna()

Drop rows which have values that are not available (N/A). As we know is a major part of data cleaning

data = data.drop('lsoa_code')

data.show(5)

To drop a column we can use .drop(...) and pass a column name as a string value.

total_boroughs = data.select('borough')\ # (1)
                    .distinct() # (2)
total_boroughs.show()
  1. Select a column from the DataFrame
  2. Select only distinct values
total_boroughs.count()

The number of distinct values for this column from the DataFrame

Filter Records

hackney_data = data.filter(data['borough'] == 'Hackney')
hackney_data.show(5)

Using .filter(...) we can filter records using columns their data

# (1)
data_2015_2016 = data.filter(data['year'].isin(["2015", "2016"]))

# (2)
data_2015_2016.sample(fraction=0.1).show()
  1. Notice the use of .isin(...) withing the filter parameters. This will select the records with column values in the range of the parameters passed into it
  2. .sample(...) returns a sampled subset of this DataFrame. fraction determines the fraction size of the full DataFrame to return
data_2014_onwards = data.filter(data['year'] >=2014)

data_2014_onwards.sample(fraction=0.1).show()

Another example of .filter(...) using >= comparator to select records with a coloumn value greater or equal to the value passed

Aggregations and grouping

borough_crime_count = data.groupBy('borough')\
                            .count()

borough_crime_count.show(5)

DataFrames support grouping of data, with the .groupBy(...) function. .groupBy(...) can be used on any column

borough_crime_count = data.groupBy('borough')\
                            .agg({"value":"sum"})

borough_crime_count.show(5)

.agg(...) is a function that will compute aggregates and return the result as a DataFrame.

Built-in aggregation functions:

  • avg
  • max
  • min
  • sum
  • count
borough_convictin_sum = data.groupBy('borough')\
                            .agg({"value":"sum"})\
                            .withColumnRenamed('sum(value)','convictions')

borough_convictin_sum.show(5)

Using .withColumnRenamed(<original name>, <new name>) will result in the column name being replaced by a new name

total_borough_convictions = borough_conviction_sum.agg({'convictions':'sum'})

total_borough_convictions.show()

By removing the grouping function, the aggregate will act on the whole DataFrame

total_convictions = total_borough_convictions.collect()[0][0]

Using matrix notation we can grab the value out of the collectiona and assign it to variable

import pyspark.sql.functions as fun

Imports some extra functionality from the PySpark library

borough_percentage_contribution = borough_conviction_sum.withColumn(
    '% contribution',
    func.round(borough_conviction_sum.convictions / total_convictions * 100, 2))

borough_percentage_contribution.printSchema()

Here we create a new column and use the previous variable to calculate the new column value

borough_percentage_contribution.orderBy(borough_percentage_contribution[2].desc())\
                                .show(10)

we can use .orderBy(...) and a column index to transform the DataFrame by ascending and descending order

conviction_monthly = data.filter(data['year'] == 2014)\
                            .groupBy('month')\
                            .agg({'value':'sum'})\
                            .withColumnRenamed('sum(value)', 'convictions')

Here we use a combination of group by, aggregate and column renaming to extract the data

total_conviction_monthly = conviction_monthly.agg({'convictions':'sum'})\
                                            .collect()[0][0]

total_conviction_monthly = conviction_monthly.withColumn(
                'percent',
                func.round(conviction_monthly.convictions/total_conviction_monthly * 100, 2))
total_conviction_monthly.columns

Now we use more transformations to alter the data more and print the resulting DataFrame columns

total_conviction_monthly.orderBy(total_conviction_monthly.percent.desc()).show()

Finally, we order the resulting DataFrame and display

Aggregations and Visualizations

crimes_category = data.groupBy('major_category')\
                        .agg({'value':'sum'})\
                        .withColumnRenamed('sum(value)','convictions')

use group by and aggregates to create a DataFrame

crimes_category.orderBy(crimes_category.convictions.desc()).show()

Order and display the new DataFrame

year_df = data.select('year')

Create a new DataFrame from one column

year_df.agg({'year':'min'}).show()

Use the min aggregate to return the minimum value

year_df.agg({'year':'max'}).show()

Use the max aggregate to reutrn the maximum value

year_df.describe().show()

.describe() will return:

  • count
  • mean
  • standard deviation
  • min
  • max
data.crosstab('borough', 'major_category')\
    .select('borough_major_category', 'Burglary', 'Drugs', 'Fraud or Forgery', 'Robbery')\
    .show()

.crosstab(...) computes a pair-wise frequency table of the given columns. Also known as a contingency table.

get_ipython().magic('matplotlib inline')
import matplotlib.pyplot as plt
plt.style.use('ggplot')

Matplotlib graphs displayed inline on this notebook

def describe_year(year):
    yearly_details = data.filter(data.year == year)\
                        .groupBy('borough')\
                        .agg({'value':'sum'})\
                        .withColumnRenamed('sum(value)', 'convictions')
    
    borough_list = [x[0] for x in yearly_details.toLocalIterator()]
    convictions_list = [x[1] for x in yearly_details.toLocalIterator()]
    
    plt.figure(figsize=(33,10))
    plt.bar(borough_list, convictions_list)
    
    plt.title('Crime for the year: ' + year, fontsize=30)
    plt.xlabel('Boroughs',fontsize=30)
    plt.ylabel('Convictions', fontsize=30)
    
    plt.xticks(rotation=90, fontsize=30)
    plt.yticks(fontsize=30)
    plt.autoscale()
    plt.show()

This is a helper function to contain all the necessary steps to create the DataFrame based off year and create the chart to visualize it

Extracting Data and User Defined Functions

In this section we will explore using DataFrames to explore and clean data. We will use User Defined Functions to assist us in the process

from pyspark.sql import SparkSession

First import SparkSession

spark = SparkSession.builder\
                .appName('Analyzing soccer players')\
                .getOrCreate()

Create SparkSession instance

players = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load('../datasets/player.csv')

Read in the data source into a DataFrame

players.printSchema()

Look at the schema

players.show(5)

Checkout the first few records

player_attributes = spark.read\
                        .format('csv')\
                        .option('header', 'true')\
                        .load('../datasets/Player_Attributes.csv')

Read in a second CSV data source in a DataFrame

player_attributes.printSchema()

Again, look at the schema

players.count(), player_attributes.count()

Lets view the total record count for each DataFrame

player_attributes.select('player_api_id')\
                .distinct()\
                .count()

Notice that entities from one DataFrame have a many to one relationship with the records of the other data set

players = players.drop('id', 'player_fifa_api_id')
players.columns

Get rid of unwanted data columns

player_attributes = player_attributes.drop(
    'id',
    'player_fifa_api_id',
    'preferred_foot',
    'attacking_work_rate',
    'defensive_work_rate',
    'crossing',
    'jumping',
    'sprint_speed',
    'balance',
    'aggression',
    'short_passing',
    'potential'
)
player_attributes.columns

Get rid of unwanted data colums

player_attributes = player_attributes.dropna()
players = players.dropna()

Remove records with non available data

players.count(), player_attributes.count()

Look at the new data count

User defined functions

from pyspark.sql.functions import udf

Import the User defined functions library

year_extract_udf = udf(lambda date: date.split('-')[0]) # (1)

player_attributes = player_attributes.withColumn( # (2)
    'year',
    year_extract_udf(player_attributes.date)
)
  1. Create an UDF with a lambda function operate on a date value and return the year only
  2. Create a new column for the year and extract the values from the data column using the UDF
player_attributes = player_attributes.drop('date')

Now we can drop the data column, as the year data has been copied to another column

player_attributes.columns

view the new schema

Joining DataFrames

Spark DataFrames can be joined much like SQL tables can be joined. In this section we will join data to create a new DataFrame.

pa_2016 = player_attributes.filter(player_attributes.year == 2016)

Create a new DataFrame from a subset of another DataFrame

pa_2016.count()

View the count

pa_2016.select(pa_2016.player_api_id)\
    .distinct()\
    .count()

Select only destinct value to make sure the unique ids match with the DataFrame we want to join

pa_striker_2016 = pa_2016.groupBy('player_api_id')\
                        .agg({
                            'finishing':'avg',
                            'shot_power':'avg',
                            'acceleration':'avg'
                        })

Since one data set has many records associated with an entity we will group the records by entity id first, then average the values of the columns we are interested in to create a one to one relatioinship

pa_striker_2016.count()

Check that the two DataFrame counts match

pa_striker_2016.show(5)

Take a quick look at the new aggregated data

pa_striker_2016 = pa_striker_2016.withColumnRenamed('avg(finishing)', 'finishing')\
                                 .withColumnRenamed('avg(shot_power)', 'shot_power')\
                                 .withColumnRenamed('avg(acceleration)', 'acceleration')

Rename the columns for readablity

weight_finishing = 1
weight_shot_power = 2
weight_acceleration = 1

total_weight = weight_finishing + weight_shot_power + weight_acceleration

Lets create a weighted grading system to apply more value to some attributes

strikers = pa_striker_2016.withColumn('striker_grade',
                                     (pa_striker_2016.finishing * weight_finishing + \
                                      pa_striker_2016.shot_power * weight_shot_power + \
                                      pa_striker_2016.acceleration * weight_acceleration) / total_weight)

Create a new column and apply the grading syestem to caluculate the each row value

strikers = strikers.drop('finishing',
                         'acceleration',
                         'shot_power')

Remove uneeded fields

strikers = strikers.filter(strikers.striker_grade > 70)\
                    .sort(strikers.striker_grade.desc())

strikers.show(10)

Drop lower grades from the dataset

strikers.count(), players.count()

See how many entities we have left

striker_details = players.join(strikers, players.player_api_id == strikers.player_api_id)

Now we can join the two DataFrames

striker_details.columns

View the columns, and take note the double both join fields

striker_details.count()

Check that count is inline with before

striker_details = players.join(strikers, ['player_api_id'])

Alternate way to join

striker_details.show(5)

view the data

striker_details.columns

View the columns, and take note the single join column

Saving DataFrames to CSV and JSON

Saving to file is pretty straight forward

CSV

striker_details.select("player_name", "striker_grade")\  # (1)
                .coalesce(1)\  # (2)
                .write\  # (3)
                .option('header', 'true')\  # (4)
                .csv('striker_grade.csv')  # (5)
  1. Select the columns to export
  2. how many files to break the data into
  3. Begin the write command
  4. Any options to apply
  5. File format and file name

JSON

striker_details.select("player_name", "striker_grade")\
                .write\
                .json('striker_grade.json')

Going Further with Joins

Here we will cover other ways to join DataFrames

valuesA = [('John', 100000), ('James', 150000), ('Emily', 65000), ('Nina', 200000)]
tableA = spark.createDataFrame(valuesA, ['name', 'salary'])

Create a DataFrame from a list of tuples

tableA.show()

View DataFrame

valuesB = [('James', 2), ('Emily',3), ('Darth Vader', 5), ('Princess Leia', 6)]

tableB = spark.createDataFrame(valuesB, ['name', 'employee_id'])

Create a second DataFrame

tableB.show()

View DataFrame

inner_join = tableA.join(tableB, tableA.name == tableB.name)
inner_join.show()

This is the behavior that we have seen previously

left_join = tableA.join(tableB, tableA.name == tableB.name, how='left')
left_join.show()

using the how parameter to explicitly declare the type of join

right_join = tableA.join(tableB, tableA.name == tableB.name, how='right')
right_join.show()

Outer join right

full_outer_join = tableA.join(tableB, tableA.name == tableB.name, how='full')
full_outer_join.show()

Full outer join