Data Cargo In The Clouds – Data and Containerization On The Cloud Platforms

container

This topic is where I actually planned to pivot to for quite a while, but either had no bandwith\time with moving to Seattle or some other excuse like that. This topic is an interesting to me as its intersection of technologies where I used to spend quite a bit of time including:

  • Data, including both traditional RDBMS databases and NoSQL designs
  • Cloud, including Microsoft Azure, AWS and GCP
  • Containers and container orchestration (that is area new to me)

Those who worked with me either displayed various degrees of agreement, but more often of annoyance, on me pushing this topic as growing part of data engineering discipline, but I truly believe in it. But before we start combining all these areas and do something useful in code I will spend some time here with most basic concepts of what I am about to enter here.

I will skip introduction to basic concepts of RDBMS, NoSQL or BigData\Hadoop technologies, as if you didn’t hide under the rock for last five years, you should be quite aware of those. That brings us straight to containers:

As definition states – “A container image is a lightweight, stand-alone, executable package of a piece of software that includes everything needed to run it: code, runtime, system tools, system libraries, settings. Available for both Linux and Windows based apps, containerized software will always run the same, regardless of the environment. Containers isolate software from its surroundings, for example differences between development and staging environments and help reduce conflicts between teams running different software on the same infrastructure.”

Containers are a solution to the problem of how to get software to run reliably when moved from one computing environment to another. This could be from a developer’s laptop to a test environment, from a staging environment into production, and perhaps from a physical machine in a data center to a virtual machine in a private or public cloud.

container.jpg

But why containers if I already have VMs? 

VMs take up a lot of system resources. Each VM runs not just a full copy of an operating system, but a virtual copy of all the hardware that the operating system needs to run. This quickly adds up to a lot of RAM and CPU cycles. In contrast, all that a container requires is enough of an operating system, supporting programs and libraries, and system resources to run a specific program.
What this means in practice is you can put two to three times as many as applications on a single server with containers than you can with a VM.  In addition, with containers you can create a portable, consistent operating environment for development, testing, and deployment. 

On Linux, containers run on top of LXC. This is a userspace interface for the Linux kernel containment features. It includes an application programming interface (API) to enable Linux users to create and manage system or application containers.

Docker is an open platform tool to make it easier to create, deploy and to execute the applications by using containers.

  • The heart of Docker is Docker engine. The Docker engine is a part of Docker which create and run the Docker containers. The docker container is a live running instance of a docker image. Docker Engine is a client-server based application with following components :
    • A server which is a continuously running service called a daemon process.
    • A REST API which interfaces the programs to use talk with the daemon and give instruct it what to do.
    • A command line interface client.

             docker_engine 

  • The command line interface client uses the Docker REST API to interact with the        Docker daemon through using CLI commands. Many other Docker applications    also use the API and CLI. The daemon process creates and manage Docker images, containers, networks, and volumes.
  • Docker client is the primary service using which Docker users communicate with the Docker. When we use commands “docker run” the client sends these commands to dockerd, which execute them out.

You will build Docker images using Docker and deploy these into what are known as Docker registries. When we run the docker pull and docker run commands, the required images are pulled from our configured registry directory.Using Docker push command, the image can be uploaded to our configured registry directory. 

Finally deploying instances of images from your registry you will deploy containers. We can create, run, stop, or delete a container using the Docker CLI. We can connect a container to more than one networks, or even create a new image based on its current state.By default, a container is well isolated from other containers and its system machine. A container defined by its image or configuration options that we provide during to create or run it.

For more info on containers , LXC and Docker see – https://blog.scottlowe.org/2013/11/25/a-brief-introduction-to-linux-containers-with-lxc/, https://www.docker.com/what-container#/virtual_machines http://searchservervirtualization.techtarget.com/definition/container-based-virtualization-operating-system-level-virtualization

That brings us to container orchestration engines. While the CLI meets the needs of managing one container on one host, it falls short when it comes to managing multiple containers deployed on multiple hosts. To go beyond the management of individual containers, we must turn to orchestration tools. Orchestration tools extend lifecycle management capabilities to complex, multi-container workloads deployed on a cluster of machines.

Some of the well known orchestration engines include:

Kubernetes.  Almost a standard nowdays , originally developed at Google. Kubernetes’ architecture is based on a master server with multiple minions. The command line tool, called kubecfg, connects to the API endpoint of the master to manage and orchestrate the minions.  The service definition, along with the rules and constraints, is described in a JSON file. For service discovery, Kubernetes provides a stable IP address and DNS name that corresponds to a dynamic set of pods. When a container running in a Kubernetes pod connects to this address, the connection is forwarded by a local agent (called the kube-proxy) running on the source machine to one of the corresponding backend containers.Kubernetes supports user-implemented application health checks. These checks are performed by the kubelet running on each minion to ensure that the application is operating correctly.

kub

For more see – https://kubernetes.io/docs/concepts/overview/what-is-kubernetes/#kubernetes-is

