ASP.NET Core 6框架揭祕例項演示[03]:Dapr初體驗

語言: CN / TW / HK

也許你們可能沒有接觸過Dapr,但是一定對它“有所耳聞”,感覺今年它一下子就火了,一時間很多人都在談論它。我們從其命名(Dapr的全稱是“分散式應用執行時Distributed Application Runtime”)可以看出Dapr的定位,它並不是分散式應用的開發框架,它提供的是更底層的“執行時”。我們可以使用不同的程式語言,採用不同的開發框架在這個由Dapr提供的執行時上面構建分散式應用。我們接下來就來感受一下Dapr在.NET上面的開發體驗,關於Dapr的基本資訊以及環境的安裝,請參閱 官方文件

[117]Dapr-服務呼叫 ( 原始碼

[118]Dapr-狀態管理( 原始碼

[119]Dapr-釋出訂閱( 原始碼

[120]Dapr-Actor模型( 原始碼

[117]Dapr-服務呼叫

Dapr是一個採用Service Mesh設計的分散式微服務執行時。每一個部署在Dapr上的應用例項(獨立程序或者容器)都具有這一個專屬的Sidecar,具體體現為一個獨立的程序(daprd)或者容器。應用例項只會與它專屬的Sidecar進行通訊,跨應用通訊是在兩個應用例項的Sidecar之間進行的,具體的傳輸協議可以採用HTTP或者gRPC。正是因為應用例項和Sidecar是在各自的程序內獨立執行的,所以Dapr才對應用開發採用的技術棧沒有任何限制。我們接下來就通過一個簡單的例子來演示Dapr下的服務呼叫。我們建立瞭如圖1所示的解決方案。App1和App2代表兩個具有依賴關係的應用,App1會呼叫App2提供的服務。Shared是一個類庫專案,提供被App1和App2共享的資料型別。

圖1 Dapr應用解決方案

我們依然沿用上面演示的數學運算應用場景,並在Shared專案中定義瞭如下兩個資料型別。表示輸入的Input型別提供了兩個運算元(X和Y),表示輸出的Output型別除了通過其Result屬性表示運算結果外,還利用Timestamp屬性返回運算時間戳。

  1 public class Input
  2 {
  3     public int X { get; set; }
  4     public int Y { get; set; }
  5 }
  6 
  7 public class Output
  8 {
  9     public int 		Result { get; set; }
 10     public DateTimeOffset 	Timestamp { get; set; } = DateTimeOffset.Now;
 11 }
 12 

App2就是一個簡單的ASP.NET CORE應用,我們採用路由的方式註冊了執行數學運算的終結點。如下面的程式碼片段所示,註冊的終結點採用的路徑模板為“/{method}”,路由引數“{method}”既表示運算操作型別,同時也作為Dapr服務的方法名。在作為終結點處理器的Calculate方法中,請求的主體內容被提取出來,經過反序列化後繫結為input引數。在根據提供的輸入執行對應的運算並生成Output物件後,將其序列化成JSON,並以此作為響應的內容。

  1 using Microsoft.AspNetCore.Mvc;
  2 using Shared;
  3 
  4 var app = WebApplication.Create(args);
  5 app.MapPost("{method}", Calculate);
  6 app.Run("http://localhost:9999");
  7 
  8 static IResult Calculate(string method, [FromBody] Input input)
  9 {
 10     var result = method.ToLower() switch
 11     {
 12         "add" => input.X + input.Y,
 13         "sub" => input.X - input.Y,
 14         "mul" => input.X * input.Y,
 15         "div" => input.X / input.Y,
 16         _ => throw new InvalidOperationException($"Invalid method {method}")
 17     };
 18     return Results.Json(new Output { Result = result });
 19 }

在呼叫WebApplication物件的Run方法啟動應用的時候,我們顯式指定了監聽地址,其目的是為了將埠(9999)固定下來。App2目前實際上與Dapr一點關係都沒有,我們必須以Dapr的方式啟動它才能將它部署到本機的Dapr環境中,具體來說我們可以執行“dapr run --app-id app2 --app-port 9999 -- dotnet run”這個命令來啟動Sidecar的同時以子程序的方式啟動應用。提供的命令列引數除了提供應用的啟動方式(dotnet run)之外,還提供了針對應用的表示(--app-id app2)和監聽的埠(--app-port 9999)。考慮到每次在控制檯輸入這些繁瑣的命令有點麻煩,我們選擇在launchSettings.json檔案中定義如下這個Profile來以Dapr的方式啟動應用。由於這種啟動方式會將輸出目錄作為當前工作目錄,我們選擇指定程式集的方式來啟動應用(dotnet App2.dll)。

  1 {
  2   "profiles": {
  3     "Dapr": {
  4       "commandName": "Executable",
  5       "executablePath": "dapr",
  6       "commandLineArgs": "run --app-id app2 --app-port 9999 -- dotnet App2.dll"
  7     }
  8   }
  9 }
 10 

App1是一個簡單的控制檯應用,為了能夠採用上述這種方式來啟動它,我們還是將SDK從“Microsoft.NET.Sdk”改成“Microsoft.NET.Sdk.Web”。我們在launchSettings.json檔案中定義瞭如下這個類似的Profile,應用的標識被設定成“app1”。由於App1僅僅涉及到對其他應用的呼叫,自身並不提供服務,所以我們不需要設定埠號。

  1 {
  2   "profiles": {
  3     "Dapr": {
  4       "commandName": "Executable",
  5       "executablePath": "dapr",
  6       "commandLineArgs": "run --app-id app1 -- dotnet App1.dll"
  7     }
  8   }
  9 }
 10 

由於App1涉及到針對Dapr服務的呼叫,需要使用到Dapr客戶端SDK提供的API,所以我們為它添加了“Dapr.Client”這個NuGet包的引用。具體的服務呼叫體現在下面的程式中,如程式碼片段所示,我們呼叫DaprClient的靜態方法CreateInvokeHttpClient針對目標服務或者應用的標識“app2”建立了一個HttpClient物件,並利用該它完成了四個服務方法的呼叫。具體的服務呼叫實現在InvokeAsync這個本地方法中,在將作為輸入的Input物件序列化成JSON文字之後,該方法會將其作為請求的主體內容。在一個分散式環境下,我們不需要知道目標服務所在的位置,因為這是不確定的,確定的唯有目標服務/應用的標識,所以我們直接將此標識作為請求的目標地址。在得到呼叫結果之後,我們對它進行了簡單的格式化後直接輸出到控制檯上。

  1 using Dapr.Client;
  2 using Shared;
  3 
  4 HttpClient client = DaprClient.CreateInvokeHttpClient(appId: "app2");
  5 var input = new Input(2, 1);
  6 
  7 await InvokeAsync("add", "+");
  8 await InvokeAsync("sub", "-");
  9 await InvokeAsync("mul", "*");
 10 await InvokeAsync("div", "/");
 11 
 12 async Task InvokeAsync(string method, string @operator)
 13 {
 14     var response = await client.PostAsync(method, JsonContent.Create(input));
 15     var output = await response.Content.ReadFromJsonAsync<Output>();
 16     Console.WriteLine( $"{input.X} {@operator} {input.Y} = {output.Result} ({output.Timestamp})");
 17 }
 18 

我們先後啟動App2和App1後,兩個應用所在的控制檯上會產生如圖2所示的輸出。應用輸出的文字會採用“== App ==”作為字首,其餘內容為Sidecar輸出的日誌。從App2所在控制檯(前面)上輸出可以看出,它成功地完成了基於四種運算的服務呼叫。當筆者以Debug模式啟動App1的時候有時會“閃退”的現象,如果你也出現這樣的情況,可以選擇非Debug模式(在解決方案視窗中右鍵選擇該專案,選擇Debug => Start Without Debuging)啟動它。

圖2基於Dapr的服務呼叫

[118]Dapr-狀態管理

我們可以藉助Dapr提供的狀態管理元件建立“有狀態”的服務。這裡的狀態並不是儲存在應用例項的程序中供其獨享,而是儲存在獨立的儲存中(比如Redis)共所有應用例項共享,所以並不是影響水平伸縮的能力。對於上面演示的例項,假設計算服務提供的是四個耗時的操作,那麼我們就可以將計算結果快取起來避免重複計算,我們現在就來實現這樣的功能。為了能夠使用到針對狀態管理的API,我們為App2新增針對“Dapr.AspNetCore”這個NuGet包的引用。我們將快取相關的三個操作定義在如下這個IResultCache介面中。如程式碼片段所示,該介面定義了三個方法,GetAsync方法根據指定的操作/方法名稱和輸入提取快取的計算結果,SaveAsync方法負責將計算結果根據對應的方法名成和輸入快取起來,ClearAsync方法則將指定方法的所有快取結果全部清除掉。

  1 public interface IResultCache
  2 {
  3     Task<Output> GetAsync(string method, Input input);
  4     Task SaveAsync(string method, Input input, Output output);
  5     Task ClearAsync(params string[] methods);
  6 }
  7 

如下所示的IResultCache介面的實現型別ResultCache的定義。我們在建構函式中注入了DaprClient物件,並利用它來完成狀態管理的相關操作。計算結果快取項的Key由方法名稱和輸入引數以 “Result_{method}_{X}_{Y}”這樣的格式生成,具體的格式化體現在_keyGenerator欄位返回的委託上。由於涉及到對快取計算結果的清除,我們不得不將所有計算結果快取項的Key也一併快取起來,該快取項採用的Key為“ResultKeys”。

  1 public class ResultCache : IResultCache
  2 {
  3     private readonly DaprClient 			_daprClient;
  4     private readonly string 			_keyOfKeys = "ResultKeys";
  5     private readonly string 			_storeName = "statestore";
  6     private readonly Func<string, Input, string> 	_keyGenerator;
  7 
  8     public ResultCache(DaprClient daprClient)
  9     {
 10         _daprClient = daprClient;
 11         _keyGenerator = (method, input) => $"Result_{method}_{input.X}_{input.Y}";
 12 }
 13 
 14     public Task<Output> GetAsync(string method, Input input)
 15     {
 16         var key = _keyGenerator(method, input);
 17         return _daprClient.GetStateAsync<Output>(storeName: _storeName, key: key);
 18     }
 19 
 20     public async Task SaveAsync(string method, Input input, Output output)
 21     {
 22         var key = _keyGenerator(method, input);
 23         var keys = await _daprClient.GetStateAsync<HashSet<string>>(storeName: _storeName, key: _keyOfKeys) ?? new HashSet<string>();
 24         keys.Add(key);
 25 
 26         var operations = new StateTransactionRequest[2];
 27         var value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(output));
 28         operations[0] = new StateTransactionRequest(key: key, value: value,  operationType: StateOperationType.Upsert);
 29 
 30         value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(keys));
 31         operations[1] = new StateTransactionRequest(key: _keyOfKeys, value: value, operationType: StateOperationType.Upsert);
 32         await _daprClient.ExecuteStateTransactionAsync(storeName: _storeName,  operations: operations);
 33     }
 34 
 35     public async Task ClearAsync(params string[] methods)
 36     {
 37         var keys = await _daprClient.GetStateAsync<HashSet<string>>(storeName: _storeName, key: _keyOfKeys);
 38         if (keys != null)
 39         {
 40             var selectedKeys = keys .Where(it => methods.Any(m => it.StartsWith($"Result_{m}"))).ToArray();
 41             if (selectedKeys.Length > 0)
 42             {
 43                 var operations = selectedKeys .Select(it => new StateTransactionRequest(key: it, value: null, operationType: StateOperationType.Delete)) .ToList();
 44                 operations.ForEach(it => keys.Remove(it.Key));
 45                 var value = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(keys));
 46                 operations.Add(new StateTransactionRequest(key: _keyOfKeys, value: value, operationType: StateOperationType.Upsert));
 47                 await _daprClient.ExecuteStateTransactionAsync(storeName: _storeName, operations: operations);
 48             }
 49         }
 50     }
 51 }

在實現的GetAsync方法中,我們根據指定的方法名稱和輸入生成對應快取項的Key,並呼叫DaprClient物件的GetStateAsync<TValue>提取對應快取項的值。我們將Key作為該方法的第二個引數,第一個引數代表狀態儲存(State Store)元件的名稱。Dapr在初始化過程中會預設為我們設定一個針對Redis的狀態儲存元件,並將其命名為“statestore”,ResultCache使用正式這個狀態儲存元件。

對單一狀態值進行設定只需要呼叫DaprClient物件的SaveStateAsync<TValue>方法就可以了,但是我們實現的SaveAsync方法除了需要快取計算結果外,還需要修正“ResultKeys”這個快取的值。為了確保兩者的一致性,兩個快取項的更新最好在同一個事務中進行,為此我們呼叫了DaprClient物件的ExecuteStateTransactionAsync方法。我們為此建立了兩個StateTransactionRequest物件來描述對這兩個快取項的更新,具體來說需要設定快取項的Key、Value和操作型別(Upsert)。這裡設定的值必須是最原始的二進位制陣列,由於狀態管理元件預設採用JSON的序列化方式和UTF-8編碼,所以我們按照這樣的規則生成了作為快取值的二進位制陣列。另一個實現的ClearAsync方法採用類似的方式刪除指定方法的計算結果快取,並修正了“ResultKeys”快取項的值。

接下來我們需要對計算服務的處理方法Calculate作必要的修改。如下面的程式碼片段所示,我們直接在該方法中注入了IResultCache物件。如果能夠利用該物件提取出了快取的計算結果,我們會直接將它返回給客戶端。只有在對應計算結果尚未快取的情況下,我們才會真正實施計算。在返回計算結果之前,我們會對計算結果實施快取。該方法中注入IResultCache和DaprClient物件對應的服務在WebApplication被構建之前進行了註冊。

  1 using App2;
  2 using Microsoft.AspNetCore.Mvc;
  3 using Shared;
  4 var builder = WebApplication.CreateBuilder(args);
  5 builder.Services
  6     .AddSingleton<IResultCache, ResultCache>()
  7     .AddDaprClient();
  8 var app = builder.Build();
  9 app.MapPost("/{method}", Calculate);
 10 app.Run("http://localhost:9999");
 11 
 12 static async Task<IResult> Calculate(string method, [FromBody] Input input, IResultCache resultCache)
 13 {
 14     var output = await resultCache.GetAsync(method, input);
 15     if (output == null)
 16     {
 17         var result = method.ToLower() switch
 18         {
 19             "add" => input.X + input.Y,
 20             "sub" => input.X - input.Y,
 21             "mul" => input.X * input.Y,
 22             "div" => input.X / input.Y,
 23             _ => throw new InvalidOperationException($"Invalid operation {method}")
 24         };
 25         output = new Output { Result = result };
 26         await resultCache.SaveAsync(method, input, output);
 27     }
 28     return Results.Json(output);
 29 }
 30 

由於兩輪服務呼叫使用相同的輸入。如果服務端對計算結果進行了快取,那麼針對同一個方法的呼叫就應該具有相同的時間戳,如圖3所示的輸出結果證實了這一點。

圖3利用狀態管理快取計算結果

[119]Dapr-釋出訂閱

Dapr提供了“開箱即用”的釋出訂閱(“Pub/Sub”)模組,我們可以將其作為訊息佇列來用。上面演示的例項利用狀態管理元件快取了計算結果,現在我們採用釋出訂閱的方法將指定方法的計算結果快取清除掉。具體來說,我們在App2中訂閱“刪除快取”的主題(Topic),當接收到釋出的對應主題的訊息時,我們從訊息中提待刪除的方法列表,並將對應的計算結果快取清除掉。至於“刪除快取”的主題的釋出,我們將它交給App1來完成。我們為此對App2再次做出修改。如下面的程式碼片段所示,我們針對路徑“clear”註冊了一個作為“刪除快取”主題的訂閱終結點,它對應的處理方法為ClearAsync。我們通過標註在該方法上的TopicAttribute來對訂閱的主題作相應設定。該特性建構函式的第一個引數為採用的釋出訂閱元件名稱,我們採用的是初始化Dapr是設定的基於Redis的釋出訂閱元件,該元件命名為“pubsub”。第二個引數表示訂閱主題的名稱,我們將其設定為“clearresult”。

  1 using App2;
  2 using Dapr;
  3 using Microsoft.AspNetCore.Mvc;
  4 using Shared;
  5 var builder = WebApplication.CreateBuilder(args);
  6 builder.Services
  7     .AddSingleton<IResultCache, ResultCache>()
  8     .AddDaprClient();
  9 var app = builder.Build();
 10 
 11 app.UseCloudEvents();
 12 app.MapPost("clear", ClearAsync);
 13 app.MapSubscribeHandler();
 14 
 15 app.MapPost("/{method}", Calculate);
 16 app.Run("http://localhost:9999");
 17 
 18 [Topic(pubsubName:"pubsub", name:"clearresult")]
 19 static Task ClearAsync(IResultCache cache, [FromBody] string[] methods) => cache.ClearAsync(methods);
 21 
 22 static async Task<IResult> Calculate(string method, [FromBody]Input input,    IResultCache resultCache) ...

ClearAsync方法定義了兩個引數,第一個引數會預設繫結為註冊的IResultCache服務,第二個引數表示待刪除的方法列表,上面標註的FromBodyAttribute特性將指導路由系統通過提取請求主體內容來繫結對應引數值。但是Dapr的釋出訂閱元件預設採用Cloud Events訊息格式,如果請求的主體為具有如此結構的訊息,按照預設的繫結規則,針對input引數的繫結將會失敗。為此我們呼叫WebApplication物件的UseCloudEvents擴充套件方法額外註冊了一個CloudEventsMiddleware中介軟體,該中介軟體會提取出請求資料部分的內容,並使用它將整個請求主體部分的內容替換掉,那麼針對methods引數的繫結就能成功了。我們還呼叫WebApplication物件的MapSubscribeHandler擴充套件方法註冊了一個額外的終結點。在應用啟動的時候,Sidecar會利用這個終結點收集當前應用提供的所有訂閱處理器的元資料資訊,其中包括髮布訂閱元件和主題名稱,以及呼叫的路由或路徑(對於本例就是“clear”)。當Sidecar接受到釋出訊息後,會根據這組元資料選擇匹配的訂閱處理器,並利用其提供的路徑完成對它的呼叫。

我們針對釋出者的角色對App1做了相應的修改。如下面的程式碼片段所示,我們利用建立的DaprClientBuilder構建了一個DaprClient物件。在兩輪針對計算服務的呼叫之間,我們呼叫了DaprClient的PublishEventAsync方法釋出了一個名為“clearresult”的訊息。從提供的第三個引數可以看出,我們僅僅清除“加法”和“減法”這兩個方法的計算結果快取。圖4所示的是App1執行之後在控制檯上的輸出。對於兩輪間隔為5秒的服務呼叫,加法和減法的結果由於快取被清除,所以它們具有不同的時間戳,但乘法和除法的計算時間依舊是相同的。

圖4利用釋出訂閱元件刪除結果快取

[120]Dapr-Actor模型

如果分散式系統待解決的功能可以分解成若干很小且獨立狀態邏輯單元,我們可以考慮使用Actor模型(Model)進行設計。具體來說,我們將上述這些狀態邏輯單元定義成單個的Actor,並在它們之間採用訊息驅動的通訊方法完成整個工作流程。每個Actor只需要考慮對接收的訊息進行處理,並將後續的操作轉換成訊息分發給另一個Actor就可以了。由於每個Actor以單執行緒模式執行,我們無需考慮多執行緒併發和同步的問題。由於Actor之間的互動是完全無阻塞的,一般能夠提高系統整體的吞吐量。

接下來我們依然通過對上面演示例項的修改來演示Dapr的Actor模型在.NET下的應用。這次我們將一個具有狀態的累加計數器設計成Actor。我們在Shared專案中為這個Actor定義了一個介面,如下面的程式碼片段所示,這個名為IAccumulator的介面派生於IActor,由於後者來源於“Dapr.Actors”這個NuGet包,所以我們需要新增對應的包引用。IAccumulator介面定義了兩個方法,IncreaseAsync方法根據指定的數值進行累加並返回當前的值, ResetAsync方法將累加數值重置歸零。

public interface IAccumulator: IActor
{
    Task<int> IncreaseAsync(int count);
    Task ResetAsync();
}

我們將IAccumulator介面的實現型別Accumulator定義在App2中。如下面的程式碼片段所示,除了實現對應的介面,Accumulator型別還繼承了Actor這個基類。由於每個Actor提供當前累加的值,所以它們是有狀態的。但是不能利用Accumulator例項的屬性來維持這個狀態,我們使用從基類繼承下來的StateManager屬性返回的IActorStateManager物件來管理當前Actor的狀態。具體來說,我們呼叫TryGetStateAsync方法提取當前Actor針對指定名稱(“__counter”)的狀態值,新的狀態值通過呼叫它的SetStateAsync方法進行設定。由於IActorStateManager物件的SetStateAsync方法對狀態所作的更新都是本地操作,我們最終還需要呼叫Actor物件自身的SaveStateAsync方法提交所有的狀態更新。Actor的狀態依舊是通過Dapr的狀態管理元件進行儲存的。

public class Accumulator : Actor, IAccumulator
{
    private readonly string _stateName = "__counter";
    public Accumulator(ActorHost host) : base(host)
    {
    }
    public async Task<int> IncreaseAsync(int count)
    {
        var counter = 0;
        var existing = await StateManager.TryGetStateAsync<int>(stateName: _stateName);
        if(existing.HasValue)
        {
            counter = existing.Value;
        }
        counter+= count;
        await StateManager.SetStateAsync(stateName: _stateName, value:counter);
        await SaveStateAsync();
        return counter;
    }
    public async Task ResetAsync()
    {
        await StateManager.TryRemoveStateAsync(stateName: _stateName);
        await SaveStateAsync();
    }
}

承載Actor相關的API由“Dapr.Actors.AspNetCore”這個NuGet包提供,所以我們需要新增該包的引用。Actor的承載方式與MVC框架類似,它們都是建立在路由系統上,MVC框架將所有Controller型別轉換成註冊的終結點,而Actor的終結點由WebApplication的MapActorsHandlers擴充套件方法進行註冊。在註冊中介軟體之前,我們還需要呼叫IServiceCollection介面的AddActors擴充套件方法將註冊的Actor型別新增到ActorRuntimeOptions配置選項上。

using App2;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddActors(options => options.Actors.RegisterActor<Accumulator>());
var app = builder.Build();
app.MapActorsHandlers();
app.Run("http://localhost:9999");

我們在App1中編寫了如下的程式來演示針對Actor的呼叫。如程式碼片段所示,我們呼叫ActorProxy的靜態方法Create<TActor>建立了兩個IAccumulator物件。建立Actor物件(其實是呼叫Actor的代理)時需要指定唯一標識Actor的ID(“001”和“002”)和對應的型別(“Accumulator”)。

using Dapr.Actors;
using Dapr.Actors.Client;
using Shared;

var accumulator1 = ActorProxy.Create<IAccumulator>(new ActorId("001"), "Accumulator");
var accumulator2 = ActorProxy.Create<IAccumulator>(new ActorId("002"), "Accumulator");

while (true)
{
    var counter1 = await accumulator1.IncreaseAsync(1);
    var counter2 = await accumulator2.IncreaseAsync(2);
    await Task.Delay(5000);
    Console.WriteLine($"001: {counter1}");
    Console.WriteLine($"002: {counter2}\n");

    if (counter1 > 10)
    {
        await accumulator1.ResetAsync();
    }
    if (counter2 > 20)
    {
        await accumulator2.ResetAsync();
    }
}

Actor物件創建出來後,我們在一個迴圈中採用不同的步長(1和2)呼叫它們的IncreaseAsync實施累加操作。在計數器數值達到上限(10和20)時,我們呼叫它們的ResetAsync方法重置計數器。在先後啟動App2和App1之後,App1所在控制檯上將會以如圖5所示的形式輸出兩個累加計數器提供的計數。

圖5 Actor模式實現的累加計數器