Pages

Sunday 18 September 2016

Map Reduce

You can't really process large data set (size: GB's/TB's) on single application/machine. Instead you will try to run small chunks of data sets over multiple applications/machines in a distributed way. This is simply MapReduce is all about.

In HDFS we see how data will be stored in distributed way. Using Map Reduce technique we will see how to process the data in Distributed parallel way.

(You know Map Reduce is written entirely in java.)

It consists of two phases
Map
Reduce




Map Takes a key/value pair and generates a set of intermediate key/value pairs
•map(k1, v1) -> list(k2, v2)
Reduce Iterate over values with the same key and generate and aggregated or consolidated output
•Takes intermediate values and associates them with the same intermediate key
•reduce(k2, list[v2]) -> list (k3, v3)

(input) <k1, v1> => map -> <k2, v2> => shuffle -> <k2, v2> => reduce -> <k3, v3> (output)

Here keys are objects which implements WritableComparable thats why key can compare with each other while sorting and send to Reducer.

Before going into detailed explanation we will see some of the classes which are involved in Map Reduce programming. 

Firstly client submits a file and FileInputFormat which computes into no of InputSplits.
InputSplit is nothing but a piece of a file which is going to be store in DataNode.

There are 3 types of FileInputFormat types
TextInputFormat (default)
KeyValueTextInputFormat
SequenceFileInputFormat

TextInputFormat: takes offset value (row num) as key and entire row as value;
KeyValueTextInputFormat: first tab separated value as key and rest of the line as value;
SequenceFileInputFormat: it takes entire file into bytes format;

Similarly there are 3 types  of OutputFormats 

TextOutputFormat (default)
SequenceOutputFileFormat
NullOutputFormat

So InputSplit will be processed by any one of these InputFormats to give it as an input to Mapper (Map). To read line by line and converts into key value pairs RecordReader will comes into picture.
LineRecordReader is the default RecordReader which reads line by line and converts into key,value pairs and given it as input to Map.

With this we can say no of input splits are equals to no of Maps. So Hadoop will control the no of Maps to be launched based on the no of Input Splits.

Once Maps got the input splits starts parallel processing on each DataNode and produce a key,value pair and write into disk. This causes the Map Reduce technique slow because writing into a disk is costs you.

These key,value pairs are partitioned and shuffled and gives as input to Reducer. The no of Reducers should run that we can control. Reducer will generate key,value pairs and writes output into HDFS.



First of all client submit the file based on file size FileInputFormat create inputsplits (logical splitting) which will be loaded into Mapper as an input by RecordReader as a key value pair.
The default InputFormatter is TextInputFormat and the default RecordReader is LineRecordReader.
Once Mapper will process the input and writes output to disk as key, value pair in sorted order. These outputs were partitioned and shuffeled (Same keys are moved to one/same reducer and then again sorting will apply. Partition and Shuffling ) and given as input to Reducers (HashPartitioner is the default partitioner, partitioner will do shuffling).  
Finally Reducer produces the key value pair output and OutputFormat will write into HDFS as line by line using RecordWriter.
Map will sort locally,after copying to Reducer it will again sort all the combined data. So sorting will happen in both Mapper and Reducer side..
Reducer has to process output files from multiple/many Maps which costs time consuming. (mostly maps are in huge numbers compared to reducers) ie; reducer bottle-neck issue,to avoid this situation we have to configure no of reducers explicitly. and in some other cases we use Combiner.
Combiner is nothing but reducer only which we will configure after Map processing before partitioning happens.Let say Maps will produce more key value pairs that needs to process by the reducer. so we are processing those values before it reaching the reducer.
combiner will aggregate map results before they are partitioned.

Till here you know how internally Map Reduce works, time to write our first Map Reduce program.


Previous Post                                                                                        Next Post

Saturday 17 September 2016

Hadoop Installations

You know we can Hadoop runs in 3 modes.

1.Local (Standalone) Mode
2.Pseudo-Distributed Mode
3.Fully-Distributed Mode

Local Mode: By default, Hadoop is configured to run in a non-distributed mode, as a single Java process. This is useful for debugging. (single machine)

Pseudo-Distributed Mode: Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process. (single machine)

Fully-Distributed Mode: Here Hadoop runs on multiple-nodes in a distributed mode.

We will use Pseudo-Distributed Mode for practice.

Cloudera is one of the leading vendors of Hadoop, provides Cloudera vmware where they've installed all required components of Hadoop like HDFS,YARN,Hive,Pig,Oozie,Sqoop,Flume etc. Starts all these services when you start Cloudera VM. So you can concentrate on programming.

