2018年12月24日 星期一

[RabbitMQ] Message acknowledgements and durability


 RabbitMQ   CSharp



(From www.rabbitmq.com)


Introduction


This section will talk about RabbitMQ’s Message Acknowledgements and Durability.





Environment


RabbitMQ 3.6
Dotnet core 2.2.101




Implement


Message Acknowledgements

In order to make sure that the message is delivered successfully, consumer must send back Message Acknowledgements (Ack) to RabbitMQ and tell it the particular message has been received and can be deleted from the queue.


Here is a sample that when the left-side consumer fails and not ack back, the right one will take care of the messages.




You can check the number of Unacked messages in RabbotMQ Management UI or by the following list_queues,

$ rabbitmqctl list_queues [-p <vhost_name>] name messages_ready messages_unacknowledged


The previous codes of consumer ack automatically by setting autoAck=true,

var consumer = new EventingBasicConsumer(channel);
  
channel.BasicConsume(
                queue: QUEUE,
                autoAck: true,
                consumer: consumer);


We can manually ack like this,

var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                
.          
//Set the acknoledge manually
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };

            //Start consuming
            channel.BasicConsume(
                queue: QUEUE,
                autoAck: false,
                consumer: consumer;


Message durability

When RabbitMQ quits or crashes, the queue and massage will be lost.
However, we can enable the messages persistent by setting

1.  Queue as durable
2.  Message as persistent

to save the messages to disk.

However this doesn’t fully guarantee that a message won't be lost. For stronger guarantees, see publisher confirms.


Declare Queue as durable

using (var conn = connFactory.CreateConnection())
using (var channel = conn.CreateModel())
{
       //Decalre queue (it will only be created if it doesn't exist already)
       channel.QueueDeclare(
                    queue: QUEUE,
                    durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
}

PS. Update the code both on Producer and Consumer.


Publish persistent messages

//Make messages persistent   
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
  // Or by settig DeliveryMode
  // properties.DeliveryMode = 2; //1: non-persistent, 2: persistent

                  
 channel.BasicPublish(
                        exchange: "",
                        routingKey: QUEUE,
                        basicProperties: properties,
                        body: Encoding.UTF8.GetBytes(msg)
           );


Dispatch a new message to consumer only when it has acknowledged the previous one

By setting prefetchCount=1, RabbitMQ dispatches a new message to a worker only when it has processed and acknowledged the previous one.


channel.BasicQos(
                prefetchSize: 0,
                prefetchCount:1,
                global:false);





Sample code



Reference




沒有留言:

張貼留言