Kafka – Spark Streaming Integration

Kafka – Spark Streaming Integration

Spark streaming is a distributed stream processing engine which can ingest data from various sources. One of the most popular source is Apache Kafka which is a distributed streaming platform providing you publish and subscribe features of an enterprise messaging system while also supporting data stream processing.

In this blog we will create a realtime streaming pipeline for ingesting credit card data and finding Merchants with highest fraud cases.

 

First download the Spark Streaming Kafka Assembly jar file from MVN Repository.
Site: https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-assembly
Select the compatible version and copy the link to download jar using wget command.
wget http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-assembly_2.10/1.6.0/spark-streaming-kafka-assembly_2.10-1.6.0.jar

 

Add ‘bin’ folder of Kafka folder to Path environment variable so as to execute the Kafka commands.
export PATH=$PATH:/usr/hdp/current/kafka-broker/bin/

 

Create a Kafka topic ‘vsbdi’ which will be used to publish the credit card data.
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic vsbdi

 

Open terminal and start Spark Shell.
spark-shell --master local[2] --conf "spark.dynamicAllocation.enabled=false" --jars /home/vaibhavsharma806060/spark-streaming-kafka-assembly_2.10-1.6.0.jar

 

Import all dependencies
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext

 

Start Spark Streaming Context.
val ssc = new StreamingContext(sc,Seconds(10))

 

Set log level to ERROR.
sc.setLogLevel("ERROR")

 

Create an input stream to pull messages from Kafka broker ‘vsbdi’.
val kvs = KafkaUtils.createStream(ssc,"localhost:2181", "spark-streaming-consumer", Map("vsbdi"-> 1))

 

Write the Spark program to process data from the input stream to find out the merchants with maximum frauds.
val lines = kvs.map(_._2)
val data = lines.map(line => line.split(","))

case class creditData(Merchant_id:String, Average_Amount_transaction_day:String, Transaction_amount:String, Is_declined:String, Total_Number_of_declines_day:String, isForeignTransaction:String, isHighRiskCountry:String, Daily_chargeback_avg_amt:String, six_month_avg_chbk_amt:String, six_month_chbk_freq:String, isFradulent:String)

data.foreachRDD({ rdd =>
import sqlContext.implicits._
val credit_s = rdd.map(row => creditData(row(0), row(1), row(2), row(3), row(4), row(5), row(6), row(7), row(8), row(9), row(10))).toDF()
credit_s.registerTempTable("credit_code")

val query1 = sqlContext.sql("select distinct(Merchant_id) from credit_code where isFradulent = 'Y'")
query1.show(5)
})
ssc.start()

scala> ssc.start()
scala>
scala> +-----------+
|Merchant_id|
+-----------+
+-----------+

 

Now the Spark Streaming engine is waiting for data to be sent over the pipeline.

Open another terminal and start the Kafka Producer and send the data file ‘creditcard.csv’ with Credit Card Fraud details over it.
kafka-console-producer.sh --broker-list ip-172-31-13-154.ec2.internal:6667 --topic vsbdi < /home/vaibhavsharma806060/creditcard.csv

 

Next go back to the Spark Streaming terminal and press enter on the prompt.
scala> +-----------+
|Merchant_id|
+-----------+
| 3359690891|
| 5516509929|
| 6062032943|
| 5614772190|
| 4192972254|
+-----------+
only showing top 5 rows

 

You can see the top 5 results have been outputted on the terminal window. Now stop the Spark Shell by pressing Ctrl-C keys.

18/08/06 19:07:03 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver

 

We have seen in this blog how easy it is to create a data processing pipeline with data ingestion through Kafka followed by Spark Streaming for realtime processing of the data.