In the previous blog I shared how to use DataFrames with pyspark on a Spark Cassandra cluster. As a followup, in this blog I will share implementing Naive Bayes classification for a multi class classification problem.

We will use the same dataset as the previous example which is stored in a Cassandra table and contains several text fields and a label. Below is the Cassandra table schema:

create table sample_logs(
sample_id text PRIMARY KEY,
title text,
description text,
label text,
log_links frozen ‹list‹map‹text,text›››,
rawlogs text,
cleanedlogs text,
state text);

As in previous blog, i will use Jupyter notebook to implement the classification model.

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip 192.168.0.21' pyspark

We being by reading the table into a DataFrame.

log_df = spark.read\
 .format("org.apache.spark.sql.cassandra")\
 .options(table="sample_logs", keyspace="ml_cases")\
 .load()

Check the number of records that we have read.

log_df.count()
output:15070

As a first step, we can analyze the label that we are having to see how many samples are there for each label

from pyspark.sql import functions as F
df_grouped = grouped.agg(F.count(df_log['label']).alias('count'))
df_grouped.show(5)
output:
+--------------------+-----+
| label              |count|
+--------------------+-----+
| NIU....            | 140  |
| NIH....            | 100  |
| LT.....            | 50   |
| RAB....            | 200  |
| LT.....            | 100  |
+--------------------+-----+
only showing top 5 rows

In case, there is significant variance in the sample count for each label and we would like to keep only labels with a minimum number samples, we can drop some of the labels using filter function.

grouped_25 = df_grouped.filter(df_grouped['count'] › 25)
grouped_25.show(5)
df_log = df_log.join(grouped_25, ['label'])
output:

We could also a define a function to filter out groups

def min_samples(r):
    if r ›= 10:
        return 1
    else:
        return 0
min_samples_func = F.udf(min_samples, IntegerType())

Once we have label analyzed, we need to encode the labels which are strings into floats. We can use the StringIndexer function for this.

indexer = StringIndexer(inputCol="label", outputCol="label_index")
model = indexer.fit(df_log)
indexed = model.transform(df_log)
indexed.printSchema()
Output:
root
 |-- id: string (nullable = true)
 |-- log_links: array (nullable = true)
 | |-- element: map (containsNull = true)
 | | |-- key: string
 | | |-- value: string (valueContainsNull = true)
 |-- cleanedlogs: string (nullable = true)
 |-- description: string (nullable = true)
 |-- label: string (nullable = true)
 |-- rawlogs: string (nullable = true)
 |-- state: string (nullable = true)
 |-- title: string (nullable = true)
 |-- label_index: double (nullable = true)

Take a look at the labels after indexing.

indexed.select('label','label_index').show(5, False)
Output:
+--------------+----------+
|label         | label_in.|
+--------------+----------+
|RAB...        |    2.0   |
|RABL...       |    1.0   |
|LT...         |   19.0   |
|NUL...        |   44.0   |
|NIZ.......... |   7.0    |
+--------------+----------+

 

For this example, we can create the X dataset by combining the title and description into a single string. We can at the same time, select only the X and y that will needed for the model creation

from pyspark.sql.functions import concat, col, lit
df_train = indexed.select(col('label_index'), concat(col("title"), lit(" "), col("title"), \
 lit(" "), col("title"), lit(" "), col("description")).alias('data'))
df_train.printSchema()
Output:
root
 |-- label_index: double (nullable = true)
 |-- data: string (nullable = true)

We will then tokenize the X to get word tokens for each X

tokenizer = Tokenizer(inputCol="data", outputCol="words")
wordsData = tokenizer.transform(df_train)
trainData = wordsData['label_index','words']
trainData.printSchema()
Output:
root
 |-- label_index: double (nullable = true)
 |-- words: array (nullable = true)
 | |-- element: string (containsNull = true)

Once we have words tokenized, we use different type of Vectorizer to extract the feature-set that will be used for training. For example using, CountVectorizer

cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=5000)
cv_model = cv.fit(trainData)
cv_result = cv_model.transform(trainData)
cv_train_input = cv_result['label_index', 'features']

Or using Tfidfvectorizer

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5000)
featurizedData = hashingTF.transform(trainData)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

We then split the dataset to create train and test sets

(train, test) = rescaledData.randomSplit([0.8, 0.2])

After that we can use NaiveBayes to fit the data to create our model.

nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(train)
type(model)
Output:
pyspark.ml.classification.NaiveBayesModel

We can then use the model to get the predictions for the test set.

predictions = model.transform(test)
predictions.printSchema()
Output:
root
 |-- label: double (nullable = true)
 |-- words: array (nullable = true)
 | |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)

Once the predictions are available, we can a use classification evaluator to calculate the scores.

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
 metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

That’s it. We have our classification accuracy with NaiveBayes implementation in SparkMLlib.

Hope you found this useful.

In the next blog, I will share an classification example for the same problem using OneVsAll implementation.

Advertisement