本文

Publish/Subscribe

我們來練習使用 Exchange 透過 Fanout 的方式發送訊息給所有 Queue。

現在我們目標是要發送 log 給所有 Queue。
一樣先建立Publisher。

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //創建一個Exchange設定為logs
                channel.ExchangeDeclare(exchange: "logs", ExchangeType.Fanout);

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "logs", //設定我們的Exchange為logs
                                     routingKey: "",   //不指定Queue
                                     basicProperties: null,
                                     body: body);

                Console.WriteLine(" 送出訊息: {0}", message);
            }

            Console.WriteLine("請按任意鍵離開!");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0)
                   ? string.Join(" ", args)
                   : "info: Hello World!");
        }

一樣會創建兩個 Reciver

        static void Main(string[] args)
        {
        
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "LogsExchange", ExchangeType.Fanout);

                //這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "LogsExchange", //指定透過我們的logsExchange
                                  routingKey: "");

                Console.WriteLine(" 等待拿取log ");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 收到訊息: {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine("請按任意鍵離開!");
                Console.ReadLine();
            }

首先我們先使用 Publisher 發送 log 觀察 RabbitMQ。

看一下 RabbitMQ 目前的狀態,多出了一個 Exchange。

接著就運行我們的兩個 Reciver 看看吧:
打開之後發送兩個訊息

接收結果:

我們再回頭看一下 RabbitMQ 狀態。

看到多出了兩個隨機的 Queue。
接下來我們把應用程式關閉再來觀察看看

Queue 就消失啦~~~

這邊就是簡單的透過 Exchange 發送 Fanout 模式練習。

Routing

接著是透過 Exchange 使用 direct 的方式,直接將 Message 丟給指定的 Queue。

延續上一個範例,雖然都是 log,但我們會想針對某一種型態的 log 傳給其中一個 Reciever 接收就好,這次目標就是把 Warning 的 log 傳給 Reciever2 接收即可。

Publisher:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //創建一個Exchange設定為direct_logs 透過direct方式發送
                channel.ExchangeDeclare(
                    exchange: "direct_logs", 
                    ExchangeType.Direct);

                //我們已接收到的第一個參數來當key決定要丟給哪一個Queue
                var key = (args.Length > 0) ? args[0] : "info";

                var message = (args.Length > 1)
                              ? string.Join(" ", args.Skip(1).ToArray())
                              : "Hello World!";

                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "direct_logs", //設定我們的Exchange為logs
                                     routingKey: key,   
                                     basicProperties: null,
                                     body: body);

                Console.WriteLine($" 送出訊息: '{key}' : '{message}'");
            }

            Console.WriteLine("請按任意鍵離開!");
            Console.ReadLine();
        }

Reciever1 負責拿取 Safe 的 log。

        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //一樣是direct_logs Exchange
                channel.ExchangeDeclare(
                    exchange: "direct_logs",
                    ExchangeType.Direct);

                //這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs", //指定透過我們的logsExchange
                                  routingKey: "safe");     //只拿取Safe的logs

                Console.WriteLine(" 等待拿取log ");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 收到訊息: {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine("請按任意鍵離開!");
                Console.ReadLine();
            }
        }

Reciver2 負責拿取 Warning 的 log。

        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                //一樣是direct_logs Exchange
                channel.ExchangeDeclare(
                    exchange: "direct_logs",
                    ExchangeType.Direct);

                //這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs", //指定透過我們的logsExchange
                                  routingKey: "warning");  //只拿取Warning的logs

                Console.WriteLine(" 等待拿取log ");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 收到訊息: {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);

                Console.WriteLine("請按任意鍵離開!");
                Console.ReadLine();
            }
        }

接下來我們來進行測試,送出一個 Warning 的訊息。

dotnet run warning "你家系統大爆炸!"

可以觀察到就只有 Reciver2 有收到訊息。

反之如果我們送出一個 safe 的訊息。

dotnet run safe "沒事沒事系統還活著繼續睡!"

只有 Reciver1 會收到訊息。

後記

這次就介紹 Publish/Subscribe、Routing,兩個範例。
程式一樣會放在 GitHub 有需要的歡迎取用:
Publish/Subscribe Sample
Routing Sample

參考連結