基於 gRPC 和 .NET Core 的伺服器流

語言: CN / TW / HK

原文:https://bit.ly/3lpz8Ll

作者:Chandan Rauniyar

翻譯:精緻碼農-王亮

早在 2019 年,我寫過《用 Mapbox 繪製位置資料》一文,詳細介紹了我如何通過簡單的檔案上傳,用 Mapbox 繪製約 230 萬個位置點。本文介紹我是如何通過使用 gRPC 和 .NET Core 的伺服器流來快速獲取所有位置歷史資料的。

https://chandankkrr.medium.com/mapping-location-data-with-mapbox-9b256f64d569

什麼是 gRPC

gRPC 是一個現代開源的高效能 RPC 框架,可以在任何環境下執行。它可以有效地連線資料中心內和跨資料中心的服務,並對負載平衡、跟蹤、健康檢查和認證提供可插拔的支援。gRPC 最初是由谷歌建立的,該公司使用一個名為 Stubby 的單一通用 RPC 基礎設施來連線其資料中心內和跨資料中心執行的大量微服務,使用已經超過十年。2015 年 3 月,谷歌決定建立 Stubby 的下一個版本,並將其開源,結果就是現在的 gRPC,被許多企業或組織使用。

https://grpc.io/

gRPC 伺服器流

伺服器流式(Server Streaming)RPC,客戶端向伺服器傳送請求,並獲得一個流來讀取一連串的訊息。客戶端從返回的流中讀取資訊,直到沒有訊息為止。gRPC 保證在單個 RPC 呼叫中的資訊是有序的。

rpc GetLocationData (GetLocationRequest) returns (stream GetLocationResponse);

協議緩衝區(Protobuf)

gRPC 使用協議緩衝區(protocol buffers)作為介面定義語言(IDL)來定義客戶端和伺服器之間的契約。在下面的 proto 檔案中,定義了一個 RPC 方法  GetLocations ,它接收  GetLocationsRequest 訊息型別並返回  GetLocationsResponse 訊息型別。響應訊息型別前面的  stream 關鍵字表示響應是流型別,而不是單個響應。

syntax = "proto3";

option csharp_namespace = "GPRCStreaming";

package location_data;

service LocationData {
  rpc GetLocations (GetLocationsRequest) returns (stream GetLocationsResponse);
}

message GetLocationsRequest {
  int32 dataLimit = 1;
}

message GetLocationsResponse {
  int32 latitudeE7 = 1;
  int32 longitudeE7 = 2;
}

建立 gRPC 服務

我們可以使用 dotnet new grpc -n threemillion 命令輕鬆建立一個 .NET gRPC 服務。更多關於在 ASP.NET Core 中建立 gRPC 伺服器和客戶端的資訊可在微軟文件中找到。

Create a gRPC client and server in ASP.NET Core
https://docs.microsoft.com/en-us/aspnet/core/tutorials/grpc/grpc-start?view=aspnetcore-5.0&tabs=visual-studio-code

在添加了 proto 檔案並生成了 gRPC 服務資原始檔後,接下來我添加了 LocationService 類。在下面的程式碼片段中,我有一個  LocationService 類,它繼承了從  Location.proto 檔案中生成的  LocationDataBase 型別。客戶端可以通過  Startup.cs 檔案中  Configure 方法中的  endpoints.MapGrpcService<LocationService>() 來訪問  LocationService 。當伺服器收到  GetLocations 請求時,它首先通過  GetLocationData 方法呼叫讀取 Data 資料夾中  LocationHistory.json 檔案中的所有資料(未包含在原始碼庫)。該方法返回  RootLocation 型別,其中包含  List<Location> 型別的  Location 屬性。 Location 類由兩個內部屬性  Longitude 和  Latitude 組成。接下來,我迴圈瀏覽每個位置,然後將它們寫入  responseStream 中,返回給客戶端。伺服器將訊息寫入流中,直到客戶在  GetLocationRequest 物件中指定的  dataLimit

using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using System.IO;
using System;
using System.Linq;

namespace GPRCStreaming
{
    public class LocationService : LocationData.LocationDataBase
    {
        private readonly FileReader _fileReader;
        private readonly ILogger<LocationService> _logger;

        public LocationService(FileReader fileReader, ILogger<LocationService> logger)
        {
            _fileReader = fileReader;
            _logger = logger;
        }

