SPARK : How to generate Nested Json using Dataset

I have come across requirements where in I am supposed to generate the output in nested Json format.Below is a sample code which helps to do the same.The input to this code is a csv file which contains 3 columns .

  1. company name
  2. department
  3. employee name

Example:

google,jessica,sales
google,sita,technology

We need to generate a json which contains company name at the top most level and then list of employees per department as below:

COMPANY: 
        DEPARTMENTS:
                    EMPLOYEE LIST

Below code reads the csv file into dataset and create a temp view “employees” over it.First we group the employees and create a “list” of employees per-company-per-deparment(grouping on company and department).Then we will create a struct of deparment and employee list and group it based on company.Finally we create the json and display it.

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;

import scala.collection.mutable.WrappedArray;
public class JsonExample {
	public static void main(String [] args)
	{
		SparkSession sparkSession = SparkSession
			      .builder()
			      .appName("JsonExample")
			      .master("local")
			      .getOrCreate();
		
		//read the csv file
		Dataset<Row> employees = sparkSession.read().option("header", "true").csv("/tmp/data/emp.csv");
		//create the temp view
		employees.createOrReplaceTempView("employees");
		
		//First , group the employees based on company AND department 
		sparkSession.sql("select company,department,collect_list(name) as department_employees from employees group by company,department").createOrReplaceTempView("employees");
		/*Now create a struct by invoking the UDF create_struct. 
		 * The struct will contain department and the list of employees 
		*/
		sparkSession.sql("select company,collect_list(struct(department,department_employees)) as department_info from employees group by company").toJSON().show(false);
		
	
		
	}
}

Sample Input:

company,name,department
google,samantha,admin
apple,ram,sales
apple,laxman,sales
apple,shiva,technology
google,jessica,sales
google,sita,technology
google,parvathi,technology

output:

{"company":"apple","departments":[{"department":"technology","department_employees":["shiva"]},{"department":"sales","department_employees":["ram","laxman"]}]}                                                                 |
{"company":"google","departments":[{"department":"sales","department_employees":["jessica"]},{"department":"admin","department_employees":["samantha"]},{"department":"technology","department_employees":["sita","parvathi"]}]}|

Formatted Output:

Leave a Reply

Your email address will not be published. Required fields are marked *