SPARK :Add a new column to a DataFrame using UDF and withColumn()

In this post I am going to describe with example code as to how we can add a new column to an existing DataFrame using withColumn() function of DataFrame.
There are 2 scenarios:

  1. The content of the new column is derived from the values of the existing column
  2. The new column is going to have just a static value (i.e not depended on other columns)

Scenario 1:

  1. We have a DataFrame with 2 columns of Integer type, we would like to add a third column which is sum these 2 columns.
  2. We would initially read the data from a file into an RDD[String].
  3. Convert this RDD[String] into a RDD[Row]
  4. Please note that the contents that we read from the file using textfile() are of type string,we are converting the numbers in String Type to Integer type.
  5. Create a DataFrame “inputDataFrame” from the RDD[Row] “inputRows”
  6. Create a anonymous function “addColumn” which takes 2 Integers and returns the sum of those two.
  7. Create a udf “addColumnUDF” using the addColumn anonymous function
  8. Now add the new column using the withColumn() call of DataFrame. The first parameter “sum” is the name of the new column, the second parameter is the call to the UDF “addColumnUDF”. To the udf “addColumnUDF” we pass 2 columns of the DataFrame “inputDataFrame”.

The udf will be invoked on every row of the DataFrame and adds a new column “sum” which is addition of the existing 2 columns.
Sample input

12,1
13,5
14,2
15,1
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType,DoubleType,IntegerType}
import org.apache.spark.sql.functions.udf

object UDFExample {
  def main(args : Array[String]):Unit =
  {
    val sc= new SparkContext(new SparkConf().setAppName("UDFExample"))
    val sqlContext = new SQLContext(sc)
    //Dataframe contains 2 columns
    val schemaString = "FirstNum,SecondNum"
    val schema=StructType(schemaString.split(",",-1).map(fieldName =>StructField(fieldName,IntegerType)))

    //Load the textfile and generate a DataFrame with 2 integer columns
    val inputRows = sc.textFile("file:///home/xyz/example.txt").map((p=>Row.fromSeq(p.split(",",-1) ))).map({ case Row(i: java.lang.String,j: java.lang.String) => Row(i.toInt,j.toInt) })
    val inputDataFrame = sqlContext.createDataFrame(inputRows, schema)

    //Anomynous function which takes 2 Integer and returns the sum
    val addColumn : (Int,Int)=>Int=(num1:Int,num2:Int)=>{num1+num2}
    
    //Declare the UDF
    val addColumnUDF = udf(addColumn)
    
    //Add the new column "sum" by calling the udf
    val output = inputDataFrame.withColumn("sum",addColumnUDF(inputDataFrame.col("FirstNum"),inputDataFrame.col("SecondNum")))
    
    output.take(4).foreach { println }
  }
}

Scenario 2:
If we just want to add a column with a static dummy value as then below code would come in handy.

import org.apache.spark.sql.functions.lit
.
.
    val addDummyColumn :(String)=>String=(data:String)=>{data}
    val dummyColUDF= udf(addDummyColumn)
     val output = inputDataFrame.withColumn("Name",dummyColUDF(lit("dummy data")))

Leave a Reply

Your email address will not be published. Required fields are marked *