PySpark - Using RDDs
I will keep my data in a folder named 'datasets'
If PySpark is not already loaded up, go ahead and start PySpark and create a new Jupyter notebook
View information about the SparkContext by inputing sc
If we were running a cluster of nodes the output would be a bit more interesting. As we are running in standalone mode there is little output
Lets import a few things
from pyspark.sql.types import Row # (1)
from datatime import datetime # (2)
- Row is a spark object that represents a single row of a dataframe, we will see this shortly
- datetime is standard from Python
simple_data = sc.parallelize([1, "Alice", 50])
simple_data
sc.parallelize(...)
converts data into an RDD
simple_data.count()
Returns a number representing the number of entities in the RDD
simple_data.first()
Access the first element in the RDD
.count()
and .first()
are what is considered an Action
simple_data.take(2)
.take(...)
will return a subset of the RDD as a list
simple_data.collect()
.collect()
will return all values in the RDD as a list
These have been some examples of Actions. Remember that when calling an action it will trigger all the transformations that occurred before it to execute. This can be a costly operation on large datasets so be care when and where you use them
Up until now we have been using RDDs, and this is fine to do in Spark. However Spark 2 offers the new DataFrame and we will use dataframes a lot more than RDDs. The take away is that Spark 2 still has access to the underlying RDD construct. Lets quickly try to create a dataframe from an RDD
df = simple_data.toDF()
toDF()
will try to convert an RDD to a DataFrame
Here we get an error. The problem here is that the data in the rows of the RDD are not structured. The data types are mixed and dataframes
This RDD has no schema, contains elements of different types - it cannot be converted to a DataFrame
Convert RDDs to DataFrames
records = sc.parallelize([[1, "Alice", 50], [2,"Bob", 100]])
records
Here we create an RDD with structured data, two rows with matching data schemas
records.collect()
Again, collect()
returns all rows in the RDD
records.count()
Again, returns the row count of the RDD
records.first()
Again, returns the first record
records.take(2)
records.collect()
Because of the size of this RDD, the previous two methods have the same return values
df = records.toDf()
This will return a Spark DataFrame for the RDDs values. Because the RDDs rows have the same number of columns and those columns have the same data type, Spark can create a DataFrame from this RDDs values
df
We can see Spark infers the datatypes
df.show()
.show()
allows for a quick view of the dataframe. Here is show the first 20 rows as a default
Take notice that the Column names have been automatically generated and assigned
If we want to specify the column names we must make use of the Row object imported earlier. Using a Row object to create an RDD will pass column name data
data = sc.parallelize([Row(id=1,
name="Alice",
score=50)])
data
Now if we inspect the column names withing the row object
data.collect()
data.count()
Lets create dataframe from this RDD
df = data.toDF()
df.show()
We can now see the column names applied to the output.
Lets add some more data
data = sc.parallelize([Row(id=1,
name="Alice",
score=50),
Row(id=2,
name="Bob",
score=100),
Row(id=3,
name="Charlee",
score=150)])
data
Now, convert to a dataframe and show
df = data.toDF()
df.show()
And as before, since the data is structured Spark will infer the datatypes and effortlessly convert to a dataframe
Working with complex data
complex_data = sc.parallelize([Row(
col_float=1.44,
col_integer=10,
col_sring="John")])
We create a RDD with one Row object. This row consist of a float, integer and string value types
complex_data_df = complex_data.toDF()
complex_data_df.show()
Convert the complex data to a dataframe
complex_data = sc.parallelize([Row(
col_float=1.44,
col_integer=10,
col_sring="John",
col_boolean=True,
col_list=[1,2,3])])
Now we see a good mixture of datatypes, take note of the list in the last column.
complex_data_df = complex_data.toDF()
complex_data_df.show()
After converting to a dataframe, we can see fromt the table displayed fromthe show()
method that the list type has been preserved
complex_data = sc.parallelize([Row(
col_list=[1,2,3],
col_dic={"k1": 0},
col_row=Row(a=10,b=20,c=30),
col_time=datetime(2014, 8, 1, 14, 1 ,5)
),
Row(
col_list=[1,2,3,4,5],
col_dic={"k1": 0, "k2":1},
col_row=Row(a=40,b=50,c=60),
col_time=datetime(2014, 8, 1, 14, 1 ,6)
),
Row(
col_list=[1,2,3,4,5,6,7],
col_dic={"k1": 0,"k2": 0,"k3": 0},
col_row=Row(a=70,b=80,c=90),
col_time=datetime(2014, 8, 1, 14, 1 ,7)
)])
Here we can see all the complex structures supported by dataframes in spark
complex_data_df = complex_data.toDF()
complex_data_df.show()
SQL Context
We can ust the sqlContext to run SQL queries on the Spark data
sqlContext = SQLContext(sc)
sqlContext
This wraps around the SparkContext to add SQL functionality
df = sqlContext.range(5)
df
.range(5)
on the sqlContext obect will return a dataframe with five one column rows with the integer values 1-5
df.count()
data = [('Alice',50),
('Bob',80),
('Charlee', 75)]
Create a list of touples and assign to data
variable
sqlContext.createDataFrame(data).show()
Creates a dataframe from the list and displays the data. Note the column names have been automatically generated
sqlContext.createDataFrame(data, ['Name', 'Score']).show()
The same operation but with specifing the column name
complex_data = [
(1.0,
10,
"Alice",
True,
[1,2,3],
{"k1":0},
Row(a=1,b=2,c=3),
datetime(2014, 8,1,14,1,5)),
(2.0,
20,
"Bob",
True,
[1,2,3,4,5],
{"k1":0,"k2":1},
Row(a=1,b=2,c=3),
datetime(2014, 8,1,14,1,5)),
(3.0,
30,
"Charlee",
False,
[1,2,3,4,5,6,7],
{"k1":0,"k2":1,"k3":2},
Row(a=1,b=2,c=3),
datetime(2014, 8,1,14,1,5))
]
List of complex data
sqlContext.createDataFrame(complex_data).show()
Convert to dataframe and display
complex_data_df = sqlContext.createDataFrame(complex_data,[
'col_integer',
'col_float',
'col_string',
'col_boolean',
'col_list',
'col_dictionary',
'col_row',
'col_date_time']
)
complex_data_df.show()
Convert to dataframe with column name and display
data = sc.parallelize([
Row(1,'Alice',50),
Row(2,'Bob',100),
Row(3,'Charlee',150)
])
Create an RDD with some Row objects, but with no column name specification for the Row object
column_names = Row('id','name','score')
students = data.map(lambda r: column_names(*r))
We can apply column name to an RDD after it has been created by using the .map(...)
function.
students
This returns a new RDD
Note: The map() operation performs a transformation on every element in the RDD
students.collect()
We see that the column names have been assigned to all the records
students_df = sqlContext.createDataFrame(students)
students_df
Use the SQLContext to create a dataframe from the students RDD
student_df.show()
Notice the dataframe has recognized all the column names properly
Accessing RDDs from DataFrames
Looking back to the complex data we created earlier
complex_data_df.first()
This data consist is primative data types as well as complex data types
complex_data_df.take(2)
Dataframes are in tabular format and can be accessed using matrix notation
cell_string = complex_data_df.collect()[0][2]
cell_string
another example
cell_list = complex_data_df.collect()[0][4]
cell_list
Modify the list
cell_list.append(100)
cell_list
complex_data_df.show()
Take note that the original data is unaltered. This is because accessing the data is then returned as seperate value
complex_data_df.rdd\
.map(lambda x: (x.col_string, x.col_dictionary))\
.collect()
Extract specific columns by converting the DataFrame to an RDD
complex_data_df.select(
'col_string',
'col_list',
'col_date_time'
).show()
.select(...)
will return only the specified column names
complex_data_df.rdd\
.map(lambda x: (x.col_string + " Boo"))\
.collect()
A map() operation which appends "Boo" to every string in the column
Dataframes do not support the .map(...)
function
complex_data_df.select(
'col_integer',
'col_float'
)\
.withColumn(
'col_sum',
complex_data_df.col_integer + complex_data_df.col_float
)\
.show()
To perform a calculation with column values we need to use the .withColumn(...)
- Select the column to use for calculation
- Create a new column with the reulting values
complex_data_df.select('col_boolean')\
.withColumn(
'col_opposite',
complex_data_df.col_boolean == False)\
.show()
Here is another example of .withColumn(...)
that inverts the value of booleans in a column
complex_data_df.withColumnRenamed('col_dictionary','col_map').show()
This example renames the column
complex_data_df.select(complex_data_df.col_string.alias('Name')).show()
This will select and rename a column
Spark DataFrames and Pandas
Pandas and Spark DataFrames are interoperable
import pandas
Import the pandas library, do not forget to pip install
if needed
df_pandas = complex_data_df.toPandas()
df_pandas
Converting a Spark DataFrame to a Pandas DataFrame is done using toPandas()
Remember that Spark DataFrames are built on top of RDDs and stored across multiple nodes. Conversely, Pandas dataframes are stored in memory of the machine it is running on.
df_spark = sqlContext.createDataFrame(df_pandas).show()
df_spark
On the flip side the .createDataFrame(...)
will convert a Pandas DataFrame to a Spark DataFrame