PySpark - Spark SQL Context

Spark aims to make it easy to work with data. One way they achieve this is by working with spark data as if you were working on a SQL database

  • Spark SQL enables querying of DataFrames as database tables
  • Temporary per-session and global tables
  • The Catalyst optimizer makes SQL queries fast
  • Tables schemas can be inferred or explicitly specified

Basic Operations

from pyspark.sql import SparkSession

Import SparkSession

spark = SparkSession.builder\
                    .appName('Analyzing Students')\
                    .getOrCreate()

Create a new Session

from pyspark.sql.types import Row
from datetime import datetime

Import the some libraries

record = sc.parallelize([Row(id = 1,
                             name = 'Jill',
                            active = True,
                            clubs = ['chess', 'hockey'],
                            subjects = {'math':80, 'english': 56},
                            enrolled = datetime(2014, 8, 1, 14, 1, 5)),
                        Row(id = 2,
                            name = 'George',
                            active = False,
                            clubs = ['chess','soccer'],
                           subjects = {'math': 60, 'english':96},
                           enrolled = datetime(2015, 3, 21, 8, 2, 5))
                        ])

Use paralize(...) to create and RDD made of Row objects that contains a mixture of data types

record_df = record.toDF()
record_df.show()

Create a DataFrame from the RDD

record_df.createOrReplaceTempView('records')

Run this data as SQL we first need to regester the DataFram as a table. The name of the SQL table is records and only exist within this session. Once the session exits the table is also destroyed

all_records_df = sqlContext.sql('SELECT * FROM records')

all_records_df.show()

Using the sqlContext.sql(...) to pass SQL statements to query the table and return a DataFrame

sqlContext.sql('SELECT id, clubs[1], subjects["english"] FROM records').show()

This is using a complex query, returning sebsets of collection data from queried rows

sqlContext.sql('SELECT ID, NOT active FROM records').show()

Logical operators(AND, OR, NOT)

sqlContext.sql('SELECT* FROM records WHERE subjects["english"] > 90').show()

comparison operators are also available (<,>,<=,>=)

record_df.createGlobalTempView('global_records')

In order to make a table accessible to all sessions on the cluster we must register the table as Global

sqlContext.sql('SELECT * FROM global_temp.global_records').show()

In order to access global table view and namespace must be provided with the table name

Analyzing Data with Spark SQL

from pyspark.sql import SparkSession

Import SparkSession

spark = SparkSession.builder\
                    .appName("Analyzing airline data")\
                    .getOrCreate()

Create Session

from pyspark.sql.types import Row
from datetime import datetime

Import some libraries to be used later

airlinesPath = '/Users/froilanmiranda/python-envs/sparktest/datasets/airlines.csv'
flightsPath = '/Users/froilanmiranda/python-envs/sparktest/datasets/flights.csv'
airportsPath = '/Users/froilanmiranda/python-envs/sparktest/datasets/airports.csv'

Create some variable to represent the paths to the data sets

airlines = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load(airlinesPath)

Create a DataFrame from the csv file

airlines.createOrReplaceTempView('airlines')

Register a table view of the DataFrame that is only accessible from this SparkSession

airlines = spark.sql('SELECT * FROM airlines')
airlines.columns

Explore the data by displaying the columns

airlines.show(5)

Continue exploring the data by displaying the first few rows

flights = spark.read\
                .format('csv')\
                .option('header','true')\
                .load(flightsPath)

Read in the next csv as a DataFrame

flights.createOrReplaceTempView('flights')

flights.columns

Register another table view from the DataFrame. Then display the columns to begin exploring the data

flights.show(5)

Explore this data by printing the first few records to the screen

flights.count(), airlines.count()

Get a total record count for each set of data

flights_count = spark.sql('SELECT COUNT(*) FROM flights')
airlines_count = spark.sql('SELECT COUNT(*) FROM airlines')

We can also use sql to get the same data

flights_count, airlines_count

Display the result of the SQL query. Notice the result is a DataFrame

flights_count.collect()[0][0], airlines_count.collect()[0][0]

We can matrix notation to extract particular values from the resulting DataFrame

total_distance_df = spark.sql('SELECT distance FROM flights')\ # (1)
                        .agg({'distance':'sum'})\ # (2)
                        .withColumnRenamed('sum(distance)',  'total_distance') # (3)

Mixing of DataFrame and SQL operations are valid since the sqlContext will return a DataFrame

  1. Return the 'distance' columm as a DataFrame
  2. Apply Aggregatin on the DataFrame
  3. Creat a new column in the DataFrame and assign the aggregate value to the new column
total_distance_df.show()

Display DataFrame values

all_delays_2012 = spark.sql(
    'SELECT date, airlines, flight_number, departure_delay ' +
    'FROM flights WHERE departure_delay > 0 and year(date) = 2012')

Results in an empty DataFrame, no records match the WHERE criteria

all_delays_2012.show(5)

Displays empty DataFrame

