前言
前一篇介紹了基本的用法,接下來我們就來研究第一篇有介紹過的各種特殊處理。
本文
這邊主要的練習都是參考以下文件:
RabbitMQ 中文文檔
Work Queues
有多個 Consumer 可以來幫忙消化 Queue 內的任務模式。
首先我們一樣先建立一個 Pubisher,這次改成由外部接收要傳送的訊息再丟給 Queue。
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "admin",
Password = "a1234"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "WorkeTestQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "WorkeTestQueue",
body: body);
Console.WriteLine(" 送出訊息: {0}", message);
}
Console.WriteLine("請按任意鍵離開!");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
接著我們需要創建兩個 Reciver,這邊特殊的地方在我們藉由訊息傳送的 點號來模擬處理繁忙的情況,每一個點號我們就睡一秒。
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "admin",
Password = "a1234"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare
(
queue: "WorkeTestQueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" 收到訊息: {0}", message);
var dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine("任務完成");
};
channel.BasicConsume
(
queue: "WorkeTestQueue",
autoAck: true,
consumer: consumer
);
Console.WriteLine("請按任意鍵離開!");
Console.ReadLine();
}
Reciever1 和 Reciever2 程式內容是一樣的。
那麼我們就來進行測試吧,首先我們把專案的起始方式改成多個起始專案。
對解決方案點選右鍵後點選屬性。
將設定改成下圖。
Publisher 的啟動方式我們會改用 PowerShell 執行,先將專案起始後看看執行狀況。
這樣測試環境就準備好了,接著我們可以開始來發布訊息了。
首先打開你的 PowerShell。
有需要修改字型設定的可參考同事這一篇。
Powershell 美化作戰 —— 字型、執行原則和 oh-my-posh
先移動到我們的 Publisher 所在的位子,請視自行狀況修改路徑。
cd D:\Program\RabbitMQSample\WorkQueuesSample\Sample\Publisher
接著就可以開始執行以下測試,我們會依序執行以下指令
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."
回頭看一下我們的兩個Reciever。
這邊我們使用基本的設定,RabbitMQ 會將訊息依序發給下一個 Reciever,這種方式被稱作 Round-Robin。
訊息確認方式
在目前使用的方法當我們的 Reciever 接收到訊息後就會自動回復一個 Ack 給 RabbitMQ。
這種情況最大的問題是接收端雖然收到訊息了,但發生了 Crash、或是執行被中斷了,RabbitMQ 並不會知道有這種情形發生,任務就會因此而遺失。
我們可以來測試看看,首先使用 Publisher 發送一個需要長時間執行的任務,當有 Reciever 收到的時候就馬上將他關閉,模擬執行被中斷的情況。
這邊我們先使用 PowerShell 執行 Publisher 觀察 Queue 中的情況就好還先不要執行 Reciever 應用程式。
接著我們執行兩個 Reciever 應用程式,當收到訊息的當下馬上就將該應用程式結束執行。
回去看 RabbitMQ 監控台。
雖然訊息已經發送出去了,但其實只執行到一半沒有完成,為了預防這種情況發生,我們可以使用自行回傳 Act 的方式。
修改一下我們的 Reciever 程式。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
一樣記得兩個 Reciver 都要修改喔!!
接著我們來測試看看他的效果吧,發送一個需要長時間執行的任務。
執行我們的 Reciver 觀察情況。
可以觀察到當我們把第一個接收到任務執行的 Reciver 關掉,RabbitMQ 就會馬上把任務拿回佇列找尋下一個可以接收任務的 Reciver。
儘管此種方式可以預防 Reicever 執行中斷保存訊息的好方法,但並沒有辦法保證當 RabbitMQ 自身斷線、或是重啟的時候確保訊息是能夠繼續派送的。
可以實驗看看我們先發送一個訊息到 RabbitMQ,確認 Queue 內有資料後我們把 RabbitMQ 重啟再來觀察。
接著把 RabbitMQ 重啟。
馬上就看到連 Queue 都消失啦~~~~
訊息持久化方式
這裡第一步是要先改變我們的 Queue 設定。
channel.QueueDeclare(
queue: "WorkeTestQueue",
durable: true, //持久化設定
exclusive: false,
autoDelete: false,
arguments: null);
接下來我們傳送一個訊息後,確認 Queue 內有資料後再把 RabbitMQ 重啟測試。
這次我們發現 Queue 是保存住了,但是我們還沒被接收的訊息卻消失了,為了解決這點,我們需要多加設定。
修改一下 Publisher
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
接著我們再來重試看看。
這時候就會看到Queue內有一個Persistent的訊息,接著重啟RabbitMQ。
成功保存了未送出的訊息。
平均分配訊息的方式
為了避免我們有許多個 Reicever,但卻把工作都只丟給某一個做,可以加上以下設定讓 RabbitMQ不 要一次給同一個 Reicever 多於一個的任務,在他還沒處理完該任務且回傳 Ack 確認前都不該再派送訊息給他。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
也是一樣要記得加在兩個 Reiciver 內。
接下來我們來完整測試目前的程式吧。
可以觀察到因為某一個 Reicever 正在執行需要長時間運行的任務,就會交由有空閒時間的 Reicever 來執行剩餘的工作。
後記
這篇介紹就到這邊,一樣會把程式碼放在 GitHub 歡迎取用。
WorkQueue Sample