Apache Mesos. This is an open source cluster manager that simplifies the complexity of running tasks on a shared pool of servers. A typical Mesos cluster consists of one or more servers running the mesos-master and a cluster of servers running the mesos-slave component. Each slave is registered with the master to offer resources. The master interacts with deployed frameworks to delegate tasks to slaves. Unlike other tools, Mesos ensures high availability of the master nodes using Apache ZooKeeper, which replicates the masters to form a quorum. A high availability deployment requires at least three master nodes. All nodes in the system, including masters and slaves, communicate with ZooKeeper to determine which master is the current leading master. The leader performs health checks on all the slaves and proactively deactivates any that fail.

 

Over the next couple of posts I will create containers running SQL Server and other data stores, both RDBMS and NoSQL, deploy these on the cloud and finally attempt to orchestrate these hopefully as well.  So lets move off theory and pictures into world of parctical data engine deployments.

Hope you will find this detour interesting.

Advertisements

Forecast Cloudy – Using Azure Blob Storage with Apache Hive on HDInsight

The beauty of working with Big Data in Azure is that you can manage (create\delete) compute resources with your HDInsight cluster independent of your data stored either in Azure Data Lake or Azure blob storage.  In this case I will concentrate on using Azure blob storage\WASB as data store for HDInsWight Azure PaaS Hadoop service

With a typical Hadoop installation you load your data to a staging location then you import it into the Hadoop Distributed File System (HDFS) within a single Hadoop cluster. That data is manipulated, massaged, and transformed. Then you may export some or all of the data back as resultset for consumption by other systems (think PowerBI, Tableau, etc)
Windows Azure Storage Blob (WASB) is an extension built on top of the HDFS APIs. The WASBS variation uses SSL certificates for improved security. It in many ways “is” HDFS. However, WASB creates a layer of abstraction that enables separation of storage. This separation is what enables your data to persist even when no clusters currently exist and enables multiple clusters plus other applications to access a single piece of data all at the same time. This increases functionality and flexibility while reducing costs and reducing the time from question to insight.

hdinsight

In Azure you store blobs on containers within Azure storage accounts. You grant access to a storage account, you create collections at the container level, and you place blobs (files of any format) inside the containers. This illustration from Microsoft’s documentation helps to show the structure:

blob1

Hold on, isn’t the whole selling point of Hadoop is proximity of data to compute?  Yes, and just like with any other Hadoop system on premises data is loaded into memory on the individual nodes at compute time. With Azure data infrastructure setup and data center backbone within data center built for performance, your job performance is generally the same or better than if you used disks locally attached to the VMs.

Below is diagram of HDInsight data storage architecture:

hdi.wasb.arch

HDInsight provides access to the distributed file system that is locally attached to the compute nodes. This file system can be accessed by using the fully qualified URI, for example:

hdfs:///

More important is ability access data that is stored in Azure Storage. The syntax is:

wasb[s]://@.blob.core.windows.net/

As per https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage you need to be aware of following:

  • Container Security for WASB storage.  For containers in storage accounts that are connected to cluster,because the account name and key are associated with the cluster during creation, you have full access to the blobs in those containers. For public containers that are not connected to cluster you have read-only permission to the blobs in the containers.  For private containers in storage accounts that are not connected to cluster , you can’t access the blobs in the containers unless you define the storage account when you submit the WebHCat jobs.
  • The storage accounts that are defined in the creation process and their keys are stored in %HADOOP_HOME%/conf/core-site.xml on the cluster nodes. The default behavior of HDInsight is to use the storage accounts defined in the core-site.xml file. It is not recommended to directly edit the core-site.xml file because the cluster head node(master) may be reimaged or migrated at any time, and any changes to this file are not persisted.

 You can create new or point existing storage account to HDinsight cluster easy via portal as I show below:

Capture

You can point your HDInsight cluster to multiple storage accounts as well , as explained here – https://blogs.msdn.microsoft.com/mostlytrue/2014/09/03/hdinsight-working-with-different-storage-accounts/ 

You can also create storage account and container via Azure PowerShell like in this sample:

$SubscriptionID = “<Your Azure Subscription ID>”
$ResourceGroupName = “<New Azure Resource Group Name>”
$Location = “EAST US 2”

$StorageAccountName = “<New Azure Storage Account Name>”
$containerName = “<New Azure Blob Container Name>”

Add-AzureRmAccount
Select-AzureRmSubscription -SubscriptionId $SubscriptionID

# Create resource group
New-AzureRmResourceGroup -name $ResourceGroupName -Location $Location

# Create default storage account
New-AzureRmStorageAccount -ResourceGroupName $ResourceGroupName -Name $StorageAccountName -Location $Location -Type Standard_LRS

# Create default blob containers
$storageAccountKey = (Get-AzureRmStorageAccountKey -ResourceGroupName $resourceGroupName -StorageAccountName $StorageAccountName)[0].Value
$destContext = New-AzureStorageContext -StorageAccountName $storageAccountName -StorageAccountKey $storageAccountKey
New-AzureStorageContainer -Name $containerName -Context $destContext

 

The URI scheme for accessing files in Azure storage from HDInsight is:

wasb[s]://<BlobStorageContainerName>@<StorageAccountName>.blob.core.windows.net/<path>

The URI scheme provides unencrypted access (with the wasb: prefix) and SSL encrypted access (with wasbs). Microsoft recommends using wasbs wherever possible, even when accessing data that lives inside the same region in Azure.

