Microsoft Service Bus-Queue

Overview

Microsoft service bus is a fully managed enterprise integration message broker and it can decouple applications and services. Service bus offer secure communication using asynchronous transfer of data and state.

Data is transferred from sender to an receiver using a concept called message; however, message is in binary format and can contain Json, XML or text.

Microsoft service bus comes in different flavours

  • Queues
  • Topic

Queues

Messages can be sent and received from queues; moreover, queues store messages until the application is available to receive and process them.

Queues provide point to point communication between the sender and the receiver.

Topics

Topics can also be used to send and receive messages but instead of point to point communication, it used publish/subscribe mechanism(Observer Pattern)

Topics can have multiple and independent subscriptions . You may prefer to receiver the message that the individual has subscribed too. Example: Let’s consider that, we have e-commerce application. In e-commerce application, we have the segregation of orders and products. You may not want product related information to be sent to orders and vice versa. Due to these problem, Microsoft has come up with the concept of filters in the Topics.

In this article, we will be covering only about queues.

In case, you are not using Azure, I would recommend you to go for free trail by using https://portal.azure.com

Service bus creation in Azure Portal

Click on create new resource and search for Service bus

I have created service bus with the namespace name as ServiceBus. Let’s understand the attributes involved in creating the service bus

AttributesDescription
Resource GroupA resource group in Azure is a container in which the metadata for a logical group of resources is stored.
Namespace nameName of the service bus instance
LocationLocation in which the service bus has to be deployed. For me, the closest location is south India
Pricing tierPricing tier of the service bus. I’ll be using Standard as it supports both Queues and Topic.

Below are the pricing details of Service bus, In basic tier, only queues are supports but i will be creating topics in the next article.

Now, lets create a queue with the name testqueue(you can give name of your choice)

Coding

Here comes the interesting part. I’ve created 2 projects, one for publisher and other one for the consumer i.e. sender and receiver because queues support point to point communication. For publisher, I have created asp.net core web api whereas created worker service for consumer.

Add Microsoft.Azure.ServiceBus as a nuget package in both consumer and producer projects.

Let’s start with the publisher project especially with startup.cs file

services.AddSingleton<IQueueClient>(serviceProvider => new QueueClient
            (connectionString: Configuration.GetValue<string>("servicebus:connectionstring"),
            entityPath: Configuration.GetValue<string>("serviceBus:queuename")));

Now add servicebus tag in the appsettings.json

{
    "servicebus": {
        "connectionstring": "<Connection string of service bus>",
        "queuename": "testqueue"
    },
    "Logging": {
        "LogLevel": {
            "Default": "Information",
            "Microsoft": "Warning",
            "Microsoft.Hosting.Lifetime": "Information"
        }
    },
    "AllowedHosts": "*"
}

You can get the connection string in Azure portal by navigating to Shared access policies of the service bus.

Nextly, let’s create a message publisher service

public interface IMessagePublisher
    {
        Task Publish<T>(T value);

        Task Publish(string raw);
    }
public class MessagePublisher:IMessagePublisher
    {
        private readonly IQueueClient _queueClient;

        public MessagePublisher(IQueueClient queueClient)
        {
            _queueClient = queueClient;
        }

        public Task Publish<T>(T value)
        {
            var message = new Message
            {
                Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value))
            };
            return _queueClient.SendAsync(message);
        }

        public Task Publish(string raw)
        {
            var message = new Message(Encoding.UTF8.GetBytes(raw));
            return _queueClient.SendAsync(message);
        }
    }

Publisher library has 2 methods, one with generic publisher and other one with raw/string publisher. IQueueClient interface is used to send and receive messages from the service bus queue. Message is used to send the content of the message i.e. body, timetolive, replyto etc. As you might have already noticed, body excepts bytes as a return type.

Let’s create a controller as QueueController

[Route("api/[controller]")]
    public class QueueController : Controller
    {
        private readonly IMessagePublisher _messagePublisher;

        public QueueController(IMessagePublisher messagePublisher)
        {
            this._messagePublisher = messagePublisher;
        }

        // POST api/Queue
        [HttpPost]
        public async Task<ActionResult> PublishToQueue([FromBody]Employee employee)
        {
            await _messagePublisher.Publish(employee);
            return Ok();
        }
    }

Just a normal post method to publish the message to queue. Here, I’m using MessagePublisher class to publish the data to the queue for the employee details.

 public class Employee
    {
        public Employee()
        {
        }

        public int Id { get; set; }

        public string Name { get; set; }
    }

Now add Dependency injection for IMessagePublisher interface in startup.cs

public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            services.AddSingleton<IQueueClient>(serviceProvider => new QueueClient
            (connectionString: Configuration.GetValue<string>("servicebus:connectionstring"),
            entityPath: Configuration.GetValue<string>("serviceBus:queuename")));
            services.AddSingleton<IMessagePublisher, MessagePublisher>();
        }

Now, let’s start with consumer project. In the consumer project, we have worker class using BackgroundService.

.NET Core 2.1 has come up with a feature called IHostedService to allow developers to run a background service that can have a managed lifetime to its caller, be it from an ASP.NET Core or a console.

public interface IHostedService
{
    Task StartAsync(CancellationToken cancellationToken);
    Task StopAsync(CancellationToken cancellationToken);
}

BackgroundService is a base class for implementing a long running IHostedService. Internally, BackgroundService uses IHostedService for implementation.

public abstract class BackgroundService : IDisposable, Microsoft.Extensions.Hosting.IHostedService
 public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly IQueueClient queueClient;

        public Worker(ILogger<Worker> logger, IQueueClient queueClient)
        {
            _logger = logger;
            this.queueClient = queueClient;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                queueClient.RegisterMessageHandler((Message message, CancellationToken cancellationToken) =>
                {
                    var employee = JsonConvert.DeserializeObject<Employee>(Encoding.UTF8.GetString(message.Body));
                    _logger.LogInformation(employee.Id+""+employee.Name);
                    //Implement DB/other logic here
                    return queueClient.CompleteAsync(message.SystemProperties.LockToken);
                },x=>Task.CompletedTask);
                _logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
                
                await Task.Delay(1000, stoppingToken);
            }
        }
    }
}

I always recommend you to implement BackgroundService for the queue, topics, cache etc. While you are working on microservices architecture because these background tasks can scale it down/up as you need or you can even make sure that it runs a single instance of that microservice process/container.

message.SystemProperties.LockToken is to ensure thread safety of the queue operation.

    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>();
                    services.AddSingleton<IQueueClient>
                    (new QueueClient
                    ("<ConnectionString Here>",
                    "<Queue Name>"));
                });
    }

Once you implement your background service, you need to register in the ConfigureServices method. Along with hosted services, IQueueClient needs to be registered as well.

In terms of implementation, we have completed but i would recommend you to check for Service bus explorer tool to see the actual message being sent to queue. You can download a tool or can add extension in VS code.

I hope you like the article. In case, you find the article as interesting then kindly like and share it.

5 thoughts on “Microsoft Service Bus-Queue

  1. Pingback: Microsoft Azure Service Bus-Topic – Articles on latest .net technology

  2. Pingback: Microsoft Azure Service Bus-Topic – Blog

  3. Pingback: Exploring Azure Functions- Bindings – Articles on latest .net technology

  4. Pingback: Exploring Azure Features- Bindings – Blog

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s