如何在C#中使用Channels進行非同步排隊

語言: CN / TW / HK

本篇文章是對如何在asp.net core 的web api中使用Channels的簡單介紹

引子

設想一下我們有這樣一個典型的生產者/消費者問題,我在controller 上收到一個請求並希望通過後臺服務執行任務。

相信你的第一個想法是使用一些外部的訊息佇列來實現,如 RabbitMQ 或Kafka,這對於這樣的一個簡單任務來說這些元件都有點重了。所以我決定在不使用任何外部訊息佇列的情況下實現它。

所以我的第一次嘗試是使用ConcurrentQueue和一些鎖,但它變得很複雜。所以我嘗試搜尋一些更簡單的方式,直到我找到了“Channels”。

Channels是什麼

一個 Channel 是一個 . NET 資料結構或集合,我們可以在其中儲存來自生產者的資料,同時消費者可以檢索它,而無需從我們這邊進行任何額外的同步。

生產者/消費者描述了生產者釋出訊息的行為,並且有一個或多個消費者可以對該訊息進行處理,但每條訊息只能讀取一次。它不會對每個訂閱者進行重複消費。換句話說就是,消費者之間是競爭消費。

例子

當我搜索示例時,我發現了很多控制檯應用程式示例,但沒有針對 Web API的,因此我決定嘗試以 ASP.Net Core Web API 專案為例來進行演示

所以讓我們開始吧。

在 Visual Studio 中建立一個 Web API 專案,或者您可以使用下面的 cmd命令進行建立

dotnet new webapi -o LearningChannels

現在為了簡單起見,我使用兩個後臺任務,一個用於生產者,一個用於消費者。

WriterService

ReaderService

而對於ASP.NET Core Web API,最重要的部分是定義通道和依賴注入。

我們已經完成了更改。

讓我們測試一下。執行應用程式

正如我們在輸出視窗中看到的那樣,一旦資料寫入Channels,消費者就會讀取它。

現在,如果您在我的示例中注意到我使用了無界Channels。

那是什麼,還有什麼其他選擇。讓我們看看他們。

有界Channels和無界Channels

CreateUnbounded 方法建立一個對可以儲存的專案數量沒有限制的channel。當然在某些時候它可能會達到記憶體的限制,這時候你會得到記憶體不足的異常。

另一方面, CreateBounded 方法可以建立一個具有在建立期間提供的顯式數量限制的通道。

有界Channels有一些用於指示不同行為的選項, BoundedChannelFullMode

public enum BoundedChannelFullMode
{
    Wait,
    DropNewest,
    DropOldest,
    DropWrite
}

預設為 Wait ,等待佇列中有空間時寫入。因此TryWrite將為此類channel返回 false

DropOldest 將刪除“最舊”的資料。

DropNewest 將刪除最新的資料。

DropWrite 刪除當前正在寫入的資料。

因此,您可以根據你的場景選擇適合的選項。

總結

在本教程中,我們已經瞭解了通道如何在應用程式中處理非同步任務時發揮作用。

希望你喜歡閱讀它。

快樂編碼並繼續學習..!

參考閱讀:

https://medium.com/@niteshsinghal85/using-channels-for-asynchronous-queuing-in-c-ed96c51d4576