When working with Machine Learning for large datasets sooner or later we end up with Spark which is the go-to solution for implementing real life use-cases involving large amount of data.

Pyspark DataFrame API can get little bit tricky especially if you worked with Pandas  before –  Pyspark DataFrame has some similarities with the Pandas version but there is significant difference in the APIs which can cause confusion.

In this blog, I will share how to work with Spark and Cassandra using DataFrame. In most text books and online examples, the table structure used is quite trivial whereas in a real life use-cases we tend to have complex datatypes and table structure can be complex which makes using Spark DataFrame API difficult .

We begin with a Cassandra table which contains a complex structure including a list of tuples.

create table sample_logs(sample_id text PRIMARY KEY,
    title text,
    description text,
    label text,
    log_links frozen ‹list‹map‹text,text›››,
    rawlogs text,
    cleanedlogs text,
    state text);

I will use Jupyter notebook in this blog. The notebook can be launched from command line with this command:


Once the notebook is running, we can ready to start playing with the Spark DataFrames.

We being by reading the table into a DataFrame,

log_df = spark.read\
     .options(table="sample_logs", keyspace="ml_cases")\

Checking the schema of the DataFrame

 |-- sample_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- rawlogs: string (nullable = true)
 |-- cleanedlogs: string (nullable = true)
 |-- state: string (nullable = true)
 |-- label: string (nullable = true)
 |-- attach_map: array (nullable = true)
 | |-- element: map (containsNull = true)
 | | |-- key: string
 | | |-- value: string (valueContainsNull = true)

Selecting one columns with select which returning a DataFrame

log_title = log_df.select('sample_id', 'title')
DataFrame[sample_id: string, title: string]

Using list like in Pandas also works

log_title = log_df[['sample_id', 'title']]
DataFrame[sample_id: string, title: string]

But it can be little confusing when selecting only one columns as Spark DataFrame does not have something similar to Pandas Series; instead we get a Column object.


But Columns object can not be used independently of a DataFrame which, I think, limit the usability of Column.

df['title'].show(2, False)
TypeError: 'Column' object is no callable

Viewing some rows in the DataFrame:

log_df.show(5, False) # To prevent truncating the display

Counting the total rows in the DataFrame. Note that count is an action which will cause the DAG to be evalauted.


Filtering the DataFrame using exact match

log_filtered = log_df.filter(log_df['title'] == 'title to match')

Filtering the DataFrame using like  match

log_filtered = log_df.filter(log_df['title'].rlike('title to match'))

Filtering the DataFrame using regex pattern match

log_filtered = log_df.filter(log_df['title'].rlike('title to match'))

Dropping duplicates on some columns

log_filtered = log_df.select('title').dropDuplicates()

Like Pandas, Spark DataFrame provide groupby API.

Creating a GroupedDataFrame and get the count

grouped = df_log.groupby(df_log['label'])

Several aggregation method are for applying on the GroupedDataFrame, getting the count

grouped.agg({"*": "count"}).show(5)

Getting the count using inbuilt spark count function


Getting the count and renaming the columns

grouped_df = grouped.agg(F.count(df_log['label']).alias('count'))

And finally writing the data frame to the Cassandra table

    .options(table="sample_logs", keyspace="ml_cases")\

Hope you found this useful.

In the next blog, I will share an classification example using Naive Bayes on Spark Cassandra cluster