前言

前一篇介紹了基本的用法,接下來我們就來研究第一篇有介紹過的各種特殊處理。

本文

這邊主要的練習都是參考以下文件:
RabbitMQ 中文文檔

Work Queues

有多個 Consumer 可以來幫忙消化 Queue 內的任務模式。

首先我們一樣先建立一個 Pubisher,這次改成由外部接收要傳送的訊息再丟給 Queue。

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                UserName = "admin",
                Password = "a1234"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "WorkeTestQueue",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "",
                                     routingKey: "WorkeTestQueue",
                                     body: body);
                Console.WriteLine(" 送出訊息: {0}", message);
            }

            Console.WriteLine("請按任意鍵離開!");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }

接著我們需要創建兩個 Reciver,這邊特殊的地方在我們藉由訊息傳送的 點號來模擬處理繁忙的情況,每一個點號我們就睡一秒。

        var factory = new ConnectionFactory()
			{
				HostName = "localhost",
				UserName = "admin",
				Password = "a1234"
			};

			using (var connection = factory.CreateConnection())
			using (var channel = connection.CreateModel())
			{

				channel.QueueDeclare
					(
						queue: "WorkeTestQueue",
						durable: false,
						exclusive: false,
						autoDelete: false,
						arguments: null
					);

				var consumer = new EventingBasicConsumer(channel);
				consumer.Received += (model, ea) =>
				{
					var body = ea.Body.ToArray();
					var message = Encoding.UTF8.GetString(body);
					Console.WriteLine(" 收到訊息: {0}", message);

					var dots = message.Split('.').Length - 1;
					Thread.Sleep(dots * 1000);

					Console.WriteLine("任務完成");
				};
				channel.BasicConsume
					(
						queue: "WorkeTestQueue",
						autoAck: true,
						consumer: consumer
					);

				Console.WriteLine("請按任意鍵離開!");
				Console.ReadLine();
			}

Reciever1 和 Reciever2 程式內容是一樣的。

那麼我們就來進行測試吧,首先我們把專案的起始方式改成多個起始專案。

對解決方案點選右鍵後點選屬性。

將設定改成下圖。

Publisher 的啟動方式我們會改用 PowerShell 執行,先將專案起始後看看執行狀況。

這樣測試環境就準備好了,接著我們可以開始來發布訊息了。

首先打開你的 PowerShell。
有需要修改字型設定的可參考同事這一篇。
Powershell 美化作戰 —— 字型、執行原則和 oh-my-posh

先移動到我們的 Publisher 所在的位子,請視自行狀況修改路徑。

cd D:\Program\RabbitMQSample\WorkQueuesSample\Sample\Publisher

接著就可以開始執行以下測試,我們會依序執行以下指令

dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

回頭看一下我們的兩個Reciever。

這邊我們使用基本的設定,RabbitMQ 會將訊息依序發給下一個 Reciever,這種方式被稱作 Round-Robin。

訊息確認方式

在目前使用的方法當我們的 Reciever 接收到訊息後就會自動回復一個 Ack 給 RabbitMQ。

這種情況最大的問題是接收端雖然收到訊息了,但發生了 Crash、或是執行被中斷了,RabbitMQ 並不會知道有這種情形發生,任務就會因此而遺失。

我們可以來測試看看,首先使用 Publisher 發送一個需要長時間執行的任務,當有 Reciever 收到的時候就馬上將他關閉,模擬執行被中斷的情況。

這邊我們先使用 PowerShell 執行 Publisher 觀察 Queue 中的情況就好還先不要執行 Reciever 應用程式。

接著我們執行兩個 Reciever 應用程式,當收到訊息的當下馬上就將該應用程式結束執行。

回去看 RabbitMQ 監控台。

雖然訊息已經發送出去了,但其實只執行到一半沒有完成,為了預防這種情況發生,我們可以使用自行回傳 Act 的方式。
修改一下我們的 Reciever 程式。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

一樣記得兩個 Reciver 都要修改喔!!

接著我們來測試看看他的效果吧,發送一個需要長時間執行的任務。

執行我們的 Reciver 觀察情況。

可以觀察到當我們把第一個接收到任務執行的 Reciver 關掉,RabbitMQ 就會馬上把任務拿回佇列找尋下一個可以接收任務的 Reciver。

儘管此種方式可以預防 Reicever 執行中斷保存訊息的好方法,但並沒有辦法保證當 RabbitMQ 自身斷線、或是重啟的時候確保訊息是能夠繼續派送的。

可以實驗看看我們先發送一個訊息到 RabbitMQ,確認 Queue 內有資料後我們把 RabbitMQ 重啟再來觀察。

接著把 RabbitMQ 重啟。

馬上就看到連 Queue 都消失啦~~~~

訊息持久化方式

這裡第一步是要先改變我們的 Queue 設定。

channel.QueueDeclare(
queue: "WorkeTestQueue",
durable: true, //持久化設定
exclusive: false,
autoDelete: false,
arguments: null);

接下來我們傳送一個訊息後,確認 Queue 內有資料後再把 RabbitMQ 重啟測試。

這次我們發現 Queue 是保存住了,但是我們還沒被接收的訊息卻消失了,為了解決這點,我們需要多加設定。

修改一下 Publisher

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

接著我們再來重試看看。

這時候就會看到Queue內有一個Persistent的訊息,接著重啟RabbitMQ。

成功保存了未送出的訊息。

平均分配訊息的方式

為了避免我們有許多個 Reicever,但卻把工作都只丟給某一個做,可以加上以下設定讓 RabbitMQ不 要一次給同一個 Reicever 多於一個的任務,在他還沒處理完該任務且回傳 Ack 確認前都不該再派送訊息給他。

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

也是一樣要記得加在兩個 Reiciver 內。

接下來我們來完整測試目前的程式吧。

可以觀察到因為某一個 Reicever 正在執行需要長時間運行的任務,就會交由有空閒時間的 Reicever 來執行剩餘的工作。

後記

這篇介紹就到這邊,一樣會把程式碼放在 GitHub 歡迎取用。
WorkQueue Sample

參考連結