Although in general ,unlike some of my colleagues, I have never been an integration guy vs. for example my keen interest in all things database, NoSQL, analytics and data storage, my customers currently are Azure Service Bus users, so I had to understand Service Bus, topics, queues and how to develop, troubleshoot, and tor this technology.
An Enterprise Service Bus (ESB) is a software architecture model used for designing and implementing communication between mutually interacting software applications in a service-oriented architecture (SOA). As a software architectural model for distributed computing it is a specialty variant of the more general client server model and promotes agility and flexibility with regard to communication between applications. Its primary use is in enterprise application integration (EAI) of heterogeneous and complex landscapes.
Azure Service Bus is a multi-tenant cloud service, which means that the service is shared by multiple users. Each user, such as an application developer, creates a namespace, then defines the communication mechanisms he\she needs within that namespace
Within a namespace, you can use one or more instances of four different communication mechanisms, each of which connects applications in a different way. The choices are:
- Queues, which allow one-directional communication. Each queue acts as an intermediary (sometimes called a broker) that stores sent messages until they are received. Each message is received by a single recipient.
- Topics, which provide one-directional communication using subscriptions-a single topic can have multiple subscriptions. Like a queue, a topic acts as a broker, but each subscription can optionally use a filter to receive only messages that match specific criteria.
- Relays, which provide bi-directional communication. Unlike queues and topics, a relay doesn’t store in-flight messages-it’s not a broker. Instead, it just passes them on to the destination application.
- Event Hubs, which provide event and telemetry ingress to the cloud at massive scale, with low latency and high reliability.
In this tutorial we will create a topic and sample subscription and will even attempt to setup some counters to monitor it.
A topic is similar in many ways to a queue. Senders submit messages to a topic in the same way that they submit messages to a queue, and those messages look the same as with queues. The big difference is that topics let each receiving application create its own subscription by defining a filter. A subscriber will then see only the messages that match that filter
As in picture from Azure SB docs above:
- Subscriber 1 receives only messages that contain the property Seller=”Ava”.
- Subscriber 2 receives messages that contain the property Seller=”Ruby” and/or contain an Amount property whose value is greater than 100,000. Perhaps Ruby is the sales manager, and so she wants to see both her own sales and all big sales regardless of who makes them.
- Subscriber 3 has set its filter to True, which means that it receives all messages. For example, this application might be responsible for maintaining an audit trail and therefore it needs to see all the messages.
For more on Service Bus and Topics see – https://msdn.microsoft.com/en-us/library/azure/hh367516.aspx, http://azure.microsoft.com/en-us/documentation/services/service-bus/, http://azure.microsoft.com/en-us/documentation/articles/service-bus-dotnet-how-to-use-topics-subscriptions/.
So lets move from theory to practice here.
First thing I will need to do is to login to Azure Portal to define Service Bus Namespace: After I get in I will pick Service Bus in the left bar and click on Add New Namespace. You will then pick name for your namespace , tier and region.
Next I need to understand and configure security to my SB namespace.
Applications can authenticate to the Service Bus using either Shared Access Signature (SAS) authentication, or by authenticating through the Access Control Service (ACS).
- Shared Access Signature authentication enables applications to authenticate to the Service Bus using an access key configured on the namespace, or on the entity with which specific rights are associated. SAS is more applicable in scenarios in which applications do not need to manage the notion of an authorized “user”.
- Access Control Service provides federation with various standards-based identity providers, e.g. Active Directory Federation Services (ADFS), Microsoft Account, Google, Yahoo!, and Facebook. ACS is more applicable in scenarios where applications require a rich, identity-based authentication.
In this demo I will use SAS as simplest way to authenticate.
Below, are the steps for setting up the relevant configuration:
Click the service namespace we created earlier, and then click Configure tab. Add the following policies:
- Name: PublisherAccessKey, Permission: Send. This key will be used by the Publisher sample to send messages to the topic. Any attempt to use this policy to listen to messages or to modify any attributes of the topic or subscriptions will result in an exception.
- Name: SubscriberAccessKey, Permission: Send, Listen. This key will be used by the Subscriber sample to receive and send messages from the subscriptions configured on the topic.
Now I can actually open my Visual Studio 2013 and see my Service Bus Namespace in Server Explorer:
In this case I will create new solution in Visual Studio that will have three items:
- Console application called SampleTopicPublisher
- Console application called SampleTopicSubscriber
- Library application called SampleSBLibrary
Our message here will represent an order. Therefore I will define following data contract:
using System.Text; using System.Threading.Tasks; using System.Runtime.Serialization; using System.Xml; namespace SampleSBLibrary { public class Order { public string ID { get;set;} public string Region {get;set;} public string Product {get;set;} public int Quantity {get;set;} public double PricePerUnit {get;set;} } }
Now I am going to create Service Bus Topic. Topic can actually be created multiple ways:
- Windows Azure Management Portal: New -> App Services -> Service Bus -> Topic -> Quick Create or Custom Create.
- Visual Studio: Server Explorer -> Windows Azure -> Service Bus -> Topics -> Create New Topic
- Programmatically: using the CreateTopic() method of Microsoft.ServiceBus.NamespaceManager class
As it doesn’t really matter here I will use Visual Studio.
Next thing we will do is setup Service Bus Connection String in in appSettings section of App.config file. The format we will use is:
Next I will add Service Bus NuGet package to my solution via Visual Studio NuGet Package Manager searching for Service Bus online:
Now I pick and install Microsoft Azure Service Bus package accepting all of the legalese as necessary.
Next is my Publisher I will generate method that creates a list of sample data (Orders) to send to Service Bus:
public static List GenerateSampleOrders(int NumOfOrders) { string[] products = new string[] { "Bicycle", "Motorcycle", "Kids Bycicle", "Skateboard" }; string[] regions = new string[] {"North Central","South Central","North East","South East","South West"}; Random r = new Random(); List orders = new List(); for (int i = 0; i < NumOfOrders; i++) { orders.Add(new Order { ID = DateTime.Now.Ticks -DateTime.Parse("011\2015").Ticks +i, Region = regions[r.Next(regions.Length)], Product = products[r.Next(products.Length)], Quantity = r.Next(10), PricePerUnit = r.Next(100) }); } return orders; }
Inside the Program class for Publisher define a SendOrdersToTopic() method that takes the strongly-typed List of Order objects. Next, we read the service bus connection string stored in the app.config file. We will use the connection string and topic name to instantiate a TopicClient object.:
public static void SendOrdersToTopic (List orders_list) { //read connection string String connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"].ToString(); //create topic from connection string TopicClient myTopicClient = TopicClient.CreateFromConnectionString(connectionString,"orderstopic"); //lets send test messages to topic foreach (Order ord in orders_list) { //create brokered message BrokeredMessage orderToBeSent = new BrokeredMessage(ord); orderToBeSent.Label =ord.ID.ToString(); orderToBeSent.CorrelationId= ord.ID.ToString(); orderToBeSent.MessageId = ord.ID.ToString(); myTopicClient.Send(orderToBeSent); Console.WriteLine("Sent " + orderToBeSent.MessageId.ToString()); } }
Finally lets put it all together in Publisher Main function\method:
static void Main(string[] args) { while(true) { Console.Write("Enter number of Orders to Send to SB. Enter 0 to Exit"); int input; if(!Int32.TryParse(Console.ReadLine(),out input) || input==0) { break; } List orders = GenerateSampleOrders(input); SendOrdersToTopic(orders); }
Now we will create a Topic Subscriber. Below is the code for that Console Application:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; using SampleSBLibrary; using System.Configuration; namespace SampleTopicSubscriber { class Program { private static NamespaceManager _nsManager; private static MessagingFactory _factory; static void Initialize() { // create uri Uri uri = ServiceBusEnvironment.CreateServiceUri("sb", "gennadyktest", String.Empty); //create manager level permissions using SAS token string KeyName = "RootManageSharedAccessKey"; string KeyValue = ""; TokenProvider creds = TokenProvider.CreateSharedAccessSignatureTokenProvider(KeyName,KeyValue); //create namespace client _nsManager = new NamespaceManager(uri, creds); KeyName = "PublisherAccessKey"; KeyValue = ""; creds = TokenProvider.CreateSharedAccessSignatureTokenProvider(KeyName, KeyValue); //create messaging factory _factory = MessagingFactory.Create(uri, creds); } static void CleanUp() { if(!(_factory.IsClosed)) { _factory.Close(); } } private static void CreateTopicSubscriptions(string topic) { //add subscriptions for all orders if(!(_nsManager.SubscriptionExists(topic,"AllOrders"))) { _nsManager.CreateSubscription(topic,"AllOrders"); } } static void ReceiveOrdersFromSubscription(string topic, string subscription) { List orders = new List(); Console.WriteLine("Receiving Messages for '{0}", subscription); //Create Subscription Client SubscriptionClient mySubClient = _factory.CreateSubscriptionClient(topic, subscription, ReceiveMode.PeekLock); Console.WriteLine("Receiving Messages"); BrokeredMessage msg; while ((msg = mySubClient.Receive(new TimeSpan(0,0,1)))!=null) { var Order = msg.GetBody(); Console.WriteLine("Received Message {0}", msg.MessageId.ToString()); msg.Complete(); } mySubClient.Close(); } static void Main(string[] args) { string TopicPath ="orderstopic"; Initialize(); CreateTopicSubscriptions(TopicPath); while(true) { ReceiveOrdersFromSubscription(TopicPath,"AllOrders"); Console.Write("All Done. Press Y/y to check again...."); string input =Console.ReadLine(); if(!input.Equals("Y",StringComparison.InvariantCultureIgnoreCase)) { break; } } CleanUp(); } } }
The recommended way to receive messages from a subscription is to use a SubscriptionClient object. SubscriptionClient objects can work in two different modes: ReceiveAndDelete and PeekLock.
When using the ReceiveAndDelete mode, receive is a single-shot operation – that is, when Service Bus receives a read request for a message in a subscription, it marks the message as being consumed and returns it to the application. ReceiveAndDelete mode is the simplest model and works best for scenarios in which an application can tolerate not processing a message in the event of a failure. To understand this, consider a scenario in which the consumer issues the receive request and then crashes before processing it. Because Service Bus will have marked the message as consumed, when the application restarts and begins consuming messages again, it will have missed the message that was consumed prior to the crash.
In PeekLock mode (which is the default mode), the receive process becomes a two-stage operation which makes it possible to support applications that cannot tolerate missing messages. When Service Bus receives a request, it finds the next message to be consumed, locks it to prevent other consumers receiving it, and then returns it to the application. After the application finishes processing the message (or stores it reliably for future processing), it completes the second stage of the receive process by calling Complete on the received message. When the Service Bus sees the Complete call, it marks the message as being consumed and removes it from the subscription.
So here are two console apps in action – one sending Orders to Service Bus and another one receiving them.
So that’s all folks? Well, I built all this for a different purpose – I need to monitor it. Some time ago looking for monitoring solutions for Service Bus I came across long deleted page still indexed by Google and cached. it told me that there is a NuGet package for Client performance counters for Service Bus. And sure thing I see it in Package Manager in VS:
Installing this package will add the following files to your project.
- Performance\Microsoft.ServiceBus.MessagingPerformanceCounters.man – Manifest file defining performance counters for the Azure Service Bus Queues, Topics and Event Hubs
- Tracing\Microsoft.ServiceBus.EventDefinitions.man – Adds event log tracing for Service Bus Queues and Topics
- RegisterMessagingPerfCounter.cmd – Command script to install the counters
As you probably will not be using Console application as Service Bus Client, but most likely a Windows Azure Worker Role, you can add following to that role Wad.config to collect these counters:
Then Then the final piece of the puzzle is to add the “RegisterMessagingPerfCounter.cmd” as a Start Up task within the Service definition file. This command will automatically install the performance counters on deployment. The Start Up task needs to be run with elevated privileges
Now you’re all done. Once your app is deployed, start putting some messages through and you’re see the counters capturing this information. This little package makes it incredibly simply to add performance monitoring to the Azure Service Bus. In addition the package provides support for a lot of tracing information which can be surfaced from the event log.