本文

Topics

這個模式下 Exchange 會透過 Topic 的方式,將訊息丟給符合的 Queue 傳給相對應的 Reciver。

Topic 會透過我們給予的 Key 來決定要給哪些人,而 Key 可以使用兩種特殊符號:

  • 1、 * (星號):它能夠代表一個萬用詞。
  • 2、 # (井號):則是能夠代表零個或多個萬用詞。

舉例來說上圖我們有一個 *.orange.* 的 Key ,那他就允許兩個*字號替代的單詞,所以它會允許 sweet.orange.juice 通過,如果多加了一個字 sweet.good.orange.juice 則不允許通過。

那麼 lazy.# 則允許零個或多個單詞,Ex:lazy.、lazy.man、lazy.man.with、lazy.man.with.friends … 都允許通過。

接著我們來實作測試看看吧。

Publisher

            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //創建一個Exchange設定為topic_logs 透過Topic方式發送
                channel.ExchangeDeclare(
                    exchange: "topic_logs",
                    ExchangeType.Topic);

                //我們已接收到的第一個參數來當key決定要丟給哪一個Queue
                var key = (args.Length > 0) ? args[0] : "anonymous.info";

                var message = (args.Length > 1)
                              ? string.Join(" ", args.Skip(1).ToArray())
                              : "Hello World!";

                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "topic_logs", //設定我們的Exchange為topic_logs
                                     routingKey: key,
                                     basicProperties: null,
                                     body: body);

                Console.WriteLine($" 送出訊息: '{key}' : '{message}'");
            }

            Console.WriteLine("請按任意鍵離開!");
            Console.ReadLine();

我們的 Reciver1 負責拿取 *.safe的logs。

        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //一樣是topic_logs Exchange
                channel.ExchangeDeclare(
                    exchange: "topic_logs",
                    ExchangeType.Topic);

                //這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queue: queueName,
                                  exchange: "topic_logs", //指定透過我們的logsExchange
                                  routingKey: "*.safe");     //只拿取*.safe的logs

                Console.WriteLine(" 等待拿取log ");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 收到訊息: {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine("請按任意鍵離開!");
                Console.ReadLine();
            }
        }

Reciver2 拿取任何有包含 warning 的 logs。

        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //一樣是topic_logs Exchange
                channel.ExchangeDeclare(
                    exchange: "topic_logs",
                    ExchangeType.Topic);

                //這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queue: queueName,
                                  exchange: "topic_logs", //指定透過我們的logsExchange
                                  routingKey: "#.warning.#");//只拿取warning有關的logs

                Console.WriteLine(" 等待拿取log ");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 收到訊息: {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine("請按任意鍵離開!");
                Console.ReadLine();
            }
        }

我們發送

 dotnet run "Status.safe" "沒事沒事系統還活著繼續睡!"

可以觀察到只會有 Reciver1 拿到訊息。

接著發送

dotnet run "Status.safe.Good!" "沒事沒事系統還活著繼續睡!"

因為沒有任何可對應的 key 則不會有人收到訊息。

接著我們來測試 warning 相關的訊息

dotnet run "Status.warning" "你家系統大爆炸!"

就只有 Reciver2 會拿到訊息。

再試著發送

dotnet run "Status.warning.dead" "你家系統大爆炸!"
dotnet run "Status.warning.dead.enmergency" "你家系統大爆炸!"

Reciver2 都還是會拿到訊息。

RPC

最後一個範例要介紹的是 RPC,此模式最大特點是可以接收 Server 的回傳訊息。

過程使用的方式就是藉由一個 RPC 的協定(類似的有:gRPC),大概的概念就是A服務可以透過 RPC 來呼叫 B 服務的程式,好處就是對 A 服務來說就好像使用了本地端的程式一樣,並不需要做額外的互動處理(對比對象可以想像我們先前常使用的 API 與之差異)。

缺點的話就是 debug 不容易,問題出現的時候不容易發覺到底是我們本地程式有問題還是呼叫 RPC 服務過程有狀況,所以文件上也有提供了三個建議:

  • 確保能夠清楚看出哪一個方法是本地執行,哪一個方法是透過 RPC 服務執行的。
  • 建立完整的文件,可以清楚看出服務之間的依賴。
  • 要有錯誤處理機制,如果 RPC 服務反應時間過長,Client 該如何做應對處理。

