Spark Streaming with MongoDB

Spark streaming enables us to do realtime processing of data streams. In this blog post we will see how data stream coming to Spark over TCP socket can be processed and the result saved into MongoDB. You can extrapolate this example in your applications where you are using MongoDB as the data sink after processing by Spark.
We will use the word count example by sending some words on TCP socket and counting their occurrence using Spark in realtime. The results of the processing will be saved in a collection of the MongoDB.
First download the MongoDB-Spark connector jar into your working directory. Go to Maven repository to get the link to download the jar: https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector_2.11/1.0.0
Ensure you download the correct jar version compatible with your Spark version. Now right click on jar in Files section and copy the link.
Open the terminal and download the jars using the following command:
$>wget http://central.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.11/1.0.0/mongo-spark-connector_2.11-1.0.0.jar
Launch MongoDB using the following commands and create a ‘streaming’ collection under a db e.g. ‘vaibhavstreaming’
$>mongo
>use vaibhavstreaming
>db.createCollection("streaming")
Open terminal and execute the following command to start Spark shell using the MongoDB as data store.
$>spark-shell –conf "spark.mongodb.input.uri=mongodb://127.0.0.1/vaibhavstreaming.streaming?readPreferenc e=primaryPreferred" --conf "spark.mongodb.input.partitioner=MongoPaginateBySizePartitioner" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/vaibhavstreaming.streaming" --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
Import the dependencies for MongoDB and Spark.
import com.mongodb.spark.sql._
import org.apache.spark.streaming._
Initiate SparkContext which is the main entry point of SparkStreaming with the batch duration of 1 second which means the streaming data will be grouped into micro-batches of 1 second time interval.
val ssc = new StreamingContext(sc, Seconds(1))
Create input stream from TCP source which in our example is localhost as we will be listening to the port (9992) on the same machine.
val lines = ssc.socketTextStream("localhost", 9992)
Now write the word count code and save the results into MongoDB. Refer to the highlighted line where the results are written into the mongoDB collection in append mode.
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
case class WordCount(word: String, count: Int)
wordCounts.foreachRDD({ rdd =>
import sqlContext.implicits._
val wordCounts = rdd.map({ case (word: String, count: Int)
=> WordCount(word, count) }).toDF()
wordCounts.write.mode("append").mongo()
})
ssc.start()
Open another terminal window and connect to spark to feed data into it using the Netcat command.
Note: Use the same port you have specified in Spark socket text stream.
$>nc -lk 9992
Input some words for spark stream. e.g.
new new new
delhi delhi
raining
Then exist the Netcat prompt and connect to MongoDB to check if the processed results are persisted in database.
$>mongo
>use vaibhavstreaming
>db.streaming.find()
{ "_id" : ObjectId("5b5d39082c30d13e019bdc7e"), "word" : "new", "count" : 3 }
{ "_id" : ObjectId("5b5d390c2c30d13e019bdc7f"), "word" : "delhi", "count" : 2 }
{ "_id" : ObjectId("5b5d3b0f2c30d13e019bdc82"), "word" : "raining", "count" : 1 }
As you can see, the data you have written to TCP port has been processed and saved successfully in a MongoDB collection.