PySpark - Using RDDs

I will keep my data in a folder named 'datasets'

If PySpark is not already loaded up, go ahead and start PySpark and create a new Jupyter notebook

View information about the SparkContext by inputing sc

If we were running a cluster of nodes the output would be a bit more interesting. As we are running in standalone mode there is little output

Lets import a few things

from pyspark.sql.types import Row # (1)
from datatime import datetime # (2)
  1. Row is a spark object that represents a single row of a dataframe, we will see this shortly
  2. datetime is standard from Python
simple_data = sc.parallelize([1, "Alice", 50])
simple_data

sc.parallelize(...) converts data into an RDD

simple_data.count()

Returns a number representing the number of entities in the RDD

simple_data.first()

Access the first element in the RDD

.count() and .first() are what is considered an Action

simple_data.take(2)

.take(...) will return a subset of the RDD as a list

simple_data.collect()

.collect() will return all values in the RDD as a list

These have been some examples of Actions. Remember that when calling an action it will trigger all the transformations that occurred before it to execute. This can be a costly operation on large datasets so be care when and where you use them

Up until now we have been using RDDs, and this is fine to do in Spark. However Spark 2 offers the new DataFrame and we will use dataframes a lot more than RDDs. The take away is that Spark 2 still has access to the underlying RDD construct. Lets quickly try to create a dataframe from an RDD

df = simple_data.toDF()

toDF() will try to convert an RDD to a DataFrame

Here we get an error. The problem here is that the data in the rows of the RDD are not structured. The data types are mixed and dataframes

This RDD has no schema, contains elements of different types - it cannot be converted to a DataFrame

Convert RDDs to DataFrames

records = sc.parallelize([[1, "Alice", 50], [2,"Bob", 100]])
records

Here we create an RDD with structured data, two rows with matching data schemas

records.collect()

Again, collect() returns all rows in the RDD

records.count()

Again, returns the row count of the RDD

records.first()

Again, returns the first record

records.take(2)
records.collect()

Because of the size of this RDD, the previous two methods have the same return values

df = records.toDf()

This will return a Spark DataFrame for the RDDs values. Because the RDDs rows have the same number of columns and those columns have the same data type, Spark can create a DataFrame from this RDDs values

df

We can see Spark infers the datatypes

df.show()

.show() allows for a quick view of the dataframe. Here is show the first 20 rows as a default

Take notice that the Column names have been automatically generated and assigned

If we want to specify the column names we must make use of the Row object imported earlier. Using a Row object to create an RDD will pass column name data

data = sc.parallelize([Row(id=1,
                           name="Alice",
                           score=50)])
data

Now if we inspect the column names withing the row object

data.collect()

data.count()

Lets create dataframe from this RDD

df = data.toDF()
df.show()

We can now see the column names applied to the output.

Lets add some more data

data = sc.parallelize([Row(id=1,
                           name="Alice",
                           score=50),
                      Row(id=2,
                           name="Bob",
                           score=100),
                      Row(id=3,
                           name="Charlee",
                           score=150)])
data

Now, convert to a dataframe and show

df = data.toDF()
df.show()

And as before, since the data is structured Spark will infer the datatypes and effortlessly convert to a dataframe

Working with complex data

complex_data = sc.parallelize([Row(
                                col_float=1.44,
                                col_integer=10,
                                col_sring="John")])

We create a RDD with one Row object. This row consist of a float, integer and string value types

complex_data_df = complex_data.toDF()
complex_data_df.show()

Convert the complex data to a dataframe

complex_data = sc.parallelize([Row(
                                col_float=1.44,
                                col_integer=10,
                                col_sring="John",
                                col_boolean=True,
                                col_list=[1,2,3])])

Now we see a good mixture of datatypes, take note of the list in the last column.

complex_data_df = complex_data.toDF()
complex_data_df.show()

After converting to a dataframe, we can see fromt the table displayed fromthe show() method that the list type has been preserved

complex_data = sc.parallelize([Row(
                                col_list=[1,2,3],
                                col_dic={"k1": 0},
                                col_row=Row(a=10,b=20,c=30),
                                col_time=datetime(2014, 8, 1, 14, 1 ,5)
                              ),
                              Row(
                                col_list=[1,2,3,4,5],
                                col_dic={"k1": 0, "k2":1},
                                col_row=Row(a=40,b=50,c=60),
                                col_time=datetime(2014, 8, 1, 14, 1 ,6)
                              ),
                              Row(
                                col_list=[1,2,3,4,5,6,7],
                                col_dic={"k1": 0,"k2": 0,"k3": 0},
                                col_row=Row(a=70,b=80,c=90),
                                col_time=datetime(2014, 8, 1, 14, 1 ,7)
                              )])

Here we can see all the complex structures supported by dataframes in spark

complex_data_df = complex_data.toDF()
complex_data_df.show()

