.Net Core&RabbitMQ基本使用

語言: CN / TW / HK

佇列模式

http://www.rabbitmq.com/getstarted.html

對以上幾種模式進行簡要分類,可以分成如下三類(RPC暫不考慮)

  • 簡單佇列模式,單發單收,一對一模式

  • Worker模式,單發多收(一個訊息一個接收者,多個訊息多個接收者),一對多模式

  • 釋出訂閱模式,包括髮布訂閱、路由、萬用字元模式,這三種只是交換機型別不同

簡單佇列模式

佇列作為中介軟體,在生產者和消費者中間承擔訊息傳遞的通道

建立兩個控制檯專案RabbitMQDemo.Basic.Producer和RabbitMQDemo.Basic.Consumer並安裝Nuget包以支援對RabbitMQ操作

install-package rabbitmq.client

生產者程式碼

  • 通過IConnectionFactory,IConnection和IModel來建立連線和通道。IConnection例項物件負責與RabbitMQ 服務端的連線,通道是在這連線基礎上建立虛擬連線,通過複用來減少效能開銷且便於管理。

  • 通過QueueDeclare方法建立訊息佇列,設定訊息佇列本身的一些屬性資訊。

  • 傳送訊息時呼叫BasicPublish來發送訊息,通過exchange和routingkey引數滿足大部分匹配訊息佇列的場景。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "helloworld";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
        }
    }
}

消費者程式碼

  • 消費端同樣先建立連線和通道

  • 同樣在消費端也會進行佇列宣告,以確保當生產者並未建立或是手動沒有建立情況下不會出現佇列不存在的異常。

  • 定義一個EventingBasicConsumer物件的消費者,然後定義接收事件,輸出從訊息佇列中接收的資料,

  • 最後呼叫BasicConsume方法來啟動消費者監聽

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "helloworld";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var message = ea.Body;
            Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()));
            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        Console.ReadKey();
    }
}

執行過程

執行程式碼,可以在管理頁面中看到佇列宣告好了

生產者傳送訊息,經RabbitMQ,消費者端接收訊息

Worker模式

簡單佇列模式下,當多開消費者時,便演化到了Worker模式,這種情況下不再考慮基礎的怎麼用,而是要如何協調多個消費者的工作。

與簡單佇列模式類似再建立三個控制檯專案RabbitMQDemo.Worker.Producer和RabbitMQDemo.Worker.ConsumerA和RabbitMQDemo.Worker.ConsumerB並安裝Nuget,抄襲第一部分程式碼,更改個佇列名字,然後直接跑起來,其實是一樣的消費模式。

基本使用

當傳送多條訊息,兩個消費者都能夠展示訊息,並且,其中的訊息總是隻會被一個消費者所擁有,預設分配方式是輪詢。

消費能力

現在,思考下如何能夠各消費者的消費能力,來消費訊息,這更側重於消費端了。

將ConsumerA、B在消費時各增加Sleep 1秒和10秒,以區分消費能力的不同。當再次傳送訊息時,因消費端出現著處理訊息耗時的不同,展示資料的時間也不同,但是訊息的分配卻沒有變化。

需要進一步均衡的分配任務,按照消費能力高的分配多,消費能力低的分配少。

當消費能力不同時,可以將消費的任務均衡分配,這樣來使得整體消費端的能力充分發揮。

負載能力

RabbitMQ提供了一個屬性可以設定各消費端的能力,以此可以根據能力分配訊息。

在消費端程式碼中更改下Qos(quality-of-service),即消費端最多接收未被ack的訊息個數,舉個例子:

  • 如果輸入1,當消費端接收到一個訊息,但是沒有應答(活還在幹別再分配任務了),則消費端不會收到下一個訊息,訊息只會在佇列中阻塞。

  • 如果輸入3,那麼可以最多有3個訊息不應答(可以同時幹三個活),如果到達了3個,則當有分配給這個消費端的訊息時只會在阻塞到佇列中,消費端不會接收到訊息。

    對ConsumerA、B分別設定值prefetchCount為10和1。

ConsumerA: channel.BasicQos(0, prefetchCount:10, false);
ConsumerB: channel.BasicQos(0, prefetchCount:1, false);

當再次傳送訊息時,會因為因為A、B兩端的消費能力不同而出現訊息聚集側重於一端

生產者傳送一堆訊息,兩個消費者自身的消費能力不同,設定的能夠消費的容量不同,這樣分配得到的訊息數量也不同。