In order to execute Cloudera Hadoop CDH4 VM in Windows , you need to download the quickstart VM from here according to your VM version(i.e VMware/VirtualBox/KVM). It requires a 64 bit host OS.
This VM runs CentOS 6.2 and includes CDH4.3, Cloudera Manager 4.6, Cloudera Impala 1.0.1 and Cloudera Search .9 Beta.
I have used VMware version of Cloudera Quickstart VM for running on Windows 10 64 bit host OS.

System Requirements:
This is a 64-bit VM, and requires a 64-bit host OS and a virtualization product that can support a
64-bit guest OS.
This VM uses 4 GB of total RAM. The total system memory required varies depending on the size
of your data set and on the other processes that are running.
The demo VM file is approximately 2 GB.

To use the VMware VM, you must use a player compatible with WorkStation 8.x or higher: Player 4.x or higher, ESXi 5.x or higher, or Fusion 4.x or higher. Older versions of WorkStation can be used to create a new VM using the same virtual disk (VMDK file), but some features in VMware Tools won’t be available. You can download vmware workstation player from here.

After downloading the Cloudera VM , extract it & select the virtual machine configuration (.vmx) file into your vmware player.



Start your vm player and click on Open a Virtual Machine as shown in above and navigate to extracted Cloudera vmware folder and select cloudera-quickstart-vm-5.7.0-0-vmware.vmx file.



