This blog is first in a series focussing on building machine learning pipelines in Spark.

In this blog, we will build a text classifier pipeline for news group dataset using SparkML package

First lets import the packages we will need

from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

Let’s load the news groups dataset into a spark RDD. I have downloaded and untared the dataset to /var/tmp on my driver node.

news_rdd = sc.wholeTextFiles('/var/tmp/20_newsgroups/*', minPartitions=25).cache()

Next step, convert the RDD into Spark DataFrame which provides better performance and an easy to use API.

news_df = news_rdd.toDF(["filePath", "document"]).cache()
news_df.printSchema()

[Output]
root
 |-- filePath: string (nullable = true)
 |-- document: string (nullable = true)

The filePath column contains the path of the document from which we need to extract the newsgroup label – this is the label we will target to predict with our pipeline

news_df.select('filePath').show(5, False)

[Output]
+--------------------------------------------------+
|filePath                                          |
+--------------------------------------------------+
|file:/var/tmp/20_newsgroups/rec.motorcycles/104558|
|file:/var/tmp/20_newsgroups/rec.motorcycles/105214|
|file:/var/tmp/20_newsgroups/rec.motorcycles/104578|
|file:/var/tmp/20_newsgroups/rec.motorcycles/103139|
|file:/var/tmp/20_newsgroups/rec.motorcycles/104966|
+--------------------------------------------------+
only showing top 5 rows

For this, we implement a user defined function which uses regex to extract the label from the file path.

from pyspark.sql.functions import udf
@udf
def get_label(input_row):
    import re
    pat = r'(file:.*/)(?P<label>(.*))(/.*$)'
    patc = re.compile(pat)
    match = patc.search(input_row)
    if match != None:
        return (match.group('label'))
news_df = news_df.withColumn('filePath', get_label(news_df.filePath))
news_df.select('filePath').show(5, False)

[Output]
+---------------+
|filePath       |
+---------------+
|rec.motorcycles|
|rec.motorcycles|
|rec.motorcycles|
|rec.motorcycles|
|rec.motorcycles|
+---------------+
only showing top 5 rows

We then rename the filePath column to label.

news_df = news_df.selectExpr("filePath as label", "document")

We now begin building the components for our pipeline. First up, we have the Tokenizer for splitting the document sentences to tokens of words. The parameter pattern allows us to control how to split the documents to words.

tokenizer = RegexTokenizer(inputCol="document", outputCol="tokens", pattern='\\W+')

Next we clean the tokens by removing the stop words. Spark uses the same default stop words list as sklearn and the list can be found from “Glasgow Information Retrieval Group” http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words

remover = StopWordsRemover(inputCol='tokens', outputCol='nostops')

We need to convert the string labels into numeric values and we do it with the Spark StringIndexer

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel", handleInvalid='skip')
labelIndexer_model = labelIndexer.fit(removedStopDF)

We also need to create reverse indexer to get back our string label from the numeric labels

convertor = IndexToString(inputCol='prediction', outputCol='predictedLabel', labels=labelIndexer_model.labels)

For this example, we can use CountVectorizer to convert the text tokens into a feature vectors. CountVectorizer is a very simple option and we can use any other Vectorizer for generating feature vector

vectorizer = CountVectorizer(inputCol='nostops', outputCol='features', vocabSize=1000)

Lets use RandomForestClassifier for our classification task

rfc = RandomForestClassifier(featuresCol='features', labelCol='indexedLabel', numTrees=20)

Now with all the components of our pipeline ready we create a Spark pipeline

pipe_rfc = Pipeline(stages=[tokenizer, remover, labelIndexer, vectorizer, rfc, convertor])

With the pipeline ready, we split our dataset into train and test sets. We fit the pipeline with the train set and make the predictions with the test set

train_df, test_df = news_df.randomSplit((0.8, 0.2), seed=42)
model = pipe_rfc.fit(train_df)
prediction_rfc_df = model.transform(test_df)

We use the Spark Multiclass evaluator to evaluate the model accuracy

evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(prediction_rfc_df)
print ('accuracy is: {0:2f}'.format(accuracy))

[Output]
accuracy is: 0.914814

In our case we got an accuracy of 91%.

We can take a look at the label and predictions in our prediction result dataframe

prediction_rfc_df.groupBy('indexedLabel', 'prediction').count().orderBy('indexedLabel', 'prediction').show(50)

[Output]
+------------+----------+-----+
|indexedLabel|prediction|count|
+------------+----------+-----+
|         0.0|       0.0|  170|
|         0.0|       1.0|    2|
|         0.0|       8.0|    1|
|         0.0|      12.0|    8|
|         1.0|       0.0|    6|
|         1.0|       1.0|  159|
|         1.0|       8.0|    7|
|         1.0|       9.0|    1|
|         1.0|      12.0|    8|
|         1.0|      17.0|    1|
+------------+----------+-----+
only showing top 10 rows

Hope you found this useful. In the following blogs, we will build a Spark pipeline using Spark-NLP and Deep Learning using Intel BIGDL library.