The <BlobStorageContainerName> identifies the name of the blob container in Azure storage. The <StorageAccountName> identifies the Azure Storage account name. A fully qualified domain name (FQDN) is required.

I ran into rather crazy little limitation\ issue when working with \WASB and HDInsight. Hadoop and Hive is looking for and  expects a valid folder hierarchy to import data  files, whereas  WASB does not support a folder hierarchy i.e. all blobs are listed under a container. The workaround is to use SSH session to login into head cluster node and use mkdir command line command to manually create such directory via the driver.

The SSH Procedure with HDInsight can be found here – https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-linux-use-ssh-unix

Another one recommended to me was that “/” character can be used within the key name to make it appear as if a file is stored within a directory structure. HDInsight sees these as if they are actual directories.For example, a blob’s key may be input/log1.txt. No actual “input” directory exists, but due to the presence of the “/” character in the key name, it has the appearance of a file path.

For more see – https://social.technet.microsoft.com/wiki/contents/articles/6621.azure-hdinsight-faq.aspx,  https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storagehttps://www.codeproject.com/articles/597940/understandingpluswindowsplusazureplusblobplusstora

Hope this helps.

Meet Redis – Setting Up Redis On Ubuntu Linux

redis_thumb.jpg

I have been asked by few folks on quick tutorial setting up Redis under systemd in Ubuntu Linux version 16.04.

I have blogged quite a bit about Redis in general –https://gennadny.wordpress.com/category/redis/ , however just a quick line on Redis in general. Redis is an in-memory key-value store known for its flexibility, performance, and wide language support. That makes Redis one of the most popular key value data stores in existence today. Below are steps to install and configure it to run under systemd in Ubuntu 16.04 and above.

Here are the prerequisites:

Next steps are:

  • Login into your Ubuntu server with this user account
  • Update and install prerequisites via apt-get
             $ sudo apt-get update
             $ sudo apt-get install build-essential tcl
    
  • Now we can download and exgract Redis to tmp directory
              $ cd /tmp
              $ curl -O http://download.redis.io/redis-stable.tar.gz
              $ tar xzvf redis-stable.tar.gz
              $ cd redis-stable
    
  • Next we can build Redis
        $ make
    
  • After the binaries are compiled, run the test suite to make sure everything was built correctly. You can do this by typing:
       $ make test
    
  • This will typically take a few minutes to run. Once it is complete, you can install the binaries onto the system by typing:
    $ sudo make install
    

Now we need to configure Redis to run under systemd. Systemd is an init system used in Linux distributions to bootstrap the user space and manage all processes subsequently, instead of the UNIX System V or Berkeley Software Distribution (BSD) init systems. As of 2016, most Linux distributions have adopted systemd as their default init system.

  • To start off, we need to create a configuration directory. We will use the conventional /etc/redis directory, which can be created by typing
    $ sudo mkdir /etc/redi
    
  • Now, copy over the sample Redis configuration file included in the Redis source archive:
         $ sudo cp /tmp/redis-stable/redis.conf /etc/redis
    
  • Next, we can open the file to adjust a few items in the configuration:
    $ sudo nano /etc/redis/redis.conf
    
  • In the file, find the supervised directive. Currently, this is set to no. Since we are running an operating system that uses the systemd init system, we can change this to systemd:
    . . .
    
    # If you run Redis from upstart or systemd, Redis can interact with your
    # supervision tree. Options:
    #   supervised no      - no supervision interaction
    #   supervised upstart - signal upstart by putting Redis into SIGSTOP mode
    #   supervised systemd - signal systemd by writing READY=1 to $NOTIFY_SOCKET
    #   supervised auto    - detect upstart or systemd method based on
    #                        UPSTART_JOB or NOTIFY_SOCKET environment variables
    # Note: these supervision methods only signal "process is ready."
    #       They do not enable continuous liveness pings back to your supervisor.
    supervised systemd
    
    . . .
    
  • Next, find the dir directory. This option specifies the directory that Redis will use to dump persistent data. We need to pick a location that Redis will have write permission and that isn’t viewable by normal users.
    We will use the /var/lib/redis directory for this, which we will create

    . . .
    
    
    # The working directory.
    #
    # The DB will be written inside this directory, with the filename specified
    # above using the 'dbfilename' configuration directive.
    #
    # The Append Only File will also be created inside this directory.
    #
    # Note that you must specify a directory here, not a file name.
    dir /var/lib/redis
    
    . . .
    

    Save and close the file when you are finished

  • Next, we can create a systemd unit file so that the init system can manage the Redis process.
    Create and open the /etc/systemd/system/redis.service file to get started:

    $ sudo nano /etc/systemd/system/redis.service
    
  • The file will should like this, create sections below
    [Unit]
    Description=Redis In-Memory Data Store
    After=network.target
    
    [Service]
    User=redis
    Group=redis
    ExecStart=/usr/local/bin/redis-server /etc/redis/redis.conf
    ExecStop=/usr/local/bin/redis-cli shutdown
    Restart=always
    
    [Install]
    WantedBy=multi-user.target
    
  • Save and close file when you are finished

