Sunday, March 13, 2016

Learning Apache Hadoop OREILLY Course

we should know the concept of Disk Stripping, in Disk Stripping or RAID0, the data is divided into multiple chunk, so lets say you have 4 hard Disks, data will be divided into 4 pieces, by that accessing data will be much faster,

in RAID1, we do mirroring for the data

in order to ensure that your data is safe you should combing RAID1 with RAID0.

Hadoop logically does that in the cluster, it stripes and mirror data.

- Hadoop is Fault tolerant, it means if a disk is couropted or a network card not working, this is fine.
- Hadoom has master slave structure.

you should choose a powerful and expensive computer for your master node.
master node is a single point of failure,
you should have 2 or 3 Master node in a cluster
you should have redundancy ( as it is SPOF)

You need alot of RAM, more than 25 as the deamon takes alot of ram
you should use RAID
you should use HOT SWAP Disk drive
you should have redundant Network card
you should have dual power supply

the bottom line, the MASTER NODE should never goes down.

CPU is not important like RAM here.

you will have 4 to 4000 slave node in a cluster
slave nodes are not single point of failure.
7400RPM disks are fine
more disks are better, which means 8 * 1 TB data is much better than 4 * 2 TB
it is better that all slaves have the same disk size.

sure slave IS NOT Redundant
you dont need RAID or Dual network card or Dual power supply

you need alot of RAM

lets say you have 10TB of data every month
you have slaves with 8TB
you have replication factor of 3
you should know that you have something called "intermediate data" which is the generated data betwee MAP and REDUCE. this data is about 25% of the disk size ( in this case 2TB )

the avaialbe space formula is = (RAW - ID) / RF = (8 - 2)/3 = 2 TB

which means each slave has 2TB not 8TB, which means you need 5 slaves every months (as you have 10TB every months).

it means all the things on top of hadoop,
basically when we say hadoop we main HDFS and MAPREDUCE.
main things in hadoop are
1- NAME NODE: part of the master
2- Secondery Name Node: part of the master
3- Job Tracker: part of the master
4- Data Node: part of the slave
5- Task Tracker: part of the slave

1- HBAS: fast scalable NoSql database
2-HIVE: write sql like queries instead of map reduce
3- pig: write functional queries instead of map reduce
4- sqoop: pull and push data to RDBMS, used for integration
5- flume: pull data into HDFS
6- HUE: web interface for users
7- Cloud Manager: web interface for managing the cluster for admin
8- oozie: workflow builder
9- Impala: real time sql queries, 70 faster that MapReduce.
10-Avro: serialize complex object to save in hadoop
11- Maheut: machine learingng in hadoop
12- Zoo Keeper
13- Spark
14- YARN
15- Storm

hadoop is used for batch processing, which means parallelization, which means problems like graph based doesnt fit with hadoop

the best is Cloudera


when we talk about Hadoop, we are talking about 2 main things
1- storage: whcih is HDFS, a distributed redundant storage
2- processing: which is MapReduce: a distributed processing system

some terminology to know:
1- a job: all tasks need to run on all data.
2- a task: individual thing, which is either a map or a reduce
3- Slave/Master: these are computers
4- NameNode, DataNode: these are daemons, which means JVM instances

we have MapReduce v1: old and stable
we have MapReduce v2: new things like dynamic allocation and scalability

Hadoop cluster has 5 daemons:
- Storage Daemons:
NameNode(on Master)
- Processing Daemons:

Master Daemons are for orchestration
Slave Daemons are for working

NameNode: Handle Storage meta data, it puts some information in Memory for fast access but also it persist data.
Secondary Name Node: it checks NameNode if it is alive or not, it is not a failover node
Job Tracker: coordinate processing and schedualling.

NOTE: use different machines for Name Node and Secondary Name Node, because if the machine is down, the Secondary Name Node will detect that and build a new Name Node directly

NOTE: you can install the job tracker on the same machine with the Name Node, and move it to another machine when your project gets bigger.

Data Node: handle row data (Read & Write)

Task Tracker: handle individual taks (Map or Reduce)

data node and task tracker always sends heart beats to the master to tell him that we are alive and we are working on this.

Hadoop run modes

1- Local JobRunner: Single computer, single JVM with all daemons, good for debugging
2- pseudo Distribution: Single computer, 5 JVM (one for each daemon), good for testing
3- Fully Distributed: Multiple computers, multiple JVM, this is the real environment.

when you install Hadoop it is recommended to use linux, use RHEL for Master and CentOS for slaves.

use Redhat Kickstart to install hadoop on multiple machines.

Elastic Map Reduce

is a solution from Amazon similar to hadoop.

it has this structure:

