Microsoft Azure Service Bus-Topic

Recap

In our previous article, I have covered about the overview of service bus especially on queue & topics and finally we saw how queue works from the coding perspective.

In case, you have not read about Service Bus Queues, I highly recommend you to go through the article by referring to https://anuphosur.wordpress.com/2020/05/17/microsoft-service-bus-queue/

In case, you are not using Azure, I would recommend you to go for free trail by using https://portal.azure.com

Configuring Topic in Azure Portal

Let’s jump directly on how to create an service bus-topic from the Azure portal.

I’m preferring to stay with default setting while creating the topic i.e. partitioning and duplicate detection are set to disabled.

Here, I’m creating two subscriptions. one with Order subscription and other one with Product subscription. In both the subscriptions, we have solicited service bus to have maximum deliveries of message to 5, however, remaining setting are set to default. In the future articles, we’ll cover about sessions, dead lettering etc.

I have created two shared access policies, one for send and other one for listen i.e. we’ll use send policy in the publisher whereas listen policy for consumers(products and orders consumers).

Coding

I have created 4 project for this article

  • Asp.net core web api-> Producer
  • 2 Worker Service projects-> 1 for products consumer and other one for orders consumer.
  • .net core class library-> Common classes

Add Microsoft.Azure.ServiceBus nuget package in producer and consumer(products and orders) projects

Firstly, let’s tackle with Producer functionality

public interface IMessagePublisher
    {
        Task PublisherAsync<T>(T request); 
    }
public class MessagePublisher : IMessagePublisher
    {
        private readonly ITopicClient topicClient;

        public MessagePublisher(ITopicClient topicClient)
        {
            this.topicClient = topicClient;
        }
        public async Task PublisherAsync<T>(T request)
        {
            var message = new Message
            {
                MessageId = Guid.NewGuid().ToString(),
                Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request))
            };
            message.UserProperties.Add("MessageType", typeof(T).Name);
            await topicClient.SendAsync(message);
        }
    }

Message Publisher class is pretty much similar to what we have implemented in the previous article. Instead of IQueueClient, we have to use ITopicClient for topic; moreover, I have added MessageId and custom property(MessageType) to the message header. Later, we will discuss about why I added custom property in detail.

[Route("api/[controller]")]
    [ApiController]
    public class TopicController : ControllerBase
    {
        private readonly IMessagePublisher messagePublisher;

        public TopicController(IMessagePublisher messagePublisher)
        {
            this.messagePublisher = messagePublisher;
        }

        // POST api/values
        [HttpPost(template:"product")]
        public async Task SendToProduct([FromBody]Product product)
        {
            await messagePublisher.PublisherAsync(product);
        }

       [HttpPost(template:"order")]
       public async Task SentToOrder([FromBody]Order order)
        {
            await messagePublisher.PublisherAsync(order);
        }
    }

I have created a pretty straightforward controller with http verb as post for both product and order.

public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            services.AddSingleton<ITopicClient>(serviceProvider => new TopicClient(
                connectionString: Configuration.GetValue<string>("servicebus:connectionstring"),
                entityPath: Configuration.GetValue<string>("serviceBus:topicname")));
            services.AddSingleton<IMessagePublisher, MessagePublisher>();
        }

In the startup class, i have created a dependency injection for ITopicClient and IMessagePublisher.

{
    "servicebus": {
        "connectionstring": "<ConnectionString Here>",
        "topicname": "<Topic name here>"
    },
    "Logging": {
        "LogLevel": {
            "Default": "Information",
            "Microsoft": "Warning",
            "Microsoft.Hosting.Lifetime": "Information"
        }
    },
    "AllowedHosts": "*"
}

Here are the domain class defined in common project

public class Order
    {
        public int Id { get; set; }

        public string Name { get; set; }

        public int Quantity { get; set; }
    }
public class Product
    {
        public int Id { get; set; }

        public string Name { get; set; }

        public int  Price { get; set; }

        public string ProductStatus { get; set; }
    }

Now run the publisher with either products or orders. In my case, I’m running with products

Now verify the message count in the subscription

Good part is that we are getting the message count as 1 but wait we have the message count as 1 for orders too. Weird, we are sending the message for products but we have received message for orders as well. Any guess?

The answer is, we don’t have any filters for the topic.

Let’s understand, what is filter in topic. Subscribers has to define which message they want from the topic is called filter. Each of the newly created topic subscription has an initial default subscription rule. If in case, you don’t explicitly specify a filter condition for the rule, the applied filter is 1=1 that enables all messages to be selected into the subscription.

There are 2 types of filters in topics

  • SQL Filters- SQL filters holds SQL like conditional expressions against system and user properties(custom properties). Here is the answer, why we used custom property in the message.
  • Correlational Filters- Correlation filter are used to match against one or more message’s system or user properties.

In this articles, we’ll be focus on both the filter.

By default, the default filter is applied i.e. 1=1 or it will accept all the incoming messages.

Let’s add a new filter

For products, I have used SQL filter. In order to receive message from topic, I have set MessageType=’Product’

For orders, I have used Correlation filter. I have specified the custom properties to receive a message from topic.

Now run the postman and see the result. Now I’m running for products

Here we go, the message has been received for product subscription. Publisher seems to be working fine.

Now, let’s move our focus on both the consumers. Firstly, let’s begin with products consumer.

 public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>();
                    services.AddSingleton<ISubscriptionClient>(serviceProvider => new SubscriptionClient(
    connectionString: "<ConnectionString Here",
    topicPath: "<Topic Name here>", subscriptionName: "ProductSubscription"));
                });

We have created dependency injection for ISubscriptionClient by passing connection string, topic name and subscription name.

Note: You have to use listen key of the shared access policy for the connection string.

public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly ISubscriptionClient subscriptionClient;

        public Worker(ILogger<Worker> logger, ISubscriptionClient subscriptionClient)
        {
            _logger = logger;
            this.subscriptionClient = subscriptionClient;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            subscriptionClient.RegisterMessageHandler((message, token) =>
            {
                _logger.LogInformation($"message id:{message.MessageId}");
                _logger.LogInformation($"message body:{Encoding.UTF8.GetString( message.Body)}");
                var product = JsonConvert.DeserializeObject<Common.Product>(Encoding.UTF8.GetString(message.Body));
                //Perform operation here ex: DB operation etc
                return subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
            }, new MessageHandlerOptions(ex =>
            {
                _logger.LogError(ex.Exception, ex.Exception.Message);
                return Task.FromException(ex.Exception);
            })
            {
                AutoComplete = false,
                MaxConcurrentCalls = 1
            });
        }

You need to register the message handler in order to receive messages from topic. In the above snippet, we are converting the message body from bytes to product object. Don’t forget to specify AutoComplete=false other you will end up with exception.

Similarly, we have to create for Order subscription as well. There are absolutely no change in terms of code other than specifying subscription name in the program class.

public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<Worker>();
                    services.AddSingleton<ISubscriptionClient>(serviceProvider => new SubscriptionClient(
    connectionString: "<ConnectionString Here",
    topicPath: "<Topic Name here>", subscriptionName: "ProductSubscription"));
                });

Finally, we have managed to put all the code changes in place. Run the application and verify for both the products and orders.

I hope you like the article. In case, you find the article as interesting then kindly like and share it.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s