all_delays_2014 = spark.sql(
    'SELECT date, airlines, flight_number, departure_delay ' +
    'FROM flights WHERE departure_delay > 0 and year(date) = 2014')

all_delays_2014.show(5)

Change the criteria to capture data that exists in the table view

all_delays_2014.createOrReplaceTempView('all_delays')

Register the resulting DataFrame as a table view

all_delays_2014.orderBy(all_delays_2014.departure_delay.desc()).show(5)

Sort all the records by the delay time. Notice the values for delay don't make sense, earlier we saw that other delay times were greater in value. Why is this? Because the delay column is being treated as a string value. This is why taking your time to observe and explore your data is crucial. We will not use this data so we can leave it as is

delay_count = spark.sql('SELECT COUNT(departure_delay) FROM all_delays')

Collect the total count of flights delayed

delay_count.show()

Display this result

delay_count.collect()[0][0]

Extract the single piece of data

delay_percent = delay_count.collect()[0][0] / flights_count.collect()[0][0] * 100
delay_percent

Using all this data we can calculate the percentage of flights that were delayed

delay_per_airline = spark.sql('SELECT airlines, departure_delay  FROM flights')\
                        .groupBy('airlines')\
                        .agg({'departure_delay':'avg'})\
                        .withColumnRenamed('avg(departure_delay)', 'departure_delay')

Now lets get the average delay by airline

delay_per_airline.orderBy(delay_per_airline.departure_delay.desc()).show(5)

Ordering by departure delay in descending order gives us the airlines with the longest delays

delay_per_airline.createOrReplaceTempView('delay_per_airline')

Register the DataFrame as a table view to perform SQL queries

delay_per_airline = spark.sql('SELECT * FROM delay_per_airline ORDER BY departure_delay DESC')

This will assign ordered data from the SQL table into a DataFrame.

delay_per_airline.show(5)

Displaying this data we can see it matches the previous operation. This is to show that SQL and DataFrame operations will result in the same outcome

delay_per_airline = spark.sql('SELECT * FROM delay_per_airline ' +
                              'JOIN airlines ON airlines.code = delay_per_airline.airlines ' +
                              'ORDER BY departure_delay DESC')

Using a SQL join, we are able to combine two registerd SQL tables and return a DataFrame

delay_per_airline.show(5)

Display the first few columns of the resulting DataFrame

Inferred and Explicit Schemas

Spark with infer data types with creating DataFrames. But sometimes we will need to explictly set the schema of the DataFrame

from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .appName('Inferred and explicit schemas')\
                    .getOrCreate()
from pyspark.sql.types import Row

Import the needed libraries and create the needed entities as per usual

lines = sc.textFile('/Users/froilanmiranda/python-envs/sparktest/datasets/students.txt')

Use the SparkContext that is directly available to read the text file into an RDD

lines.collect()

This is a comma seperated list about a few students. Every line is a string and every string has values seperated by commas

parts = lines.map(lambda l: l.split(',')) # (1)

parts.collect() # (2)
  1. Use the map function with a lambda to create a list from the the string value of each row
  2. Display the result to screen
students = parts.map(lambda p: Row(name=p[0], math=int(p[1]), english=int(p[2]), science=int(p[3])))

Again, use the map function with a lambda to create Row objects from the list

students.collect()

Display the result

schemaStudents = spark.createDataFrame(students)

schemaStudents.createOrReplaceTempView('students')

Create a DataFrame from the RDD and then register it as a SQL table

schemaStudents.columns

Show column info of the DataFrame

schemaStudents.schema

We did not declare a schema for the DataFrame but it was able to use reflection to infer the schema. Notice the data type StructType and StructField

spark.sql('SELECT * FROM students').show()

It is because of the infered typing that Spark can create a schema for the table view when it is registered

parts.collect()

Now lets use the parts RDD to create a DataFrame and explicitly define the schema. We can see is an RDD of List elements

parts_typed = parts.map(lambda p: Row(name=p[0], math=int(p[1]), english=int(p[2]), science=int(p[3])))

As we can see above the values for the grades are strings and we will need them to be numbers. Using the map and lambda together to accomplish this

schemaString = 'name math english science'

This is just to map out what columns we want to configure the schema to for visual reference only

from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType

fields = [StructField('name', StringType(), True),
         StructField('math', IntegerType(), True),
         StructField('english', IntegerType(), True),
         StructField('science', IntegerType(), True)]

Specify the fields for every record. Each column is represented by a StructField and takes values for column name, data type, is nullable.

schema = StructType(fields)

Create a StructType and pass the StructFields as a parameter to create a schema

schemaStudents = spark.createDataFrame(parts_typed, schema)

Create a DataFrame using the RDD of List and the configured schema

schemaStudents.columns

Confirm columns have been properly named

schemaStudents.schema

Confirm schema is configured correctly

schemaStudents.createOrReplaceTempView('students_explicit')

Register the DataFrame as a SQL table

spark.sql('SELECT * FROM students_explicit').show()

And now with the schema explicitly in place we can query the data with SQL