Bees and Elephants In The Clouds – Using Hive with Azure HDInsight

hdinsight

For today’s blog entry I will combine both Cloud and Big Data to show you how easy it is to do Big Data on the Cloud. 

Microsoft Azure is a cloud service provided by Microsoft, and one of the services offered is HDInsight, an Apache Hadoop-based distribution running in the cloud on Azure. Another service offered by Azure is blob storage. Azure Storage functions as the default file system and stores data in the cloud and not on-premises or in nodes. This way, when you are done running an HDInsight job, the cluster can be decommissioned (to save you money) while your data remains intact.

8484_110113_2116_TheHDInsigh4

HDinsight provides an excellent scenario for IoT analytics, PoC\Development, bursting of on-Premise resources to the Cloud, etc.

Untitled picture

First thing of course what you will need is Microsoft Azure Subscription. Subscription has many moving parts, and http://azure.microsoft.com has interactive pricing pages, a pricing calculator, and plenty of documentation.  If you have an MSDN subscription you will get a limited Azure subscription as well (just have to enable and link it) or you can setup a trial subscription at https://azure.microsoft.com/en-us/pricing/free-trial/. For this little tutorial I will be using my MSDN subscription.

Assuming you know how to setup your subscription and can login to Azure Management Portal our next step will be to create Azure storage account. Now I already have number of storage accounts, but will create on just for this tutorial:

Hit +NEW in lower left side of portal and expand your choices, picking storage:

image

Pick unique URL , data center location and hit Create Storage Account. Easy:

image

Once the account is created I will create storage container within this account. In your account dashboard navigate to containers tab and hit create new container:

image

Enter name for container and If there is any chance of sensitive or PII data being loaded to this container choose Private access. Private access requires a key. HDInsight can be configured with that key during creation or keys can be passed in for individual jobs.

image

This will be the default container for the cluster. If you want to manage your data separately you may want to create additional containers.

Next I will create Hadoop\HDInsight cluster. Here as with a lot of things in Azure I can use either Quick Create or Custom Create options, later giving me more control of course. Lets create an HDInsight cluster using Custom Create option.

Back to +New in lower left corner of the portal:

image

Obviously pick unique name and storage account  that we created in previous step , you also will need to provide a strict password and finally hit Create button. It takes a bit to create a cluster (way faster than setting one up on your own trust me) and while that is happening you will see this:

image

Once that is done I can go to Query Console:from the Dashboard for my cluster:

image

After I enter user name (admin) and password I setup on cluster creation I can see Query Console:

image

