Search This Blog

Sunday, 17 December 2017

BigData - Working with HDFS, MapReduce and YARN

1.       HDFS -  Hadoop Distributed File System
a.       HDFS is default file system used in Hadoop
b.       As name suggests, HDFS spread out multiple nodes where each node is individual machine built on commodity hardware
c.       HDFS is highly fault tolerant. Individual nodes might go down or have corrupted data but it does no impact availability of data in cluster as whole.
d.       Its suited for batch processing with data access high throughput rather than low latency. This means it is suited for large volumes and long running jobs which process data for long time. Its not suited for quick retrieval of data
e.       Queries to HDFS are not interactive  i.e. user can not type SQL and wait for response . all queries are via batch jobs
f.        HDFS used to store very very large files
g.       HDFS data is semi-structured data unlike relational database.  It tends to be unformatted and unstructured
h.       HDFS files are stored on multiple disks where each disk is on different machines in cluster. HDFS file system responsibility is to manage all these disks. It does by configuring master-slave 
i.          Within cluster HDFS chooses randomly one node as masternode.. masternode is responsible for coordinating storage all other slave nodes.
j.         On masternode, HDFS runs a java process which receives all requests that are made by user on cluster and forwards these requests to other slavenodes, this process is called NameNode.
k.       HDFS runs a process called DataNode in all other slave nodes.
l.         In simple terms, master is NameNode and slave is DataNode.. Only one NameNode per cluster. NameNode maintains information about the HDFS files and keeps record of what file is present in which datanode etc.. NameNode is single point for failure. I.e. if NameNode fails that’s it , HDFS is failed. As a remedy, we will have secondary NameNode as a fail over.
m.      Storing data in HDFS
                                                               i.      Each large file is divided into equal blocks. Each block or group of blocks are stored in different nodes
                                                             ii.      NameNode maintains records of which block stored in which DataNode
                                                           iii.      Multiple same copies of blocks are stored in different nodes for fault tolerance
                                                           iv.      Optimum block size in HDFS is 128mb. If we increase blocksize, parallelism is reduced. If we decrease the blocksize, it will increase the overhead.
                                                             v.      Time taken to process a block can be divided into 2 parts.. 1 to locate the block (seek time) and 2 to read the block (transfer time).
n.       HDFS Commands
                                                               i.      Hdfs file systems commands are same as linux file system commands example mkdir, rmdir, ls etc
                                                             ii.      From Hadoop installation directory , run “bin/hadoop fs -help” to get man page on commands.. we can also use “bin/hdfs dfs -help”…the difference is Hadoop command can deal with other file system configured in Hadoop but hdfs only deals with hdfs file system
                                                           iii.      We can copy/move files between local file system and Hadoop hdfs file system
1.       Bin/hdfs dfs -copyFromLocal local-file hdfs-file…  for copying file from local to hdfs… -put command on hdfs also does same thing
2.       Bin/hdfs dfs -copyToLocal hdfs-file local-file  -- for copying file from hdfs to local..  -get command on hdfs also does same thing
o.       HDFS Fault Tolerance is achieved using replications
                                                               i.      Default replication factor in fully distributed environment is 3.. i.e. every block is duplicated 3 times
                                                             ii.      Default replication factor in pseudo distrusted model is 1
                                                           iii.      Replication is done by balancing maximum redundancy and minimum write bandwidth
1.       maximum redundancy is achieved by replicating in faraway nodes, typically in different racks in a cluster. i.e. even if entire rack goes down, we still have a copy of block to work with
2.       minimum write bandwidth is achieved by keeping replications as close as possible to avoid write latency at the time of replication
3.       minimum write bandwidth could be achieved by keeping replications in same rack in cluster because intra rack(within rack) is low latency compared with inter rack(between racks)…  but this also means low fault tolerance therefore balance needs to be maintained between maximum redundancy and minimum write bandwidth
p.       NameNode Failure Management
                                                               i.      NameNode stores all block mapping information in memory its not persistant. This means if NameNode fails then this information is lost
                                                             ii.      Every time NameNode is restarted, datanodes sends the block storage information to NameNode so that NameNode can reconstruct the block mapping information
                                                           iii.      Restarting NameNode is not ideal solution in production environements
                                                           iv.      Therefore in order to manage NameNode failures, we can use MetaData files and seconday name node
