Forecast Cloudy – Using Azure Blob Storage with Apache Hive on HDInsight

The beauty of working with Big Data in Azure is that you can manage (create\delete) compute resources with your HDInsight cluster independent of your data stored either in Azure Data Lake or Azure blob storage.  In this case I will concentrate on using Azure blob storage\WASB as data store for HDInsWight Azure PaaS Hadoop service

With a typical Hadoop installation you load your data to a staging location then you import it into the Hadoop Distributed File System (HDFS) within a single Hadoop cluster. That data is manipulated, massaged, and transformed. Then you may export some or all of the data back as resultset for consumption by other systems (think PowerBI, Tableau, etc)
Windows Azure Storage Blob (WASB) is an extension built on top of the HDFS APIs. The WASBS variation uses SSL certificates for improved security. It in many ways “is” HDFS. However, WASB creates a layer of abstraction that enables separation of storage. This separation is what enables your data to persist even when no clusters currently exist and enables multiple clusters plus other applications to access a single piece of data all at the same time. This increases functionality and flexibility while reducing costs and reducing the time from question to insight.

hdinsight

In Azure you store blobs on containers within Azure storage accounts. You grant access to a storage account, you create collections at the container level, and you place blobs (files of any format) inside the containers. This illustration from Microsoft’s documentation helps to show the structure:

blob1

Hold on, isn’t the whole selling point of Hadoop is proximity of data to compute?  Yes, and just like with any other Hadoop system on premises data is loaded into memory on the individual nodes at compute time. With Azure data infrastructure setup and data center backbone within data center built for performance, your job performance is generally the same or better than if you used disks locally attached to the VMs.

Below is diagram of HDInsight data storage architecture:

hdi.wasb.arch

HDInsight provides access to the distributed file system that is locally attached to the compute nodes. This file system can be accessed by using the fully qualified URI, for example:

hdfs:///

More important is ability access data that is stored in Azure Storage. The syntax is:

wasb[s]://@.blob.core.windows.net/

As per https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage you need to be aware of following:

  • Container Security for WASB storage.  For containers in storage accounts that are connected to cluster,because the account name and key are associated with the cluster during creation, you have full access to the blobs in those containers. For public containers that are not connected to cluster you have read-only permission to the blobs in the containers.  For private containers in storage accounts that are not connected to cluster , you can’t access the blobs in the containers unless you define the storage account when you submit the WebHCat jobs.
  • The storage accounts that are defined in the creation process and their keys are stored in %HADOOP_HOME%/conf/core-site.xml on the cluster nodes. The default behavior of HDInsight is to use the storage accounts defined in the core-site.xml file. It is not recommended to directly edit the core-site.xml file because the cluster head node(master) may be reimaged or migrated at any time, and any changes to this file are not persisted.

 You can create new or point existing storage account to HDinsight cluster easy via portal as I show below:

Capture

You can point your HDInsight cluster to multiple storage accounts as well , as explained here – https://blogs.msdn.microsoft.com/mostlytrue/2014/09/03/hdinsight-working-with-different-storage-accounts/ 

You can also create storage account and container via Azure PowerShell like in this sample:

$SubscriptionID = “<Your Azure Subscription ID>”
$ResourceGroupName = “<New Azure Resource Group Name>”
$Location = “EAST US 2”

$StorageAccountName = “<New Azure Storage Account Name>”
$containerName = “<New Azure Blob Container Name>”

Add-AzureRmAccount
Select-AzureRmSubscription -SubscriptionId $SubscriptionID

# Create resource group
New-AzureRmResourceGroup -name $ResourceGroupName -Location $Location

# Create default storage account
New-AzureRmStorageAccount -ResourceGroupName $ResourceGroupName -Name $StorageAccountName -Location $Location -Type Standard_LRS

# Create default blob containers
$storageAccountKey = (Get-AzureRmStorageAccountKey -ResourceGroupName $resourceGroupName -StorageAccountName $StorageAccountName)[0].Value
$destContext = New-AzureStorageContext -StorageAccountName $storageAccountName -StorageAccountKey $storageAccountKey
New-AzureStorageContainer -Name $containerName -Context $destContext

 

The URI scheme for accessing files in Azure storage from HDInsight is:

wasb[s]://<BlobStorageContainerName>@<StorageAccountName>.blob.core.windows.net/<path>

The URI scheme provides unencrypted access (with the wasb: prefix) and SSL encrypted access (with wasbs). Microsoft recommends using wasbs wherever possible, even when accessing data that lives inside the same region in Azure.

The <BlobStorageContainerName> identifies the name of the blob container in Azure storage. The <StorageAccountName> identifies the Azure Storage account name. A fully qualified domain name (FQDN) is required.

I ran into rather crazy little limitation\ issue when working with \WASB and HDInsight. Hadoop and Hive is looking for and  expects a valid folder hierarchy to import data  files, whereas  WASB does not support a folder hierarchy i.e. all blobs are listed under a container. The workaround is to use SSH session to login into head cluster node and use mkdir command line command to manually create such directory via the driver.

The SSH Procedure with HDInsight can be found here – https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-linux-use-ssh-unix

Another one recommended to me was that “/” character can be used within the key name to make it appear as if a file is stored within a directory structure. HDInsight sees these as if they are actual directories.For example, a blob’s key may be input/log1.txt. No actual “input” directory exists, but due to the presence of the “/” character in the key name, it has the appearance of a file path.