生產者程式碼

此處並未做任何大的改變,只是將佇列名更改為當前模式的名字以示區分。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "worker";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
        }
    }
}

消費者程式碼

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var queueName = "worker";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        channel.BasicQos(0, 1, false);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Thread.Sleep(10000);
            var message = ea.Body;
            Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        Console.ReadKey();
    }
}

Exchange模式

釋出訂閱,路由和萬用字元這三種可以算為一種模式,區別僅僅是互動機型別不同。

  • 釋出訂閱模式:使用fanout型別交換機

  • 路由模式:使用direct型別交換機

  • 萬用字元模式:使用topic型別交換機

生產者將訊息及RoutingKey傳送到指定交換機,消費者建立各自的訊息佇列並繫結到交換機,交換機根據路由規則匹配生產者傳送的RoutingKey轉發訊息到相應佇列中,其本身不儲存訊息。

Exchange型別

簡要介紹這幾種交換機型別,本身只是對路由規則的匹配方式不同。

  • fanout: 把所有傳送到該交換機的訊息路由到 所有與該交換機繫結的佇列 中。

  • direct: 把訊息路由到那些 BindingKey和RoutingKey完全匹配的佇列 中。

  • topic: 把訊息路由到那些 BindingKey和RoutingKey相匹配的佇列 中。

  • header: 不依賴RoutingKey的匹配規則,而是 根據訊息內容中的headers屬性匹配 (效能差,不實用,使用少)。

注意:BindingKey為交換機和佇列繫結時指定的RoutingKey,傳送訊息時也會給定一個RoutingKey,兩者會按照交換機型別的不同而匹配

釋出訂閱模式(fanout)

fanout模式下會把所有傳送到該交換機的訊息路由到 所有與該交換機繫結的佇列 中。

當生產者傳送訊息到指定交換機,該交換機會將訊息路由到繫結的Queue1和Queue2,兩個佇列分別轉發給其下繫結的消費者,從單個佇列視角來看,便是Worker模式了。

生產者程式碼

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "publishsubscribe_exchange";
        channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
        while (true)
        {
            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
        }
    }
}

如上生產者端在worker模式的基礎上,改動了幾處

  • 訊息佇列宣告變成了交換機宣告,其型別為fanout。

  • 傳送訊息時由指定相應的佇列名改成了空,而指定了交換機名稱。

  • routingKey留空,fanout無需關心routingKey。

消費者程式碼

此處設定兩個queue,分別為publishsubscribe_exchange_worker_1和publishsubscribe_exchange_worker_2

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "publishsubscribe_exchange";
        channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
        var queueName = exchangeName + "_worker_1";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "");

        channel.BasicQos(0, 10, false);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Thread.Sleep(1000);
            var message = ea.Body;
            Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        Console.ReadKey();
    }
}

與Worker的消費者端相比,程式碼上也做了些調整,其餘是保持一致的。

  • 增加了交換機名稱並聲明瞭交換機且型別為fanout,這和生產者端保持一致了

  • 將佇列名和交換機名進行了繫結

  • routingKey留空,fanout無需關心routingKey。

執行過程

當啟動生產者端和消費者端時,交換機和兩個佇列都宣告完畢

同時,點入交換機中,可以看到該交換機下綁定了兩個佇列

這樣一來,當有訊息到達交換機,交換機可以根據訊息名來路由到相應的佇列。因此處設定的routekey是空的,兩個佇列繫結時用的routekey也是空的,因此兩個佇列都符合路由規則,則訊息會同時存在於兩個佇列中。

路由模式(direct)

direct模式下會把訊息路由到那些 BindingKey和RoutingKey完全匹配的佇列 中。

當生產者傳送了一個訊息且傳送的RoutingKey為Warning時,交換機會根據該RoutingKey匹配並轉發訊息到Queue1和Queue2,兩個佇列都滿足了路由規則,當RoutingKey為Info是,僅Queue2滿足,則將訊息轉發給Queue2。

生產者程式碼

生產者端在worker模式的基礎上,只需改動幾處

  • 交換機型別從fanout變更為direct

  • 生產者傳送訊息時指定RoutingKey,原先是留空

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "routing_exchange";
        channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
        while (true)
        {
            Console.WriteLine("訊息RoutingKey(warning or info):");
            var routingKey = Console.ReadLine();

            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
        }
    }
}