after selecting the above file on left nav you can see the cloudera quick start vm. By clicking once you can edit the player to edit RAM (min 4GB you've to keep otherwise it crashes).
By double clicking on cloudera quick strat vm you can start cloudera.

Once cloudera starts it may ask you credentials cloudera/cloudera is username/password.


Open terminal (marked in yellow) and start practising Unix Commands and HDFS commands

Previous Post                                                                                    Next Post


YARN

YARN: Yet Another Resource Negotiator - a cluster management technology

PSB


What is this place? Bank - lockers ? Shelves ? No mate ! This is FACEBOOK data center. A good explanation FB data center here.
If you see there are many racks which holds commodity servers. These machines are connected to each other over the network. Similarly all the racks are connected to each other over network we called it as a Cluster. In other words you can say Cluster is a collection of Racks which contains commodity servers.

Rack Awareness: Network bandwidth between any two nodes in rack is greater than bandwidth between two nodes on different racks.

So YARN is taking care of all these things like memory,disk,network etc.

In Hadoop 1.x there is no separate Cluster Management concept. It was managed by MapReduce only. But MapReduce is slow (we discuss more in next post), it is java specific and is the only way to communicate with HDFS (bottle neck situation). So to avoid this situation and to make availability of other programming techniques to communicate HDFS we opt YARN. The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job or a DAG of jobs.

YARN has mainly 3 demons



1.Resource Manager
                  –Manages Resources for a Cluster
                          •Memory, disk, networks etc
                  –Instruct Node Manager to allocate resources
                  –Application negotiates for resources with Resource Manager
                  –There is only one instance of Resource Manager
2.Node Manager
                 –Manages Resources of a single node
                 –There is one instance per node in the cluster
3.Job History Server
                 –Archives Jobs’ metrics and meta-data

How it works ?

When client submits a request first it will contact with the Resource Manager and it will fetch the metadata of DataNodes from NameNode. Each DataNode has its own Node Manager which will send some periodic signals (heart beat) to the Resource Manager about their readiness to run the tasks. The input file will now break down into no of input splits (discuss more on input splits in Map Reduce till then it is just a piece of file) and ready to store in Data Nodes. This process is nothing but a Job. For each job Resource Manager assigns a Job Id. Job history maintains information of all jobs.

(We discussed  there is only one Resource Manager per cluster so to make Resource Manager more robust they've introduced a concept called App Master. Instead of Resource Manager, App Master will take care of the Job.Resource Manager will do only launching one App Master per Job and rest of the things App Master will take care.)

So once input splits are ready Resource Manager will launch one App Master on any DataNode (will decide by Resource Manager, we don't have any control of it). Once App Master launched successfully it will register to Resource Manager. App Master negotiates with Resource Manager for the resources (DataNodes) and gives launch specification to Node Manager and Node Manager will launch containers (JVM) on DataNodes and start running the process.

What just happened ?
As discussed one of Hadoop feature here the program itself going to the data location and processing. Here our program will be copied into the container and will launch JVM to run the program.


Once containers finishes the process will get notified to App Master and it will notifies back to Resource Manager. And Resource Manager de-register the App Master and kill the App Master.

So with this we covered the base concepts of Hadoop. 


So if you see the above image HDFS is for distributed storage, YARN is for cluster management. So as discussed programming languages other than java can also communicates with HDFS with the help of YARN.

Previous Post                                                                                       Next Post

Monday 12 September 2016

HDFS Commands

33 Frequently used HDFS shell commands

# Open a terminal window to the current working directory.
# /home/training
# 1. Print the Hadoop version
hadoop version

# 2. List the contents of the root directory in HDFS
#
hadoop fs -ls /

# 3. Report the amount of space used and
# available on currently mounted filesystem
#
hadoop fs -df hdfs:/

# 4. Count the number of directories,files and bytes under
# the paths that match the specified file pattern
#
hadoop fs -count hdfs:/

# 5. Run a DFS filesystem checking utility
#
hadoop fsck – /

# 6. Run a cluster balancing utility
#
hadoop balancer

# 7. Create a new directory named “hadoop” below the
# /user/training directory in HDFS. Since you’re
# currently logged in with the “training” user ID,
# /user/training is your home directory in HDFS.
#
hadoop fs -mkdir /user/training/hadoop

# 8. Add a sample text file from the local directory
# named “data” to the new directory you created in HDFS
# during the previous step.
#
hadoop fs -put data/sample.txt /user/training/hadoop

# 9. List the contents of this new directory in HDFS.
#
hadoop fs -ls /user/training/hadoop

# 10. Add the entire local directory called “retail” to the
# /user/training directory in HDFS.
#
hadoop fs -put data/retail /user/training/hadoop

# 11. Since /user/training is your home directory in HDFS,
# any command that does not have an absolute path is
# interpreted as relative to that directory. The next
# command will therefore list your home directory, and
# should show the items you’ve just added there.
#
hadoop fs -ls

# 12. See how much space this directory occupies in HDFS.
#
hadoop fs -du -s -h hadoop/retail

# 13. Delete a file ‘customers’ from the “retail” directory.
#
hadoop fs -rm hadoop/retail/customers

# 14. Ensure this file is no longer in HDFS.
#
hadoop fs -ls hadoop/retail/customers

# 15. Delete all files from the “retail” directory using a wildcard.
#
hadoop fs -rm hadoop/retail/*

# 16. To empty the trash
#
hadoop fs -expunge

# 17. Finally, remove the entire retail directory and all
# of its contents in HDFS.
#
hadoop fs -rm -r hadoop/retail

# 18. List the hadoop directory again
#
hadoop fs -ls hadoop

# 19. Add the purchases.txt file from the local directory
# named “/home/training/” to the hadoop directory you created in HDFS
#
hadoop fs -copyFromLocal /home/training/purchases.txt hadoop/

# 20. To view the contents of your text file purchases.txt
# which is present in your hadoop directory.
#
hadoop fs -cat hadoop/purchases.txt

# 21. Add the purchases.txt file from “hadoop” directory which is present in HDFS directory
# to the directory “data” which is present in your local directory
#
hadoop fs -copyToLocal hadoop/purchases.txt /home/training/data

# 22. cp is used to copy files between directories present in HDFS
#
hadoop fs -cp /user/training/*.txt /user/training/hadoop

# 23. ‘-get’ command can be used alternaively to ‘-copyToLocal’ command
#
hadoop fs -get hadoop/sample.txt /home/training/

# 24. Display last kilobyte of the file “purchases.txt” to stdout.
#
hadoop fs -tail hadoop/purchases.txt

# 25. Default file permissions are 666 in HDFS
# Use ‘-chmod’ command to change permissions of a file
#
hadoop fs -ls hadoop/purchases.txt
sudo -u hdfs hadoop fs -chmod 600 hadoop/purchases.txt

# 26. Default names of owner and group are training,training
# Use ‘-chown’ to change owner name and group name simultaneously
#
hadoop fs -ls hadoop/purchases.txt
sudo -u hdfs hadoop fs -chown root:root hadoop/purchases.txt

# 27. Default name of group is training
# Use ‘-chgrp’ command to change group name
#
hadoop fs -ls hadoop/purchases.txt
sudo -u hdfs hadoop fs -chgrp training hadoop/purchases.txt

# 28. Move a directory from one location to other
#
hadoop fs -mv hadoop apache_hadoop

# 29. Default replication factor to a file is 3.
# Use ‘-setrep’ command to change replication factor of a file
#
hadoop fs -setrep -w 2 apache_hadoop/sample.txt

# 30. Copy a directory from one node in the cluster to another
# Use ‘-distcp’ command to copy,
# -overwrite option to overwrite in an existing files
# -update command to synchronize both directories
#
hadoop fs -distcp hdfs://namenodeA/apache_hadoop hdfs://namenodeB/hadoop

# 31. Command to make the name node leave safe mode
#
hadoop fs -expunge
sudo -u hdfs hdfs dfsadmin -safemode leave

# 32. List all the hadoop file system shell commands
#
hadoop fs

# 33. Last but not least, always ask for help!
#
hadoop fs -help

Previous Post                                                                                        Next Post

Sunday 11 September 2016

HDFS

HDFS (Hadoop Distributed File System)
Hadoop is specially designed in UNIX. So it is Platform Dependent.

How Unix OS manage process the data ??
Existing file system is working on memory chunks. ext3, ext4 and xfs are some of File Systems currently being used.
The default memory block size is 4kb (OS level) with max 20kb and it is dynamically changes.
And the disk (hardware) processes 512 bytes at a time.Once Data received into OS chunks which holds 4 kb data and processor will process(writes) parallel to the disk with 512 bytes at a time.

HDFS is a wrapper on top of existing unix file system. Default memory chunk size on HDFS is 64 mb (Configurable). Client submits data will store in HDFS buffer then given to OS buffer from OS buffer data will be processed to disk at a rate of 512 bytes.

Data from Front End

      |

HDFS (64mb - buffer)

      |

  OS (4kb - buffer)

              |

Writes into DISK(512 bytes at a time)


So Memory won't be wasted (as many of us think if a block size is 64 mb (it's configurable) and I've a file with 32 mb the remaining 32 mb will be wasted, no it's not)

Files in HDFS are “write once”, No random writes to files are allowed

If block size is configurable then i will choose 256 mb as a block size what wrong dude ?
Disk Seek Time : Seek Time is measured defines the amount of time it takes a hard drive's read/write head to find the physical location of a piece of data on the disk.So if you take block size as 64mb or 128mb or 256mb doesn't matter as if your disk seek time is not faster.


HDFS Daemons (Threads) are namely 3, they are


–NameNode (master)
–DataNode (Slave)
–Secondary NameNode (master)

NameNode – NN (master)
–Stores all metadata
•Information about file ownership and permissions
•Information about file locations in HDFS
•Names of the individual blocks
•Locations of the blocks
–Holds metadata in memory for fast access
–Changes to the metadata are made in RAM and are also written to a log file on disk called Edit Logs
–Metadata is stored on disk and read when the NN starts up
–Collect block reports from DataNodes on block locations
–Replicate missing blocks (Under Replicated)
–Authorization and authentication
–Single Point Of Failure (SPOF)

DataNode – DN (slave)
–Actual contents of the files are stored as blocks on the slave nodes
–Each block is stored on multiple different nodes for redundancy (Block Replication)
–Communicates with the NN and periodically send block reports
–Clients access the blocks directly from data nodes for read and write through NN

Secondary NameNode – 2NN (master)
–Secondary NameNode (2NN) is not a failover NameNode
–Periodically combines a prior file system snapshot and editlog into a new snapshot
–New snapshot is transmitted back to the NameNode

Confused ?? Let me clear...




Whenever client submits a file to store on HDFS the first contact will be with NameNode. Name Node already has the information about all the Data Nodes in the cluster. Based on File size file will splits into small chunks and each will be stored in different Data Nodes. Here Data Node is nothing but a commodity server which is collection of memory chunks/blocks. So NameNode will maintain the data of the block location, Data Node ip address... ie; metadata.
With this we achieved Distributed Storage (wait for Distributed Parallel processing we will cover this in our MapReduce Topic).

Now another advantage of Hadoop we learned is Fault Tolerant. To achieve this we are storing each piece of File in 3 different blocks(Block Replication) of 3 different DataNodes. If any machine failure we can still fetch it from another Data Node. (Here 3 replicas are default & its configurable).
The Data Node will copy the file into the first listed replica's memory and closes connection. That replica block will write the file into Disk and repeats the process for rest of the replica Data Nodes.

Under Replicated: If replication was not successful, Name Node will raise request to replicate remaining to reach the replication factor.

While retrieving files first it will fetch metadata information and then client will directly contact with Data Node and collects the data.

So ever wonder how our dear NameNode (The Master) came to know about all the DataNodes(Slaves) availability ?
Well for every 3 seconds (configurable) all slave nodes sends a message to his Master about their availability and this process is called Heart Beat. If Master doesn't receive any Heart Beat from any Slave Node then it will consider that slave is No longer available and stops sending data to that slave to Store. Once the slave sends Heart Beat back then only it will consider.

So with this you came to know metadata is precious. If you loss metadata gameover. To continue the game NameNode will write all the metadata into a file periodically(again configurable). We call the file as Edit Logs. Writing huge metadata into Edit logs will cause slow performance to NameNode (write optimisation),so it will partitioned to small files so when system startup happening reading all small small Edit logs will cause again performance issue(read optimisation).
So one background process(Secondary Node)is there to merge all those small Edit logs into one bigger log. That bigger log is nothing but FSImage. And will give it back to NameNode and it stores in Disk.
So with this we came to know we are taking care of metadata but dude if NameNode it self fails then?
In Hadoop 2 they came with a concept called Journal Node.
Journal Node is back up of Name Node.

Previous Post                                                                                   Next Post

Basic UNIX Commands

As we know Hadoop  works on UNIX OS before proceeding to learn Hadoop we have to know basic commands of UNIX. PSB...

Command
Example
Description
1.     ls
ls
ls -alF
Lists files in current directory
List in long format
2.     cd
cd tempdir
cd ..
cd ~dhyatt/web-docs
Change directory to tempdir
Move back one directory
Move into dhyatt's web-docs directory
3.     mkdir
mkdir graphics
Make a directory called graphics
4.     rmdir
rmdir emptydir
Remove directory (must be empty)
5.     cp
cp file1 web-docs
cp file1 file1.bak
Copy file into directory
Make backup of file1
6.     rm
rm file1.bak
rm *.tmp
Remove or delete file
Remove all file
7.     mv
mv old.html new.html
Move or rename files
8.     more
more index.html
Look at file, one page at a time
9.     touch
Touch filename
touch command is used to make files
10.   man
man ls
Online manual (help) about command
11. cat
Cat filename
Sends file contents to standard output





































Basic VI editor Commands  

Letter
Action
i
Starts inserting in front of the current cursor position
<Esc>
Gets out of Insert Mode
<ctrl>f
Moves the cursor forward a full page of text at a time
<ctrl>b
Moves the cursor backward a full page of text at a time
x
Deletes the character under the cursor
dd
Deletes the line where the cursor is located (type d twice!)
n dd
Delete n consecutive lines ( n is an integer)
u
Undoes the last edit operation
yy
Copies or yanks a line ( 5yy yanks 5 lines)
p
Puts the yanked text on the line below the cursor (lower case p)
P
Puts the yanked text above the current line (capital P)
Note: If vi is already in the input mode, text from that or another window may be highlited using the left mouse button, and copied into place by pressing the middle mouse button.
: r <file>
reads a file from disk into the vi editor
: w <file>
writes current file to disk
: wq
writes the file and quits vi
: q!
quits without writing (useful if you've messed up!)






What is Hadoop ? How it is different from other techniques ?

Hey guys !!! Today we will see why Hadoop is popular and different than other regular programming techniques.

Before start of this artical we will see what are the cons of existing RDBMS (Relational Database Management System)

To start with Hadoop is the only technology will cope all the charecterstics of Big Data; Those are 3 V's.

Volume    - Huge data coming from various sources like apps,sensors,social media...
Velocity   - There is huge amount of data that has been produced for every second. From sensors...
Vereity     - Structured (Data which in table format with columns are rows), Semi Structured (XML,JSON), Unstructured (email contents,web pages, multimedia)

There are total 8 V's are there in total will discuss Later. Hadoop is handling all these without much bother.

Comming to RDBMS,
It is best suitable only for Structred data. (According to surveys the present the ratio of structured data is 20% and rest 80% is unstructured data).
Costing (Do you know how much cost of an highly reliable server is?).
No scalability (Relational databases are designed to run on a single server inorder to maintain integrety of the table mappings, so you Just can't add another server to allow more data).

What Hadoop is providing,

(Hadoop Cluster - A group of commodity servers (a normal machine) into one unit.)

Distributed Batch Processing - Hadoop works is distributed mode which is best suitable to Handle Big Data.
How Earlier Techniques - There is file with size say 1 gb, you are writing a program to  process the file. First you'll fetch the file from database/server to your program (two different networks) side which required network bandwidth, and then you start processing the file from top to bottom.
How Hadoop Works - First Hadoop will break 1 gb file into small peices and stored into different machines in your cluster, and it will copy your program to all machines where peices of file were stored and runs all programs paralally. So here there is no point of High Network Bandwidth.
Fault Tolerant - In Hadoop Cluster any server is failed due to hardware problems then it no longer stops our job.
Scalability - In the middle you came to know your cluster is out of memory you can add a commodity server to the Cluster.
Opensource     - Free Free Free ;)

We will see UNIX basic commands & HDFS in next topics.

Next Post