For more see – https://social.technet.microsoft.com/wiki/contents/articles/6621.azure-hdinsight-faq.aspx,  https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storagehttps://www.codeproject.com/articles/597940/understandingpluswindowsplusazureplusblobplusstora

Hope this helps.

Meet Redis – Setting Up Redis On Ubuntu Linux

redis_thumb.jpg

I have been asked by few folks on quick tutorial setting up Redis under systemd in Ubuntu Linux version 16.04.

I have blogged quite a bit about Redis in general –https://gennadny.wordpress.com/category/redis/ , however just a quick line on Redis in general. Redis is an in-memory key-value store known for its flexibility, performance, and wide language support. That makes Redis one of the most popular key value data stores in existence today. Below are steps to install and configure it to run under systemd in Ubuntu 16.04 and above.

Here are the prerequisites:

Next steps are:

  • Login into your Ubuntu server with this user account
  • Update and install prerequisites via apt-get
             $ sudo apt-get update
             $ sudo apt-get install build-essential tcl
    
  • Now we can download and exgract Redis to tmp directory
              $ cd /tmp
              $ curl -O http://download.redis.io/redis-stable.tar.gz
              $ tar xzvf redis-stable.tar.gz
              $ cd redis-stable
    
  • Next we can build Redis
        $ make
    
  • After the binaries are compiled, run the test suite to make sure everything was built correctly. You can do this by typing:
       $ make test
    
  • This will typically take a few minutes to run. Once it is complete, you can install the binaries onto the system by typing:
    $ sudo make install
    

Now we need to configure Redis to run under systemd. Systemd is an init system used in Linux distributions to bootstrap the user space and manage all processes subsequently, instead of the UNIX System V or Berkeley Software Distribution (BSD) init systems. As of 2016, most Linux distributions have adopted systemd as their default init system.

  • To start off, we need to create a configuration directory. We will use the conventional /etc/redis directory, which can be created by typing
    $ sudo mkdir /etc/redi
    
  • Now, copy over the sample Redis configuration file included in the Redis source archive:
         $ sudo cp /tmp/redis-stable/redis.conf /etc/redis
    
  • Next, we can open the file to adjust a few items in the configuration:
    $ sudo nano /etc/redis/redis.conf
    
  • In the file, find the supervised directive. Currently, this is set to no. Since we are running an operating system that uses the systemd init system, we can change this to systemd:
    . . .
    
    # If you run Redis from upstart or systemd, Redis can interact with your
    # supervision tree. Options:
    #   supervised no      - no supervision interaction
    #   supervised upstart - signal upstart by putting Redis into SIGSTOP mode
    #   supervised systemd - signal systemd by writing READY=1 to $NOTIFY_SOCKET
    #   supervised auto    - detect upstart or systemd method based on
    #                        UPSTART_JOB or NOTIFY_SOCKET environment variables
    # Note: these supervision methods only signal "process is ready."
    #       They do not enable continuous liveness pings back to your supervisor.
    supervised systemd
    
    . . .
    
  • Next, find the dir directory. This option specifies the directory that Redis will use to dump persistent data. We need to pick a location that Redis will have write permission and that isn’t viewable by normal users.
    We will use the /var/lib/redis directory for this, which we will create

    . . .
    
    
    # The working directory.
    #
    # The DB will be written inside this directory, with the filename specified
    # above using the 'dbfilename' configuration directive.
    #
    # The Append Only File will also be created inside this directory.
    #
    # Note that you must specify a directory here, not a file name.
    dir /var/lib/redis
    
    . . .
    

    Save and close the file when you are finished

  • Next, we can create a systemd unit file so that the init system can manage the Redis process.
    Create and open the /etc/systemd/system/redis.service file to get started:

    $ sudo nano /etc/systemd/system/redis.service
    
  • The file will should like this, create sections below
    [Unit]
    Description=Redis In-Memory Data Store
    After=network.target
    
    [Service]
    User=redis
    Group=redis
    ExecStart=/usr/local/bin/redis-server /etc/redis/redis.conf
    ExecStop=/usr/local/bin/redis-cli shutdown
    Restart=always
    
    [Install]
    WantedBy=multi-user.target
    
  • Save and close file when you are finished

Now, we just have to create the user, group, and directory that we referenced in the previous two files.
Begin by creating the redis user and group. This can be done in a single command by typing:

$ sudo chown redis:redis /var/lib/redis

Now we can start Redis:

  $ sudo systemctl start redis

Check that the service had no errors by running:

$ sudo systemctl status redis

And Eureka – here is the response

redis.service - Redis Server
   Loaded: loaded (/etc/systemd/system/redis.service; enabled; vendor preset: enabled)
   Active: active (running) since Wed 2016-05-11 14:38:08 EDT; 1min 43s ago
  Process: 3115 ExecStop=/usr/local/bin/redis-cli shutdown (code=exited, status=0/SUCCESS)
 Main PID: 3124 (redis-server)
    Tasks: 3 (limit: 512)
   Memory: 864.0K
      CPU: 179ms
   CGroup: /system.slice/redis.service
           └─3124 /usr/local/bin/redis-server 127.0.0.1:6379    

Congrats ! You can now start learning Redis. Connect to Redis CLI by typing

$ redis-cli

Now you can follow these Redis tutorials

Hope this was helpful

Dancing with Elephants and Flying with The Bees–Using ORC File Format with Apache Hive

hive

 

When you start with Hive on Hadoop clear majority of samples and tutorials will have you work with text files. However, some time ago disadvantages of text files as file format were clearly seen by Hive community in terms of storage efficiency and performance.

