This blog is first in a series focussing on building machine learning pipelines in Spark.
In this blog, we will build a text classifier pipeline for news group dataset using SparkML package
First lets import the packages we will need
from pyspark.ml.feature import RegexTokenizer from pyspark.ml.feature import StopWordsRemover from pyspark.ml.feature import CountVectorizer from pyspark.ml.feature import StringIndexer from pyspark.ml.feature import IndexToString from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml import Pipeline
Let’s load the news groups dataset into a spark RDD. I have downloaded and untared the dataset to /var/tmp on my driver node.
news_rdd = sc.wholeTextFiles('/var/tmp/20_newsgroups/*', minPartitions=25).cache()
Next step, convert the RDD into Spark DataFrame which provides better performance and an easy to use API.
news_df = news_rdd.toDF(["filePath", "document"]).cache() news_df.printSchema() [Output] root |-- filePath: string (nullable = true) |-- document: string (nullable = true)
The filePath column contains the path of the document from which we need to extract the newsgroup label – this is the label we will target to predict with our pipeline
news_df.select('filePath').show(5, False) [Output] +--------------------------------------------------+ |filePath | +--------------------------------------------------+ |file:/var/tmp/20_newsgroups/rec.motorcycles/104558| |file:/var/tmp/20_newsgroups/rec.motorcycles/105214| |file:/var/tmp/20_newsgroups/rec.motorcycles/104578| |file:/var/tmp/20_newsgroups/rec.motorcycles/103139| |file:/var/tmp/20_newsgroups/rec.motorcycles/104966| +--------------------------------------------------+ only showing top 5 rows
For this, we implement a user defined function which uses regex to extract the label from the file path.
from pyspark.sql.functions import udf @udf def get_label(input_row): import re pat = r'(file:.*/)(?P<label>(.*))(/.*$)' patc = re.compile(pat) match = patc.search(input_row) if match != None: return (match.group('label')) news_df = news_df.withColumn('filePath', get_label(news_df.filePath)) news_df.select('filePath').show(5, False) [Output] +---------------+ |filePath | +---------------+ |rec.motorcycles| |rec.motorcycles| |rec.motorcycles| |rec.motorcycles| |rec.motorcycles| +---------------+ only showing top 5 rows
We then rename the filePath column to label.
news_df = news_df.selectExpr("filePath as label", "document")
We now begin building the components for our pipeline. First up, we have the Tokenizer for splitting the document sentences to tokens of words. The parameter pattern allows us to control how to split the documents to words.
tokenizer = RegexTokenizer(inputCol="document", outputCol="tokens", pattern='\\W+')
Next we clean the tokens by removing the stop words. Spark uses the same default stop words list as sklearn and the list can be found from “Glasgow Information Retrieval Group” http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words
remover = StopWordsRemover(inputCol='tokens', outputCol='nostops')
We need to convert the string labels into numeric values and we do it with the Spark StringIndexer
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel", handleInvalid='skip') labelIndexer_model = labelIndexer.fit(removedStopDF)
We also need to create reverse indexer to get back our string label from the numeric labels
convertor = IndexToString(inputCol='prediction', outputCol='predictedLabel', labels=labelIndexer_model.labels)
For this example, we can use CountVectorizer to convert the text tokens into a feature vectors. CountVectorizer is a very simple option and we can use any other Vectorizer for generating feature vector
vectorizer = CountVectorizer(inputCol='nostops', outputCol='features', vocabSize=1000)
Lets use RandomForestClassifier for our classification task
rfc = RandomForestClassifier(featuresCol='features', labelCol='indexedLabel', numTrees=20)
Now with all the components of our pipeline ready we create a Spark pipeline
pipe_rfc = Pipeline(stages=[tokenizer, remover, labelIndexer, vectorizer, rfc, convertor])
With the pipeline ready, we split our dataset into train and test sets. We fit the pipeline with the train set and make the predictions with the test set
train_df, test_df = news_df.randomSplit((0.8, 0.2), seed=42) model = pipe_rfc.fit(train_df) prediction_rfc_df = model.transform(test_df)
We use the Spark Multiclass evaluator to evaluate the model accuracy
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='accuracy') accuracy = evaluator.evaluate(prediction_rfc_df) print ('accuracy is: {0:2f}'.format(accuracy)) [Output] accuracy is: 0.914814
In our case we got an accuracy of 91%.
We can take a look at the label and predictions in our prediction result dataframe
prediction_rfc_df.groupBy('indexedLabel', 'prediction').count().orderBy('indexedLabel', 'prediction').show(50) [Output] +------------+----------+-----+ |indexedLabel|prediction|count| +------------+----------+-----+ | 0.0| 0.0| 170| | 0.0| 1.0| 2| | 0.0| 8.0| 1| | 0.0| 12.0| 8| | 1.0| 0.0| 6| | 1.0| 1.0| 159| | 1.0| 8.0| 7| | 1.0| 9.0| 1| | 1.0| 12.0| 8| | 1.0| 17.0| 1| +------------+----------+-----+ only showing top 10 rows
Hope you found this useful. In the following blogs, we will build a Spark pipeline using Spark-NLP and Deep Learning using Intel BIGDL library.