Azure Service Bus

Azure service bus is used to disassociate application and service from each other. This will be used when large running process is called. Let’s say generate pdf document when submit a request from client. This will take long waiting time for the users. To avoid such situation, we can create a queue and queue will call any other long processing function, application or services. Service queue is reliable and secure platform for asynchronous data and state transfer. Data is transferred between different applications and services using messages. A message is in binary format like JSON, XML, or just text.

After completing this article, you will be familiar with

  1. How to create Service Bus Namespace in Azure
  2. How to create Service Bus Queue
  3. How to send messages to queue programmatically using .NET Core
  4. How to receive messages programmatically

Create Service Bus Namespace

Login to Azure Portal https://portal.azure.com
Select Create a resource and search for “Service Bus”

Enter the Name, Pricing tier (I choose Basic for this tutorial), subscription, Resource group and location.

Click create and wait till provisioning the service bus namespace. This will be available in your resource group once provisioned.

Create Queue

Select Queues under “Entities” from left navigation. Click add Queue. Provide name and tick “Enable Partitioning”

Send messages to the queue programmatically

Open Visual studio and create new console app with .NET Core Project
Install “Microsoft.Azure.ServiceBus” NuGet Package

In Program.cs, add the following namespaces

using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;

Add the following constant variables.

const string serviceBusConnStr = "<your_connection_string>";
const string QueueName = "msaztque";
static IQueueClient queueClient;

You can get the connection string from Service Bus Namespace we created in our previous steps. Refer the screenshot to get the Primary connection string.

Replace the default by this code in Main()

MainAsync().GetAwaiter().GetResult();

Add the Following MainAsync and SendMessagesAsync methods below the Main method to connect Service Bus Queue and Send Messages

static async Task MainAsync()
{
 const int numberOfMessages = 10;
 queueClient = new QueueClient(ServiceBusConnectionString, QueueName);

    Console.WriteLine("=================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
    Console.WriteLine("=================================================");

    // Send messages.
    await SendMessagesAsync(numberOfMessages);

    Console.ReadKey();

    await queueClient.CloseAsync();
}

static async Task SendMessagesAsync(int numberOfMessagesToSend)
{
    try
    {
        for (var i = 0; i < numberOfMessagesToSend; i++)
        {
            // Create a new message to send to the queue.
            string messageBody = $"Message {i}";
            var message = new Message(Encoding.UTF8.GetBytes(messageBody));

            // Write the body of the message to the console.
            Console.WriteLine($"Sending message: {messageBody}");

            // Send the message to the queue.
            await queueClient.SendAsync(message);
        }
    }
    catch (Exception exception)
    {
        Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
    }
}

Run the application and check in the Azure Portal. You can see 10 messages created.

Receive messages from the queue programmatically

Receiving message is like Send messages to queue. Create new console app, Install “Microsoft.Azure.ServiceBus” NuGet Package and add the namespaces as described in the previous section.

Replace the following code in Main Method

MainAsync().GetAwaiter().GetResult();

Add the Following MainAsync and RegisterOnMessageHandlerAndReceiveMessages methods below the Main method to connect Service Bus Queue and Receive Messages

static async Task MainAsync()
{
    queueClient = new QueueClient(serviceBusConnStr, QueueName);

    Console.WriteLine("======================================================");
    Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
    Console.WriteLine("======================================================");

    // Register the queue message handler and receive messages in a loop
    RegisterOnMessageHandlerAndReceiveMessages();

    Console.ReadKey();

    await queueClient.CloseAsync();
}

static void RegisterOnMessageHandlerAndReceiveMessages()
{
    // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
    var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
    {
        // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
        // Set it according to how many messages the application wants to process in parallel.
        MaxConcurrentCalls = 1,

        // Indicates whether the message pump should automatically complete the messages after returning from user callback.
        // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
        AutoComplete = false
    };

    // Register the function that processes messages.
    queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}

Add the ProcessMessagesAsync to process each received messages
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
    // Process the message.
    Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");

// to do tasks using message body here…

    // Complete the message so that it is not received again.
    // This can be done only if the queue Client is created in ReceiveMode.PeekLock mode (which is the default).
    await queueClient.CompleteAsync(message.SystemProperties.LockToken);

    // Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
    // If queueClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
    // to avoid unnecessary exceptions.
}

Finally, add the following method to handle any exceptions that might occur

static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
    var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
    Console.WriteLine("Exception context for troubleshooting:");
    Console.WriteLine($"- Endpoint: {context.Endpoint}");
    Console.WriteLine($"- Entity Path: {context.EntityPath}");
    Console.WriteLine($"- Executing Action: {context.Action}");
    return Task.CompletedTask;
}  

Run the application. All your messages created will be closed.

Hope you understand how to create, send and receive messages to queue. You can use Azure functions to receive queue messages and to do any long running operations

Leave a Reply