Just for this tutorial I will pick Twitter Trend Analysis sample. In this tutorial, idea is to  learn how to use Hive to get a list of Twitter users that sent the most tweets containing a particular word. Tutorial has a sample data set of tweets housed in Azure Blob Storage (that’s why I created storage account and pointed my HDInsight cluster there-  wasb://gennadykhdp@gennadykhdpstorage.blob.core.windows.net/HdiSamples/TwitterTrendsSampleData/

First I will create raw tweet table from data:

DROP TABLE IF EXISTS HDISample_Tweets_raw;

--create the raw Tweets table on json formatted twitter data
CREATE EXTERNAL TABLE HDISample_Tweets_raw(json_response STRING)
STORED AS TEXTFILE LOCATION 'wasb://gennadykhdp@gennadykhdpstorage.blob.core.windows.net/HdiSamples/TwitterTrendsSampleData/';

The following Hive queries will create a Hive table called HDISample_Tweets where you will store the parsed raw Twitter data. The Create Hive Table query shows the fields that the HDISample_Tweets table will contain. The Load query shows how the HDISample_Tweets_raw table is parsed to be placed into the HDISample_Tweets table.

DROP TABLE IF EXISTS HDISample_Tweets;
CREATE TABLE HDISample_Tweets(
    id BIGINT,
    created_at STRING,
    created_at_date STRING,
    created_at_year STRING,
    created_at_month STRING,
    created_at_day STRING,
    created_at_time STRING,
    in_reply_to_user_id_str STRING,
    text STRING,
    contributors STRING,
    retweeted STRING,
    truncated STRING,
    coordinates STRING,
    source STRING,
    retweet_count INT,
    url STRING,
    hashtags array,
    user_mentions array,
    first_hashtag STRING,
    first_user_mention STRING,
    screen_name STRING,
    name STRING,
    followers_count INT,
    listed_count INT,
    friends_count INT,
    lang STRING,
    user_location STRING,
    time_zone STRING,
    profile_image_url STRING,
    json_response STRING);
FROM HDISample_Tweets_raw
INSERT OVERWRITE TABLE HDISample_Tweets
SELECT
    CAST(get_json_object(json_response, '$.id_str') as BIGINT),
    get_json_object(json_response, '$.created_at'),
    CONCAT(SUBSTR (get_json_object(json_response, '$.created_at'),1,10),' ',
    SUBSTR (get_json_object(json_response, '$.created_at'),27,4)),
    SUBSTR (get_json_object(json_response, '$.created_at'),27,4),
    CASE SUBSTR (get_json_object(json_response, '$.created_at'),5,3)
        WHEN 'Jan' then '01'
        WHEN 'Feb' then '02'
        WHEN 'Mar' then '03'
        WHEN 'Apr' then '04'
        WHEN 'May' then '05'
        WHEN 'Jun' then '06'
        WHEN 'Jul' then '07'
        WHEN 'Aug' then '08'
        WHEN 'Sep' then '09'
        WHEN 'Oct' then '10'
        WHEN 'Nov' then '11'
        WHEN 'Dec' then '12' end,
    SUBSTR (get_json_object(json_response, '$.created_at'),9,2),
    SUBSTR (get_json_object(json_response, '$.created_at'),12,8),
    get_json_object(json_response, '$.in_reply_to_user_id_str'),
    get_json_object(json_response, '$.text'),
    get_json_object(json_response, '$.contributors'),
    get_json_object(json_response, '$.retweeted'),
    get_json_object(json_response, '$.truncated'),
    get_json_object(json_response, '$.coordinates'),
    get_json_object(json_response, '$.source'),
    CAST (get_json_object(json_response, '$.retweet_count') as INT),
    get_json_object(json_response, '$.entities.display_url'),
    ARRAY(  
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[0].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[1].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[2].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[3].text'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[4].text')))),
    ARRAY(
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[1].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[2].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[3].screen_name'))),
        TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[4].screen_name')))),
    TRIM(LOWER(get_json_object(json_response, '$.entities.hashtags[0].text'))),
    TRIM(LOWER(get_json_object(json_response, '$.entities.user_mentions[0].screen_name'))),
    get_json_object(json_response, '$.user.screen_name'),
    get_json_object(json_response, '$.user.name'),
    CAST (get_json_object(json_response, '$.user.followers_count') as INT),
    CAST (get_json_object(json_response, '$.user.listed_count') as INT),
    CAST (get_json_object(json_response, '$.user.friends_count') as INT),
    get_json_object(json_response, '$.user.lang'),
    get_json_object(json_response, '$.user.location'),
    get_json_object(json_response, '$.user.time_zone'),
    get_json_object(json_response, '$.user.profile_image_url'),
    json_response
WHERE (LENGTH(json_response) > 500);

These queries will show you how to analyze the Tweets Hive table that was created to determine the Twitter users that sent out the most tweets containing the word ‘Azure’. The results are saved into a new table called HDISample_topusers

DROP TABLE IF EXISTS HDISample_topusers;

--create the topusers hive table by selecting from the HDISample_Tweets table
CREATE TABLE IF NOT EXISTS  HDISample_topusers(name STRING, screen_name STRING, tweet_count INT);

INSERT OVERWRITE TABLE  HDISample_topusers
SELECT name, screen_name, count(1) as cc
    FROM HDISample_Tweets
    WHERE text LIKE '%Azure%'
    GROUP BY name, screen_name
    ORDER BY cc DESC LIMIT 10;

Now I will run it all, submit the job and watch the Job History Console:

image

Finally as data has been inserted into my HDISample_topusers table I can show this data as well:

select * from HDISample_topusers;

And get results

luciasannino	ciccialucia1960	1
Valerie M	valegabri1	1
Tony Zake	TonyZake	1
Sami Ghazali	SamiGhazali	1
JOSÈ ADARMES	ADARMESJEB	1
Eric Hexter	ehexter	1
Daniel Neumann	neumanndaniel	1
Anthony Bartolo	WirelessLife	1


For more details see – https://azure.microsoft.com/en-us/documentation/articles/hdinsight-use-hive/, https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hadoop-tutorial-get-started-windows/, http://www.developer.com/db/using-hive-in-hdinsight-to-analyze-data.html, http://blogs.msdn.com/b/cindygross/archive/2015/02/26/create-hdinsight-cluster-in-azure-portal.aspx

Running To Stand Still – Adventures in .NET Parallelism

Last winter I was fortunate enough to be involved in performance tuning engagement on one of customer sites . While testing and tuning customers Web ASP.NET based application we noted that running certain functions\tasks synchronously surprisingly was actually somewhat faster and more stable than asyncronizing these tasks via Task Parallel Library (TPL).

We saw no CPU contention or extensive context switching. CPU averages 25-35% and maxes around 55-60%. Moreover async change does not cause noticeably higher CPU , we see 2 to 4% increase on average only. The low CPU doesn’t surprise me here since the threads if I understand correctly are simply blocked  while waiting for network call operation to complete) for a while. So no high CPU, no thread switching, no easy to see contention – but yet asynchronous methods that should be running faster are not. Moreover we observed a pretty bad “jitteriness” in application response times with significant latency spikes.

Looking at ANTS Profiler picture is easy to understand (time is spent waiting on async tasks to complete), but really doesn’t explain that question as to why: time is spent in ThreadPool here:

image

Grabbing couple of hang dumps and looking into these actually started explaining things and uncovered a rather interesting picture.  I will scrub stacks to “protect an innocent ” here, but you will get an idea nevertheless:

