PySpark - Spark SQL Context
Spark aims to make it easy to work with data. One way they achieve this is by working with spark data as if you were working on a SQL database
- Spark SQL enables querying of DataFrames as database tables
- Temporary per-session and global tables
- The Catalyst optimizer makes SQL queries fast
- Tables schemas can be inferred or explicitly specified
Basic Operations
from pyspark.sql import SparkSession
Import SparkSession
spark = SparkSession.builder\
.appName('Analyzing Students')\
.getOrCreate()
Create a new Session
from pyspark.sql.types import Row
from datetime import datetime
Import the some libraries
record = sc.parallelize([Row(id = 1,
name = 'Jill',
active = True,
clubs = ['chess', 'hockey'],
subjects = {'math':80, 'english': 56},
enrolled = datetime(2014, 8, 1, 14, 1, 5)),
Row(id = 2,
name = 'George',
active = False,
clubs = ['chess','soccer'],
subjects = {'math': 60, 'english':96},
enrolled = datetime(2015, 3, 21, 8, 2, 5))
])
Use paralize(...)
to create and RDD made of Row objects that contains a mixture of data types
record_df = record.toDF()
record_df.show()
Create a DataFrame from the RDD
record_df.createOrReplaceTempView('records')
Run this data as SQL we first need to regester the DataFram as a table. The name of the SQL table is records and only exist within this session. Once the session exits the table is also destroyed
all_records_df = sqlContext.sql('SELECT * FROM records')
all_records_df.show()
Using the sqlContext.sql(...)
to pass SQL statements to query the table and return a DataFrame
sqlContext.sql('SELECT id, clubs[1], subjects["english"] FROM records').show()
This is using a complex query, returning sebsets of collection data from queried rows
sqlContext.sql('SELECT ID, NOT active FROM records').show()
Logical operators(AND, OR, NOT)
sqlContext.sql('SELECT* FROM records WHERE subjects["english"] > 90').show()
comparison operators are also available (<,>,<=,>=)
record_df.createGlobalTempView('global_records')
In order to make a table accessible to all sessions on the cluster we must register the table as Global
sqlContext.sql('SELECT * FROM global_temp.global_records').show()
In order to access global table view and namespace must be provided with the table name
Analyzing Data with Spark SQL
from pyspark.sql import SparkSession
Import SparkSession
spark = SparkSession.builder\
.appName("Analyzing airline data")\
.getOrCreate()
Create Session
from pyspark.sql.types import Row
from datetime import datetime
Import some libraries to be used later
airlinesPath = '/Users/froilanmiranda/python-envs/sparktest/datasets/airlines.csv'
flightsPath = '/Users/froilanmiranda/python-envs/sparktest/datasets/flights.csv'
airportsPath = '/Users/froilanmiranda/python-envs/sparktest/datasets/airports.csv'
Create some variable to represent the paths to the data sets
airlines = spark.read\
.format('csv')\
.option('header', 'true')\
.load(airlinesPath)
Create a DataFrame from the csv file
airlines.createOrReplaceTempView('airlines')
Register a table view of the DataFrame that is only accessible from this SparkSession
airlines = spark.sql('SELECT * FROM airlines')
airlines.columns
Explore the data by displaying the columns
airlines.show(5)
Continue exploring the data by displaying the first few rows
flights = spark.read\
.format('csv')\
.option('header','true')\
.load(flightsPath)
Read in the next csv as a DataFrame
flights.createOrReplaceTempView('flights')
flights.columns
Register another table view from the DataFrame. Then display the columns to begin exploring the data
flights.show(5)
Explore this data by printing the first few records to the screen
flights.count(), airlines.count()
Get a total record count for each set of data
flights_count = spark.sql('SELECT COUNT(*) FROM flights')
airlines_count = spark.sql('SELECT COUNT(*) FROM airlines')
We can also use sql to get the same data
flights_count, airlines_count
Display the result of the SQL query. Notice the result is a DataFrame
flights_count.collect()[0][0], airlines_count.collect()[0][0]
We can matrix notation to extract particular values from the resulting DataFrame
total_distance_df = spark.sql('SELECT distance FROM flights')\ # (1)
.agg({'distance':'sum'})\ # (2)
.withColumnRenamed('sum(distance)', 'total_distance') # (3)
Mixing of DataFrame and SQL operations are valid since the sqlContext will return a DataFrame
- Return the 'distance' columm as a DataFrame
- Apply Aggregatin on the DataFrame
- Creat a new column in the DataFrame and assign the aggregate value to the new column
total_distance_df.show()
Display DataFrame values
all_delays_2012 = spark.sql(
'SELECT date, airlines, flight_number, departure_delay ' +
'FROM flights WHERE departure_delay > 0 and year(date) = 2012')
Results in an empty DataFrame, no records match the WHERE
criteria
all_delays_2012.show(5)
Displays empty DataFrame
all_delays_2014 = spark.sql(
'SELECT date, airlines, flight_number, departure_delay ' +
'FROM flights WHERE departure_delay > 0 and year(date) = 2014')
all_delays_2014.show(5)
Change the criteria to capture data that exists in the table view
all_delays_2014.createOrReplaceTempView('all_delays')
Register the resulting DataFrame as a table view
all_delays_2014.orderBy(all_delays_2014.departure_delay.desc()).show(5)
Sort all the records by the delay time. Notice the values for delay don't make sense, earlier we saw that other delay times were greater in value. Why is this? Because the delay column is being treated as a string value. This is why taking your time to observe and explore your data is crucial. We will not use this data so we can leave it as is
delay_count = spark.sql('SELECT COUNT(departure_delay) FROM all_delays')
Collect the total count of flights delayed
delay_count.show()
Display this result
delay_count.collect()[0][0]
Extract the single piece of data
delay_percent = delay_count.collect()[0][0] / flights_count.collect()[0][0] * 100
delay_percent
Using all this data we can calculate the percentage of flights that were delayed
delay_per_airline = spark.sql('SELECT airlines, departure_delay FROM flights')\
.groupBy('airlines')\
.agg({'departure_delay':'avg'})\
.withColumnRenamed('avg(departure_delay)', 'departure_delay')
Now lets get the average delay by airline
delay_per_airline.orderBy(delay_per_airline.departure_delay.desc()).show(5)
Ordering by departure delay in descending order gives us the airlines with the longest delays
delay_per_airline.createOrReplaceTempView('delay_per_airline')
Register the DataFrame as a table view to perform SQL queries
delay_per_airline = spark.sql('SELECT * FROM delay_per_airline ORDER BY departure_delay DESC')
This will assign ordered data from the SQL table into a DataFrame.
delay_per_airline.show(5)
Displaying this data we can see it matches the previous operation. This is to show that SQL and DataFrame operations will result in the same outcome
delay_per_airline = spark.sql('SELECT * FROM delay_per_airline ' +
'JOIN airlines ON airlines.code = delay_per_airline.airlines ' +
'ORDER BY departure_delay DESC')
Using a SQL join, we are able to combine two registerd SQL tables and return a DataFrame
delay_per_airline.show(5)
Display the first few columns of the resulting DataFrame
Inferred and Explicit Schemas
Spark with infer data types with creating DataFrames. But sometimes we will need to explictly set the schema of the DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.appName('Inferred and explicit schemas')\
.getOrCreate()
from pyspark.sql.types import Row
Import the needed libraries and create the needed entities as per usual
lines = sc.textFile('/Users/froilanmiranda/python-envs/sparktest/datasets/students.txt')
Use the SparkContext that is directly available to read the text file into an RDD
lines.collect()
This is a comma seperated list about a few students. Every line is a string and every string has values seperated by commas
parts = lines.map(lambda l: l.split(',')) # (1)
parts.collect() # (2)
- Use the map function with a lambda to create a list from the the string value of each row
- Display the result to screen
students = parts.map(lambda p: Row(name=p[0], math=int(p[1]), english=int(p[2]), science=int(p[3])))
Again, use the map function with a lambda to create Row objects from the list
students.collect()
Display the result
schemaStudents = spark.createDataFrame(students)
schemaStudents.createOrReplaceTempView('students')
Create a DataFrame from the RDD and then register it as a SQL table
schemaStudents.columns
Show column info of the DataFrame
schemaStudents.schema
We did not declare a schema for the DataFrame but it was able to use reflection to infer the schema. Notice the data type StructType
and StructField
spark.sql('SELECT * FROM students').show()
It is because of the infered typing that Spark can create a schema for the table view when it is registered
parts.collect()
Now lets use the parts RDD to create a DataFrame and explicitly define the schema. We can see is an RDD of List elements
parts_typed = parts.map(lambda p: Row(name=p[0], math=int(p[1]), english=int(p[2]), science=int(p[3])))
As we can see above the values for the grades are strings and we will need them to be numbers. Using the map and lambda together to accomplish this
schemaString = 'name math english science'
This is just to map out what columns we want to configure the schema to for visual reference only
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
fields = [StructField('name', StringType(), True),
StructField('math', IntegerType(), True),
StructField('english', IntegerType(), True),
StructField('science', IntegerType(), True)]
Specify the fields for every record. Each column is represented by a StructField
and takes values for column name, data type, is nullable.
schema = StructType(fields)
Create a StructType
and pass the StructFields
as a parameter to create a schema
schemaStudents = spark.createDataFrame(parts_typed, schema)
Create a DataFrame using the RDD of List and the configured schema
schemaStudents.columns
Confirm columns have been properly named
schemaStudents.schema
Confirm schema is configured correctly
schemaStudents.createOrReplaceTempView('students_explicit')
Register the DataFrame as a SQL table
spark.sql('SELECT * FROM students_explicit').show()
And now with the schema explicitly in place we can query the data with SQL