那我們就來做個簡單的範例,Client 透過 RPC 呼叫一個兩數相加的方法。

Server

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //設定RPC Queue
                channel.QueueDeclare
                    (
                        queue: "rpc_queue",
                        durable: false,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null
                    );
                //訊息分配設定
                channel.BasicQos
                    (
                        prefetchSize: 0,
                        prefetchCount: 1,
                        global: false
                    );

                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume
                    (
                        queue: "rpc_queue",
                        autoAck: false,
                        consumer: consumer
                    );

                Console.WriteLine(" 等待RPC請求中 ");

                consumer.Received += (model, ea) =>
                {
                    string response = null;

                    var body = ea.Body.ToArray();
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    //藉由CorrelationId來匹配請求
                    replyProps.CorrelationId = props.CorrelationId;

                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"兩數相加:{message}");
                        response = AddNumber(message);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine("請確認錯誤訊息:" + ex.Message);
                        response = string.Empty;
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);

                        channel.BasicPublish
                        (
                            exchange: "",
                            routingKey: props.ReplyTo,
                            basicProperties: replyProps,
                            body: responseBytes
                         );

                        channel.BasicAck
                        (
                            deliveryTag: ea.DeliveryTag,
                            multiple: false
                        );

                        Console.WriteLine("回傳訊息成功!");
                    }
                };

                Console.WriteLine(" 請按任意鍵離開! ");
                Console.ReadLine();
            }
        }

        /// <summary>
        /// Adds the number.
        /// </summary>
        /// <param name="message">The message.</param>
        /// <returns></returns>
        private static string AddNumber(string message)
        {
            var strArry = message.Split(',');
            if (string.IsNullOrWhiteSpace(strArry.FirstOrDefault()) &&
                string.IsNullOrWhiteSpace(strArry.LastOrDefault()))
            {
                throw new Exception("請確認傳入參數");
            }

            if (int.TryParse(strArry[0], out int firstNumber).Equals(false))
            {
                throw new Exception("第一個數字轉換出錯");
            }

            if (int.TryParse(strArry[1], out int secondNumber).Equals(false))
            {
                throw new Exception("第二個數字轉換出錯");
            }

            var result = firstNumber + secondNumber;

            return result.ToString();
        }

Client

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                var replyQueueName = channel.QueueDeclare().QueueName;
                var consumer = new EventingBasicConsumer(channel);

                var props = channel.CreateBasicProperties();
                //使用Guid創建一組獨一無二的CorrelationId
                props.CorrelationId = Guid.NewGuid().ToString();
                props.ReplyTo = replyQueueName;

                var message = GetMessage(args);
                Console.WriteLine(" 兩數相加請求 ");
                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish
                    (
                        exchange: "",
                        routingKey: "rpc_queue",
                        basicProperties: props,
                        body: messageBytes
                    );

                channel.BasicConsume
                    (
                        consumer: consumer,
                        queue: replyQueueName,
                        autoAck: true
                    );

                //接收回傳訊息
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var response = Encoding.UTF8.GetString(body);
                    //只接收CorrelationId與我們發送出去相同的訊息其餘忽略
                    if (ea.BasicProperties.CorrelationId == props.CorrelationId)
                    {
                        Console.WriteLine($" 兩數相加結果:{response}");
                    }
                };
                Console.WriteLine(" 請按任意鍵離開! ");
                Console.ReadLine();
            }
        }

        /// <summary>
        /// Gets the message.
        /// </summary>
        /// <param name="args">The arguments.</param>
        /// <returns></returns>
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }

測試:
首先執行我們 Server

接著透過 PowerShell 執行我們的 Client

dotnet run "1,2"

也可以觀察一下 Server 端的訊息

後記

那麼 RabbitMQ 基本使用就介紹到這邊,這邊都是直接參照官方文件做的練習,若需要更深入了解還是建議看一下官方文件細節。

一樣會把程式放在 GitHub 上有需要歡迎取用。

Topics Sample

RPC Sample

參考連結