Now, we just have to create the user, group, and directory that we referenced in the previous two files.
Begin by creating the redis user and group. This can be done in a single command by typing:

$ sudo chown redis:redis /var/lib/redis

Now we can start Redis:

  $ sudo systemctl start redis

Check that the service had no errors by running:

$ sudo systemctl status redis

And Eureka – here is the response

redis.service - Redis Server
   Loaded: loaded (/etc/systemd/system/redis.service; enabled; vendor preset: enabled)
   Active: active (running) since Wed 2016-05-11 14:38:08 EDT; 1min 43s ago
  Process: 3115 ExecStop=/usr/local/bin/redis-cli shutdown (code=exited, status=0/SUCCESS)
 Main PID: 3124 (redis-server)
    Tasks: 3 (limit: 512)
   Memory: 864.0K
      CPU: 179ms
   CGroup: /system.slice/redis.service
           └─3124 /usr/local/bin/redis-server 127.0.0.1:6379    

Congrats ! You can now start learning Redis. Connect to Redis CLI by typing

$ redis-cli

Now you can follow these Redis tutorials

Hope this was helpful

Dancing with Elephants and Flying with The Bees–Using ORC File Format with Apache Hive

hive

 

When you start with Hive on Hadoop clear majority of samples and tutorials will have you work with text files. However, some time ago disadvantages of text files as file format were clearly seen by Hive community in terms of storage efficiency and performance.

First move to better columnar storage was introduction to Hive RC File Format. RCFile (Record Columnar File) is a data placement structure designed for MapReduce-based data warehouse systems. Hive added the RCFile format in version 0.6.0. RCFile stores table data in a flat file consisting of binary key/value pairs. It first partitions rows horizontally into row splits, and then it vertically partitions each row split in a columnar way. RCFile stores the metadata of a row split as the key part of a record, and all the data of a row split as the value part. Internals for RC File Format can be found in JavaDoc here – http://hive.apache.org/javadocs/r1.0.1/api/org/apache/hadoop/hive/ql/io/RCFile.html. What is important to note is why it was introduced as far as advantages:

  • As row-store, RCFile guarantees that data in the same row are located in the same node
  • As column-store, RCFile can exploit column-wise data compression and skip unnecessary column reads.

As time passed by explosion of data and need for higher speed in HiveQL queries has pushed need for further optimized columnar storage file formats. Therefore, ORC File Format was introduced. The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data

This has following advantages over RCFile format:

  • a single file as the output of each task, which reduces the NameNode’s load
  • light-weight indexes stored within the file, allowing to skip row groups that don’t pass predicate filtering and do seek to a given row
  • block-mode compression based on data type

An ORC file contains groups of row data called stripes, along with auxiliary information in a file footer. At the end of the file a postscript holds compression parameters and the size of the compressed footer.

The default stripe size is 250 MB. Large stripe sizes enable large, efficient reads from HDFS.

The file footer contains a list of stripes in the file, the number of rows per stripe, and each column’s data type. It also contains column-level aggregates count, min, max, and sum.

image

What does it all mean for me?

What it means for us as implementers following:

  • Better read performance due to compression. Streams are compressed using a codec, which is specified as a table property for all streams in that table. To optimize memory use, compression is done incrementally as each block is produced. Compressed blocks can be jumped over without first having to be decompressed for scanning. Positions in the stream are represented by a block start location and an offset into the block.
  • · Introduction to column-level statistics for optimization, feature that long existed in pretty much all commercial RDBMS packages (Oracle, SQL Server, etc.) . The goal of the column statistics is that for each column, the writer records the count and depending on the type other useful fields. For most of the primitive types, it records the minimum and maximum
    values; and for numeric types it additionally stores the sum. From Hive 1.1.0 onwards, the column statistics will also record if there are any null values within the row group by setting the hasNull flag.
  • · Light weight indexing
  • Larger Blocks by default 256 MB

Here is a good Hive file format comparison from HOrtonworks:

image

Using ORC – create table with ORC format:

Simplest way to create ORC file formatted Hive table is to add STORED AS ORC to Hive CREATE TABLE statement like:

CREATE TABLE my_table  (
column1 STRING,
column2 STRING,
column3 INT,
column4 INT
) STORED AS ORC;

ORC File Format can be used together with Hive Partitioning, which I explained in my previous post.  Here is an example of using Hive partitioning with ORC File Format:

CREATE  TABLE airanalytics 
(flightdate date ,dayofweek int,depttime int,crsdepttime int,arrtime int,crsarrtime int,uniquecarrier varchar(10),flightno int,tailnum int,aet int,cet int,airtime int,arrdelay int,depdelay int,origin varchar(5),dest varchar(5),distance int,taxin int,taxout int,cancelled int,cancelcode int,diverted string,carrdelay string,weatherdelay string,securtydelay string,cadelay string,lateaircraft string) 
 PARTITIONED BY (flight_year String)
 clustered BY (uniquecarrier)
 sorted BY (flightdate)
 INTO 24 buckets
 stored AS orc tblproperties ("orc.compress"="NONE","orc.stripe.size"="67108864", "orc.row.index.stride"="25000")

The parameters added on table level are as per docs:

Key

Default

Notes

orc.compress

ZLIB

high level compression (one of NONE, ZLIB, SNAPPY)

orc.compress.size

262,144

number of bytes in each compression chunk

orc.stripe.size

268435456

number of bytes in each stripe

orc.row.index.stride

10,000

number of rows between index entries (must be >= 1000)

orc.create.index

true

whether to create row indexes

orc.bloom.filter.columns

“”

comma separated list of column names for which bloom filter should be created

orc.bloom.filter.fpp

0.05

false positive probability for bloom filter (must >0.0 and <1.0)

If you have existing Hive table, it can be moved to ORC via:

    • ALTER TABLE … [PARTITION partition_spec] SET FILEFORMAT ORC
    • SET ive.default.fileformat=Orc

For more information see – https://en.wikipedia.org/wiki/RCFile, https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC, https://cwiki.apache.org/confluence/display/Hive/FileFormats, https://orc.apache.org/docs/hive-config.html

Hope this helps.

Dancing With Elephants and Flying With The Bees–Apache Hive Scaling Out with Partitions and Buckets

hive

In my previous post some time ago I introduced Apache Hive technology on Hadoop. Coming from SQL and RDBMS this was bound to be my favorite Hadoop technology.  Apache Hive is an open-source data warehouse system for querying and analyzing large datasets stored in HDFS files.

Today, unlike previous basics post, I will concentrate on Hive Partitions and Buckets. A simple query in Hive reads the entire dataset even if we have where clause filter. This becomes a bottleneck for running MapReduce jobs over a large table. We can overcome this issue by implementing partitions in Hive. Hive makes it very easy to implement partitions by using the automatic partition scheme when the table is created.

image

Partitions,  Just like in RDBMS, in Hive data within a table is split across multiple partitions. Each partition corresponds to a particular value(s) of partition column(s) and is stored as a sub-directory within the table’s directory on HDFS. When the table is queried, where applicable, only the required partitions of the table are queried, thereby reducing the I/O and time required by the query.  Example of creating partitioned table in Hive is below, note you can do this with both internal and external tables:

CREATE TABLE CUSTOMER   (

    userid             BIGINT,

    First_Name        STRING,

    Last_Name         STRING,

    address1           STRING,

    address2           STRING,

    city               STRING,

    zip_code           STRING,

    state              STRING

 

)

PARTITION BY  (


    COUNTRY            STRING

) 

As you can see I am using Country as partition column, The partition statement lets Hive alter the way it manages the underlying structures of the table’s data directory. If you browse the location of the data directory for a non-partitioned table, it will look like this: .db/. All the data files are directly written to this directory. In case of partitioned tables, subdirectories are created under the table’s data directory for each unique value of a partition column. In case the table is partitioned on multiple columns, then Hive creates nested subdirectories based on the order of partition columns in the table definition. Example with table above:

/gennadyk.db/customer/country=US

/gennadyk.db/customer/country=UK

When a partitioned table is queried with one or both partition columns in criteria or in the WHERE clause, what Hive effectively does is partition elimination by scanning only those data directories that are needed. If no partitioned columns are used, then all the directories are scanned (full table scan) and partitioning will not have any effect.

With a few quick changes it’s easy to configure Hive to support dynamic partition creation. Just as SQL Server has a SET command to change database options, Hive lets us change settings for a session using the SET command. Changing these settings permanently would require opening a text file and restarting the Hive cluster

SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;

Important notes on best practices

  • Be careful using dynamic partitions. Hive has some built-in limits on the number of partitions that can be dynamically created as well as limits on the total number of files that can exist within Hive. Creating many partitions at once will create a lot of files and creating a lot of files will use up memory in Hadoop Name Node
  • If your partitions are relatively small then the expense of recursing directories becomes more expensive than simply scanning the data. Likewise, partitions should be roughly similar in size to prevent a single long running thread from holding things up

Buckets. Much like partitioning, bucketing is a technique that allows you to cluster or segment large sets of data to optimize query performance. As you create HIVE table you can use CLUSTERED keyword to define buckets.

CREATE TABLE CUSTOMER   (

    userid             BIGINT,

    First_Name        STRING,

    Last_Name         STRING,

    address1           STRING,

    address2           STRING,

    city               STRING,

    zip_code           STRING,

    state              STRING

 

)

