Streaming Analysis Using Spark ML in StreamSets DataOps Platform | StreamSets

Dash Desai
4 min readApr 9, 2020

Learn how to load a serialized Spark ML model stored in MLeap bundle format on Databricks File System (DBFS), and use it for classification on new, streaming data flowing through the StreamSets DataOps Platform.

In my previous blogs, I illustrated how easily you can extend the capabilities of StreamSets Transformer using Scala and PySpark. If you have not perused blogs train Spark ML Random Forest Regressor model, serialize the trained model , train Logistic Regression NLP model , I highly recommend it before proceeding because this blog builds upon them.

Ok, let’s get right to it!

Streaming Data: Twitter to Kafka

I’ve designed this StreamSets Data Collector pipeline to ingest and transform tweets, and store them in Kafka. This pipeline is the main source of our streaming data that we will perform sentiment analysis on in the second pipeline.

Pipeline overview:

Here’s an example of the original Twitter Search API response as ingested by the HTTP Client origin.

And here’s an example of the transformed tweet written to Kafka.

Classification on Streaming Data: Kafka to Spark ML Model to Databricks

As detailed in my previous blogs, let’s assume that you have trained and serialized a model in MLeap bundle format, and it’s stored on DBFS as shown below.

Next up… I’ve designed this StreamSets Transformer pipeline running on Databricks cluster.

It basically takes the input data(frame) and if it contains column “text” (tweet), it loads the NLP model (“spark_nlp_model.zip”) and classifies each tweet. Then it creates a new dataframe with just the tweet and its classification stored in “prediction” column. (Note that you could also pass along/include all columns present in the input dataframe instead of just the two-”text” and “prediction”.)

Analysis on Databricks

Once the tweets, along with their classification, are stored on the Databricks File System, they’re ready for querying in Databricks Notebook.

Here I’ve created a dataframe that reads all the Parquet files output by the second pipeline in DBFS location /dash/nlp/ and shows what the data looks like.

Query the tweets and their classification

Here I’ve created a temp table that reads the same data stored in /dash/nlp/ DBFS location and an aggregate query showing total number of positive tweets vs negative tweets.

Create temp table and aggregate data

In the demo video, I’ve also shown how to create and run structured streaming query in Databricks to auto-update the counts-total number of positive and negative sentiment tweets-without having to manually refresh the source dataframe as new data is flowing in from the second pipeline.

Good News!

Based on my model and the data I’ve collected, there appears to be more positive sentiments than negative sentiments when it comes to # quarantinelife hashtag. That is something to feel good about! 🙂

In all honesty and fairness though, it goes without saying that the model accuracy depends on the size and quality of the training and test datasets as well as feature engineering and hyperparameter tuning-which isn’t exactly the point of this blog; rather to showcase how StreamSets DataOps Platform can be used and extended for variety of use cases.

Learn more about StreamSets for Databricks and StreamSets DataOps Platform which is available on Microsoft Azure Marketplace and AWS Marketplace.

Originally published at https://streamsets.com on April 9, 2020.

--

--

Dash Desai

Lead Developer Advocate @ Snowflake | AWS Machine Learning Specialty | #DataScience | #ML | #CloudComputing | #Photog