Looking for managed thread pool contention you will not notice any:

0:032> !threadpool
CPU utilization: 17%
Worker Thread: Total: 22 Running: 5 Idle: 17 MaxLimit: 32767 MinLimit: 4
Work Request in Queue: 0
--------------------------------------
Number of Timers: 2
--------------------------------------
Completion Port Thread:Total: 9 Free: 8 MaxFree: 8 CurrentLimit: 9 MaxLimit: 1000 MinLimit: 4

But in addition to clearly showing no thread contention here, i.e. I am not running out of worker threads, something else becomes really apparent.As we can see, there are 17 idle threads with 5 doing some work. The thread pool removes an idle thread after it’s been idle for 10 seconds so this means the number of threads we are using is bursty. The way that Tasks are being used isn’t really the best way to use them. There are two reasons for using tasks.

  • First, if you have a CPU intensive operation you can put that work on another thread so that you can either return (e.g. in a GUI app to keep UI responsive) or do something else on the current thread.
  • Second, as a way to execute an asynchronous operation such as network or disk io and easily queue up the work that needs to be done after the asynchronous operation has completed.

In addition looking at the stacks in the dump majority are in following stacks:

OS Thread Id: 0x40 (109)
Child SP               IP Call Site
000000bbdf8bd9f8 000007fd958b315b [GCFrame: 000000bbdf8bd9f8] 
000000bbdf8bdac8 000007fd958b315b [HelperMethodFrame_1OBJ: 000000bbdf8bdac8] System.Threading.Monitor.ObjWait(Boolean, Int32, System.Object)
000000bbdf8bdbe0 000007fd8a8f0d6c System.Threading.ManualResetEventSlim.Wait(Int32, System.Threading.CancellationToken) 
000000bbdf8bdca0 000007fd8a8f078b System.Threading.Tasks.Task.SpinThenBlockingWait(Int32, System.Threading.CancellationToken) 
000000bbdf8bdd30 000007fd8b1bb9da System.Threading.Tasks.Task.InternalWait(Int32, System.Threading.CancellationToken) 
000000bbdf8bde30 000007fd8b1c14f6 System.Threading.Tasks.Task`1[[System.__Canon, mscorlib]].GetResultCore(Boolean)
000000bbdf8bde70 000007fd2db17606 CustomerApp.SubmitRequest[[System.__Canon, mscorlib]](System.__Canon, System.Net.Http.HttpClient, System.Uri, System.Net.Http.Formatting.MediaTypeFormatter)

Looking at dump stacks here we are synchronously waiting on a task to complete which is occupying a thread. We have off loaded the network call to a Task which is running on another thread but aren’t gaining the benefits of doing so as you are just waiting for it to come back. Because of the large (compared with running) number of idle threads, the suspicion is that our off box calls are made using synchronous APIs which would mean another thread blocking doing nothing.

The jitter in the latency times is most likely caused by resource contention with needing to have a new thread created for all these tasks we are dispatching which don’t have a thread to work on. The Worker Thread MinLimit is 4, and as our number of threads is greater than that, the ThreadPool will only give us a new thread once every 500ms. Once you’ve created enough threads, the next request should run quicker, until you leave the threads idle long enough that they are cleaned up.

The basic operation manner of the thread pool is actually pretty easy to explain. The thread pool starts from 0 threads, it immediately creates new threads to serve work requests until the number of running threads reaches a configurable minimum (the default minimum is set to the number of CPUs on the machine).

While the number of running threads is equal or bigger than that minimum – the thread pool will create new threads at the rate of 1 thread per 0.5 second. Which means that if your application is running on a dual core machine, and 16 work requests that spans for 10 seconds are scheduled together, assuming the the thread pool is empty, the first two request will be served immediately, the third after 0.5 second, the forth after 1 second and the 16th after 7 seconds. In addition, to avoid unnecessary starvation a demons thread is running in the background and periodically monitors the CPU – in case of low CPU utilization it creates new threads as appropriate.

The thread pool will not create new threads after it reaches a configurable maximum. The default maximum is set to 250 * number of CPUs, which is 1o times more than it was in the 1.0 version of the .NET Framework. The default maximum was increased in order to reduce the chance for possible dead lock that can occur when callbacks rely on each other in order to complete.

After a thread finish its work it is not being destroyed immediately, rather it stays in the pool waiting for another work to arrive. Once new work arrive it is being served immediately by one of the waiting threads. The waiting threads are being destroyed only after spending 10 seconds (was 40 seconds) on the pool doing nothing.

clip_image002

So what is the lesson here? If you are using TPL make sure that you are using TPL and asynchronous code through your complete call. If you are calling Redis for example – call Redis asynchronous client, etc. Otherwise sometimes you may get more consistent and stable results just being synchronous in your applications that are using ASP.NET thread pool.

Hope this helps.

For more see – http://blogs.msdn.com/b/tmarq/archive/2010/04/14/performing-asynchronous-work-or-tasks-in-asp-net-applications.aspx, http://blog.stephencleary.com/2013/11/taskrun-etiquette-examples-dont-use.html,