PARTITION BY  (


    COUNTRY            STRING

) 
(
   CLUSTERED BY (userid) INTO 10 BUCKETS;
(

In general, the bucket number is determined by the expression hash_function(bucketing_column) mod num_buckets. The hash_function depends on the type of the bucketing column. For an int, as in our example  it’s easy, hash_int(i) == i.. In our example we can expect  all useid’s that end in 0 to be in bucket 1, all userid’s that end in a 1 to be in bucket 2 and so on. For other datatypes it gets a lot more tricky unfortunately. To note hash of a string or a complex datatype will be some number that’s derived from the value, but not anything humanly-recognizable.In general, distributing rows based on the hash will give you a even distribution in the buckets.  That’s your goal here. Bucketing is entirely dependent on data correctly being loaded to the table.Loading table properly is critical in case of bucketing,

For more see – http://www.bidn.com/blogs/cprice1979/ssas/4646/partitions-amp-buckets-in-hive, https://www.qubole.com/blog/big-data/5-tips-for-efficient-hive-queries/, http://www.brentozar.com/archive/2013/03/introduction-to-hive-partitioning/, http://blog.cloudera.com/blog/2014/08/improving-query-performance-using-partitioning-in-apache-hive/, http://archive.cloudera.com/cdh/3/hive/language_manual/working_with_bucketed_tables.html

Forecast Cloudy – Going Big Data With Google Big Query

google-cloud-platform

Although I am mainly Microsoft and Azure centric guy,after reading a bit on Google Big Query it got me interested. Therefore I decided to explore it a bit more as well here.

BigQuery is a RESTful web service that enables interactive analysis of massively large datasets working in conjunction with Google Storage. It is an Infrastructure as a Service (IaaS) that may be used complementarily with MapReduce. BigQuery (BQ) is reportedly based on Dremel,a scalable, interactive ad hoc query system for analysis of read-only nested data. To use the data in BigQuery, it first must be uploaded to Google Storage and in a second step imported using the BigQuery HTTP API. BigQuery requires all requests to be authenticated, supporting a number of Google-proprietary mechanisms as well as OAuth.

Lets start with what BigQuery is not. Its not either RDBMS nor MapReduce technology. As I stated BigQuery is based on internal Google technology called Dremel. Dremel is a query service that allows you to run SQL-like queries against very,very large data sets and get accurate results in mere seconds. You just need a basic knowledge of SQL to query extremely large datasets in an ad hoc manner. At Google, engineers and non-engineers alike, including analysts, tech support staff and technical account managers, use this technology many times a day. BigQuery provides the core set of features available in Dremel to third party developers. It does so via a REST API, a command line interface, a Web UI, access control and more, while
maintaining the unprecedented query performance of Dremel.

Why BigQuery is so fast? The answer can be found in two core technologies which gives BQ this unprecedented performance:

  • Columnar Storage. Data is stored in a columnar storage fashion which makes possible to achieve very high compression ratio and scan throughput
  • Tree Architecture is used for dispatching queries and aggregating results across thousands of machines in a few seconds.

BigQuery stores data in its columnar storage, which means it separates a record into column values and stores each value on different storage volume, whereas traditional RDBMS normally store the whole record on one volume

image

Actually this technology isnt that new for anyone who dealt with DW technologies for a while, it’s a fairly hot topic today with SQL Server Column-Store Indexes and SAP HANA In-Memory Column Store for example.  As you may know Column Storage has following advantages:

  • Traffic minimization. Only required column values on each query are scanned and transferred on query execution. For example, a query “SELECT top(title) FROM foo” would access the title column values only.
  • Higher compression ratio.  One study reports that columnar storage can achieve a compression ratio of 1:10, whereas ordinary row-based storage can compress at roughly 1:3. Because each column would have similar values, especially if the cardinality of the column (variation of possible column values) is low, it’s easier to gain higher compression ratios than row-based storage.

Columnar storage has the disadvantage of not working efficiently when updating existing records. In the case of BigQuery, it simply doesn’t support any update operations. Thus the technique has been used mainly in read-only OLAP/BI type of usage. Although the technology has been popular as a data warehouse database design, Dremel\BigQuery is one of the first implementations of a columnar storage-based analytics system that harnesses the computing power of many thousands of servers and is delivered as a cloud service.

One of the challenges Google had in designing Dremel\BigQuery was how to dispatch queries and collect results across tens of thousands of machines in a matter of seconds. The challenge was resolved by using the Tree architecture. The architecture forms a massively parallel distributed tree for pushing down a query to the tree and then aggregating the results from the leaves at a blazingly fast speed.

image    

By leveraging this architecture, Google was able to implement the distributed design for Dremel\BigQuery and realize the vision of the massively parallel columnar-based database on the cloud platform.

image

     

BigQuery provides the core set of features available in Dremel to third party developers. It does so via a REST API, command line interface, Web UI,access control, data schema management and the integration with Google Cloud Storage. BigQuery and Dremel share the same underlying architecture and performance characteristics. Users can fully utilize the power of Dremel by using BigQuery to take advantage of Google’s massive computational infrastructure. This incorporates valuable benefits like multiple replication across regions and high data center scalability. Most importantly, this infrastructure requires no management by the developer.

So why BigQuery over MapReduce? The difference here is MapReduce is batch based programming framework for very large datasets,  whereas BIgQuery is an interactive data query tool for large datasets

image

Ok, how do I use it?  Assuming you already have Google Cloud account you will have to create a new project from the dropdown in your Google Cloud Console.

image 

Once that is done, you can navigate to project console and enable BigQuery APIs for use with your project:

image

Now in you left side menu you can pick BigQuery from Big Data offerings.

image

 

Before we can run any queries, we need some data! There are a couple of options here:

  • Load you own data
  • Use Google provided public data

For now I will settle on the second choice.  I will pick Shakespeare dataset here, This dataset contains the words in Shakespeare’s works, the word_count for each word, in which corpus the word appears, and the date the corpus was written. First I will issue a simple SELECT. SELECT is the most basic clause and specifies what it is that you want to be returned by the query. FROM specifies what dataset we are using.

SELECT corpus
FROM (publicdata:samples.shakespeare)
GROUP BY corpus

image

Let’s switch gears. Say we want to count something – say, the number of words in Shakespeare’s works. Luckily, we have word_count, which represents how many times a particular word appeared in a particular corpus. We can just sum all of these values, and we are left with the total number of words that he wrote.

SELECT SUM(word_count) AS count
FROM (publicdata:samples.shakespeare)

image

945,845 words! Pretty good – but there must surely be some duplicates. How would we query the number of unique words that he used?

SELECT COUNT(word) AS count, word
FROM publicdata:samples.shakespeare
GROUP BY word
ORDER BY count Desc

Here we use the COUNT function to count how many words there are, and group them by word so as not to show duplicates. 32,786 unique words. Moreover I order these by mostly used words in descending order.

image

Ok, lets finally add WHERE clause. For example I wonder how many times Shakespeare is using word “Sir”:

SELECT word, SUM(word_count) as count
FROM (publicdata:samples.shakespeare)
WHERE word = "Sir"
GROUP BY word
ORDER BY count DESC

From results I can state that Shakespeare was really polite guy:

image

This of course is pretty basic. However, BigQuery now does allow for joins. I could take my large table and join it to smaller lookup table using standard ANSI SQL join syntax

image

There are also some complex functions it supports like:

image

In the next part I am planning to upload a dataset and do some joins and more complex processing.

For more on BigQuery see – https://cloud.google.com/bigquery/web-ui-quickstart, http://martinfowler.com/articles/bigQueryPOC.html, https://support.google.com/analytics/answer/4419694?hl=en, http://googlecode.blogspot.com/2011/11/google-bigquery-service-big-data.html

Bees and Elephants In The Clouds – Using Hive with Azure HDInsight

hdinsight

For today’s blog entry I will combine both Cloud and Big Data to show you how easy it is to do Big Data on the Cloud. 

Microsoft Azure is a cloud service provided by Microsoft, and one of the services offered is HDInsight, an Apache Hadoop-based distribution running in the cloud on Azure. Another service offered by Azure is blob storage. Azure Storage functions as the default file system and stores data in the cloud and not on-premises or in nodes. This way, when you are done running an HDInsight job, the cluster can be decommissioned (to save you money) while your data remains intact.

8484_110113_2116_TheHDInsigh4

HDinsight provides an excellent scenario for IoT analytics, PoC\Development, bursting of on-Premise resources to the Cloud, etc.

Untitled picture

First thing of course what you will need is Microsoft Azure Subscription. Subscription has many moving parts, and http://azure.microsoft.com has interactive pricing pages, a pricing calculator, and plenty of documentation.  If you have an MSDN subscription you will get a limited Azure subscription as well (just have to enable and link it) or you can setup a trial subscription at https://azure.microsoft.com/en-us/pricing/free-trial/. For this little tutorial I will be using my MSDN subscription.

Assuming you know how to setup your subscription and can login to Azure Management Portal our next step will be to create Azure storage account. Now I already have number of storage accounts, but will create on just for this tutorial:

Hit +NEW in lower left side of portal and expand your choices, picking storage:

image

Pick unique URL , data center location and hit Create Storage Account. Easy:

image

Once the account is created I will create storage container within this account. In your account dashboard navigate to containers tab and hit create new container:

image

Enter name for container and If there is any chance of sensitive or PII data being loaded to this container choose Private access. Private access requires a key. HDInsight can be configured with that key during creation or keys can be passed in for individual jobs.

image

This will be the default container for the cluster. If you want to manage your data separately you may want to create additional containers.

Next I will create Hadoop\HDInsight cluster. Here as with a lot of things in Azure I can use either Quick Create or Custom Create options, later giving me more control of course. Lets create an HDInsight cluster using Custom Create option.

Back to +New in lower left corner of the portal:

image

Obviously pick unique name and storage account  that we created in previous step , you also will need to provide a strict password and finally hit Create button. It takes a bit to create a cluster (way faster than setting one up on your own trust me) and while that is happening you will see this:

image

Once that is done I can go to Query Console:from the Dashboard for my cluster:

image

After I enter user name (admin) and password I setup on cluster creation I can see Query Console:

image

Just for this tutorial I will pick Twitter Trend Analysis sample. In this tutorial, idea is to  learn how to use Hive to get a list of Twitter users that sent the most tweets containing a particular word. Tutorial has a sample data set of tweets housed in Azure Blob Storage (that’s why I created storage account and pointed my HDInsight cluster there-  wasb://gennadykhdp@gennadykhdpstorage.blob.core.windows.net/HdiSamples/TwitterTrendsSampleData/

First I will create raw tweet table from data:

DROP TABLE IF EXISTS HDISample_Tweets_raw;

--create the raw Tweets table on json formatted twitter data
CREATE EXTERNAL TABLE HDISample_Tweets_raw(json_response STRING)
STORED AS TEXTFILE LOCATION 'wasb://gennadykhdp@gennadykhdpstorage.blob.core.windows.net/HdiSamples/TwitterTrendsSampleData/';

The following Hive queries will create a Hive table called HDISample_Tweets where you will store the parsed raw Twitter data. The Create Hive Table query shows the fields that the HDISample_Tweets table will contain. The Load query shows how the HDISample_Tweets_raw table is parsed to be placed into the HDISample_Tweets table.

DROP TABLE IF EXISTS HDISample_Tweets;
CREATE TABLE HDISample_Tweets(
    id BIGINT,
    created_at STRING,
    created_at_date STRING,
    created_at_year STRING,
    created_at_month STRING,
    created_at_day STRING,
    created_at_time STRING,
    in_reply_to_user_id_str STRING,
    text STRING,
    contributors STRING,
    retweeted STRING,
    truncated STRING,
    coordinates STRING,
    source STRING,
    retweet_count INT,
    url STRING,
    hashtags array,
    user_mentions array,
    first_hashtag STRING,
    first_user_mention STRING,
    screen_name STRING,
    name STRING,
    followers_count INT,
    listed_count INT,
    friends_count INT,
    lang STRING,
    user_location STRING,
    time_zone STRING,
    profile_image_url STRING,
    json_response STRING);
FROM HDISample_Tweets_raw
INSERT OVERWRITE TABLE HDISample_Tweets
SELECT
    CAST(get_json_object(json_response, '$.id_str') as BIGINT),
    get_json_object(json_response, '$.created_at'),
    CONCAT(SUBSTR (get_json_object(json_response, '$.created_at'),1,10),' ',
    SUBSTR (get_json_object(json_response, '$.created_at'),27,4)),
    SUBSTR (get_json_object(json_response, '$.created_at'),27,4),
    CASE SUBSTR (get_json_object(json_response, '$.created_at'),5,3)
        WHEN 'Jan' then '01'
        WHEN 'Feb' then '02'
        WHEN 'Mar' then '03'
        WHEN 'Apr' then '04'
        WHEN 'May' then '05'
        WHEN 'Jun' then '06'
        WHEN 'Jul' then '07'
        WHEN 'Aug' then '08'
        WHEN 'Sep' then '09'
        WHEN 'Oct' then '10'
        WHEN 'Nov' then '11'
        WHEN 'Dec' then '12' end,
    SUBSTR (get_json_object(json_response, '$.created_at'),9,2),
    SUBSTR (get_json_object(json_response, '$.created_at'),12,8),
    get_json_object(json_response, '$.in_reply_to_user_id_str'),
    get_json_object(json_response, '$.text'),
    get_json_object(json_response, '$.contributors'),
    get_json_object(json_response, '$.retweeted'),
    get_json_object(json_response, '$.truncated'),
    get_json_object(json_response, '$.coordinates'),
    get_json_object(json_response, '$.source'),
    CAST (get_json_object(json_response, '$.retweet_count') as INT),
    get_json_object(json_response, '$.entities.display_url'),
    ARRAY(  
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[1].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[2].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[3].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[4].text')))),
    ARRAY(
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
    TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[0].text'))),
    TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
    get_json_object(json_response, '$.user.screen_name'),
    get_json_object(json_response, '$.user.name'),
    CAST (get_json_object(json_response, '$.user.followers_count') as INT),
    CAST (get_json_object(json_response, '$.user.listed_count') as INT),
    CAST (get_json_object(json_response, '$.user.friends_count') as INT),
    get_json_object(json_response, '$.user.lang'),
    get_json_object(json_response, '$.user.location'),
    get_json_object(json_response, '$.user.time_zone'),
    get_json_object(json_response, '$.user.profile_image_url'),
    json_response
WHERE (LENGTH(json_response) > 500);

These queries will show you how to analyze the Tweets Hive table that was created to determine the Twitter users that sent out the most tweets containing the word ‘Azure’. The results are saved into a new table called HDISample_topusers

DROP TABLE IF EXISTS HDISample_topusers;

--create the topusers hive table by selecting from the HDISample_Tweets table
CREATE TABLE IF NOT EXISTS  HDISample_topusers(name STRING, screen_name STRING, tweet_count INT);

INSERT OVERWRITE TABLE  HDISample_topusers
SELECT name, screen_name, count(1) as cc
    FROM HDISample_Tweets
    WHERE text LIKE '%Azure%'
    GROUP BY name, screen_name
    ORDER BY cc DESC LIMIT 10;

Now I will run it all, submit the job and watch the Job History Console:

image

Finally as data has been inserted into my HDISample_topusers table I can show this data as well:

select * from HDISample_topusers;

And get results

luciasannino	ciccialucia1960	1
Valerie M	valegabri1	1
Tony Zake	TonyZake	1
Sami Ghazali	SamiGhazali	1
JOSÈ ADARMES	ADARMESJEB	1
Eric Hexter	ehexter	1
Daniel Neumann	neumanndaniel	1
Anthony Bartolo	WirelessLife	1


For more details see – https://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-hive/, https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-tutorial-get-started-windows/, http://www.developer.com/db/using-hive-in-hdinsight-to-analyze-data.html, http://blogs.msdn.com/b/cindygross/archive/2015/02/26/create-hdinsight-cluster-in-azure-portal.aspx