SPARK Custom Partitioner Java Example

Below is an example of partitioning the data based on custom logic.
For writing a custom partitioner we should extend the Partitioner class , and implement the getPartition() method.For this example I have a input file which contains data in the format of <Continent,Country>.
I would like to re-partition the data based on the first letter of the continent.

Main Class :

import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.PairFunction;


import scala.Tuple2;

public class SparkPartitionExample {

	public static void main(String [] args)
	{
		SparkConf sparkConf = new SparkConf().setAppName("CustomParitioning Example").setMaster("local");
		JavaSparkContext jSparkContext = new JavaSparkContext(sparkConf);
		JavaRDD<String> rdd= jSparkContext.textFile("file:///home/baahu/input.txt");
		
		JavaPairRDD<String,String> pairRDD= rdd.mapToPair(new PairFunction<String,String,String>(){

			@Override
			public Tuple2<String, String> call(String arg0) throws Exception {
				//return a tuple ,split[0] contains continent and split[1] contains country
				return new Tuple2<String,String>(arg0.split(",")[0],arg0.split(",")[1]);
			}});
		
		pairRDD=pairRDD.partitionBy(new CustomPartitioner(4));		
		pairRDD.saveAsTextFile("file:///home/baahu/output");
		
	}
	

}

Custom Partitioner Class:

import org.apache.spark.Partitioner;
import java.util.Random;

class CustomPartitioner extends Partitioner{

	private int numParts;
	
	public CustomPartitioner(int i) {
		numParts=i;
	}

	@Override
	public int numPartitions()
	{
	    return numParts;
	}

	@Override
	public int getPartition(Object key){
		
		//partition based on the first character of the key...you can have your logic here !!
		return ((String)key).charAt(0)%numParts;
	
	}
	
	@Override
	public boolean equals(Object obj){
		if(obj instanceof CustomPartitioner)
		{
			CustomPartitioner partitionerObject = (CustomPartitioner)obj;
			if(partitionerObject.numParts == this.numParts)
				return true;
		}
	
		return false;
	   }
}

Input:

asia,india
north-america,canada
south-america,mexico
asia,china
africa,somalia
south-america,chile
north-america,USA
south-america,brazil
asia,japan

Output : As can be seen , the data of the continents which start the same letter are in the same part file.

[baahu@localhost output]$ cat part-00001
(asia,india)
(asia,china)
(africa,somalia)
(asia,japan)

[baahu@localhost output]$ cat part-00002
(north-america,canada)
(north-america,USA)

[baahu@localhost output]$ cat part-00003
(south-america,mexico)
(south-america,chile)
(south-america,brazil)

0 thoughts on “SPARK Custom Partitioner Java Example

  1. REMONTTert

    Your comment is awaiting moderation.

    Colorado ls and Video
    Colorado ls Colorado ls IMPORTANT SAFETY NOTICE Howa Product Upgrade Registration Howa Machinery, Ltd. of Japan continues to offer a free safety upgrade on all Model 1500, 1550 and 1700LS rifles manufactured between 1970 and 1993. The affected rifles bear serial numbers from one of the following series on the receiver: PN00010 through PN87159, LS00001 through LS05150, M000001 through M020422, LS10001 through LS10309, B000001 through B028450. This safety upgrade is designed to prevent misassembly of the bolt, which may occur on the affected rifles and result in a hazardous condition to the rifle user. Such misassembly might result in a …
    The post Colorado ls and Video appeared first on Mortgages.

    [url=http://coin.remmont.com]New-orleans Business[/url]

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *