Dancing with Elephants and Flying with The Bees–Using ORC File Format with Apache Hive

hive

 

When you start with Hive on Hadoop clear majority of samples and tutorials will have you work with text files. However, some time ago disadvantages of text files as file format were clearly seen by Hive community in terms of storage efficiency and performance.

First move to better columnar storage was introduction to Hive RC File Format. RCFile (Record Columnar File) is a data placement structure designed for MapReduce-based data warehouse systems. Hive added the RCFile format in version 0.6.0. RCFile stores table data in a flat file consisting of binary key/value pairs. It first partitions rows horizontally into row splits, and then it vertically partitions each row split in a columnar way. RCFile stores the metadata of a row split as the key part of a record, and all the data of a row split as the value part. Internals for RC File Format can be found in JavaDoc here – http://hive.apache.org/javadocs/r1.0.1/api/org/apache/hadoop/hive/ql/io/RCFile.html. What is important to note is why it was introduced as far as advantages:

  • As row-store, RCFile guarantees that data in the same row are located in the same node
  • As column-store, RCFile can exploit column-wise data compression and skip unnecessary column reads.

As time passed by explosion of data and need for higher speed in HiveQL queries has pushed need for further optimized columnar storage file formats. Therefore, ORC File Format was introduced. The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data

This has following advantages over RCFile format:

  • a single file as the output of each task, which reduces the NameNode’s load
  • light-weight indexes stored within the file, allowing to skip row groups that don’t pass predicate filtering and do seek to a given row
  • block-mode compression based on data type

An ORC file contains groups of row data called stripes, along with auxiliary information in a file footer. At the end of the file a postscript holds compression parameters and the size of the compressed footer.

The default stripe size is 250 MB. Large stripe sizes enable large, efficient reads from HDFS.

The file footer contains a list of stripes in the file, the number of rows per stripe, and each column’s data type. It also contains column-level aggregates count, min, max, and sum.

image

What does it all mean for me?

What it means for us as implementers following:

  • Better read performance due to compression. Streams are compressed using a codec, which is specified as a table property for all streams in that table. To optimize memory use, compression is done incrementally as each block is produced. Compressed blocks can be jumped over without first having to be decompressed for scanning. Positions in the stream are represented by a block start location and an offset into the block.
  • · Introduction to column-level statistics for optimization, feature that long existed in pretty much all commercial RDBMS packages (Oracle, SQL Server, etc.) . The goal of the column statistics is that for each column, the writer records the count and depending on the type other useful fields. For most of the primitive types, it records the minimum and maximum
    values; and for numeric types it additionally stores the sum. From Hive 1.1.0 onwards, the column statistics will also record if there are any null values within the row group by setting the hasNull flag.
  • · Light weight indexing
  • Larger Blocks by default 256 MB

Here is a good Hive file format comparison from HOrtonworks:

image

Using ORC – create table with ORC format:

Simplest way to create ORC file formatted Hive table is to add STORED AS ORC to Hive CREATE TABLE statement like:

CREATE TABLE my_table  (
column1 STRING,
column2 STRING,
column3 INT,
column4 INT
) STORED AS ORC;

ORC File Format can be used together with Hive Partitioning, which I explained in my previous post.  Here is an example of using Hive partitioning with ORC File Format:

CREATE  TABLE airanalytics 
(flightdate date ,dayofweek int,depttime int,crsdepttime int,arrtime int,crsarrtime int,uniquecarrier varchar(10),flightno int,tailnum int,aet int,cet int,airtime int,arrdelay int,depdelay int,origin varchar(5),dest varchar(5),distance int,taxin int,taxout int,cancelled int,cancelcode int,diverted string,carrdelay string,weatherdelay string,securtydelay string,cadelay string,lateaircraft string) 
 PARTITIONED BY (flight_year String)
 clustered BY (uniquecarrier)
 sorted BY (flightdate)
 INTO 24 buckets
 stored AS orc tblproperties ("orc.compress"="NONE","orc.stripe.size"="67108864", "orc.row.index.stride"="25000")

The parameters added on table level are as per docs:

Key

Default

Notes

orc.compress

ZLIB

high level compression (one of NONE, ZLIB, SNAPPY)

orc.compress.size

262,144

number of bytes in each compression chunk

orc.stripe.size

268435456

number of bytes in each stripe

orc.row.index.stride

10,000

number of rows between index entries (must be >= 1000)

orc.create.index

true

whether to create row indexes

orc.bloom.filter.columns

“”

comma separated list of column names for which bloom filter should be created

orc.bloom.filter.fpp

0.05

false positive probability for bloom filter (must >0.0 and <1.0)

If you have existing Hive table, it can be moved to ORC via:

    • ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT ORC
    • SET ive.default.fileformat=Orc

For more information see – https://en.wikipedia.org/wiki/RCFile, https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC, https://cwiki.apache.org/confluence/display/Hive/FileFormats, https://orc.apache.org/docs/hive-config.html

Hope this helps.

Advertisements

Dancing With Elephants and Flying With The Bees–Apache Hive Scaling Out with Partitions and Buckets

hive

In my previous post some time ago I introduced Apache Hive technology on Hadoop. Coming from SQL and RDBMS this was bound to be my favorite Hadoop technology.  Apache Hive is an open-source data warehouse system for querying and analyzing large datasets stored in HDFS files.

Today, unlike previous basics post, I will concentrate on Hive Partitions and Buckets. A simple query in Hive reads the entire dataset even if we have where clause filter. This becomes a bottleneck for running MapReduce jobs over a large table. We can overcome this issue by implementing partitions in Hive. Hive makes it very easy to implement partitions by using the automatic partition scheme when the table is created.

image

Partitions,  Just like in RDBMS, in Hive data within a table is split across multiple partitions. Each partition corresponds to a particular value(s) of partition column(s) and is stored as a sub-directory within the table’s directory on HDFS. When the table is queried, where applicable, only the required partitions of the table are queried, thereby reducing the I/O and time required by the query.  Example of creating partitioned table in Hive is below, note you can do this with both internal and external tables:

CREATE TABLE CUSTOMER   (

    userid             BIGINT,

    First_Name        STRING,

    Last_Name         STRING,

    address1           STRING,

    address2           STRING,

    city               STRING,

    zip_code           STRING,

    state              STRING

 

)

PARTITION BY  (


    COUNTRY            STRING

) 

As you can see I am using Country as partition column, The partition statement lets Hive alter the way it manages the underlying structures of the table’s data directory. If you browse the location of the data directory for a non-partitioned table, it will look like this: .db/. All the data files are directly written to this directory. In case of partitioned tables, subdirectories are created under the table’s data directory for each unique value of a partition column. In case the table is partitioned on multiple columns, then Hive creates nested subdirectories based on the order of partition columns in the table definition. Example with table above:

/gennadyk.db/customer/country=US

/gennadyk.db/customer/country=UK

When a partitioned table is queried with one or both partition columns in criteria or in the WHERE clause, what Hive effectively does is partition elimination by scanning only those data directories that are needed. If no partitioned columns are used, then all the directories are scanned (full table scan) and partitioning will not have any effect.

With a few quick changes it’s easy to configure Hive to support dynamic partition creation. Just as SQL Server has a SET command to change database options, Hive lets us change settings for a session using the SET command. Changing these settings permanently would require opening a text file and restarting the Hive cluster

SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;

Important notes on best practices

  • Be careful using dynamic partitions. Hive has some built-in limits on the number of partitions that can be dynamically created as well as limits on the total number of files that can exist within Hive. Creating many partitions at once will create a lot of files and creating a lot of files will use up memory in Hadoop Name Node
  • If your partitions are relatively small then the expense of recursing directories becomes more expensive than simply scanning the data. Likewise, partitions should be roughly similar in size to prevent a single long running thread from holding things up

Buckets. Much like partitioning, bucketing is a technique that allows you to cluster or segment large sets of data to optimize query performance. As you create HIVE table you can use CLUSTERED keyword to define buckets.

CREATE TABLE CUSTOMER   (

    userid             BIGINT,

    First_Name        STRING,

    Last_Name         STRING,

    address1           STRING,

    address2           STRING,

    city               STRING,

    zip_code           STRING,

    state              STRING

 

)

PARTITION BY  (


    COUNTRY            STRING

) 
(
   CLUSTERED BY (userid) INTO 10 BUCKETS;
(

In general, the bucket number is determined by the expression hash_function(bucketing_column) mod num_buckets. The hash_function depends on the type of the bucketing column. For an int, as in our example  it’s easy, hash_int(i) == i.. In our example we can expect  all useid’s that end in 0 to be in bucket 1, all userid’s that end in a 1 to be in bucket 2 and so on. For other datatypes it gets a lot more tricky unfortunately. To note hash of a string or a complex datatype will be some number that’s derived from the value, but not anything humanly-recognizable.In general, distributing rows based on the hash will give you a even distribution in the buckets.  That’s your goal here. Bucketing is entirely dependent on data correctly being loaded to the table.Loading table properly is critical in case of bucketing,

For more see – http://www.bidn.com/blogs/cprice1979/ssas/4646/partitions-amp-buckets-in-hive, https://www.qubole.com/blog/big-data/5-tips-for-efficient-hive-queries/, http://www.brentozar.com/archive/2013/03/introduction-to-hive-partitioning/, http://blog.cloudera.com/blog/2014/08/improving-query-performance-using-partitioning-in-apache-hive/, http://archive.cloudera.com/cdh/3/hive/language_manual/working_with_bucketed_tables.html

Dancing With Elephants and Flying With The Bees–Basic Introduction To Apache Hive

hive

Continuing on my Big Data\Hadoop blogging journey I started from HDFS post and YARN post,   I will move to my favorite Hadoop technology – Apache Hive. At its simplest definition Apache Hive is an open-source data warehouse system for querying and analyzing large datasets stored in HDFS files.

Hive has three main functions: data summarization, query and analysis.  It supports queries expressed in a language called HiveQL, which automatically translates SQL-like queries into MapReduce jobs executed on Hadoop. In addition, HiveQL supports custom MapReduce scripts to be plugged into queries. Hive also enables data serialization/deserialization and increases flexibility in schema design by including a system catalog called Hive-Metastore.

image

According to the Apache Hive wiki, “Hive is not designed for OLTP workloads and does not offer real-time queries or row-level updates. It is best used for batch jobs over large sets of append-only data (like web logs).” Hive supports text files (also called flat files), SequenceFiles (flat files consisting of binary key/value pairs) and RCFiles (Record Columnar Files which store columns of a table in a columnar database way.)

Hive isn’t a relational database as it only maintains metadata information about Big Data stored on HDFS. however it allows you to treat your Big Data as tables and perform SQL-like operations using HiveQL.

image

Hive Components.  Components of Hive include HCatalog and WebHCat:

  • HCatalog is a component of Hive. It is a table and storage management layer for Hadoop that enables users with different data processing tools — including Pig and MapReduce — to more easily read and write data on the grid.
  • WebHCat provides a service that you can use to run Hadoop MapReduce (or YARN), Pig, Hive jobs or perform Hive metadata operations using an HTTP (REST style) interface.

Hive Data Model.

Data in Hive is organized into:

  • Tables – These are analogous to Tables in Relational Databases. Tables can be filtered, projected, joined and unioned. Additionally all the data of a table is stored in a directory in HDFS. Hive also supports the notion of external tables wherein a table can be created on preexisting files or directories in HDFS by providing the appropriate location to the table creation DDL. The rows in a table are organized into typed columns similar to Relational Databases.
  • Partitions – Each Table can have one or more partition keys which determine how the data is stored, for example a table T with a date partition column ds had files with data for a particular date stored in the <table location>/ds=<date> directory in HDFS. Partitions allow the system to prune data to be inspected based on query predicates, for example a query that is interested in rows from T that satisfy the predicate T.ds = ‘2008-09-01’ would only have to look at files in <table location>/ds=2008-09-01/ directory in HDFS.
  • Buckets – Data in each partition may in turn be divided into Buckets based on the hash of a column in the table. Each bucket is stored as a file in the partition directory. Bucketing allows the system to efficiently evaluate queries that depend on a sample of data (these are queries that use the SAMPLE clause on the table).

Hive Metastore.  The Metastore provides two important but often overlooked features of a data warehouse: data abstraction and data discovery. Without the data abstractions provided in Hive, a user has to provide information about data formats, extractors and loaders along with the query. In Hive, this information is given during table creation and reused every time the table is referenced. This is very similar to the traditional warehousing systems. The second functionality, data discovery, enables users to discover and explore relevant and specific data in the warehouse.

HiveQL Compiler and Optimizer.  Just like in RDBMS HiveQL needs to be compiled and logical query plan has to be generated. Moreover, plan will be further optimized via optimizer. Just as with RDBMS Hive optimizer is moving towards cost based optimization based on statistics (SQL or Oracle anyone) – https://cwiki.apache.org/confluence/display/Hive/Cost-based+optimization+in+Hive

Enough theory – lets do Hive. Instead of installing Apache Hadoop and Hive for learning sake I would recommend that you download one of the learning VMs provided by both Cloudera and Hortonworks. In my demo I will use Hotronworks Sandbox, latest copy of that VM you can download from here – http://hortonworks.com/products/hortonworks-sandbox/#overview, if you prefer Cloudera you can get that from – http://www.cloudera.com/content/cloudera/en/downloads/quickstart_vms/cdh-5-4-x.html

So I will launch Hortonworks sandbox on my Virtual Box:

clip_image002

Although I could launch hive easily command line for the purpose of this tutorial I will go easy and use very convenient Sandbox Web Console available on 127.0.0.1::8000  Console gives me number of nice visual tools such as File Browser to easily move files and visibility into hue, Pig and Hive Editors , Oozie scheduling tooling, etc.

clip_image002[5]

I will use dataset on baseball statistics available from Sean Lahman – http://www.seanlahman.com/. Dataset can be downloaded from – http://seanlahman.com/files/database/lahman591-csv.zip  Once we have a file we will just unzip into some temp directory. I am interested here in two files in that directory – Master,csv and Batting.csv. I can use File Explorer Utility to upload these files to hue\hdfs. File Explorer icon on the toolbar is:

clip_image002[8]

Here is the utility:

clip_image002[10]

Once I upload the files I can go to Beeswax – Hive Editor. To do so will click on another icon on toolbar (the one with bee on it)

clip_image002[12]

Beeswax is a pretty nice editor and allows me to type in and edit my HiveQL statements, as well as get explain\execution plans for query tuning purposes (more on that in next posts).

clip_image002[14]
 

Creating Tables.  So first thing I will do is create a new table in default database provided to me. The Hive concept of a database is essentially just a catalog or namespace of tables.
However, they are very useful for larger clusters with multiple teams and users, as a
way of avoiding table name collisions. It’s also common to use databases to organize
production tables into logical groups. If you don’t specify a database, the default database is used.

The CREATE TABLE statement follows SQL conventions, but Hive’s version offers significant
extensions to support a wide range of flexibility where the data files for tables
are stored, the formats used, etc.  That’s what I am doing now first:

create table temp_batting (col_value STRING);

The table we have created so far are called managed tables or sometimes called internal
tables, because Hive controls the lifecycle of their data  Hive stores the data for these tables in a subdirectory under the directory defined by hive.metastore.warehouse.directory (e.g., /user/hive/warehouse), by default.  There is also ability to define an external table, i.e. table where data is stored externally to Hive metastore directory.

We can get all the information we need about a table through a query too. Go back to the query editor and execute:

describe temp_batting;

The result describes the schema of the table and detailed table information.  Similar to RDBMS isn’t it?

Loading data.  Next I will load data into my new table from file I uploaded. This is pretty much self-explanatory:

LOAD DATA INPATH '/user/hue/Batting.csv' OVERWRITE INTO TABLE temp_batting;

Since Hive has no row-level insert, update, and delete operations, the only way to put
data into an table is to use one of the “bulk” load operations.  If the LOCAL keyword is used, the path is assumed to be in the local file system. The data is copied into the final location. If LOCAL is omitted, the path is assumed to be in the distributed file system. In this case, the data is moved from the path to the final location.

Create another table. So I have table loaded with batting data ,, but I needs results table here , after all I want to get information on players doing  most runs yearly.

create table batting (player_id STRING, year INT, runs INT);

Finally query:

insert overwrite table batting  
SELECT  
  regexp_extract(col_value, '^(?:([^,]*)\,?){1}', 1) player_id,  
  regexp_extract(col_value, '^(?:([^,]*)\,?){2}', 1) year,  
  regexp_extract(col_value, '^(?:([^,]*)\,?){9}', 1) run  
from temp_batting;
SELECT a.year, a.player_id, a.runs from batting a  
JOIN (SELECT year, max(runs) runs FROM batting GROUP BY year ) b  
ON (a.year = b.year AND a.runs = b.runs) ;

Result

Some finer points:

  • We are using INSERT and SELECTs here, as well as JOIN
  • SELECT is the projection operator in SQL, WHERE gives the condition of what to filter, GROUP BY gives a list of columns which specify how to aggregate the records
  • INSERT OVERWRITE will overwrite any existing data in the table, whereas INSERT INTO will append data
  • I use online Hive Syntax Checker here – https://sql.treasuredata.com/, to check my syntax

So this is a very basic post and I will go from there into a lot more on Hive. For more information also see – https://github.com/Prokopp/the-free-hive-book, https://altiscale.zendesk.com/hc/en-us/articles/201802346-Simple-Hive-Query-Example, http://hortonworks.com/hadoop-tutorial/hello-world-an-introduction-to-hadoop-hcatalog-hive-and-pig/, http://www.tutorialspoint.com/hive/, tutorial on Amazon – http://docs.aws.amazon.com/gettingstarted/latest/emr/getting-started-emr-load-data.html

Dancing With Elephants -Hadoop 2: Introduction To Yarn

In my previous blog entry I profiled Hadoop HDFS , now I will go one step higher in Hadoop architecture and introduce YARN.  YARN was added for efficient resource management and scheduling relatively recently in Hadoop 2.0, previously MapReduce was a layer used in that role exclusively..

image

As you can see above this is relatively large change with YARN now taking its place as “data OS” on top of HDFS and MapReduce becoming another framework on top of YARN, just like many others.

image

YARN stands for “Yet-Another-Resource-Negotiator”. It is a new framework that facilitates writing arbitrary distributed processing frameworks and applications.

YARN provides the daemons and APIs necessary to develop generic distributed applications of any kind, handles and schedules resource requests (such as memory and CPU) from such applications, and supervises their execution.

YARN’s execution model is more generic than the earlier MapReduce implementation. YARN can run applications that do not follow the MapReduce model, unlike the original Apache Hadoop MapReduce.

YARN pretty radically changes MapReduce internals. Mapreduce 1.0 had following workflow:

image

Note there is a single point of failure here – JobTracker. Job Tracker is a “master” component of Task Trackers. Client submit MapReduce jobs to Job Tracker which distributes tasks to Task Trackers.Task Trackers run on Data Node and perform actual MapReduce jobs.

With the advent of YARN, there is no longer a single JobTracker to run jobs and a TaskTracker to run tasks of the jobs. The old MapReduce 1.0 framework was rewritten to run within a submitted application on top of YARN. This application was christened MR2, or MapReduce version 2. It is the familiar MapReduce execution underneath, except that each job now controls its own destiny via its own ApplicationMaster taking care of execution flow (such as scheduling tasks, handling speculative execution and failures, etc.). It is a more isolated and scalable model than the MR1\Map Reduce 1.0 system where a singular JobTracker does all the resource management, scheduling and task monitoring work.

yarn_architecture

The ResourceManager has two main components: Scheduler and ApplicationsManager:

  • The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, CPU, disk, network etc. In the first version, only memory is supported.
  • The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.

With the advent of YARN, you are no longer constrained by the simpler MapReduce paradigm of development, but can instead create more complex distributed applications. In fact, you can think of the MapReduce model as simply one more application in the set of possible applications that the YARN architecture can run, in effect exposing more of the underlying framework for customized development.

MRV2 maintains API compatibility with previous stable release (hadoop-1.x). This means that all Map-Reduce jobs should still run unchanged on top of MRv2 with just a recompile.

So how do I configure my YARN cluster settings\size my cluster correctly?

YARN configuration options are stored in the /opt/mapr/hadoop/hadoop-2.x.x/etc/hadoop/yarn-site.xml file and are editable by the root user. This file contains configuration information that overrides the default values for YARN parameters. Overrides of the default values for core configuration properties are stored in the yarn-default.xml file.

Common parameters for yarn-site.xml can be found here – http://doc.mapr.com/display/MapR/yarn-site.xml

There is a nice reference from Hortonworks here that talks about a tool that gives some best practice suggestions for memory settings and also goes over how to manually set these values, good article from Cloudera as well – http://www.cloudera.com/content/cloudera/en/documentation/cdh4/v4-2-2/CDH4-Installation-Guide/cdh4ig_topic_11_4.html

For more on YARN see – http://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-substantial-step-forward-enterprise-hadoop/, http://blog.sequenceiq.com/blog/2014/07/22/schedulers-part-1/, http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.6.0/bk_installing_manually_book/content/rpm_chap3.html, http://blogs.msdn.com/b/bigdatasupport/archive/2014/11/11/some-commonly-used-yarn-memory-settings.aspx , http://www.informit.com/articles/article.aspx?p=2190194&seqNum=2, http://arturmkrtchyan.com/how-to-setup-multi-node-hadoop-2-yarn-cluster

Dancing With Elephants – Hadoop: Introduction to Basics of HDFS

Its been some time now that I have been fascinated by Hadoop and its related technologies. Coming from SQL Server and RDBMS background topic of analytics or what is now called “Big Data” is near and dear to my heart

Almost everywhere you go online now, Hadoop is there in some capacity. Facebook, eBay, Etsy, Yelp , Twitter, Salesforce.com — you name a popular web site or service, and the chances are it’s using Hadoop to analyze the mountains of data it’s generating about user behavior and even its own operations. Even in the physical world, forward-thinking companies in fields ranging from entertainment to energy management to satellite imagery are using Hadoop to analyze the unique types of data they’re collecting and generating.

When the seeds of Hadoop were first planted in 2002, the world just wanted a better open-source search engine. So then-Internet Archive search director Doug Cutting and University of Washington graduate student Mike Cafarella set out to build it. They called their project Nutch and it was designed with that era’s web in mind.

Google released the Google File System paper in October 2003 and the MapReduce paper in December 2004. The latter would prove especially revelatory to the two engineers building Nutch.

What they spent a lot of time doing was generalizing this into a framework that automated all these steps that we were doing manually,” Cutting explained. In 2006, Cutting went to work with Yahoo, which was equally impressed by the Google File System and MapReduce papers and wanted to build open source technologies based on them. They spun out the storage and processing parts of Nutch to form Hadoop (named after Cutting’s son’s stuffed elephant) as an open-source Apache Software Foundation project and the Nutch web crawler remained its own separate project.

So from Google File System paper grew foundation of all Hadoop – HDFS. The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets. HDFS relaxes a few POSIX requirements to enable streaming access to file system data.

image

As you can see above – HDFS is foundation of all things Hadoop.  It was created with following assumptions in mind:

  • Accept idea that hardware fails. Hardware failure is the norm rather than the exception. An HDFS instance may consist of hundreds or thousands of server machines, each storing part of the file system’s data. The fact that there are a huge number of components and that each component has a non-trivial probability of failure means that some component of HDFS is always non-functional. Therefore, detection of faults and quick, automatic recovery from them is a core architectural goal of HDFS.
  • Streaming Data Access. Applications that run on HDFS need streaming access to their data sets. They are not general purpose applications that typically run on general purpose file systems. HDFS is designed more for batch processing rather than interactive use by users. The emphasis is on high throughput of data access rather than low latency of data access. POSIX imposes many hard requirements that are not needed for applications that are targeted for HDFS. POSIX semantics in a few key areas has been traded to increase data throughput rates.
  • Work with very large data sets. Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes to terabytes in size. Thus, HDFS is tuned to support large files. It should provide high aggregate data bandwidth and scale to hundreds of nodes in a single cluster. It should support tens of millions of files in a single instance.
  • Write Once and Read Many Access. HDFS applications need a write-once-read-many access model for files. A file once created, written, and closed need not be changed. This assumption simplifies data coherency issues and enables high throughput data access. A MapReduce application or a web crawler application fits perfectly with this model. There is a plan to support appending-writes to files in the future.
  • Move computation, not data. A computation requested by an application is much more efficient if it is executed near the data it operates on. This is especially true when the size of the data set is huge. This minimizes network congestion and increases the overall throughput of the system. The assumption is that it is often better to migrate the computation closer to where the data is located rather than moving the data to where the application is running. HDFS provides interfaces for applications to move themselves closer to where the data is located.

So what is the architecture of HDFS?

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.  Typical workflow can be seen below:

image

The NameNode and DataNode are pieces of software designed to run on commodity machines. These machines typically run a GNU/Linux operating system (OS). HDFS is built using the Java language; any machine that supports Java can run the NameNode or the DataNode software. Usage of the highly portable Java language means that HDFS can be deployed on a wide range of machines. A typical deployment has a dedicated machine that runs only the NameNode software. Each of the other machines in the cluster runs one instance of the DataNode software.

What’s so special with HDFS ? – Data Replication.

HDFS is designed to reliably store very large files across machines in a large cluster. It stores each file as a sequence of blocks; all blocks in a file except the last block are the same size. The blocks of a file are replicated for fault tolerance. The block size and replication factor are configurable per file. An application can specify the number of replicas of a file. The replication factor can be specified at file creation time and can be changed later. Files in HDFS are write-once and have strictly one writer at any time.

The NameNode makes all decisions regarding replication of blocks. It periodically receives a Heartbeat and a Blockreport from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A Blockreport contains a list of all blocks on a DataNode.

What’s so special with HDFS? – Heartbeat.

One important objective of HDFS is to store data reliably, even when failures occur within name nodes, data nodes, or network partitions.  Detection is the first step HDFS takes to overcome failures. HDFS uses heartbeat messages to detect connectivity between name and data nodes. Several things can cause loss of connectivity between name and data nodes. Therefore, each data node sends periodic heartbeat messages to its name node, so the latter can detect loss of connectivity if it stops receiving them. The name node marks as dead data nodes not responding to heartbeats and refrains from sending further requests to them. Data stored on a dead node is no longer available to an HDFS client from that node, which is effectively removed from the system. If the death of a node causes the replication factor of data blocks to drop below their minimum value, the name node initiates additional replication to bring the replication factor back to a normalized state.

image

What is so special with HDFS ? – Replica Placement.

The placement of replicas is critical to HDFS reliability and performance. Optimizing replica placement distinguishes HDFS from most other distributed file systems. This is a feature that needs lots of tuning and experience. The purpose of a rack-aware replica placement policy is to improve data reliability, availability, and network bandwidth utilization. The current implementation for the replica placement policy is a first effort in this direction. The short-term goals of implementing this policy are to validate it on production systems, learn more about its behavior, and build a foundation to test and research more sophisticated policies.

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

This is all great – what happens on DataNode disk failure?

Each DataNode sends a Heartbeat message to the NameNode periodically. A network partition can cause a subset of DataNodes to lose connectivity with the NameNode. The NameNode detects this condition by the absence of a Heartbeat message. The NameNode marks DataNodes without recent Heartbeats as dead and does not forward any new IO requests to them. Any data that was registered to a dead DataNode is not available to HDFS any more. DataNode death may cause the replication factor of some blocks to fall below their specified value. The NameNode constantly tracks which blocks need to be replicated and initiates replication whenever necessary. The necessity for re-replication may arise due to many reasons: a DataNode may become unavailable, a replica may become corrupted, a hard disk on a DataNode may fail, or the replication factor of a file may be increased.

What about metadata disk failure?

The FsImage and the EditLog are central data structures of HDFS. A corruption of these files can cause the HDFS instance to be non-functional. For this reason, the NameNode can be configured to support maintaining multiple copies of the FsImage and EditLog. Any update to either the FsImage or EditLog causes each of the FsImages and EditLogs to get updated synchronously. This synchronous updating of multiple copies of the FsImage and EditLog may degrade the rate of namespace transactions per second that a NameNode can support. However, this degradation is acceptable because even though HDFS applications are very data intensive in nature, they are not metadata intensive. When a NameNode restarts, it selects the latest consistent FsImage and EditLog to use.

Important to note – DataNodes can fail, but NameNode is a single point of failure in Hadoop 1.0

The NameNode machine is a single point of failure for an HDFS cluster. If the NameNode machine fails, manual intervention is necessary.The Hadoop 2.0 HDFS High Availability feature addresses the above problems by providing the option of running two redundant NameNodes in the same cluster in an Active/Passive configuration with a hot standby. This allows a fast failover to a new NameNode in the case that a machine crashes, or a graceful administrator-initiated failover for the purpose of planned maintenance.

FSShell. HDFS allows user data to be organized in the form of files and directories. It provides a commandline interface called FS shell that lets a user interact with the data in HDFS. The syntax of this command set is similar to other shells (e.g. bash, csh) that users are already familiar with. Here are some sample action/command pairs:

image

Many of FSShell commands are pretty familiar mkdir to create directory, mv to move files, rm to delete files, etc.

Full docs on FSShell commands can be found here – http://hadoop.apache.org/docs/r2.2.0/hadoop-project-dist/hadoop-common/FileSystemShell.html

HDFS also has a Java based API.  Docs on that can be found here – http://hadoop.apache.org/docs/current/api/

This is lots of theory, but in the next few posts I hope that I show more practical things using Hive – a DW facilities on top of HDFS and Yarn that allow for querying\writing Hadoop jobs using SQL like language (HiveQL) using Hortonworks Sandbox.

More on HDFS as foundation of Hadoop – http://www.ibm.com/developerworks/library/wa-introhdfs/index.html, http://hortonworks.com/hadoop/hdfs/, http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html, http://hortonworks.com/blog/ha-namenode-for-hdfs-with-hadoop-1-0-part-1/