the master instance group: is like the master node
the core instance group: is like the slave node, but it is only responsilbe for storage ( as you can see it uses HDFS
task instance groupe: is like the slave node, it is only responsible for processing ( doing map reduce job)

usually we use S3 to write information and intermediate data.

Core instance group is static, you cannot add any new machine after you start the cluster, however the task instance group is not static, you can add new machine whenever you want


in this lap he created 5 EC2 instances, one is a master and 4 slaves
he installed cloudera manager
he installed Hadoop from cloudera manager
then he uploaded some data to Hadoop from the command line
then he ran a Map/Reduce example
then he checked everything from Cloudera manager

then he gave an example of  downloading Cloudera Quickstart VM locally, to install hadoop locally

HE used Ubunto 12.04 AMI


Hadoop Distributed File System (HDFS)

you can use HDFS without MAP/REDUCE, in that case you only need NameNode, SeconderyNameNode, DataNode

when you upload a file to HDFS it will be divided into blocks and stored in slaves nodes

every block will be replicated to 3 machines (by default)

you cannot edit or append the file you upload to HDFS, if you wanna change anything you should delete and create the file again.

the default block size is 64MB, however it is recommended to change it to 128MB

it is a master node
it has only metadata information about the files that are stored in slaves (e.g. name of the file, permessions, where the blocks exist).


the client asks the name node about the file then the client goes and read it directly from the slave node.

The name node metadata exists in RAM, however it is also persisted.

we have 2 files for the persisted metadata in name node:
1- FSIMAGE: it is a point in time image about the information that exists in HDFS
2- edit log: the changes that happened since we created the FSIMAGE, it stores the delta information

every now and then FSIMAGE and edit log will be merged and saved on the hard disk

you have to have multiple hard disks with RAID to insure that you will not lose the data.
it is also better to use remote NFS.
and daily or weekly backup.

every 3 seconds the datanode will send a heart beat to the name node
if 30 seconds passed without a heart beat, the node is out
if 10 minutes with no heart beat, hadoop will start copy the data that should be on that node to another machine.

every one hour (or after the restart of the name node) all data nodes will send Block report, which is a list of all blocks that they have.

Hadoop uses checksum to insure that data is transfered correctly.
every 3 weeks hadoop will do general checksum check on all blocks.


How writing Happen in Hadoop

here is an example

so the client divided the file into 4 pieces ,
he asked the name node to write the first piece,
the name node gave a pipeline which is: write to datanode A then c then F
the client write to A, then A write to C, then c write to f
F ack C, C ack A, a Ack the client, the client ack the nn and request a pipline for the next block.

how do we handle a failed node,

lets say DN_A is bad, the client will try with C, if not with F.
as long as the client is able to write into one node the client can move to the next block

general information:
1- checksum is used for each block
2- the file is considered as the number of written blocks, so lets say your file is 4 blocks and you wrote only 2 blocks, so to this point your file is only 2 blocks, and HDBS will see your file as 2 blocks.
for that it is better to have 2 folders, INCOMING: keep here the file that is under upload process, once you finish uploading the whole file, move the file to READY_TO_PROCESS folder.

How reading is handled

the client ask for a file, the Name Node also gives a read pipline for each block

Secondary Name Node

as we mentioned before, we have 2 files in the NameNode, fsimage which is a point in time file and edit log which is delta since the last fsimage

Note: we have 2 files, fsimage and edit log, because fsimage is a big file, opening a big file will slow down hadoop, thats why we have edit log, a small file and contains only delta information, using edit log means dealing with a small file ==> better performance



IF SECONDARY NAME NODE IS DOWN nothing will happen, the name node will keep writing on the edit log, the edit log will become bigger and bigger and the system will become slower and slower.


new lab, we used hadoop fs -put

when you do the instalation with cloudera manager, a trash directories will be created for you by default, when you delete something it will be moved to the trash directory.
if the directory is not created, it is recommended to create one.


High Availability Name Node
Name node as single point of failure is not acceptable,
thats why we have a new solution by cloudera which is intrduced in Hadoop 2, and called Name Node high availability.

as you can see, the Standby namenode will take over if the name node is off AND YOU DONT HAVE TO START THE WRITE OR READ OPERATIONS FROM BIGINNING .

NOTE IMPORTANT: Clients send all operations to both the NN and Standby NN, both of them have complete picture of what is happening in the memory.

With the architecture above, i can handle the failure of the NAME NODE, however the fsimage and edits log are still a Single Point of Failure.
that is why High Availaibility Structure introduced a new thing called JournalNode.

 the current active name node now writes, synchronously, the fsimage and edits log to set of journal nodes, the standby NN reads from these nodes

in order not to the nameNode and stand by name node misunderstood each other (maybe one think that it is the active name node now). they use something called epoch number with each write to the Journal Node


we use a cluster of Zoo keepers to determine who is the active name node (the number should be odd to avoid brain split).
as you can see we have ZKFC service in NameNode and StandBy Node, they send information to the ZooKeeper cluster to tell about the health of the node,
if the NAME NODE ZKFC noticed that the NameNode is down, he will send this information to zoo keeper, zoo keeper will set the stand by node a the active name node and the old name node as the standby one

as you can see HA is complicated, extra machines, extra configurations ...
you dont need this most of the times, the secondery name node on a different machine is usually enough.


scale name node functions by breaking up namespaces to multiple machines.

hadoop has authorization, but it doesnt have authentication, for example lets say you are sending a write request to machine1 as user xxx, user xxx is not authorized to do write operation but user yyy has, simple create user yyy and send a request as yyy, hadoop will not check that you are yyy for real.

to do Authentication you should use something else, Kerberos.

hadoop uses linux like permissions.



these are the player of map reduce

and here is how the job is done

we have also new version which is called MapReduce v2, in this version they focus on the scalability of the job tracker and removing a restriction on the number of the map and reduce jobs that can be run on each slave machine.

the Map Reduce configuration files are:
1- mapred-site.xml

in this lab he gave an example how to run a java map reduce function

this is the statement to run a map reduce, hadoop-examples.jar contains the Map and Reduce java classes.

he went over everyline of code, you can check it.

How MapReduce works in detailes

so to summarize, job tracker asks name node where the blocks are, it assigns some slaves to do map jobs, then it assigns one or more slaves to do reduce job, the reduce task trackers WILL COPY THE OUTPUT OF MAP TASK TRACKERS TO THEIR LOCAL MACHINES.


Hadoop is Rack Awareness


Advanced MapReduce, Partioners, Combiners, Comparators, And more

 firstly we should know that the Mapper and Reducers do some kind of sorting

The mapper sort the keys, and the reducer after the shuffle it also sort by the keys.

You can define a Comparator to do secondary sorting to sort the value in the Reducer, so in the example above we have us:[55,20] the secondary sorting will sort it to us:[20,55].

also we can define what we call a combiner, which is a pre-reducer, the combiner will run in the Map face, as you can see in the example above, the first mapper adds the US values and the output was 55, this is the combiner job.
With combiner you may reduce the processing time and the intermediate data.

we also have something called partioner

the mapper can partition its output to multiple partitions, and later the reducer can fetch the partion that it is intrested in,
in the example above we did a partition by key, and as you can see each reducer grabs a specific key.

There is a full example about writing a Partitioner.


for unit testing you have MRUnit  which is a new apache project.

he gave a practical example about loggin as well

when you do benchmarking we talk about terasort number, then number will give us an indecator about the performance of the cluster, and weather adding new machine gave us a gain in performance.

TERASORT is simply a simple or lets say the simplest mapreduce job hadoop can do. to do a TERASORT test you should use 3 scripts
1- teragen: to generate a dataset
2- terasort: it is a job that sorts the dataset.
3- teravalidate: it is used to validate if the dataset got sorted.

Hive vs Pig Vs Impala

we know Hive and Pig, we know that they are simply converting your requests to MapReduce requests.
they are in general 10-15% slower than a native java mapreduce.

as Hive and Pig converts the requests to MapReduce, they use the job tracker and task trackers

Impala is developed in cloudera, they are designed for real time queries, they use specific daemons for them, not the task trackers and job trackers. IMPALA DOESNT USE MAPREDUCE AT ALL.
Impala is not fault tolerant. Baisclly MApReduce is slow becuase of the time we need to start the jvm for map reduce jobs. Impala uses its own deamons.
Impala is on top of Hive, so it uses Hive (actually it is a sub set of HiveQL)



in HIVE, you can do the installation on each client and start calling.

or, you can have a HIVE server:

we always need a metastore, where we store the mapping between HIVE tables and HDFS data.

NOTE: in HIVEQL there is no update or delete, as HIVE runs on top of Hadoop and as we mentioned before you cannot delete or update a record.

Check the HIVE & PIG LAB.



Data Import and Export

we have 2 types of import and export:
1- Real Time Ingestion and Analysis:
products like Flume, Storm, Kafka, and Kinesis
the idea of these product is that you have multipel agents who push and pull data from each other.

these system doesnt care if the end system is Hadoop or NoSql or a Flat file

The products are similar, however Storm, Kafka and Kinesis has more Analysis functionality than Flume

2- Database Import Export:
Sqoop (SQL to Hadoop)
it is simply a single process that import/export data to/from hadoop.

there is no analysis or filtering or.. just import export.

you can do something like on 2:00 pull  all data from hadoop and put it in table xxx.


Flume is used to move massive amount of data from system A to System B (which is usually HDFS, MongoDB, NoSQL ...)

He talked about the architure of FLUME and there is a LAB.


some REST call examples


he gave a lab about sqoop

Oozie is used to build a workflow, the workflow is represented in XML format

No comments:

Post a Comment