When working with Machine Learning for large datasets sooner or later we end up with Spark which is the go-to solution for implementing real life use-cases involving large amount of data.

Pyspark DataFrame API can get little bit tricky especially if you worked with Pandas  before –  Pyspark DataFrame has some similarities with the Pandas version but there is significant difference in the APIs which can cause confusion.

In this blog, I will share how to work with Spark and Cassandra using DataFrame. In most text books and online examples, the table structure used is quite trivial whereas in a real life use-cases we tend to have complex datatypes and table structure can be complex which makes using Spark DataFrame API difficult .

We begin with a Cassandra table which contains a complex structure including a list of tuples.

create table sample_logs(sample_id text PRIMARY KEY,
    title text,
    description text,
    label text,
    log_links frozen ‹list‹map‹text,text›››,
    rawlogs text,
    cleanedlogs text,
    state text);

I will use Jupyter notebook in this blog. The notebook can be launched from command line with this command:

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip 192.168.0.21' pyspark

Once the notebook is running, we can ready to start playing with the Spark DataFrames.

We being by reading the table into a DataFrame,

log_df = spark.read\
     .format("org.apache.spark.sql.cassandra")\
     .options(table="sample_logs", keyspace="ml_cases")\
     .load()

Checking the schema of the DataFrame

log_df.printSchema()
output:
root
 |-- sample_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- rawlogs: string (nullable = true)
 |-- cleanedlogs: string (nullable = true)
 |-- state: string (nullable = true)
 |-- label: string (nullable = true)
 |-- attach_map: array (nullable = true)
 | |-- element: map (containsNull = true)
 | | |-- key: string
 | | |-- value: string (valueContainsNull = true)

Selecting one columns with select which returning a DataFrame

log_title = log_df.select('sample_id', 'title')
output:
DataFrame[sample_id: string, title: string]

Using list like in Pandas also works

log_title = log_df[['sample_id', 'title']]
output:
DataFrame[sample_id: string, title: string]

But it can be little confusing when selecting only one columns as Spark DataFrame does not have something similar to Pandas Series; instead we get a Column object.

log_df['title']
output:
Column

But Columns object can not be used independently of a DataFrame which, I think, limit the usability of Column.

df['title'].show(2, False)
output:
TypeError: 'Column' object is no callable

Viewing some rows in the DataFrame:

log_df.show(5)
log_df.show(5, False) # To prevent truncating the display

Counting the total rows in the DataFrame. Note that count is an action which will cause the DAG to be evalauted.

log_df.count()

Filtering the DataFrame using exact match

log_filtered = log_df.filter(log_df['title'] == 'title to match')

Filtering the DataFrame using like  match

log_filtered = log_df.filter(log_df['title'].rlike('title to match'))

Filtering the DataFrame using regex pattern match

log_filtered = log_df.filter(log_df['title'].rlike('title to match'))

Dropping duplicates on some columns

log_filtered = log_df.select('title').dropDuplicates()

Like Pandas, Spark DataFrame provide groupby API.

Creating a GroupedDataFrame and get the count

grouped = df_log.groupby(df_log['label'])

Several aggregation method are for applying on the GroupedDataFrame, getting the count

grouped.agg({"*": "count"}).show(5)

Getting the count using inbuilt spark count function

grouped.agg(F.count(df_log['label']))

Getting the count and renaming the columns

grouped_df = grouped.agg(F.count(df_log['label']).alias('count'))

And finally writing the data frame to the Cassandra table

pronto_logs_df.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="sample_logs", keyspace="ml_cases")\
    .save()

Hope you found this useful.

In the next blog, I will share an classification example using Naive Bayes on Spark Cassandra cluster