Pages

Sunday 2 October 2016

First Map Reduce Program

Hey guys, Today we're gonna see the first Map Reduce program. Java is pre-requisite to start with this as because Map Reduce is being developed completely using JAVA.

(Non-Java candidates can ignore this article, Recent times most of the projects are not using Map Reduce because of its slowness. They're opting technologies like Impala, Spark etc so no need to bother about Map Reduce. But for java guys, this article is very important to understand how internally we are processing on the data)


Hadoop datatypes are slightly different with Java datatypes. As they removed unwanted storage objects in heap. So these are light weight and we no need to bother about the memory management.

•IntWritable for ints
•LongWritable for longs
•FloatWritable for floats
•DoubleWritable for doubles
•Text for strings 

I will upload the following code on github so you can pull the code from there.

Open Cloudera VM and start eclipse from desktop (see my previous Hadoop Installations post )



Right click on Project Explorer and create maven project



select maven quick start (stand alone maven application)




Name the artifact as shown below



finish



To import Hadoop Libraries add below dependencies to the pom and build it.

    <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>


Our use case is we have a huge text file having number of words. All we need to do parallel processing and find out how many unique words are there and its count.


As discussed in earlier posts we are going to write two classes. Map and Reduce classes.
We will see step by step as discussed in our Map Reduce Post


Map Class:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
     
// input is from Record Reader
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
           //writing to disk
            context.write(word, one);
        }
    }
 } 
As discussed in Map Reduce post the Record Reader will read line by line and sends key value pair input to Map. Here in our case key is To read the output from Record Reader we are extending our map class with Mapper and overriding map method. So hadoop will call our map method runtime (runtime polymorphism).

After that partitioner will shuffle the data and pass all related words into one reducer as an Iterable array.  like ("Apple",[1,1,1])

Reduce Class:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

 //new API Iterator is replaced by Iterable
   public void reduce(Text key, Iterable<IntWritable> values, Context context) 
     throws IOException, InterruptedException {
       int sum = 0;
       // iterating values to find actual count
       for (IntWritable val : values) {
           sum += val.get();
       }
       // writing output to HDFS
       context.write(key, new IntWritable(sum));
   }
}

Main Class:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
public static void main(String[] args) throws Exception {
          
                // will load all configurations from core-site.xml
         Configuration conf = new Configuration();
    
//Old API
       //Job job = new Job(conf, "wordcount");
   
//New API creating a Job by passing conf object
                Job job = Job.getInstance(conf, " wordcount ");
                
                //job starts from this class
        job.setJarByClass(WordCount.class);
               //give a name to Job 
job.setJobName("Word Count");
               /* As we know we are producing output in key value pair format 
               so producing output Key in Text datatype,Value in IntWritable type */ 
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);
        
               // Configuring Map and Reduce Classes
       job.setMapperClass(WordMap.class);
       job.setReducerClass(WordReduce.class);
        
               /* setting Input and Output Formats as TextInputFormat, Record Reader reads line by line
               as offset value as key,line as value and gives to Map as an input. */  
       job.setInputFormatClass(TextInputFormat.class);
       job.setOutputFormatClass(TextOutputFormat.class);
        
               // Input files path  and output path (should be inside HDFS)
       FileInputFormat.addInputPath(job, new Path(args[0]));
       FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
               // run the job.
       job.waitForCompletion(true);
 }
}

Right click on pom and build application. We can find wordcountapp-0.0.1-SNAPSHOT.jar in target folder. So our Map Reduce technique is ready now.
Right click on the jar and copy pull path of that jar 
/home/cloudera/workspace/wordcountapp/target/wordcountapp-0.0.1-SNAPSHOT.jar

Open a new terminal (as i said all Hadoop Demons are started once we start our cloudera vm)
Copy input files from Windows Os to  Unix (you can drag and drop into home folder)
Map processes data on HDFS so you've to move data from UNIX to HDFS using hadoop command

Create a folder in HDFS

[cloudera@quickstart ~]$ hadoop fs -mkdir /user/input

Copy your files from UNIX to HDFS

[cloudera@quickstart ~]$ hadoop fs -put /home/cloudera/gutenberg/ /user/input/

You can download gutenberg folder from here

Finally time has arrived to run the jar

SYNTAX: hadoop jar jarpath mainclass inputpath outputpath

[cloudera@quickstart ~]$ hadoop jar /home/cloudera/workspace/wordcountapp/target/wordcountapp-0.0.1-SNAPSHOT.jar com.hadoop.wordcountapp.WordCount /user/input/gutenberg  /user/output

you will get following console

16/10/02 01:11:00 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/10/02 01:11:02 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/10/02 01:11:03 INFO input.FileInputFormat: Total input paths to process : 3
16/10/02 01:11:03 INFO mapreduce.JobSubmitter: number of splits:3
16/10/02 01:11:03 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1475385588622_0001
16/10/02 01:11:06 INFO impl.YarnClientImpl: Submitted application application_1475385588622_0001
16/10/02 01:11:06 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1475385588622_0001/
16/10/02 01:11:06 INFO mapreduce.Job: Running job: job_1475385588622_0001
16/10/02 01:11:48 INFO mapreduce.Job: Job job_1475385588622_0001 running in uber mode : false
16/10/02 01:11:48 INFO mapreduce.Job:  map 0% reduce 0%
16/10/02 01:13:31 INFO mapreduce.Job:  map 22% reduce 0%
16/10/02 01:13:37 INFO mapreduce.Job:  map 33% reduce 0%
16/10/02 01:13:45 INFO mapreduce.Job:  map 78% reduce 0%
16/10/02 01:13:46 INFO mapreduce.Job:  map 89% reduce 0%
16/10/02 01:13:47 INFO mapreduce.Job:  map 100% reduce 0%
16/10/02 01:14:06 INFO mapreduce.Job:  map 100% reduce 89%
16/10/02 01:14:07 INFO mapreduce.Job:  map 100% reduce 100%
16/10/02 01:14:07 INFO mapreduce.Job: Job job_1475385588622_0001 completed successfully
16/10/02 01:14:07 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=7334442
FILE: Number of bytes written=15124101
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=3671908
HDFS: Number of bytes written=880829
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters 
Killed map tasks=1
Launched map tasks=4
Launched reduce tasks=1
Data-local map tasks=4
Total time spent by all maps in occupied slots (ms)=336988
Total time spent by all reduces in occupied slots (ms)=26479
Total time spent by all map tasks (ms)=336988
Total time spent by all reduce tasks (ms)=26479
Total vcore-seconds taken by all map tasks=336988
Total vcore-seconds taken by all reduce tasks=26479
Total megabyte-seconds taken by all map tasks=345075712
Total megabyte-seconds taken by all reduce tasks=27114496
Map-Reduce Framework
Map input records=77931
Map output records=629172
Map output bytes=6076092
Map output materialized bytes=7334454
Input split bytes=385
Combine input records=0
Combine output records=0
Reduce input groups=82334
Reduce shuffle bytes=7334454
Reduce input records=629172
Reduce output records=82334
Spilled Records=1258344
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=8745
CPU time spent (ms)=35290
Physical memory (bytes) snapshot=804868096
Virtual memory (bytes) snapshot=6008590336
Total committed heap usage (bytes)=623063040
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=3671523
File Output Format Counters 
Bytes Written=880829

Output will be stored in given location on HDFS /user/output/part-r-00000

part-r-00000 where r indicates it is reduce; means the output is stored on HDFS is came from Reducer Program
for Map results the format will be part-m-00000 where m is Map here
You can download the sources from my github here



Previous Post                                                                                        Next Post