消費者程式碼

接收者端在釋出訂閱模式的基礎上增加了交換機和佇列時繫結的key,用於交換機路由規則時選擇佇列。

如下為Queue2下的消費者,為Queue2設定了info和warning兩個RoutingKey用於交換機和佇列繫結。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "routing_exchange";
        channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
        var queueName = exchangeName + "_worker_1";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    
        var routingKey1 = "warning";
        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey1);
        var routingKey2 = "info";
        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey2);

        channel.BasicQos(0, 10, false);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Thread.Sleep(1000);
            var message = ea.Body;
            Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        Console.ReadKey();
    }
}

執行過程

執行程式碼,交換機、佇列及兩者的繫結先完成,可以在管理頁面中看到宣告的資訊。

當生產者傳送訊息且RoutingKey為warning,兩個佇列都滿足路由條件接收到訊息,兩個消費者都展示了訊息。

當傳送訊息且RoutingKey為info,queue2佇列滿足路由條件接收了訊息,一個消費者展示了訊息。

萬用字元模式(topic)

topic模式會把訊息路由到那些 BindingKey和RoutingKey相匹配的佇列 中。

topic型別與direct型別相似,但匹配規則上有所不同,direct需要完全匹配,topic可以設定萬用字元以達到區域性匹配即可滿足。

和direct不同的是,topic設定的RoutingKey(不論是BindingKey還是RoutingKey)都需要為帶"."的字串。比如a.b、c.d.e、fff.gggg.hhhh等,最多為 255 個位元組。

在交換機和佇列繫結時,給定的RoutingKey可以依照如下來設定。

  • #:匹配0~N個單詞

  • *:匹配1個單詞

    舉例說明下,比如兩個RoutingKey分別為#.created和index. ,當生產者傳送訊息時給定的RoutingKey為a.created、aa.created或是b.created等都滿足#.created的規則,index.a、index.b或index.c等都滿足index. 的規則。

生產者程式碼

在路由模式的基礎上更改交換機型別為topic

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "topics_exchange";
        channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
        while (true)
        {
            Console.WriteLine("訊息RoutingKey:");
            var routingKey = Console.ReadLine();

            Console.WriteLine("訊息內容(exit退出):");
            var message = Console.ReadLine();
            if (message.Trim().ToLower() == "exit")
            {
                break;
            }

            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
            Console.WriteLine("訊息內容傳送完畢:" + message);
        }
    }
}

消費者程式碼

接收者端在路由模式的基礎上更改了交換機和佇列繫結的key,可以方便滿足多種情況下的需要。

如下為Queue2下的消費者,為Queue2設定了index.*和#.created.#兩個RoutingKey用於交換機和佇列繫結。

var connFactory = new ConnectionFactory
{
    HostName = "xxx.xxx.xxx.xxx",
    Port = 5672,
    UserName = "rabbitmqdemo",
    Password = "[email protected]",
    VirtualHost = "rabbitmqdemo"};
using (var conn = connFactory.CreateConnection())
{
    using (var channel = conn.CreateModel())
    {
        var exchangeName = "topics_exchange";
        channel.ExchangeDeclare(exchange: exchangeName, type: "topic");
        var queueName = exchangeName + "_worker_2";
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

        var routingKey1 = "index.*";
        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey1);
        var routingKey2 = "#.created.#";
        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey2);

        channel.BasicQos(0, 1, false);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Thread.Sleep(10000);
            var message = ea.Body;
            Console.WriteLine("接收到資訊為:" + Encoding.UTF8.GetString(message.ToArray()) + DateTime.Now.ToString("hh:mm:ss"));
            ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, false);
        };

        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        Console.ReadKey();
    }
}

執行過程

執行程式碼,交換機、佇列及繫結關係,相應RoutingKey都在管理頁面中展示

當生產者傳送訊息且RoutingKey為#.created,兩個佇列都滿足路由條件接收到訊息,兩個消費者都展示了訊息。

當生產者傳送訊息且RoutingKey為#.created.#,queue2佇列滿足了路由條件接收了訊息,一個消費者展示了訊息。

總結

對於在生產者和消費者間解耦,完成非同步協作,訊息佇列可太方便了,暫不深入考慮三者間如何可靠傳輸,僅看訊息佇列提供的多種交換機模式,很大程度上滿足實際使用中需要用到的很多功能。

2022-07-29,望技術有成後能回來看見自己的腳步