Spark : How to Get the file name for a record of an RDD

There would be occasions where in we would also need to know the name of the file that we are processing. We can use wholetextfile() which returns a PairRDD of filename and filecontents. wholetextfile() put whole contents of the file in to a single record, which means if we are reading a 2GB file then whole of the this 2GB data is put into the RDD as a single record, which could prove detrimental and would be counter productive .To overcome such situations we can read the data split by split as provided in the below approach.

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.spark.api.java.JavaNewHadoopRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SparkSession;


import scala.Tuple2;



public class FileNameInSpark {

	public static void main(String[] args) {

		SparkSession sparkSession = SparkSession
			      .builder()
			      .appName("FileNameInSpark")
			      .master("local")
			      .getOrCreate();
		JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

		JavaPairRDD<LongWritable,Text> javaPairRDD = javaSparkContext.newAPIHadoopFile("/temp/text.txt", 
																						TextInputFormat.class, LongWritable.class, Text.class,new Configuration());
		
		JavaNewHadoopRDD<LongWritable,Text> hadoopRDD = (JavaNewHadoopRDD<LongWritable,Text>) javaPairRDD;
        
		/*read split by split and return the file name and the record.Now the RDD contains Tuple2<String,String> as a record.
		 * Please note that this is a RDD(not a pairRDD) of Tuple2<String,String> As records.
		 *So convert this to a pair RDD by invoking mapToPair()*/
		JavaPairRDD<String,String>fileNameAndRecordsRDD =
				hadoopRDD.mapPartitionsWithInputSplit(new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String,String>>>(){
					
					@Override
					public Iterator<Tuple2<String,String>> call(InputSplit arg0,
							Iterator<Tuple2<LongWritable, Text>> dataIterator) throws Exception {
	
							FileSplit fileSplit = (FileSplit) arg0;
							//Retrieve the file name from the split
							String fileLocation = fileSplit.getPath().toString();
							List<Tuple2<String,String>> retList = new LinkedList<Tuple2<String,String>>();
							while(dataIterator.hasNext())
							{
								String data = dataIterator.next()._2.toString();
								retList.add(new Tuple2<String,String>(fileLocation,data));
							}
							return retList.iterator();
					}
					
				},true).mapToPair(new PairFunction<Tuple2<String,String>,String,String>() {
					
					@Override
					public Tuple2<String, String> call(Tuple2<String, String> t) throws Exception {

						return new Tuple2<String,String>(t._1,t._2);
					}});

		/*Now print the records*/
		fileNameAndRecordsRDD.foreach(new VoidFunction<Tuple2<String,String>>() {

			@Override
			public void call(Tuple2<String, String> t) throws Exception {
				System.out.println("File Name:"+t._1+"##Record is:"+t._2);
				
			}});
	}

}

Example input file:

I am a record
I am another record
I am yet another record

Output:

File Name:file:/temp/test.txt##Record is:I am a record
File Name:file:/temp/test.txt##Record is:I am another record
File Name:file:/temp/test.txt##Record is:I am yet another record

Leave a Reply

Your email address will not be published. Required fields are marked *