        public override async Task GetLocations(
          GetLocationsRequest request,
          IServerStreamWriter<GetLocationsResponse> responseStream,
          ServerCallContext context)
        {
            try
            {
                _logger.LogInformation("Incoming request for GetLocationData");

                var locationData = await GetLocationData();
                var locationDataCount = locationData.Locations.Count;

                var dataLimit = request.DataLimit > locationDataCount ? locationDataCount : request.DataLimit;

                for (var i = 0; i <= dataLimit - 1; i++)
                {
                   var item = locationData.Locations[i];

                    await responseStream.WriteAsync(new GetLocationsResponse
                    {
                        LatitudeE7 = item.LatitudeE7,
                        LongitudeE7 = item.LongitudeE7
                    });
                }
            }
            catch (Exception exception)
            {
                _logger.LogError(exception, "Error occurred");
                throw;
            }
        }

        private async Task<RootLocation> GetLocationData()
        {
            var currentDirectory = Directory.GetCurrentDirectory();
            var filePath = $"{currentDirectory}/Data/Location_History.json";

            var locationData = await _fileReader.ReadAllLinesAsync(filePath);

            return locationData;
        }
    }
}

現在,讓我們執行該服務併發送一個請求。我將使用一個叫 grpcurl 的命令列工具,它可以讓你與 gRPC 伺服器互動。它基本上是針對 gRPC 伺服器的  curl

https://github.com/fullstorydev/grpcurl

通過 grpcurl 與 gRPC 端點(endpoint)互動只有在 gRPC 反射服務被啟用時才可用。這允許服務可以被查詢,以發現伺服器上的 gRPC 服務。擴充套件方法  MapGrpcReflectionService 需要引入  Microsoft.AspNetCore.Builder 的名稱空間:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    app.UseEndpoints(endpoints =>
    {
        endpoints.MapGrpcService<LocationService>();

        if (env.IsDevelopment())
        {
            endpoints.MapGrpcReflectionService();
        }

        endpoints.MapGet("/", async context =>
        {
            await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
        });
    });
}
grpcurl -plaintext -d '{"dataLimit": "100000"}' localhost:80 location_data.LocationData/GetLocations

一旦伺服器收到請求,它就會讀取檔案,然後在位置列表中迴圈,直到達到 dataLimit 計數,並將位置資料返回給客戶端。

接下來,讓我們建立一個 Blazor 客戶端來呼叫 gRPC 服務。我們可以使用 IServiceCollection 介面上的  AddGrpcClient 擴充套件方法設定一個 gRPC 客戶端:

public void ConfigureServices(IServiceCollection services)
{
    services.AddRazorPages();
    services.AddServerSideBlazor();
    services.AddSingleton<WeatherForecastService>();

    services.AddGrpcClient<LocationData.LocationDataClient>(client =>
    {
        client.Address = new Uri("http://localhost:80");
    });
}

我使用 Virtualize Blazor 元件來渲染這些位置。 Virtualize 元件不是一次性渲染列表中的每個專案,只有當前可見的專案才會被渲染。

ASP.NET Core Blazor component virtualization
https://docs.microsoft.com/en-us/aspnet/core/blazor/components/virtualization?view=aspnetcore-5.0

相關程式碼:

@page "/locationdata"

@using Grpc.Core
@using GPRCStreaming
@using threemillion.Data
@using System.Diagnostics
@using Microsoft.AspNetCore.Components.Web.Virtualization

@inject IJSRuntime JSRuntime;
@inject System.Net.Http.IHttpClientFactory _clientFactory
@inject GPRCStreaming.LocationData.LocationDataClient _locationDataClient

<table class="tableAction">
    <tbody>
        <tr>
            <td>
                <div class="data-input">
                    <label for="dataLimit">No of records to fetch</label>
                    <input id="dataLimit" type="number" @bind="_dataLimit" />
                    <button @onclick="FetchData" class="btn-submit">Call gRPC</button>
                </div>
            </td>
            <td>
                <p class="info">
                    Total records: <span class="count">@_locations.Count</span>
                </p>
                <p class="info">
                    Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds
                </p>
            </td>
        </tr>
    </tbody>
</table>

<div class="tableFixHead">
    <table class="table">
        <thead>
            <tr>
                <th>Longitude</th>
                <th>Latitude</th>
            </tr>
        </thead>
        <tbody>
            <Virtualize Items="@_locations" Context="locations">
                <tr>
                    <td>@locations.LongitudeE7</td>
                    <td>@locations.LatitudeE7</td>
                </tr>
            </Virtualize>
        </tbody>
    </table>
