RabbitMQ CSharp
(From
www.rabbitmq.com)
▌Introduction
▌Environment
▋RabbitMQ 3.6
▋Dotnet core 2.2.101
▌Implement
▋Message Acknowledgements
In order
to make sure that the message is delivered successfully, consumer must send back Message
Acknowledgements (Ack) to
RabbitMQ and tell it the particular message has been received and can be
deleted from the queue.
Here is a
sample that when the left-side consumer fails and not ack back, the right one
will take care of the messages.
You can
check the number of Unacked messages
in RabbotMQ Management UI or by the following list_queues,
$ rabbitmqctl list_queues [-p
<vhost_name>] name messages_ready messages_unacknowledged
The
previous codes of consumer ack automatically by setting autoAck=true,
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(
queue: QUEUE,
autoAck: true,
consumer: consumer);
We can
manually ack like this,
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
…
.
//Set the
acknoledge manually
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
//Start
consuming
channel.BasicConsume(
queue: QUEUE,
autoAck: false,
consumer: consumer;
▋Message durability
When
RabbitMQ quits or crashes, the queue and massage will be lost.
However,
we can enable the messages persistent by setting
1.
Queue as durable
2.
Message as persistent
to save
the messages to disk.
However
this doesn’t fully guarantee that a message won't be lost. For stronger
guarantees, see publisher confirms.
▋Declare Queue as durable
using (var conn =
connFactory.CreateConnection())
using (var channel =
conn.CreateModel())
{
//Decalre
queue (it will only be created if it doesn't exist already)
channel.QueueDeclare(
queue: QUEUE,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
}
PS. Update
the code both on Producer and Consumer.
▋Publish persistent messages
//Make
messages persistent
var properties
= channel.CreateBasicProperties();
properties.Persistent = true;
// Or by
settig DeliveryMode
// properties.DeliveryMode
= 2; //1: non-persistent, 2: persistent
channel.BasicPublish(
exchange: "",
routingKey: QUEUE,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(msg)
);
▋Dispatch a new message to consumer only when it has acknowledged
the previous one
By
setting prefetchCount=1,
RabbitMQ dispatches a new message to a worker only when it has processed and
acknowledged the previous one.
channel.BasicQos(
prefetchSize: 0,
prefetchCount:1,
global:false);
▋Sample code
▌Reference