In this second blog on Spark pipelines, we will use the spark-nlp library to build text classification pipeline.

To import the spark-nlp library, we first get the SparkSession instance passing the spark-nlp library using the extraClassPath option. The instructions for this are available in the spark-nlp GitHub account.  Need to pay attention to the compatibility between the Spark version and the Spark-nlp version.

from pyspark import SparkConf, SparkContext
import pyspark
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder \
.config("spark.driver.maxResultSize", "2G") \
.config("spark.driver.extraClassPath", "lib/spark-nlp-assembly-1.7.3.jar")\
.config("spark.kryoserializer.buffer.max", "500m")\

I have stored the newsgroup dataset on my personal S3 account but it can download from different sources online, for example from UCI ML Repository.

news_df ="s3a://newsgroup/newgroup1.parquet")

Take a look at the download dataset. Contains 20 labels containing 1000 data samples each.

news_df.groupby('label').agg({"label": "count"}).collect()

[Row(label='', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='talk.politics.guns', count(label)=1000),
 Row(label='talk.politics.misc', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='soc.religion.christian', count(label)=997),
 Row(label='comp.sys.mac.hardware', count(label)=1000),
 Row(label='talk.religion.misc', count(label)=1000),
 Row(label='talk.politics.mideast', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='alt.atheism', count(label)=1000),
 Row(label='sci.electronics', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='', count(label)=1000),
 Row(label='sci.crypt', count(label)=1000)]

Import the spark ml and spark-nlp packages that we will be needing when building the pipeline

from import Pipeline
from import feature as spark_ft
from import clustering as spark_cl
from import classification as spark_cla
from import evaluation as spark_eval
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

Now, we build the spark-nlp pipeline for processing the text. We first convert the text into documents, then use the sentence detecter to split the documents into sentences. We then tokenize the sentences, for this the target pattern option is important – in this case lets tokenize as words. Next we stem and normalize our token to remove dirty characters. Finally, convert all the annotations into string tokens.  Please check the John Snow LABS Spark-nlp documentation for more details about all the available transformers and annotators.

document_assembler = DocumentAssembler() \
.setInputCol("text") \

sentence_detector = SentenceDetector() \
.setInputCols(["document"]) \
.setOutputCol("sentence") \

tokenizer = Tokenizer() \
.setInputCols(["sentence"]) \
.setOutputCol("token") \

stemmer = Stemmer() \
.setInputCols(["token"]) \

normalizer = Normalizer() \
.setInputCols(["stem"]) \

finisher = Finisher() \
.setInputCols(["normalized"]) \
.setOutputCols(["ntokens"]) \
.setOutputAsArray(True) \

We then assemble all the spark-nlp parts into a nlp-pipeline

nlp_pipeline = Pipeline(stages=[document_assembler, sentence_detector, tokenizer, stemmer, normalizer, finisher])

Process the news dataframe using the spark-nlp pipeline. This is mainly to analyze the functionality  that is provided by spark-nlp, finally we will build a single end-to-end pipeline.

nlp_model =
processed = nlp_model.transform(news_df).persist()

Next we build a spark-ml pipeline, contains the same components as in the previous pipeline blog

stopWords = spark_ft.StopWordsRemover.loadDefaultStopWords('english')
sw_remover = spark_ft.StopWordsRemover(inputCol='ntokens', outputCol='clean_tokens', stopWords=stopWords)

tf = spark_ft.CountVectorizer(vocabSize=500, inputCol='clean_tokens', outputCol='tf')

idf = spark_ft.IDF(minDocFreq=5, inputCol='tf', outputCol='idf')
labelIndexer = spark_ft.StringIndexer(inputCol="label", outputCol="indexedLabel", handleInvalid='skip')
labelIndexer_model =

convertor = spark_ft.IndexToString(inputCol='prediction', outputCol='predictedLabel', labels=labelIndexer_model.labels)
rfc = spark_cla.RandomForestClassifier(featuresCol='idf', labelCol='indexedLabel', numTrees=30)

evaluator = spark_eval.BinaryClassificationEvaluator(labelCol='indexedLabel', rawPredictionCol='prediction')

Both spark-nlp and spark-ml pipelines are using spark pipeline package and can be combined together to build a end to end pipeline as below

spark_nlp_pipe = Pipeline(stages=[document_assembler, sentence_detector, tokenizer, stemmer, normalizer, finisher, sw_remover, tf, idf, labelIndexer, rfc, convertor])
train_df, test_df = processed.randomSplit((0.8, 0.2), seed=42)
model =
prediction_rfc_df = model.transform(test_df)

We then use the Spark Multiclass evaluator to evaluate the model accuracy

print ('AUC: Result {}'.format(evaluator.evaluate(prediction_rfc_df)))

In the blog, I also show some ways to interpret the predictions made by our pipeline. We first extract the feature improves from the RandomForest component of the pipelines and also get the tf transformer which contains the vocabulary.

feature_importances = model.stages[-2].featureImportances
feature_imp_array = feature_importances.toArray()
tf_model = model.stages[1]

Next we zip the vocabulary and the feature importance array object and sort using the importance score to get the vocabulary sorted by importance.

feat_imp_list = []
for feature, importance in zip(tf_model.vocabulary, feature_imp_array):
    feat_imp_list.append((feature, importance))

feat_imp_list = sorted(feat_imp_list, key=(lambda x: x[1]), reverse=True)

This feature importance list can be further used for identifying the vocabulary of the sample which can help interpret the prediction for that particular sample.

top_50 = feat_imp_list[0:49]
top_feat_df ='clean_tokens')

top_feature_row = udf(lambda row: [token for token in row.clean_tokens if token in top_features],  ArrayType(StringType()))

top_feature_row_df = top_feat_df.withColumn("top_feats", top_feature_row(struct([top_feat_df[x] for x in top_feat_df.columns])))'top_feats').show(10, False)

Hope you found this useful.

In the next blog, we will build a Spark pipeline using Deep Learning using Intel BIGDL library.