In the previous blog I shared how to use DataFrames with pyspark on a Spark Cassandra cluster. As a followup, in this blog I will share implementing Naive Bayes classification for a multi class classification problem.
We will use the same dataset as the previous example which is stored in a Cassandra table and contains several text fields and a label. Below is the Cassandra table schema:
create table sample_logs( sample_id text PRIMARY KEY, title text, description text, label text, log_links frozen ‹list‹map‹text,text›››, rawlogs text, cleanedlogs text, state text);
As in previous blog, i will use Jupyter notebook to implement the classification model.
PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip 192.168.0.21' pyspark
We being by reading the table into a DataFrame.
log_df = spark.read\ .format("org.apache.spark.sql.cassandra")\ .options(table="sample_logs", keyspace="ml_cases")\ .load()
Check the number of records that we have read.
log_df.count() output:15070
As a first step, we can analyze the label that we are having to see how many samples are there for each label
from pyspark.sql import functions as F df_grouped = grouped.agg(F.count(df_log['label']).alias('count')) df_grouped.show(5) output: +--------------------+-----+ | label |count| +--------------------+-----+ | NIU.... | 140 | | NIH.... | 100 | | LT..... | 50 | | RAB.... | 200 | | LT..... | 100 | +--------------------+-----+ only showing top 5 rows
In case, there is significant variance in the sample count for each label and we would like to keep only labels with a minimum number samples, we can drop some of the labels using filter function.
grouped_25 = df_grouped.filter(df_grouped['count'] › 25) grouped_25.show(5) df_log = df_log.join(grouped_25, ['label']) output:
We could also a define a function to filter out groups
def min_samples(r): if r ›= 10: return 1 else: return 0 min_samples_func = F.udf(min_samples, IntegerType())
Once we have label analyzed, we need to encode the labels which are strings into floats. We can use the StringIndexer function for this.
indexer = StringIndexer(inputCol="label", outputCol="label_index") model = indexer.fit(df_log) indexed = model.transform(df_log) indexed.printSchema() Output: root |-- id: string (nullable = true) |-- log_links: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) |-- cleanedlogs: string (nullable = true) |-- description: string (nullable = true) |-- label: string (nullable = true) |-- rawlogs: string (nullable = true) |-- state: string (nullable = true) |-- title: string (nullable = true) |-- label_index: double (nullable = true)
Take a look at the labels after indexing.
indexed.select('label','label_index').show(5, False) Output: +--------------+----------+ |label | label_in.| +--------------+----------+ |RAB... | 2.0 | |RABL... | 1.0 | |LT... | 19.0 | |NUL... | 44.0 | |NIZ.......... | 7.0 | +--------------+----------+
For this example, we can create the X dataset by combining the title and description into a single string. We can at the same time, select only the X and y that will needed for the model creation
from pyspark.sql.functions import concat, col, lit df_train = indexed.select(col('label_index'), concat(col("title"), lit(" "), col("title"), \ lit(" "), col("title"), lit(" "), col("description")).alias('data')) df_train.printSchema() Output: root |-- label_index: double (nullable = true) |-- data: string (nullable = true)
We will then tokenize the X to get word tokens for each X
tokenizer = Tokenizer(inputCol="data", outputCol="words") wordsData = tokenizer.transform(df_train) trainData = wordsData['label_index','words'] trainData.printSchema() Output: root |-- label_index: double (nullable = true) |-- words: array (nullable = true) | |-- element: string (containsNull = true)
Once we have words tokenized, we use different type of Vectorizer to extract the feature-set that will be used for training. For example using, CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=5000) cv_model = cv.fit(trainData) cv_result = cv_model.transform(trainData) cv_train_input = cv_result['label_index', 'features']
Or using Tfidfvectorizer
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5000) featurizedData = hashingTF.transform(trainData) idf = IDF(inputCol="rawFeatures", outputCol="features") idfModel = idf.fit(featurizedData) rescaledData = idfModel.transform(featurizedData)
We then split the dataset to create train and test sets
(train, test) = rescaledData.randomSplit([0.8, 0.2])
After that we can use NaiveBayes to fit the data to create our model.
nb = NaiveBayes(smoothing=1.0, modelType="multinomial") model = nb.fit(train) type(model) Output: pyspark.ml.classification.NaiveBayesModel
We can then use the model to get the predictions for the test set.
predictions = model.transform(test) predictions.printSchema() Output: root |-- label: double (nullable = true) |-- words: array (nullable = true) | |-- element: string (containsNull = true) |-- rawFeatures: vector (nullable = true) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = true)
Once the predictions are available, we can a use classification evaluator to calculate the scores.
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test set accuracy = " + str(accuracy))
That’s it. We have our classification accuracy with NaiveBayes implementation in SparkMLlib.
Hope you found this useful.
In the next blog, I will share an classification example for the same problem using OneVsAll implementation.