Spark Streaming using Kafka – Java Example

Below is a simple Spark Streaming application which reads data from kafka topic and prints the content.

I have installed kafka in my laptop. auto commit of offset is disabled since I am committing the offset to kafka using commitAsync API.

Every batch of data will contain 1 second worth of data from Kafka topic, this duration is specified while creating JavaStreamingContext object

I am using Direct Stream using KafkaUtils.createDirectStream() , so the number of tasks spawned by the spark job will be equal to the number of partitions in Kafka topic.

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.derby.impl.sql.catalog.SYSROUTINEPERMSRowFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;


import javafx.util.Duration;

public class StreamingExample {
	public static void main(String[] arr)
	{
		SparkConf sparkConf = new SparkConf();
		sparkConf.setAppName("kafkastreamingclient");
		sparkConf.setMaster("local[*]");

		
		JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf,Durations.seconds(1));
		Collection<String> topics = Arrays.asList("twitter_topic");
		
		Map<String,Object> kafkaParams =  new HashMap<>();
		kafkaParams.put("bootstrap.servers", "localhost:9092");
		kafkaParams.put("value.deserializer", StringDeserializer.class);
		kafkaParams.put("key.deserializer", StringDeserializer.class);
		kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream_");
		kafkaParams.put("auto.offset.reset", "latest");
		kafkaParams.put("enable.auto.commit", false);
		JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
				ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
				  );
		
		/*Get the text and put into JavaDstream*/
		JavaDStream<String> message = stream.map(entry->entry.value());
		/*foreachRDD is the "output" function*/
		message.foreachRDD(rdd -> { 
									System.out.println("Number of messages:"+rdd.count());  
									rdd.foreach(val->System.out.println(val)); }
			  
							);
		/*Commit the offsets for every partition*/
		stream.foreachRDD(rdd -> {
			  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
			  rdd.foreachPartition(consumerRecords -> {
			    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
			    System.out.println(
			      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
			  });
			  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
			  System.out.println("OffsetRanges:"+offsetRanges);
			});
		
		
		streamingContext.start();
		try {
			streamingContext.awaitTermination();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

}