Spark CombineByKey

baahu   September 24, 2016   No Comments on Spark CombineByKey

CombineByKey() is very similar to combiner of the mapreduce framework. In the MR framework, the combiner function is called in the map phase to do a local reduction and this value is then eventually sent over to reducer, this results in large savings if network bandwith.
In SPARK, groupByKey() doesnt do any local aggregation while computing on the partition’s data, this is where combineByKey() comes in handy.
In combineByKey values are merged into one value at each partition, finally each value from each partition is merged into a single value.

combineByKey takes 3 function arguments:

  1. createCombiner: This function is invoked only once for every key.It creates the initial value of the accumulator
  2. mergeValue: This function is called to add the new value to the existing accumulator of that key(which was created by createCombiner)
  3. mergeCombiners: This Function is called to combine values of a key across multiple partitions

An easy example to explain combineByKey would be find average of marks that a student has scored.

import org.apache.spark.{SparkConf, SparkContext}
object ExampleCombineByKey  {
  
   //def main(args : Array[String]):Unit =
   def main(args : Array[String]):Unit =
  {
      type ScoreCollector = (Int, Double)
      type PersonScores = (String, (Int, Double))
      //type exampleEntry = 
    
      val sc= new SparkContext(new SparkConf().setAppName("combineByKeyExample").setMaster("local"))
      val score = Array(("Ajay", 98.0), ("Bill", 85.0), ("Bill", 81.0), ("Ajay", 90.0), ("Ajay", 85.0), ("Bill", 88.0))
     
      val rdd = sc.parallelize(score)
      //Create the combiner
      val combiner = (score:Double) => (1,score)
      
      //Function to merge the values with in a partition.Add 1 to the # of entries and score to the existing score
      val mergeValue = (collector:ScoreCollector,score:Double) =>{  
        (collector._1 + 1, collector._2 + score)
      }
      
      //Function to merge across the partitions
      val mergeCombiners = (collector1:ScoreCollector , collector2:ScoreCollector)=>{
        (collector1._1+collector2._1 , collector1._2+collector2._2)
      }
      
      //Function to calculate the average.PersonScores is a custom type
      val CalculateAvg = (personScore:PersonScores)=>{
        val (name,(numofScores,score)) = personScore
        (name,score/numofScores)
      }
     val rdd1=rdd.combineByKey(combiner, mergeValue, mergeCombiners).map( CalculateAvg)
       //val rdd2=rdd.combineByKey(combiner, mergeValue, mergeCombiners)
      rdd1.collect().foreach(println)
  }
}
Output is:
(Bill,84.66666666666667)
(Ajay,91.0)

Leave a Reply

Your email address will not be published. Required fields are marked *