2018年12月24日 星期一

[RabbitMQ] Message acknowledgements and durability


 RabbitMQ   CSharp



(From www.rabbitmq.com)


Introduction


This section will talk about RabbitMQ’s Message Acknowledgements and Durability.





Environment


RabbitMQ 3.6
Dotnet core 2.2.101




Implement


Message Acknowledgements

In order to make sure that the message is delivered successfully, consumer must send back Message Acknowledgements (Ack) to RabbitMQ and tell it the particular message has been received and can be deleted from the queue.


Here is a sample that when the left-side consumer fails and not ack back, the right one will take care of the messages.




You can check the number of Unacked messages in RabbotMQ Management UI or by the following list_queues,

$ rabbitmqctl list_queues [-p <vhost_name>] name messages_ready messages_unacknowledged


The previous codes of consumer ack automatically by setting autoAck=true,

var consumer = new EventingBasicConsumer(channel);
  
channel.BasicConsume(
                queue: QUEUE,
                autoAck: true,
                consumer: consumer);


We can manually ack like this,

var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                
.          
//Set the acknoledge manually
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            //Start consuming
            channel.BasicConsume(
                queue: QUEUE,
                autoAck: false,
                consumer: consumer;


Message durability

When RabbitMQ quits or crashes, the queue and massage will be lost.
However, we can enable the messages persistent by setting

1.  Queue as durable
2.  Message as persistent

to save the messages to disk.

However this doesn’t fully guarantee that a message won't be lost. For stronger guarantees, see publisher confirms.


Declare Queue as durable

using (var conn = connFactory.CreateConnection())
using (var channel = conn.CreateModel())
{
       //Decalre queue (it will only be created if it doesn't exist already)
       channel.QueueDeclare(
                    queue: QUEUE,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
}

PS. Update the code both on Producer and Consumer.


Publish persistent messages

//Make messages persistent   
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
  // Or by settig DeliveryMode
  // properties.DeliveryMode = 2; //1: non-persistent, 2: persistent

                  
 channel.BasicPublish(
                        exchange: "",
                        routingKey: QUEUE,
                        basicProperties: properties,
                        body: Encoding.UTF8.GetBytes(msg)
           );


Dispatch a new message to consumer only when it has acknowledged the previous one

By setting prefetchCount=1, RabbitMQ dispatches a new message to a worker only when it has processed and acknowledged the previous one.


channel.BasicQos(
                prefetchSize: 0,
                prefetchCount:1,
                global:false);





Sample code



Reference




2018年12月22日 星期六

[RabbitMQ] Hello world


 RabbitMQ   CSharp





Introduction


RabbitMQ is an open-source message broker which supports



RabbitMQ was originally developed to support AMQP(Advanced Message Queuing Protocol).
And below are the main concept and roles in RabbitMQ.



Name
Description
Producer
Who sends messages.
Consumer
Who receives messages.
Virtual Host (vhost)
A Virtual host provides a way to segregate applications using the same RabbitMQ instance.
Connection
A connection is a TCP connection between your application and the RabbitMQ broker.
Channel
Channel: A virtual connection inside a connection. The publishing or consuming messages from a queue are all done over a channel. Channel is like sessions on a connection.
Exchange
Receives messages from producers and pushes them to queues depending on rules defined by the exchange type.
Queue
Messages buffer




This article is reference from the official document: Hello World! The simplest thing that does something, and will create the sample as following,






Environment


RabbitMQ 3.6
Dotnet core 2.2.101




Implement


Install
I installed RabbitMQ as a Docker container by

$ docker pull rabbitmq:3.6
$ docker run -d -p 5672:5672 -p 15672:15672 --name <your_container_name> rabbitmq:3.6
$ docker start <your_container_name>
$ docker exec -it <your_container_name> rabbitmq-plugins enable rabbitmq_management

Notice that the last command will enables the Management Plugin, which provides an HTTP-based API for management and monitoring of RabbitMQ nodes and clusters with a browser-based UI and a command line tool, rabbitmqadmin.

The default ports for RabbitMQ is 5672, and Management UI/API is 15672.

--
We can enable the RabbitMQ Management plugin later by,

$ rabbitmq-plugins enable rabbitmq_management
$ rabbitmq-plugins list -e
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@1f3f73f8659e
 |/
[e*] amqp_client               3.6.16
[e*] cowboy                    1.0.4
[e*] cowlib                    1.0.2
[E*] rabbitmq_management       3.6.16
[e*] rabbitmq_management_agent 3.6.16
[e*] rabbitmq_web_dispatch     3.6.16

--

Notice that in order to skip the steps of enabling RabbitMQ Management plugin, use the docker image with tag name “<version>-management”.


Got to http://localhost:15672 for the RabbitMQ Management UI.
There is a default user/pwd: guest/guest, is allowed to login thru localhost.
However, it's recommended to delete "guest" or change the password/permissions of it.

We are going to create a new virtual host and a new user.



Create new Virtual host

Create a new virtual host named "vhost_demo" by

$ rabbitmqctl add_vhost vhost_demo
$ rabbitmqctl list_vhosts
/
vhost_demo



Create new User (as Administrator)

Create a new user with id/pwd: rabbitmquser/rabbitmqpwd

$ rabbitmqctl add_user rabbitmquser rabbitmqpwd
$ rabbitmqctl set_user_tags rabbitmquser administrator
$ rabbitmqctl set_permissions -p vhost_demo rabbitmquser ".*" ".*" ".*"

$ rabbitmqctl list_users

Listing users
rabbitmquser    [administrator]
guest   [administrator]


Remove the queue

If you would like to delete the Queue, go to RabbitMQ Management UI,





Or thru rabbitmqadmin:

$ rabbitmqadmin delete queue name=<queue_name>




Implementing Hello World!

Create 2 Dotnet core Console Application projects as following,



Install Nuget Package: RabbitMQ.Client, on the two projects.




Prodcuer

Lets implement the Producer that will publish a message to queue every 2 seconds.

class Program
    {
        const string VHOST = "vhost_demo";
        const string QUEUE = "hello";

        static void Main(string[] args)
        {
            string msg = "Hello, world!";

            if (args != null && args.Length > 0)
            {
                msg = args[0];
            }

            //Declare RabbitMQ connenction factory
            var connFactory = new ConnectionFactory()
            {
                HostName = "jb.com",
                Port = 5672,
                VirtualHost = VHOST,
                UserName="rabbituser",
                Password="rabbitpwd"
            };

            //Create connection and channel
            using (var conn = connFactory.CreateConnection())
            using (var channel = conn.CreateModel())
            {

                //Decalre queue (it will only be created if it doesn't exist already)
                channel.QueueDeclare(queue: QUEUE, durable: false,
                 exclusive: false, autoDelete: false, arguments: null);

                for (int i = 0; i < 100; i++)
                { //Publish a message every 2 seconds

                    msg = $"{msg}({i})";
                    channel.BasicPublish(
                        exchange: "",
                        routingKey: QUEUE,
                        basicProperties: null,
                        body: Encoding.UTF8.GetBytes(msg)
                        );

                    Console.WriteLine($"{DateTime.Now.ToString()} Mesage sent : {msg}");

                    Thread.Sleep(2000);
                }

            }

            Console.ReadKey();
        }
    }



Consumer

While consuming from the queue, we defined a callback by EventingBasicConsumer.Received.

class Program
    {
        const string VHOST = "vhost_demo";
        const string QUEUE = "hello";
        static void Main(string[] args)
        {
            //Declare RabbitMQ connenction factory
            var connFactory = new ConnectionFactory()
            {
                HostName = "jb.com",
                Port = 5672,
                VirtualHost = VHOST,
                UserName="rabbituser",
                Password="rabbitpwd"
            };

            //Create connection and channel
            var conn = connFactory.CreateConnection();
            var channel = conn.CreateModel();

            //Decalre queue (it will only be created if it doesn't exist already)
            channel.QueueDeclare(
                queue: QUEUE,
                durable: false,
                exclusive: false,
                autoDelete: false,
                arguments: null
            );

            //Listen and receiving the messages
            //Define the callback for receiving message by "EventingBasicConsumer.Received"
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var msg = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine($"{DateTime.Now.ToString()} Mesage received : {msg}");
            };

            //Start consuming
            channel.BasicConsume(queue: QUEUE, autoAck: true, consumer: consumer);

            Console.ReadKey();
            channel.Close();
            conn.Close();
        }
    }

Notice that since we want to stay consuming the messages asynchronously before we terminate the application, DO NOT dispose the connection or channel by doing so,

using (var conn = connFactory.CreateConnection())
using (var channel = conn.CreateModel())
 {
      //...skip
 }



Here is the demo,




If you start 2 consumers, they will receive the same numbers of messages in round-robin way.
For example,




We can set the consumer’s priority (default: 0) by setting “x-priority” as following,

var consumeArgs = new Dictionary<string, object>();
consumeArgs.Add("x-priority", priority);

channel.BasicConsume(
                queue: QUEUE,
                autoAck: true,
                consumer: consumer, arguments: consumeArgs);



And here is a demo that I gave the 2 consumers with different priority 10 and 5.
Notice that every message was pushed to the one with priority 10, but the other consumer stated to receive messages after consumer with priority 10 was terminated.




Sample code





Reference