When working with Machine Learning for large datasets sooner or later we end up with Spark which is the go-to solution for implementing real life use-cases involving large amount of data.
Pyspark DataFrame API can get little bit tricky especially if you worked with Pandas before – Pyspark DataFrame has some similarities with the Pandas version but there is significant difference in the APIs which can cause confusion.
In this blog, I will share how to work with Spark and Cassandra using DataFrame. In most text books and online examples, the table structure used is quite trivial whereas in a real life use-cases we tend to have complex datatypes and table structure can be complex which makes using Spark DataFrame API difficult .
We begin with a Cassandra table which contains a complex structure including a list of tuples.
create table sample_logs(sample_id text PRIMARY KEY, title text, description text, label text, log_links frozen ‹list‹map‹text,text›››, rawlogs text, cleanedlogs text, state text);
I will use Jupyter notebook in this blog. The notebook can be launched from command line with this command:
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip 192.168.0.21' pyspark
Once the notebook is running, we can ready to start playing with the Spark DataFrames.
We being by reading the table into a DataFrame,
log_df = spark.read\ .format("org.apache.spark.sql.cassandra")\ .options(table="sample_logs", keyspace="ml_cases")\ .load()
Checking the schema of the DataFrame
log_df.printSchema() output: root |-- sample_id: string (nullable = true) |-- title: string (nullable = true) |-- description: string (nullable = true) |-- rawlogs: string (nullable = true) |-- cleanedlogs: string (nullable = true) |-- state: string (nullable = true) |-- label: string (nullable = true) |-- attach_map: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: string | | |-- value: string (valueContainsNull = true)
Selecting one columns with select which returning a DataFrame
log_title = log_df.select('sample_id', 'title') output: DataFrame[sample_id: string, title: string]
Using list like in Pandas also works
log_title = log_df[['sample_id', 'title']] output: DataFrame[sample_id: string, title: string]
But it can be little confusing when selecting only one columns as Spark DataFrame does not have something similar to Pandas Series; instead we get a Column object.
log_df['title'] output: Column
But Columns object can not be used independently of a DataFrame which, I think, limit the usability of Column.
df['title'].show(2, False) output: TypeError: 'Column' object is no callable
Viewing some rows in the DataFrame:
log_df.show(5) log_df.show(5, False) # To prevent truncating the display
Counting the total rows in the DataFrame. Note that count is an action which will cause the DAG to be evalauted.
log_df.count()
Filtering the DataFrame using exact match
log_filtered = log_df.filter(log_df['title'] == 'title to match')
Filtering the DataFrame using like match
log_filtered = log_df.filter(log_df['title'].rlike('title to match'))
Filtering the DataFrame using regex pattern match
log_filtered = log_df.filter(log_df['title'].rlike('title to match'))
Dropping duplicates on some columns
log_filtered = log_df.select('title').dropDuplicates()
Like Pandas, Spark DataFrame provide groupby API.
Creating a GroupedDataFrame and get the count
grouped = df_log.groupby(df_log['label'])
Several aggregation method are for applying on the GroupedDataFrame, getting the count
grouped.agg({"*": "count"}).show(5)
Getting the count using inbuilt spark count function
grouped.agg(F.count(df_log['label']))
Getting the count and renaming the columns
grouped_df = grouped.agg(F.count(df_log['label']).alias('count'))
And finally writing the data frame to the Cassandra table
pronto_logs_df.write\ .format("org.apache.spark.sql.cassandra")\ .mode('append')\ .options(table="sample_logs", keyspace="ml_cases")\ .save()
Hope you found this useful.
In the next blog, I will share an classification example using Naive Bayes on Spark Cassandra cluster