SQL Context

We can ust the sqlContext to run SQL queries on the Spark data

sqlContext = SQLContext(sc)

sqlContext

This wraps around the SparkContext to add SQL functionality

df = sqlContext.range(5)
df

.range(5) on the sqlContext obect will return a dataframe with five one column rows with the integer values 1-5

df.count()

data = [('Alice',50),
        ('Bob',80),
        ('Charlee', 75)]

Create a list of touples and assign to data variable

sqlContext.createDataFrame(data).show()

Creates a dataframe from the list and displays the data. Note the column names have been automatically generated

sqlContext.createDataFrame(data, ['Name', 'Score']).show()

The same operation but with specifing the column name

complex_data = [
                (1.0,
                10,
                "Alice",
                True,
                [1,2,3],
                {"k1":0},
                Row(a=1,b=2,c=3),
                datetime(2014, 8,1,14,1,5)),
    
                (2.0,
                20,
                "Bob",
                True,
                [1,2,3,4,5],
                {"k1":0,"k2":1},
                Row(a=1,b=2,c=3),
                datetime(2014, 8,1,14,1,5)),

                (3.0,
                30,
                "Charlee",
                False,
                [1,2,3,4,5,6,7],
                {"k1":0,"k2":1,"k3":2},
                Row(a=1,b=2,c=3),
                datetime(2014, 8,1,14,1,5))    
               ]

List of complex data

sqlContext.createDataFrame(complex_data).show()

Convert to dataframe and display

complex_data_df = sqlContext.createDataFrame(complex_data,[
        'col_integer',
        'col_float',
        'col_string',
        'col_boolean',
        'col_list',
        'col_dictionary',
        'col_row',
        'col_date_time']
)
complex_data_df.show()

Convert to dataframe with column name and display

data = sc.parallelize([
    Row(1,'Alice',50),
    Row(2,'Bob',100),
    Row(3,'Charlee',150)
])

Create an RDD with some Row objects, but with no column name specification for the Row object

column_names = Row('id','name','score')
students = data.map(lambda r: column_names(*r))

We can apply column name to an RDD after it has been created by using the .map(...) function.

students

This returns a new RDD

Note: The map() operation performs a transformation on every element in the RDD

students.collect()

We see that the column names have been assigned to all the records

students_df = sqlContext.createDataFrame(students)
students_df

Use the SQLContext to create a dataframe from the students RDD

student_df.show()

Notice the dataframe has recognized all the column names properly

Accessing RDDs from DataFrames

Looking back to the complex data we created earlier

complex_data_df.first()

This data consist is primative data types as well as complex data types

complex_data_df.take(2)

Dataframes are in tabular format and can be accessed using matrix notation

cell_string = complex_data_df.collect()[0][2]
cell_string

another example

cell_list = complex_data_df.collect()[0][4]
cell_list

Modify the list

cell_list.append(100)
cell_list
complex_data_df.show()

Take note that the original data is unaltered. This is because accessing the data is then returned as seperate value

complex_data_df.rdd\
                .map(lambda x: (x.col_string, x.col_dictionary))\
                .collect()

Extract specific columns by converting the DataFrame to an RDD

complex_data_df.select(
    'col_string',
    'col_list',
    'col_date_time'
).show()

.select(...) will return only the specified column names

complex_data_df.rdd\
                .map(lambda x: (x.col_string + " Boo"))\
                .collect()

A map() operation which appends "Boo" to every string in the column

Dataframes do not support the .map(...) function

complex_data_df.select(
    'col_integer',
    'col_float'
    )\
    .withColumn(
    'col_sum',
    complex_data_df.col_integer + complex_data_df.col_float
    )\
    .show()

To perform a calculation with column values we need to use the .withColumn(...)

  1. Select the column to use for calculation
  2. Create a new column with the reulting values
complex_data_df.select('col_boolean')\
                .withColumn(
                    'col_opposite',
                    complex_data_df.col_boolean == False)\
                .show()

Here is another example of .withColumn(...) that inverts the value of booleans in a column

complex_data_df.withColumnRenamed('col_dictionary','col_map').show()

This example renames the column

complex_data_df.select(complex_data_df.col_string.alias('Name')).show()

This will select and rename a column

Spark DataFrames and Pandas

Pandas and Spark DataFrames are interoperable

import pandas

Import the pandas library, do not forget to pip install if needed

df_pandas = complex_data_df.toPandas()
df_pandas

Converting a Spark DataFrame to a Pandas DataFrame is done using toPandas()

Remember that Spark DataFrames are built on top of RDDs and stored across multiple nodes. Conversely, Pandas dataframes are stored in memory of the machine it is running on.

df_spark = sqlContext.createDataFrame(df_pandas).show()
df_spark

On the flip side the .createDataFrame(...) will convert a Pandas DataFrame to a Spark DataFrame