First move to better columnar storage was introduction to Hive RC File Format. RCFile (Record Columnar File) is a data placement structure designed for MapReduce-based data warehouse systems. Hive added the RCFile format in version 0.6.0. RCFile stores table data in a flat file consisting of binary key/value pairs. It first partitions rows horizontally into row splits, and then it vertically partitions each row split in a columnar way. RCFile stores the metadata of a row split as the key part of a record, and all the data of a row split as the value part. Internals for RC File Format can be found in JavaDoc here – http://hive.apache.org/javadocs/r1.0.1/api/org/apache/hadoop/hive/ql/io/RCFile.html. What is important to note is why it was introduced as far as advantages:

  • As row-store, RCFile guarantees that data in the same row are located in the same node
  • As column-store, RCFile can exploit column-wise data compression and skip unnecessary column reads.

As time passed by explosion of data and need for higher speed in HiveQL queries has pushed need for further optimized columnar storage file formats. Therefore, ORC File Format was introduced. The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data

This has following advantages over RCFile format:

  • a single file as the output of each task, which reduces the NameNode’s load
  • light-weight indexes stored within the file, allowing to skip row groups that don’t pass predicate filtering and do seek to a given row
  • block-mode compression based on data type

An ORC file contains groups of row data called stripes, along with auxiliary information in a file footer. At the end of the file a postscript holds compression parameters and the size of the compressed footer.

The default stripe size is 250 MB. Large stripe sizes enable large, efficient reads from HDFS.

The file footer contains a list of stripes in the file, the number of rows per stripe, and each column’s data type. It also contains column-level aggregates count, min, max, and sum.

image

What does it all mean for me?

What it means for us as implementers following:

  • Better read performance due to compression. Streams are compressed using a codec, which is specified as a table property for all streams in that table. To optimize memory use, compression is done incrementally as each block is produced. Compressed blocks can be jumped over without first having to be decompressed for scanning. Positions in the stream are represented by a block start location and an offset into the block.
  • · Introduction to column-level statistics for optimization, feature that long existed in pretty much all commercial RDBMS packages (Oracle, SQL Server, etc.) . The goal of the column statistics is that for each column, the writer records the count and depending on the type other useful fields. For most of the primitive types, it records the minimum and maximum
    values; and for numeric types it additionally stores the sum. From Hive 1.1.0 onwards, the column statistics will also record if there are any null values within the row group by setting the hasNull flag.
  • · Light weight indexing
  • Larger Blocks by default 256 MB

Here is a good Hive file format comparison from HOrtonworks:

image

Using ORC – create table with ORC format:

Simplest way to create ORC file formatted Hive table is to add STORED AS ORC to Hive CREATE TABLE statement like:

CREATE TABLE my_table  (
column1 STRING,
column2 STRING,
column3 INT,
column4 INT
) STORED AS ORC;

ORC File Format can be used together with Hive Partitioning, which I explained in my previous post.  Here is an example of using Hive partitioning with ORC File Format:

CREATE  TABLE airanalytics 
(flightdate date ,dayofweek int,depttime int,crsdepttime int,arrtime int,crsarrtime int,uniquecarrier varchar(10),flightno int,tailnum int,aet int,cet int,airtime int,arrdelay int,depdelay int,origin varchar(5),dest varchar(5),distance int,taxin int,taxout int,cancelled int,cancelcode int,diverted string,carrdelay string,weatherdelay string,securtydelay string,cadelay string,lateaircraft string) 
 PARTITIONED BY (flight_year String)
 clustered BY (uniquecarrier)
 sorted BY (flightdate)
 INTO 24 buckets
 stored AS orc tblproperties ("orc.compress"="NONE","orc.stripe.size"="67108864", "orc.row.index.stride"="25000")

The parameters added on table level are as per docs:

Key

Default

Notes

orc.compress

ZLIB

high level compression (one of NONE, ZLIB, SNAPPY)

orc.compress.size

262,144

number of bytes in each compression chunk

orc.stripe.size

268435456

number of bytes in each stripe

orc.row.index.stride

10,000

number of rows between index entries (must be >= 1000)

orc.create.index

true

whether to create row indexes

orc.bloom.filter.columns

“”

comma separated list of column names for which bloom filter should be created

orc.bloom.filter.fpp

0.05

false positive probability for bloom filter (must >0.0 and <1.0)

If you have existing Hive table, it can be moved to ORC via:

    • ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT ORC
    • SET ive.default.fileformat=Orc

For more information see – https://en.wikipedia.org/wiki/RCFile, https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC, https://cwiki.apache.org/confluence/display/Hive/FileFormats, https://orc.apache.org/docs/hive-config.html

Hope this helps.

Dancing With Elephants and Flying With The Bees–Apache Hive Scaling Out with Partitions and Buckets

hive

In my previous post some time ago I introduced Apache Hive technology on Hadoop. Coming from SQL and RDBMS this was bound to be my favorite Hadoop technology.  Apache Hive is an open-source data warehouse system for querying and analyzing large datasets stored in HDFS files.

Today, unlike previous basics post, I will concentrate on Hive Partitions and Buckets. A simple query in Hive reads the entire dataset even if we have where clause filter. This becomes a bottleneck for running MapReduce jobs over a large table. We can overcome this issue by implementing partitions in Hive. Hive makes it very easy to implement partitions by using the automatic partition scheme when the table is created.

image

