本文
Publish/Subscribe
我們來練習使用 Exchange 透過 Fanout 的方式發送訊息給所有 Queue。
現在我們目標是要發送 log 給所有 Queue。
一樣先建立Publisher。
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "admin",
Password = "a1234"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
//創建一個Exchange設定為logs
channel.ExchangeDeclare(exchange: "logs", ExchangeType.Fanout);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", //設定我們的Exchange為logs
routingKey: "", //不指定Queue
basicProperties: null,
body: body);
Console.WriteLine(" 送出訊息: {0}", message);
}
Console.WriteLine("請按任意鍵離開!");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0)
? string.Join(" ", args)
: "info: Hello World!");
}
一樣會創建兩個 Reciver
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "admin",
Password = "a1234"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "LogsExchange", ExchangeType.Fanout);
//這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "LogsExchange", //指定透過我們的logsExchange
routingKey: "");
Console.WriteLine(" 等待拿取log ");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" 收到訊息: {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine("請按任意鍵離開!");
Console.ReadLine();
}
首先我們先使用 Publisher 發送 log 觀察 RabbitMQ。
看一下 RabbitMQ 目前的狀態,多出了一個 Exchange。
接著就運行我們的兩個 Reciver 看看吧:
打開之後發送兩個訊息
接收結果:
我們再回頭看一下 RabbitMQ 狀態。
看到多出了兩個隨機的 Queue。
接下來我們把應用程式關閉再來觀察看看
Queue 就消失啦~~~
這邊就是簡單的透過 Exchange 發送 Fanout 模式練習。
Routing
接著是透過 Exchange 使用 direct 的方式,直接將 Message 丟給指定的 Queue。
延續上一個範例,雖然都是 log,但我們會想針對某一種型態的 log 傳給其中一個 Reciever 接收就好,這次目標就是把 Warning 的 log 傳給 Reciever2 接收即可。
Publisher:
static void Main(string[] args)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "admin",
Password = "a1234"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
//創建一個Exchange設定為direct_logs 透過direct方式發送
channel.ExchangeDeclare(
exchange: "direct_logs",
ExchangeType.Direct);
//我們已接收到的第一個參數來當key決定要丟給哪一個Queue
var key = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip(1).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs", //設定我們的Exchange為logs
routingKey: key,
basicProperties: null,
body: body);
Console.WriteLine($" 送出訊息: '{key}' : '{message}'");
}
Console.WriteLine("請按任意鍵離開!");
Console.ReadLine();
}
Reciever1 負責拿取 Safe 的 log。
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "admin",
Password = "a1234"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
//一樣是direct_logs Exchange
channel.ExchangeDeclare(
exchange: "direct_logs",
ExchangeType.Direct);
//這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "direct_logs", //指定透過我們的logsExchange
routingKey: "safe"); //只拿取Safe的logs
Console.WriteLine(" 等待拿取log ");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" 收到訊息: {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine("請按任意鍵離開!");
Console.ReadLine();
}
}
Reciver2 負責拿取 Warning 的 log。
static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory()
{
HostName = "localhost",
UserName = "admin",
Password = "a1234"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
//一樣是direct_logs Exchange
channel.ExchangeDeclare(
exchange: "direct_logs",
ExchangeType.Direct);
//這方法會隨機建立一個非持久化、自動刪除的、獨立的Queue
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "direct_logs", //指定透過我們的logsExchange
routingKey: "warning"); //只拿取Warning的logs
Console.WriteLine(" 等待拿取log ");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" 收到訊息: {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine("請按任意鍵離開!");
Console.ReadLine();
}
}
接下來我們來進行測試,送出一個 Warning 的訊息。
dotnet run warning "你家系統大爆炸!"
可以觀察到就只有 Reciver2 有收到訊息。
反之如果我們送出一個 safe 的訊息。
dotnet run safe "沒事沒事系統還活著繼續睡!"
只有 Reciver1 會收到訊息。
後記
這次就介紹 Publish/Subscribe、Routing,兩個範例。
程式一樣會放在 GitHub 有需要的歡迎取用:
Publish/Subscribe Sample
Routing Sample