2018年12月22日 星期六

[RabbitMQ] Hello world


 RabbitMQ   CSharp





Introduction


RabbitMQ is an open-source message broker which supports



RabbitMQ was originally developed to support AMQP(Advanced Message Queuing Protocol).
And below are the main concept and roles in RabbitMQ.



Name
Description
Producer
Who sends messages.
Consumer
Who receives messages.
Virtual Host (vhost)
A Virtual host provides a way to segregate applications using the same RabbitMQ instance.
Connection
A connection is a TCP connection between your application and the RabbitMQ broker.
Channel
Channel: A virtual connection inside a connection. The publishing or consuming messages from a queue are all done over a channel. Channel is like sessions on a connection.
Exchange
Receives messages from producers and pushes them to queues depending on rules defined by the exchange type.
Queue
Messages buffer




This article is reference from the official document: Hello World! The simplest thing that does something, and will create the sample as following,






Environment


RabbitMQ 3.6
Dotnet core 2.2.101




Implement


Install
I installed RabbitMQ as a Docker container by

$ docker pull rabbitmq:3.6
$ docker run -d -p 5672:5672 -p 15672:15672 --name <your_container_name> rabbitmq:3.6
$ docker start <your_container_name>
$ docker exec -it <your_container_name> rabbitmq-plugins enable rabbitmq_management

Notice that the last command will enables the Management Plugin, which provides an HTTP-based API for management and monitoring of RabbitMQ nodes and clusters with a browser-based UI and a command line tool, rabbitmqadmin.

The default ports for RabbitMQ is 5672, and Management UI/API is 15672.

--
We can enable the RabbitMQ Management plugin later by,

$ rabbitmq-plugins enable rabbitmq_management
$ rabbitmq-plugins list -e
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@1f3f73f8659e
 |/
[e*] amqp_client               3.6.16
[e*] cowboy                    1.0.4
[e*] cowlib                    1.0.2
[E*] rabbitmq_management       3.6.16
[e*] rabbitmq_management_agent 3.6.16
[e*] rabbitmq_web_dispatch     3.6.16

--

Notice that in order to skip the steps of enabling RabbitMQ Management plugin, use the docker image with tag name “<version>-management”.


Got to http://localhost:15672 for the RabbitMQ Management UI.
There is a default user/pwd: guest/guest, is allowed to login thru localhost.
However, it's recommended to delete "guest" or change the password/permissions of it.

We are going to create a new virtual host and a new user.



Create new Virtual host

Create a new virtual host named "vhost_demo" by

$ rabbitmqctl add_vhost vhost_demo
$ rabbitmqctl list_vhosts
/
vhost_demo



Create new User (as Administrator)

Create a new user with id/pwd: rabbitmquser/rabbitmqpwd

$ rabbitmqctl add_user rabbitmquser rabbitmqpwd
$ rabbitmqctl set_user_tags rabbitmquser administrator
$ rabbitmqctl set_permissions -p vhost_demo rabbitmquser ".*" ".*" ".*"

$ rabbitmqctl list_users

Listing users
rabbitmquser    [administrator]
guest   [administrator]


Remove the queue

If you would like to delete the Queue, go to RabbitMQ Management UI,





Or thru rabbitmqadmin:

$ rabbitmqadmin delete queue name=<queue_name>




Implementing Hello World!

Create 2 Dotnet core Console Application projects as following,



Install Nuget Package: RabbitMQ.Client, on the two projects.




Prodcuer

Lets implement the Producer that will publish a message to queue every 2 seconds.

class Program
    {
        const string VHOST = "vhost_demo";
        const string QUEUE = "hello";

        static void Main(string[] args)
        {
            string msg = "Hello, world!";

            if (args != null && args.Length > 0)
            {
                msg = args[0];
            }

            //Declare RabbitMQ connenction factory
            var connFactory = new ConnectionFactory()
            {
                HostName = "jb.com",
                Port = 5672,
                VirtualHost = VHOST,
                UserName="rabbituser",
                Password="rabbitpwd"
            };

            //Create connection and channel
            using (var conn = connFactory.CreateConnection())
            using (var channel = conn.CreateModel())
            {

                //Decalre queue (it will only be created if it doesn't exist already)
                channel.QueueDeclare(queue: QUEUE, durable: false,
                 exclusive: false, autoDelete: false, arguments: null);

                for (int i = 0; i < 100; i++)
                { //Publish a message every 2 seconds

                    msg = $"{msg}({i})";
                    channel.BasicPublish(
                        exchange: "",
                        routingKey: QUEUE,
                        basicProperties: null,
                        body: Encoding.UTF8.GetBytes(msg)
                        );

                    Console.WriteLine($"{DateTime.Now.ToString()} Mesage sent : {msg}");

                    Thread.Sleep(2000);
                }

            }

            Console.ReadKey();
        }
    }



Consumer

While consuming from the queue, we defined a callback by EventingBasicConsumer.Received.

class Program
    {
        const string VHOST = "vhost_demo";
        const string QUEUE = "hello";
        static void Main(string[] args)
        {
            //Declare RabbitMQ connenction factory
            var connFactory = new ConnectionFactory()
            {
                HostName = "jb.com",
                Port = 5672,
                VirtualHost = VHOST,
                UserName="rabbituser",
                Password="rabbitpwd"
            };

            //Create connection and channel
            var conn = connFactory.CreateConnection();
            var channel = conn.CreateModel();

            //Decalre queue (it will only be created if it doesn't exist already)
            channel.QueueDeclare(
                queue: QUEUE,
                durable: false,
                exclusive: false,
                autoDelete: false,
                arguments: null
            );

            //Listen and receiving the messages
            //Define the callback for receiving message by "EventingBasicConsumer.Received"
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var msg = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine($"{DateTime.Now.ToString()} Mesage received : {msg}");
            };

            //Start consuming
            channel.BasicConsume(queue: QUEUE, autoAck: true, consumer: consumer);

            Console.ReadKey();
            channel.Close();
            conn.Close();
        }
    }

Notice that since we want to stay consuming the messages asynchronously before we terminate the application, DO NOT dispose the connection or channel by doing so,

using (var conn = connFactory.CreateConnection())
using (var channel = conn.CreateModel())
 {
      //...skip
 }



Here is the demo,




If you start 2 consumers, they will receive the same numbers of messages in round-robin way.
For example,




We can set the consumer’s priority (default: 0) by setting “x-priority” as following,

var consumeArgs = new Dictionary<string, object>();
consumeArgs.Add("x-priority", priority);

channel.BasicConsume(
                queue: QUEUE,
                autoAck: true,
                consumer: consumer, arguments: consumeArgs);



And here is a demo that I gave the 2 consumers with different priority 10 and 5.
Notice that every message was pushed to the one with priority 10, but the other consumer stated to receive messages after consumer with priority 10 was terminated.




Sample code





Reference









沒有留言:

張貼留言