Partitions,  Just like in RDBMS, in Hive data within a table is split across multiple partitions. Each partition corresponds to a particular value(s) of partition column(s) and is stored as a sub-directory within the table’s directory on HDFS. When the table is queried, where applicable, only the required partitions of the table are queried, thereby reducing the I/O and time required by the query.  Example of creating partitioned table in Hive is below, note you can do this with both internal and external tables:

CREATE TABLE CUSTOMER   (

    userid             BIGINT,

    First_Name        STRING,

    Last_Name         STRING,

    address1           STRING,

    address2           STRING,

    city               STRING,

    zip_code           STRING,

    state              STRING

 

)

PARTITION BY  (


    COUNTRY            STRING

) 

As you can see I am using Country as partition column, The partition statement lets Hive alter the way it manages the underlying structures of the table’s data directory. If you browse the location of the data directory for a non-partitioned table, it will look like this: .db/. All the data files are directly written to this directory. In case of partitioned tables, subdirectories are created under the table’s data directory for each unique value of a partition column. In case the table is partitioned on multiple columns, then Hive creates nested subdirectories based on the order of partition columns in the table definition. Example with table above:

/gennadyk.db/customer/country=US

/gennadyk.db/customer/country=UK

When a partitioned table is queried with one or both partition columns in criteria or in the WHERE clause, what Hive effectively does is partition elimination by scanning only those data directories that are needed. If no partitioned columns are used, then all the directories are scanned (full table scan) and partitioning will not have any effect.

With a few quick changes it’s easy to configure Hive to support dynamic partition creation. Just as SQL Server has a SET command to change database options, Hive lets us change settings for a session using the SET command. Changing these settings permanently would require opening a text file and restarting the Hive cluster

SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;

Important notes on best practices

  • Be careful using dynamic partitions. Hive has some built-in limits on the number of partitions that can be dynamically created as well as limits on the total number of files that can exist within Hive. Creating many partitions at once will create a lot of files and creating a lot of files will use up memory in Hadoop Name Node
  • If your partitions are relatively small then the expense of recursing directories becomes more expensive than simply scanning the data. Likewise, partitions should be roughly similar in size to prevent a single long running thread from holding things up

Buckets. Much like partitioning, bucketing is a technique that allows you to cluster or segment large sets of data to optimize query performance. As you create HIVE table you can use CLUSTERED keyword to define buckets.

CREATE TABLE CUSTOMER   (

    userid             BIGINT,

    First_Name        STRING,

    Last_Name         STRING,

    address1           STRING,

    address2           STRING,

    city               STRING,

    zip_code           STRING,

    state              STRING

 

)