</div>

@code {
    private int _dataLimit = 1000;

    private List<Location> _locations = new List<Location>();

    private Stopwatch _stopWatch = new Stopwatch();

    protected override async Task OnInitializedAsync()
    {
        await FetchData();
    }

    private async Task FetchData()
    {
        ResetState();

        _stopWatch.Start();

        using (var call = _locationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit }))
        {
            await foreach (var response in call.ResponseStream.ReadAllAsync())
            {
                _locations.Add(new Location { LongitudeE7 = response.LongitudeE7, LatitudeE7 = response.LatitudeE7 });

                StateHasChanged();
            }
        }

        _stopWatch.Stop();
    }

    private void ResetState()
    {
        _locations.Clear();

        _stopWatch.Reset();

        StateHasChanged();
    }
}

通過在本地執行的流呼叫,從 gRPC 伺服器接收 2,876,679 個單獨的響應大約需要 8 秒鐘。讓我們也在 Mapbox 中載入資料:

@page "/mapbox"

@using Grpc.Core
@using GPRCStreaming
@using System.Diagnostics

@inject IJSRuntime JSRuntime;
@inject System.Net.Http.IHttpClientFactory _clientFactory
@inject GPRCStreaming.LocationData.LocationDataClient LocationDataClient

<table class="tableAction">
    <tbody>
        <tr>
            <td>
                <div class="data-input">
                    <label for="dataLimit">No of records to fetch</label>
                    <input id="dataLimit" type="number" @bind="_dataLimit" />
                    <button @onclick="LoadMap" class="btn-submit">Load data</button>
                </div>
            </td>
            <td>
                <p class="info">
                    Total records: <span class="count">@_locations.Count</span>
                </p>
                <p class="info">
                    Time taken: <span class="time">@_stopWatch.ElapsedMilliseconds</span> milliseconds
                </p>
            </td>
        </tr>
    </tbody>
</table>

<div id='map' style="width: 100%; height: 90vh;"></div>

@code {
    private int _dataLimit = 100;

    private List<object> _locations = new List<object>();

    private Stopwatch _stopWatch = new Stopwatch();

    protected override async Task OnAfterRenderAsync(bool firstRender)
    {
        if (!firstRender)
        {
            return;
        }

        await JSRuntime.InvokeVoidAsync("mapBoxFunctions.initMapBox");
    }

    private async Task LoadMap()
    {
        ResetState();

        _stopWatch.Start();

        using (var call = LocationDataClient.GetLocations(new GetLocationsRequest { DataLimit = _dataLimit }))
        {

            await foreach (var response in call.ResponseStream.ReadAllAsync())
            {
                var pow = Math.Pow(10, 7);
                var longitude = response.LongitudeE7 / pow;
                var latitude = response.LatitudeE7 / pow;

                _locations.Add(new
                {
                    type = "Feature",
                    geometry = new
                    {
                        type = "Point",
                        coordinates = new double[] { longitude, latitude }
                    }
                });

                StateHasChanged();
            }

            _stopWatch.Stop();

            await JSRuntime.InvokeVoidAsync("mapBoxFunctions.addClusterData", _locations);
        }
    }

    private void ResetState()
    {
        JSRuntime.InvokeVoidAsync("mapBoxFunctions.clearClusterData");

        _locations.Clear();

        _stopWatch.Reset();

        StateHasChanged();
    }
}

原始碼在我的 GitHub 上 :pray::

https://github.com/Chandankkrr/threemillion

往期 精彩 回顧

【推薦】.NET Core開發實戰視訊課程   ★★★

.NET Core實戰專案之CMS 第一章 入門篇-開篇及總體規劃

【.NET Core微服務實戰-統一身份認證】開篇及目錄索引

Redis基本使用及百億資料量中的使用技巧分享(附視訊地址及觀看指南)

.NET Core中的一個介面多種實現的依賴注入與動態選擇看這篇就夠了

10個小技巧助您寫出高效能的ASP.NET Core程式碼

用abp vNext快速開發Quartz.NET定時任務管理介面

在ASP.NET Core中建立基於Quartz.NET託管服務輕鬆實現作業排程

現身說法:實際業務出發分析百億資料量下的多表查詢優化

關於C#非同步程式設計你應該瞭解的幾點建議

C#非同步程式設計看這篇就夠了