1.       There are 2 special metadata files which allows us reconstruct the name nodes
a.       fsimage  and edit files
b.       fsimage file holds snapshot of entire file system when Hadoop cluster started up
c.       edit file contains the in memory log information about all the in memory edits to file system
d.       fsimage and edit files together will give current state of file system
e.       Backup location of fsimage and edits will be outside HDFS, either local or remote file system.. this is set in hdfs-site.xml with property name dfs.namenode.name.dir
2.       Merging fsimage and edit files is a heavy process and long running process
3.       Other easier way to set up the secondary NameNode. Exact backup of complete original NameNode and runs on different machine. Secondary NameNode copies over fsimage and edit files, then applies edits from edit file to fsimage this is called checkpointing. Then this checkpointed fsimage is copied back to primary NameNode and this process continues. Checkpoint frequency of this kind is configurable in hdfs-site.xml. the property name is dfs.namenode.checkpoint.period.
2.       Processing data with MapReduce
a.       Processing huge data requires running processes on many machines. These machines work together as a distributed system. MapReduce is a programming paradigm on a distributed system.
b.       MAP process works on individual machines and work on data that live on its individual machine. REDUCE process collates output from individual MAP processes and transfers across network within cluster to one machine where reduce phase runs.
c.       We should write code in MAP process to work on one record and produce output in key-value pairs
d.       Reduce phase collates all key-value pairs and aggregates these key-value pairs into one final output
e.       Example.. if we want to find word frequencies in very very large text files
                                                               i.      Text file with lines and sentences.. its distributed across multiple nodes in cluster.. i.e. each node has a partition of a text file
                                                             ii.      MAP process runs on each machine and process partition on that machine and procedure key value pair.. in case of work frequencies, out put will be word and count of work example {word, count}.. node1: {twinkle, 2},{this,1} and node2: {twinkle, 1}, {this,2}
                                                           iii.      Then this MAP out will be passed to Reduce and Reduce will sum up count values per word and then produce final output.. i.e. {twinkle 3}, {this 3}
f.        MapReduce typically implemented in JAVA
                                                               i.      Every MapReduce comprises of 3 basic components i.e. MAP, REDUCE and MAIN
                                                             ii.      MAP Class is where map logic is implemented. REDUCE class is where reduce logic is implemented. MAIN is a driver program that sets up the job
                                                           iii.      MAP class extends the Mapper class and the method that we need to overwrite is MAP… map class is generic class it has 4 type parameters these are input key, input value, output key and output value
                                                           iv.      REDUCE classes extends reducer class and the method that we need to overwrite is REDUCE Method. Reduce class is generic class it has 4 type parameters these are input key, input value, output key and output value.. the input key,value types of reduce class should match with output key,value type
                                                             v.      MAIN class set ups the job instance which contains the configuration parameters of mapreduce and serves as entry point for our mapreduce job. The job instance is submitted to Hadoop cluster in order to get mapreduce running.
                                                           vi.      The jobs objects has properties to be configured.. some of them are Input Filepath, Output Filepath, Mapper Class, Reduce Class, Output Data types
3.       YARN
a.       YARN is introduced in 2013 as part of Hadoop 2.0. Before 2.0, YARN functions were performed as part of MapReduce. In 2013, MapReduce is divided into 2 parts i.e. MapReduce and YARN
b.       YARN has 2 deamons… resource manager that runs on NameNode and NodeManager that runs on all datanodes
c.       Resource manager manages resources like, disk space , memory, cpu etc across all nodes in the cluster
d.       NodeManager only looks at the node its running on. Its responsible for scheduling the tasks which runs just on that node and it communicates with the resource manager
e.       YARN has following scheduling polices
                                                               i.      FIFO – First In First Out .. its rarely used in YARN and it results in long waiting
                                                             ii.      Capacity Scheduler (This is default) – Its splits up the resources into queues and each queue will have a share of resources.. percentage of share is configurable based on importance of the queue…. Each queue can be categorized into functional subject area, we can submit the job into a queue.. within queue, its again FIFO scheduling… this scheduling can lead to under utilization of resources in cluster.
                                                           iii.       Faire Scheduling – Resources in cluster are always allocated to all jobs fairly … this means jobs submitted will immediately start running and resources are split across jobs.
                                                           iv.      Actual scheduling policy to be used can be set in yarn-site.xml .. the property name is yarn.resourcemenager.scheduler.class
                                                             v.      For Capacity Scheduling, the Queues can be set up in etc/hadoop/capacity-scheduler.xml
                                                           vi.      While submitting MapReduce job, we can use “-D mapreduce.job.queuename=<qname>”


No comments:

Post a Comment