PARTITION BY  (


    COUNTRY            STRING

) 
(
   CLUSTERED BY (userid) INTO 10 BUCKETS;
(

In general, the bucket number is determined by the expression hash_function(bucketing_column) mod num_buckets. The hash_function depends on the type of the bucketing column. For an int, as in our example  it’s easy, hash_int(i) == i.. In our example we can expect  all useid’s that end in 0 to be in bucket 1, all userid’s that end in a 1 to be in bucket 2 and so on. For other datatypes it gets a lot more tricky unfortunately. To note hash of a string or a complex datatype will be some number that’s derived from the value, but not anything humanly-recognizable.In general, distributing rows based on the hash will give you a even distribution in the buckets.  That’s your goal here. Bucketing is entirely dependent on data correctly being loaded to the table.Loading table properly is critical in case of bucketing,

For more see – http://www.bidn.com/blogs/cprice1979/ssas/4646/partitions-amp-buckets-in-hive, https://www.qubole.com/blog/big-data/5-tips-for-efficient-hive-queries/, http://www.brentozar.com/archive/2013/03/introduction-to-hive-partitioning/, http://blog.cloudera.com/blog/2014/08/improving-query-performance-using-partitioning-in-apache-hive/, http://archive.cloudera.com/cdh/3/hive/language_manual/working_with_bucketed_tables.html

Forecast Cloudy – Going Big Data With Google Big Query

google-cloud-platform

Although I am mainly Microsoft and Azure centric guy,after reading a bit on Google Big Query it got me interested. Therefore I decided to explore it a bit more as well here.

BigQuery is a RESTful web service that enables interactive analysis of massively large datasets working in conjunction with Google Storage. It is an Infrastructure as a Service (IaaS) that may be used complementarily with MapReduce. BigQuery (BQ) is reportedly based on Dremel,a scalable, interactive ad hoc query system for analysis of read-only nested data. To use the data in BigQuery, it first must be uploaded to Google Storage and in a second step imported using the BigQuery HTTP API. BigQuery requires all requests to be authenticated, supporting a number of Google-proprietary mechanisms as well as OAuth.

Lets start with what BigQuery is not. Its not either RDBMS nor MapReduce technology. As I stated BigQuery is based on internal Google technology called Dremel. Dremel is a query service that allows you to run SQL-like queries against very,very large data sets and get accurate results in mere seconds. You just need a basic knowledge of SQL to query extremely large datasets in an ad hoc manner. At Google, engineers and non-engineers alike, including analysts, tech support staff and technical account managers, use this technology many times a day. BigQuery provides the core set of features available in Dremel to third party developers. It does so via a REST API, a command line interface, a Web UI, access control and more, while
maintaining the unprecedented query performance of Dremel.

Why BigQuery is so fast? The answer can be found in two core technologies which gives BQ this unprecedented performance:

  • Columnar Storage. Data is stored in a columnar storage fashion which makes possible to achieve very high compression ratio and scan throughput
  • Tree Architecture is used for dispatching queries and aggregating results across thousands of machines in a few seconds.

BigQuery stores data in its columnar storage, which means it separates a record into column values and stores each value on different storage volume, whereas traditional RDBMS normally store the whole record on one volume

image

Actually this technology isnt that new for anyone who dealt with DW technologies for a while, it’s a fairly hot topic today with SQL Server Column-Store Indexes and SAP HANA In-Memory Column Store for example.  As you may know Column Storage has following advantages:

  • Traffic minimization. Only required column values on each query are scanned and transferred on query execution. For example, a query “SELECT top(title) FROM foo” would access the title column values only.
  • Higher compression ratio.  One study reports that columnar storage can achieve a compression ratio of 1:10, whereas ordinary row-based storage can compress at roughly 1:3. Because each column would have similar values, especially if the cardinality of the column (variation of possible column values) is low, it’s easier to gain higher compression ratios than row-based storage.

Columnar storage has the disadvantage of not working efficiently when updating existing records. In the case of BigQuery, it simply doesn’t support any update operations. Thus the technique has been used mainly in read-only OLAP/BI type of usage. Although the technology has been popular as a data warehouse database design, Dremel\BigQuery is one of the first implementations of a columnar storage-based analytics system that harnesses the computing power of many thousands of servers and is delivered as a cloud service.

One of the challenges Google had in designing Dremel\BigQuery was how to dispatch queries and collect results across tens of thousands of machines in a matter of seconds. The challenge was resolved by using the Tree architecture. The architecture forms a massively parallel distributed tree for pushing down a query to the tree and then aggregating the results from the leaves at a blazingly fast speed.

image    

By leveraging this architecture, Google was able to implement the distributed design for Dremel\BigQuery and realize the vision of the massively parallel columnar-based database on the cloud platform.

image

     

BigQuery provides the core set of features available in Dremel to third party developers. It does so via a REST API, command line interface, Web UI,access control, data schema management and the integration with Google Cloud Storage. BigQuery and Dremel share the same underlying architecture and performance characteristics. Users can fully utilize the power of Dremel by using BigQuery to take advantage of Google’s massive computational infrastructure. This incorporates valuable benefits like multiple replication across regions and high data center scalability. Most importantly, this infrastructure requires no management by the developer.

So why BigQuery over MapReduce? The difference here is MapReduce is batch based programming framework for very large datasets,  whereas BIgQuery is an interactive data query tool for large datasets

image

Ok, how do I use it?  Assuming you already have Google Cloud account you will have to create a new project from the dropdown in your Google Cloud Console.

image 

Once that is done, you can navigate to project console and enable BigQuery APIs for use with your project:

image

Now in you left side menu you can pick BigQuery from Big Data offerings.

image

 

Before we can run any queries, we need some data! There are a couple of options here:

  • Load you own data
  • Use Google provided public data

For now I will settle on the second choice.  I will pick Shakespeare dataset here, This dataset contains the words in Shakespeare’s works, the word_count for each word, in which corpus the word appears, and the date the corpus was written. First I will issue a simple SELECT. SELECT is the most basic clause and specifies what it is that you want to be returned by the query. FROM specifies what dataset we are using.

SELECT corpus
FROM (publicdata:samples.shakespeare)
GROUP BY corpus

image

Let’s switch gears. Say we want to count something – say, the number of words in Shakespeare’s works. Luckily, we have word_count, which represents how many times a particular word appeared in a particular corpus. We can just sum all of these values, and we are left with the total number of words that he wrote.

SELECT SUM(word_count) AS count
FROM (publicdata:samples.shakespeare)

image

945,845 words! Pretty good – but there must surely be some duplicates. How would we query the number of unique words that he used?

SELECT COUNT(word) AS count, word
FROM publicdata:samples.shakespeare
GROUP BY word
ORDER BY count Desc

Here we use the COUNT function to count how many words there are, and group them by word so as not to show duplicates. 32,786 unique words. Moreover I order these by mostly used words in descending order.

image

Ok, lets finally add WHERE clause. For example I wonder how many times Shakespeare is using word “Sir”:

SELECT word, SUM(word_count) as count
FROM (publicdata:samples.shakespeare)
WHERE word = "Sir"
GROUP BY word
ORDER BY count DESC

From results I can state that Shakespeare was really polite guy:

image

This of course is pretty basic. However, BigQuery now does allow for joins. I could take my large table and join it to smaller lookup table using standard ANSI SQL join syntax

image

There are also some complex functions it supports like:

image

In the next part I am planning to upload a dataset and do some joins and more complex processing.

For more on BigQuery see – https://cloud.google.com/bigquery/web-ui-quickstart, http://martinfowler.com/articles/bigQueryPOC.html, https://support.google.com/analytics/answer/4419694?hl=en, http://googlecode.blogspot.com/2011/11/google-bigquery-service-big-data.html

Bees and Elephants In The Clouds – Using Hive with Azure HDInsight

hdinsight

For today’s blog entry I will combine both Cloud and Big Data to show you how easy it is to do Big Data on the Cloud. 

Microsoft Azure is a cloud service provided by Microsoft, and one of the services offered is HDInsight, an Apache Hadoop-based distribution running in the cloud on Azure. Another service offered by Azure is blob storage. Azure Storage functions as the default file system and stores data in the cloud and not on-premises or in nodes. This way, when you are done running an HDInsight job, the cluster can be decommissioned (to save you money) while your data remains intact.

8484_110113_2116_TheHDInsigh4

HDinsight provides an excellent scenario for IoT analytics, PoC\Development, bursting of on-Premise resources to the Cloud, etc.

Untitled picture

First thing of course what you will need is Microsoft Azure Subscription. Subscription has many moving parts, and http://azure.microsoft.com has interactive pricing pages, a pricing calculator, and plenty of documentation.  If you have an MSDN subscription you will get a limited Azure subscription as well (just have to enable and link it) or you can setup a trial subscription at https://azure.microsoft.com/en-us/pricing/free-trial/. For this little tutorial I will be using my MSDN subscription.

Assuming you know how to setup your subscription and can login to Azure Management Portal our next step will be to create Azure storage account. Now I already have number of storage accounts, but will create on just for this tutorial:

Hit +NEW in lower left side of portal and expand your choices, picking storage:

image

Pick unique URL , data center location and hit Create Storage Account. Easy:

image

Once the account is created I will create storage container within this account. In your account dashboard navigate to containers tab and hit create new container:

image

Enter name for container and If there is any chance of sensitive or PII data being loaded to this container choose Private access. Private access requires a key. HDInsight can be configured with that key during creation or keys can be passed in for individual jobs.

image

This will be the default container for the cluster. If you want to manage your data separately you may want to create additional containers.

Next I will create Hadoop\HDInsight cluster. Here as with a lot of things in Azure I can use either Quick Create or Custom Create options, later giving me more control of course. Lets create an HDInsight cluster using Custom Create option.

Back to +New in lower left corner of the portal:

image

Obviously pick unique name and storage account  that we created in previous step , you also will need to provide a strict password and finally hit Create button. It takes a bit to create a cluster (way faster than setting one up on your own trust me) and while that is happening you will see this:

image

Once that is done I can go to Query Console:from the Dashboard for my cluster:

image

After I enter user name (admin) and password I setup on cluster creation I can see Query Console:

image

Just for this tutorial I will pick Twitter Trend Analysis sample. In this tutorial, idea is to  learn how to use Hive to get a list of Twitter users that sent the most tweets containing a particular word. Tutorial has a sample data set of tweets housed in Azure Blob Storage (that’s why I created storage account and pointed my HDInsight cluster there-  wasb://gennadykhdp@gennadykhdpstorage.blob.core.windows.net/HdiSamples/TwitterTrendsSampleData/

First I will create raw tweet table from data:

DROP TABLE IF EXISTS HDISample_Tweets_raw;

--create the raw Tweets table on json formatted twitter data
CREATE EXTERNAL TABLE HDISample_Tweets_raw(json_response STRING)
STORED AS TEXTFILE LOCATION 'wasb://gennadykhdp@gennadykhdpstorage.blob.core.windows.net/HdiSamples/TwitterTrendsSampleData/';

The following Hive queries will create a Hive table called HDISample_Tweets where you will store the parsed raw Twitter data. The Create Hive Table query shows the fields that the HDISample_Tweets table will contain. The Load query shows how the HDISample_Tweets_raw table is parsed to be placed into the HDISample_Tweets table.

DROP TABLE IF EXISTS HDISample_Tweets;
CREATE TABLE HDISample_Tweets(
    id BIGINT,
    created_at STRING,
    created_at_date STRING,
    created_at_year STRING,
    created_at_month STRING,
    created_at_day STRING,
    created_at_time STRING,
    in_reply_to_user_id_str STRING,
    text STRING,
    contributors STRING,
    retweeted STRING,
    truncated STRING,
    coordinates STRING,
    source STRING,
    retweet_count INT,
    url STRING,
    hashtags array,
    user_mentions array,
    first_hashtag STRING,
    first_user_mention STRING,
    screen_name STRING,
    name STRING,
    followers_count INT,
    listed_count INT,
    friends_count INT,
    lang STRING,
    user_location STRING,
    time_zone STRING,
    profile_image_url STRING,
    json_response STRING);
FROM HDISample_Tweets_raw
INSERT OVERWRITE TABLE HDISample_Tweets
SELECT
    CAST(get_json_object(json_response, '$.id_str') as BIGINT),
    get_json_object(json_response, '$.created_at'),
    CONCAT(SUBSTR (get_json_object(json_response, '$.created_at'),1,10),' ',
    SUBSTR (get_json_object(json_response, '$.created_at'),27,4)),
    SUBSTR (get_json_object(json_response, '$.created_at'),27,4),
    CASE SUBSTR (get_json_object(json_response, '$.created_at'),5,3)
        WHEN 'Jan' then '01'
        WHEN 'Feb' then '02'
        WHEN 'Mar' then '03'
        WHEN 'Apr' then '04'
        WHEN 'May' then '05'
        WHEN 'Jun' then '06'
        WHEN 'Jul' then '07'
        WHEN 'Aug' then '08'
        WHEN 'Sep' then '09'
        WHEN 'Oct' then '10'
        WHEN 'Nov' then '11'
        WHEN 'Dec' then '12' end,
    SUBSTR (get_json_object(json_response, '$.created_at'),9,2),
    SUBSTR (get_json_object(json_response, '$.created_at'),12,8),
    get_json_object(json_response, '$.in_reply_to_user_id_str'),
    get_json_object(json_response, '$.text'),
    get_json_object(json_response, '$.contributors'),
    get_json_object(json_response, '$.retweeted'),
    get_json_object(json_response, '$.truncated'),
    get_json_object(json_response, '$.coordinates'),
    get_json_object(json_response, '$.source'),
    CAST (get_json_object(json_response, '$.retweet_count') as INT),
    get_json_object(json_response, '$.entities.display_url'),
    ARRAY(  
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[1].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[2].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[3].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[4].text')))),
    ARRAY(
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
    TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[0].text'))),
    TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
    get_json_object(json_response, '$.user.screen_name'),
    get_json_object(json_response, '$.user.name'),
    CAST (get_json_object(json_response, '$.user.followers_count') as INT),
    CAST (get_json_object(json_response, '$.user.listed_count') as INT),
    CAST (get_json_object(json_response, '$.user.friends_count') as INT),
    get_json_object(json_response, '$.user.lang'),
    get_json_object(json_response, '$.user.location'),
    get_json_object(json_response, '$.user.time_zone'),
    get_json_object(json_response, '$.user.profile_image_url'),
    json_response
WHERE (LENGTH(json_response) > 500);

These queries will show you how to analyze the Tweets Hive table that was created to determine the Twitter users that sent out the most tweets containing the word ‘Azure’. The results are saved into a new table called HDISample_topusers

DROP TABLE IF EXISTS HDISample_topusers;

--create the topusers hive table by selecting from the HDISample_Tweets table
CREATE TABLE IF NOT EXISTS  HDISample_topusers(name STRING, screen_name STRING, tweet_count INT);

INSERT OVERWRITE TABLE  HDISample_topusers
SELECT name, screen_name, count(1) as cc
    FROM HDISample_Tweets
    WHERE text LIKE '%Azure%'
    GROUP BY name, screen_name
    ORDER BY cc DESC LIMIT 10;

Now I will run it all, submit the job and watch the Job History Console:

image

Finally as data has been inserted into my HDISample_topusers table I can show this data as well:

select * from HDISample_topusers;

And get results

luciasannino	ciccialucia1960	1
Valerie M	valegabri1	1
Tony Zake	TonyZake	1
Sami Ghazali	SamiGhazali	1
JOSÈ ADARMES	ADARMESJEB	1
Eric Hexter	ehexter	1
Daniel Neumann	neumanndaniel	1
Anthony Bartolo	WirelessLife	1


For more details see – https://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-hive/, https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-tutorial-get-started-windows/, http://www.developer.com/db/using-hive-in-hdinsight-to-analyze-data.html, http://blogs.msdn.com/b/cindygross/archive/2015/02/26/create-hdinsight-cluster-in-azure-portal.aspx

Dancing With Elephants and Flying With The Bees–Basic Introduction To Apache Hive

hive

Continuing on my Big Data\Hadoop blogging journey I started from HDFS post and YARN post,   I will move to my favorite Hadoop technology – Apache Hive. At its simplest definition Apache Hive is an open-source data warehouse system for querying and analyzing large datasets stored in HDFS files.

Hive has three main functions: data summarization, query and analysis.  It supports queries expressed in a language called HiveQL, which automatically translates SQL-like queries into MapReduce jobs executed on Hadoop. In addition, HiveQL supports custom MapReduce scripts to be plugged into queries. Hive also enables data serialization/deserialization and increases flexibility in schema design by including a system catalog called Hive-Metastore.

image

According to the Apache Hive wiki, “Hive is not designed for OLTP workloads and does not offer real-time queries or row-level updates. It is best used for batch jobs over large sets of append-only data (like web logs).” Hive supports text files (also called flat files), SequenceFiles (flat files consisting of binary key/value pairs) and RCFiles (Record Columnar Files which store columns of a table in a columnar database way.)

Hive isn’t a relational database as it only maintains metadata information about Big Data stored on HDFS. however it allows you to treat your Big Data as tables and perform SQL-like operations using HiveQL.

image

Hive Components.  Components of Hive include HCatalog and WebHCat:

  • HCatalog is a component of Hive. It is a table and storage management layer for Hadoop that enables users with different data processing tools — including Pig and MapReduce — to more easily read and write data on the grid.
  • WebHCat provides a service that you can use to run Hadoop MapReduce (or YARN), Pig, Hive jobs or perform Hive metadata operations using an HTTP (REST style) interface.

Hive Data Model.

Data in Hive is organized into:

  • Tables – These are analogous to Tables in Relational Databases. Tables can be filtered, projected, joined and unioned. Additionally all the data of a table is stored in a directory in HDFS. Hive also supports the notion of external tables wherein a table can be created on preexisting files or directories in HDFS by providing the appropriate location to the table creation DDL. The rows in a table are organized into typed columns similar to Relational Databases.
  • Partitions – Each Table can have one or more partition keys which determine how the data is stored, for example a table T with a date partition column ds had files with data for a particular date stored in the <table location>/ds=<date> directory in HDFS. Partitions allow the system to prune data to be inspected based on query predicates, for example a query that is interested in rows from T that satisfy the predicate T.ds = ‘2008-09-01’ would only have to look at files in <table location>/ds=2008-09-01/ directory in HDFS.
  • Buckets – Data in each partition may in turn be divided into Buckets based on the hash of a column in the table. Each bucket is stored as a file in the partition directory. Bucketing allows the system to efficiently evaluate queries that depend on a sample of data (these are queries that use the SAMPLE clause on the table).

Hive Metastore.  The Metastore provides two important but often overlooked features of a data warehouse: data abstraction and data discovery. Without the data abstractions provided in Hive, a user has to provide information about data formats, extractors and loaders along with the query. In Hive, this information is given during table creation and reused every time the table is referenced. This is very similar to the traditional warehousing systems. The second functionality, data discovery, enables users to discover and explore relevant and specific data in the warehouse.

HiveQL Compiler and Optimizer.  Just like in RDBMS HiveQL needs to be compiled and logical query plan has to be generated. Moreover, plan will be further optimized via optimizer. Just as with RDBMS Hive optimizer is moving towards cost based optimization based on statistics (SQL or Oracle anyone) – https://cwiki.apache.org/confluence/display/Hive/Cost-based+optimization+in+Hive

Enough theory – lets do Hive. Instead of installing Apache Hadoop and Hive for learning sake I would recommend that you download one of the learning VMs provided by both Cloudera and Hortonworks. In my demo I will use Hotronworks Sandbox, latest copy of that VM you can download from here – http://hortonworks.com/products/hortonworks-sandbox/#overview, if you prefer Cloudera you can get that from – http://www.cloudera.com/content/cloudera/en/downloads/quickstart_vms/cdh-5-4-x.html

So I will launch Hortonworks sandbox on my Virtual Box:

clip_image002

Although I could launch hive easily command line for the purpose of this tutorial I will go easy and use very convenient Sandbox Web Console available on 127.0.0.1::8000  Console gives me number of nice visual tools such as File Browser to easily move files and visibility into hue, Pig and Hive Editors , Oozie scheduling tooling, etc.

clip_image002[5]

I will use dataset on baseball statistics available from Sean Lahman – http://www.seanlahman.com/. Dataset can be downloaded from – http://seanlahman.com/files/database/lahman591-csv.zip  Once we have a file we will just unzip into some temp directory. I am interested here in two files in that directory – Master,csv and Batting.csv. I can use File Explorer Utility to upload these files to hue\hdfs. File Explorer icon on the toolbar is:

clip_image002[8]

Here is the utility:

clip_image002[10]

Once I upload the files I can go to Beeswax – Hive Editor. To do so will click on another icon on toolbar (the one with bee on it)

clip_image002[12]

Beeswax is a pretty nice editor and allows me to type in and edit my HiveQL statements, as well as get explain\execution plans for query tuning purposes (more on that in next posts).

clip_image002[14]
 

Creating Tables.  So first thing I will do is create a new table in default database provided to me. The Hive concept of a database is essentially just a catalog or namespace of tables.
However, they are very useful for larger clusters with multiple teams and users, as a
way of avoiding table name collisions. It’s also common to use databases to organize
production tables into logical groups. If you don’t specify a database, the default database is used.

The CREATE TABLE statement follows SQL conventions, but Hive’s version offers significant
extensions to support a wide range of flexibility where the data files for tables
are stored, the formats used, etc.  That’s what I am doing now first:

create table temp_batting (col_value STRING);

The table we have created so far are called managed tables or sometimes called internal
tables, because Hive controls the lifecycle of their data  Hive stores the data for these tables in a subdirectory under the directory defined by hive.metastore.warehouse.directory (e.g., /user/hive/warehouse), by default.  There is also ability to define an external table, i.e. table where data is stored externally to Hive metastore directory.

We can get all the information we need about a table through a query too. Go back to the query editor and execute:

describe temp_batting;

The result describes the schema of the table and detailed table information.  Similar to RDBMS isn’t it?

Loading data.  Next I will load data into my new table from file I uploaded. This is pretty much self-explanatory:

LOAD DATA INPATH '/user/hue/Batting.csv' OVERWRITE INTO TABLE temp_batting;

Since Hive has no row-level insert, update, and delete operations, the only way to put
data into an table is to use one of the “bulk” load operations.  If the LOCAL keyword is used, the path is assumed to be in the local file system. The data is copied into the final location. If LOCAL is omitted, the path is assumed to be in the distributed file system. In this case, the data is moved from the path to the final location.

Create another table. So I have table loaded with batting data ,, but I needs results table here , after all I want to get information on players doing  most runs yearly.

create table batting (player_id STRING, year INT, runs INT);

Finally query:

insert overwrite table batting  
SELECT  
  regexp_extract(col_value, '^(?:([^,]*)\,?){1}', 1) player_id,  
  regexp_extract(col_value, '^(?:([^,]*)\,?){2}', 1) year,  
  regexp_extract(col_value, '^(?:([^,]*)\,?){9}', 1) run  
from temp_batting;
SELECT a.year, a.player_id, a.runs from batting a  
JOIN (SELECT year, max(runs) runs FROM batting GROUP BY year ) b  
ON (a.year = b.year AND a.runs = b.runs) ;

Result

Some finer points:

  • We are using INSERT and SELECTs here, as well as JOIN
  • SELECT is the projection operator in SQL, WHERE gives the condition of what to filter, GROUP BY gives a list of columns which specify how to aggregate the records
  • INSERT OVERWRITE will overwrite any existing data in the table, whereas INSERT INTO will append data
  • I use online Hive Syntax Checker here – https://sql.treasuredata.com/, to check my syntax

So this is a very basic post and I will go from there into a lot more on Hive. For more information also see – https://github.com/Prokopp/the-free-hive-book, https://altiscale.zendesk.com/hc/en-us/articles/201802346-Simple-Hive-Query-Example, http://hortonworks.com/hadoop-tutorial/hello-world-an-introduction-to-hadoop-hcatalog-hive-and-pig/, http://www.tutorialspoint.com/hive/, tutorial on Amazon – http://docs.aws.amazon.com/gettingstarted/latest/emr/getting-started-emr-load-data.html