本文

架設RabbitMQ

首先我們使用 docker 來架設練習用的 RabbitMQ。

我們要使用 rabbitmq:management的image ,除了會幫我們架設起一個 RabbitMQ 外,也包含了可以管理的 Web 介面。

使用以下語法,並給予預設帳號的帳號密碼。

docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=a1234 rabbitmq:management

可以看到輸入後就算目前沒有該 Image 也會自動幫你下載(懶人的福音R)

接下來可以到 docker 確認是否已經啟用。

那麼就可以輸入以下網址登入測試了。

http://localhost:15672/

連上後就會看到登入畫面。

輸入我們剛剛設定的預設值即可。

Account: admin
Password: a1234

成功進入後就會看到初始畫面儀錶板,一樣可以在 Overview 看到 RabbitMQ 目前有哪些東西在作用。

這邊就只挑比較特別的地方做介紹。

Exchange

這邊可以看到目前有建立了哪些 Exchange,畫面上看到的都是預設的。

使用 Web 介面的好處是可以直接透過介面建立。

點選 Exchange 可以看到細部的設定。

Queue

一樣是可以透過介面觀看有哪些佇列、與其細項內容,並可以直接透過介面新增。

Admin

這邊就是管理者的介面可以設定帳號,增加使用者等。

右方還有更細部的設定,比如可以設定一些連接數、政策規範等。

實作練習

這次主要範例會使用 RabbitMQ 官方的 Nuget。
RabbitMQ.Client

讓我們先建立 ConsoleApp。

需要建立一個 Publisher、一個 Reciever。

接下來先安裝 RabbitMQ.Client。

我們就可以來開始先做我們的 Publisher發布訊息內容。

        static void Main(string[] args)
        {
            //設定主機位置以及帳號密碼
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            //連線設定
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //Queue的設定
                channel.QueueDeclare
                    (
                        queue: "hello",    //Queue的名稱
                        durable: false,    //Queue是否持久化保存,是的話會寫入硬碟即便RabbitMQ重啟也不會遺失。
                        exclusive: false,  //Queue是否為私有訪問機制,是的話當有一個Reciever在訪問該Queue就會對其Lock。
                        autoDelete: false  //Queue是否自動刪除,是的話當最後一個Reciever段開連結後會自動刪除Queue。
                    );

                //設定我們要傳送的訊息
                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);

                //設定Publisher
                channel.BasicPublish
                    (
                        exchange: "",
                        routingKey: "hello",
                        basicProperties: null,
                        body: body
                    );
                Console.WriteLine(" 送出訊息: {0}", message);
            }

            Console.WriteLine("請按任意鍵離開!");
            Console.ReadLine();
        }

接著製作 Reciever。

        static void Main(string[] args)
        {
            //設定主機位置以及帳號密碼
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {

                //Queue的設定
                channel.QueueDeclare
                    (
                        queue: "hello",    //Queue的名稱。
                        durable: false,    //Queue是否持久化保存,是的話會寫入硬碟即便RabbitMQ重啟也不會遺失。
                        exclusive: false,  //Queue是否為私有訪問機制,是的話當有一個Reciever在訪問該Queue就會對其Lock。
                        autoDelete: false  //Queue是否自動刪除,是的話當最後一個Reciever段開連結後會自動刪除Queue。
                    );

                //Comsumer主要處理訊息內容
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = ea.Body.ToArray();
                    Console.WriteLine(" 收到訊息: {0}", Encoding.UTF8.GetString(message));
                };

                //建立Consumer接收
                channel.BasicConsume
                    (
                        queue: "hello",    //指定Queue的名稱。
                        autoAck: true,     //True的話會自動發送有無接收到訊息給RabbitMQ做確認。
                        consumer: consumer //Consumer內容
                    );

                Console.WriteLine("請按任意鍵離開!");
                Console.ReadLine();
            }
        }

我們來測試看看,首先執行 Publisher。

送出後我們可以到管理介面確認一下。

接下來我們執行 Reciever。

現在到管理介面看 Connections,就會看到目前有一個連線。

Channel 的地方也會看到目前使用中的有誰。

基本的範例做完後,我們來試試看調用 Exchange 來幫我們決定要發送到哪個 Queue。

Publisher

        static void Main(string[] args)
        {
            //設定主機位置以及帳號密碼
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //Exchange設定
                channel.ExchangeDeclare
                    (
                        "testExchange",
                        ExchangeType.Direct  //使用Direct方式
                    );

                channel.QueueDeclare
                    (
                        queue: "HelloExchange",
                        durable: false,
                        exclusive: false,
                        autoDelete: false
                    );

                //Queue 與 Exchange繫結處理
                channel.QueueBind
                    (
                        queue: "HelloExchange",
                        exchange: "testExchange",
                        routingKey: "hello",
                        null
                    );

                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish
                    (
                        exchange: "testExchange",
                        routingKey: "hello",
                        basicProperties: null,
                        body: body
                    );

                Console.WriteLine(" 送出訊息: {0}", message);
            }

            Console.WriteLine("請按任意鍵離開!");
            Console.ReadLine();
        }

Publisher

    class Program
    {
        static void Main(string[] args)
        {
            //設定主機位置以及帳號密碼
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare
                    (
                        queue: "HelloExchange",
                        durable: false,
                        exclusive: false,
                        autoDelete: false
                    );

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = ea.Body.ToArray();
                    Console.WriteLine(" 收到訊息: {0}", Encoding.UTF8.GetString(message));
                };
                channel.BasicConsume(queue: "HelloExchange",
                                         autoAck: true,
                                         consumer: consumer);
                //建立Consumer接收
                channel.BasicConsume
                    (
                        queue: "HelloExchange",    //指定Queue的名稱。
                        autoAck: true,     //True的話會自動發送有無接收到訊息給RabbitMQ做確認。
                        consumer: consumer //Consumer內容
                    );

                Console.WriteLine("請按任意鍵離開!");
                Console.ReadLine();
            }
        }

我們一樣先執行 Publisher 進行測試。

執行完後首先到管理介面看 Exchange 就會看到多出了一個 testExchange。

Queue 的地方一樣會看到新的 HelloExchange 有一個項目正在等待接收。

接著執行 Reciever,成功收到訊息。

後記

本篇基本用法就介紹到這邊,下篇我們再來研究有哪些特殊的使用方式。

一樣會把 Code 放在 Github 上有需要的歡迎取用。

參考連結