RabbitMQ CSharp
▌Introduction
RabbitMQ is
an open-source message broker which supports
RabbitMQ was originally developed to
support AMQP(Advanced Message Queuing Protocol).
And below are the main concept and
roles in RabbitMQ.
Name
|
Description
|
Producer
|
Who sends messages.
|
Consumer
|
Who receives messages.
|
Virtual Host (vhost)
|
A Virtual host provides a way to segregate applications using
the same RabbitMQ instance.
|
Connection
|
A connection is a TCP connection between your application and
the RabbitMQ broker.
|
Channel
|
Channel: A virtual connection inside a connection. The
publishing or consuming messages from a queue are all done over a channel.
Channel is like sessions on a connection.
|
Exchange
|
Receives messages from producers and pushes them to queues
depending on rules defined by the exchange type.
|
Queue
|
Messages buffer
|
This article is reference from the
official document: Hello World!
The simplest thing that does something, and will create the sample as
following,
▌Environment
▋RabbitMQ 3.6
▋Dotnet core 2.2.101
▌Implement
▋Install
I installed
RabbitMQ as a Docker container by
$ docker pull rabbitmq:3.6
$ docker run -d -p 5672:5672 -p 15672:15672 --name <your_container_name>
rabbitmq:3.6
$ docker start
<your_container_name>
$ docker exec -it <your_container_name> rabbitmq-plugins
enable rabbitmq_management
Notice
that the last command will enables the Management Plugin, which
provides an HTTP-based API for management and monitoring of RabbitMQ nodes and
clusters with a browser-based UI and a command line tool, rabbitmqadmin.
The default ports for RabbitMQ is 5672, and Management
UI/API is 15672.
--
We can
enable the RabbitMQ Management plugin later by,
$ rabbitmq-plugins
enable rabbitmq_management
$ rabbitmq-plugins
list -e
Configured: E =
explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@1f3f73f8659e
|/
[e*] amqp_client 3.6.16
[e*] cowboy 1.0.4
[e*] cowlib 1.0.2
[E*] rabbitmq_management 3.6.16
[e*] rabbitmq_management_agent 3.6.16
[e*] rabbitmq_web_dispatch 3.6.16
--
Notice that in order to skip the steps
of enabling RabbitMQ Management plugin, use the docker image with tag name “<version>-management”.
Got to http://localhost:15672
for the RabbitMQ Management UI.
There is
a default user/pwd: guest/guest, is
allowed to login thru localhost.
However,
it's recommended to delete "guest" or change the password/permissions
of it.
We are going
to create a new virtual host and a new user.
▋Create new Virtual host
Create a
new virtual host named "vhost_demo" by
$ rabbitmqctl add_vhost vhost_demo
$ rabbitmqctl list_vhosts
/
vhost_demo
▋Create new User (as Administrator)
Create a new user with id/pwd: rabbitmquser/rabbitmqpwd
$ rabbitmqctl add_user rabbitmquser
rabbitmqpwd
$ rabbitmqctl set_user_tags
rabbitmquser administrator
$ rabbitmqctl set_permissions -p vhost_demo
rabbitmquser ".*" ".*" ".*"
$ rabbitmqctl list_users
Listing users
rabbitmquser
[administrator]
guest
[administrator]
▋Remove the queue
If you
would like to delete the Queue, go to RabbitMQ Management UI,
Or thru rabbitmqadmin:
$ rabbitmqadmin
delete queue name=<queue_name>
▋Implementing Hello World!
Create 2
Dotnet core Console Application projects as following,
Install
Nuget Package: RabbitMQ.Client,
on the two projects.
▋Prodcuer
Lets
implement the Producer that will publish a message to queue every 2 seconds.
class Program
{
const string VHOST = "vhost_demo";
const string QUEUE = "hello";
static void Main(string[] args)
{
string msg = "Hello,
world!";
if (args != null &&
args.Length > 0)
{
msg
= args[0];
}
//Declare
RabbitMQ connenction factory
var connFactory
= new ConnectionFactory()
{
HostName = "jb.com",
Port = 5672,
VirtualHost = VHOST,
UserName="rabbituser",
Password="rabbitpwd"
};
//Create
connection and channel
using (var conn =
connFactory.CreateConnection())
using (var channel =
conn.CreateModel())
{
//Decalre
queue (it will only be created if it doesn't exist already)
channel.QueueDeclare(queue: QUEUE, durable: false,
exclusive: false, autoDelete: false, arguments:
null);
for (int i = 0; i < 100; i++)
{ //Publish a
message every 2 seconds
msg = $"{msg}({i})";
channel.BasicPublish(
exchange: "",
routingKey: QUEUE,
basicProperties: null,
body: Encoding.UTF8.GetBytes(msg)
);
Console.WriteLine($"{DateTime.Now.ToString()} Mesage
sent : {msg}");
Thread.Sleep(2000);
}
}
Console.ReadKey();
}
}
▋Consumer
While consuming
from the queue, we defined a callback by EventingBasicConsumer.Received.
class Program
{
const string VHOST = "vhost_demo";
const string QUEUE = "hello";
static void Main(string[] args)
{
//Declare
RabbitMQ connenction factory
var connFactory
= new ConnectionFactory()
{
HostName = "jb.com",
Port = 5672,
VirtualHost = VHOST,
UserName="rabbituser",
Password="rabbitpwd"
};
//Create
connection and channel
var conn =
connFactory.CreateConnection();
var channel =
conn.CreateModel();
//Decalre
queue (it will only be created if it doesn't exist already)
channel.QueueDeclare(
queue: QUEUE,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
//Listen and
receiving the messages
//Define the
callback for receiving message by "EventingBasicConsumer.Received"
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var msg = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"{DateTime.Now.ToString()} Mesage
received : {msg}");
};
//Start
consuming
channel.BasicConsume(queue: QUEUE, autoAck: true, consumer:
consumer);
Console.ReadKey();
channel.Close();
conn.Close();
}
}
Notice
that since we want to stay consuming the messages asynchronously before we terminate
the application, DO NOT dispose the connection or channel by doing so,
using (var conn =
connFactory.CreateConnection())
using (var channel =
conn.CreateModel())
{
//...skip
}
Here is
the demo,
If you
start 2 consumers, they will receive the same numbers of messages in round-robin way.
For
example,
We can
set the consumer’s priority
(default: 0) by setting “x-priority”
as following,
var consumeArgs = new Dictionary<string, object>();
consumeArgs.Add("x-priority", priority);
channel.BasicConsume(
queue: QUEUE,
autoAck: true,
consumer: consumer, arguments: consumeArgs);
And here
is a demo that I gave the 2 consumers with different priority 10 and 5.
Notice
that every message was pushed to the one with priority 10, but the other
consumer stated to receive messages after consumer with priority 10 was
terminated.
▋Sample code
▌Reference
沒有留言:
張貼留言