SPARK Custom Partitioner Java Example

Below is an example of partitioning the data based on custom logic.
For writing a custom partitioner we should extend the Partitioner class , and implement the getPartition() method.For this example I have a input file which contains data in the format of <Continent,Country>.
I would like to re-partition the data based on the first letter of the continent.

Main Class :

import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;


import scala.Tuple2;

public class SparkPartitionExample {

	public static void main(String [] args)
	{
		SparkConf sparkConf = new SparkConf().setAppName("CustomParitioning Example").setMaster("local");
		JavaSparkContext jSparkContext = new JavaSparkContext(sparkConf);
		JavaRDD<String> rdd= jSparkContext.textFile("file:///home/baahu/input.txt");
		
		JavaPairRDD<String,String> pairRDD= rdd.mapToPair(new PairFunction<String,String,String>(){

			@Override
			public Tuple2<String, String> call(String arg0) throws Exception {
				//return a tuple ,split[0] contains continent and split[1] contains country
				return new Tuple2<String,String>(arg0.split(",")[0],arg0.split(",")[1]);
			}});
		
		pairRDD=pairRDD.partitionBy(new CustomPartitioner(4));		
		pairRDD.saveAsTextFile("file:///home/baahu/output");
		
	}
	

}

Custom Partitioner Class:

import org.apache.spark.Partitioner;
import java.util.Random;

class CustomPartitioner extends Partitioner{

	private int numParts;
	
	public CustomPartitioner(int i) {
		numParts=i;
	}

	@Override
	public int numPartitions()
	{
	    return numParts;
	}

	@Override
	public int getPartition(Object key){
		
		//partition based on the first character of the key...you can have your logic here !!
		return ((String)key).charAt(0)%numParts;
	
	}
	
	@Override
	public boolean equals(Object obj){
		if(obj instanceof CustomPartitioner)
		{
			CustomPartitioner partitionerObject = (CustomPartitioner)obj;
			if(partitionerObject.numParts == this.numParts)
				return true;
		}
	
		return false;
	   }
}

Input:

asia,india
north-america,canada
south-america,mexico
asia,china
africa,somalia
south-america,chile
north-america,USA
south-america,brazil
asia,japan

Output : As can be seen , the data of the continents which start the same letter are in the same part file.

[baahu@localhost output]$ cat part-00001
(asia,india)
(asia,china)
(africa,somalia)
(asia,japan)

[baahu@localhost output]$ cat part-00002
(north-america,canada)
(north-america,USA)

[baahu@localhost output]$ cat part-00003
(south-america,mexico)
(south-america,chile)
(south-america,brazil)

Leave a Reply

Your email address will not be published. Required fields are marked *