KestrelServer詳解[2]: 網絡鏈接的創建

語言: CN / TW / HK

註冊監聽終結點(Endpoint) 》已經詳細講述瞭如何使用KestrelServer,現在我們來簡單聊聊這種處理器的總體設計和實現原理。當KestrelServer啟動的時候,註冊的每個終結點將轉換成對應的“連接監聽器”,後者在監聽到初始請求時會創建“連接”,請求的接收和響應的回覆都在這個連接中完成。本文提供的示例演示已經同步到《 ASP.NET Core 6框架揭祕-實例演示版 》)

一、連接上下文(ConnectionContext )

監聽器創建的連接時一個抽象的概念,我們可以將其視為客户端和服務端完成消息交換而構建的“上下文”,該上下文通過如下這個ConnectionContext類型表示。ConnectionContext派生於抽象基類BaseConnectionContext,後者實現了IAsyncDisposable接口。每個連接具有一個通過ConnectionId屬性表示的ID,它的LocalEndPoint和RemoteEndPoint屬性返回本地(服務端)和遠程(客户端)終結點。服務器提供的特性集合體現在它的Features屬性上,另一個Items提供了一個存放任意屬性的字典。ConnectionClosed屬性提供的CancellationToken可以用來接收連接關閉的通知。Abort方法可以中斷當前連接,這兩個方法在ConnectionContext被重寫。ConnectionContext類型的Transport屬性提供的IDuplexPipe對象是用來對請求和響應進行讀寫的雙向管道。

public abstract class ConnectionContext : BaseConnectionContext
{
    public abstract IDuplexPipe Transport { get; set; }
    public override void Abort(ConnectionAbortedException abortReason);
    public override void Abort();
}

public abstract class BaseConnectionContext : IAsyncDisposable
{
    public virtual EndPoint? LocalEndPoint { get; set; }
    public virtual EndPoint? RemoteEndPoint { get; set; }
    public abstract string ConnectionId { get; set; }
    public abstract IFeatureCollection Features { get; }
    public abstract IDictionary<object, object?> Items { get; set; }
    public virtual CancellationToken ConnectionClosed { get; set; }

    public abstract void Abort();
    public abstract void Abort(ConnectionAbortedException abortReason);
    public virtual ValueTask DisposeAsync();
}

如果採用HTTP 1.X和HTTP 2協議,KestrelServer會採用TCP套接字(Socket)進行通信,對應的連接體現為一個SocketConnection對象。如果採用的是HTTP 3,會採用基於UDP的QUIC協議進行通信,對應的連接體現為一個QuicStreamContext對象。如下面的代碼片段所示,這兩個類型都派生於TransportConnection,後者派生於ConnectionContext。

internal abstract class TransportConnection : ConnectionContext
internal sealed class SocketConnection : TransportConnection
internal sealed class QuicStreamContext : TransportConnection

二、連接監聽器(IConnectionListener )

KestrelServer同時支持三個版本的HTTP協議,HTTP 1.X和HTTP 2建立在TCP協議之上,針對這樣的終結點會轉換成通過如下這個IConnectionListener接口表示的監聽器。它的EndPoint屬性表示監聽器綁定的終結點,當AcceptAsync方法被調用時,監聽器便開始了網絡監聽工作。當來自某個客户端端的初始請求抵達後,它會將創建代表連接的ConnectionContext上下文創建出來。另一個UnbindAsync方法用來解除終結點綁定,並停止監聽。

public interface IConnectionListener : IAsyncDisposable
{
    EndPoint EndPoint { get; }
    ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default(CancellationToken));
    ValueTask UnbindAsync(CancellationToken cancellationToken = default(CancellationToken));
}

QUIC利用傳輸層的UDP協議實現了真正意義上的“多路複用”,所以它將對應的連接監聽器接口命名為IMultiplexedConnectionListener。它的AcceptAsync方法創建的是代表多路複用連接的MultiplexedConnectionContext對象,後者的AcceptAsync會將ConnectionContext上下文創建出來。QuicConnectionContext 類型是對MultiplexedConnectionContext的具體實現,它的AcceptAsync方法創建的就是上述的QuicStreamContext對象,該類型派生於抽象類TransportMultiplexedConnection。

public interface IMultiplexedConnectionListener : IAsyncDisposable
{
    EndPoint EndPoint { get; }
    ValueTask<MultiplexedConnectionContext?> AcceptAsync(IFeatureCollection? features = null,CancellationToken cancellationToken = default(CancellationToken));
    ValueTask UnbindAsync(CancellationToken cancellationToken = default(CancellationToken));
}

public abstract class MultiplexedConnectionContext : BaseConnectionContext
{
    public abstract ValueTask<ConnectionContext?> AcceptAsync(CancellationToken cancellationToken = default(CancellationToken));
    public abstract ValueTask<ConnectionContext> ConnectAsync(IFeatureCollection? features = null,CancellationToken cancellationToken = default(CancellationToken));
}

internal abstract class TransportMultiplexedConnection : MultiplexedConnectionContext
internal sealed class QuicConnectionContext : TransportMultiplexedConnection

KestrelServer使用的連接監聽器均由對應的工廠來構建。如下所示的IConnectionListenerFactory接口代表用來構建IConnectionListener監聽器的工廠,IMultiplexedConnectionListenerFactory工廠則用來構建IMultiplexedConnectionListener監聽器。

public interface IConnectionListenerFactory
{
    ValueTask<IConnectionListener> BindAsync(EndPoint endpoint,CancellationToken cancellationToken = default(CancellationToken));
}

public interface IMultiplexedConnectionListenerFactory
{
    ValueTask<IMultiplexedConnectionListener> BindAsync(EndPoint endpoint, IFeatureCollection? features = null,CancellationToken cancellationToken = default(CancellationToken));
}

三、總體設計

上面圍繞着“連接”介紹了一系列接口和類型,它們之間的關係體現在如圖1所示的UML中。KestrelServer啟動時會根據每個終結點支持的HTTP協議利用IConnectionListenerFactory或者IMultiplexedConnectionListenerFactory工廠來創建代表連接監聽器的IConnectionListener或者IMultiplexedConnectionListener對象。IConnectionListener監聽器會直接將代表連接的ConnectionContext上下文創建出來,IMultiplexedConnectionListener監聽器創建的則是一個MultiplexedConnectionContext上下文,代表具體連接的ConnectionContext上下文會進一步由該對象進行創建。

圖1 “連接”相關的接口和類型

四、利用連接接收請求和回覆響應

下面演示的實例直接利用IConnectionListenerFactory工廠創建的IConnectionListener監聽器來監聽連接請求,並利用建立的連接來接收請求和回覆響應。由於表示連接的ConnectionContext上下文直接面向傳輸層,接受的請求和回覆的響應都體現為二進制流,解析二進制數據得到請求信息是一件繁瑣的事情。這裏我們借用了“HttpMachine”NuGet包提供的HttpParser組件來完成這個任務,為此我們為它定義瞭如下這個HttpParserHandler類型。如果將這個HttpParserHandler對象傳遞給HttpParser對象,後者在請求解析過程中會調用前者相應的方法,我們利用這些方法利用讀取的內容將描述請求的HttpRequestFeature特性構建出來。源代碼可以從 這裏 查看。

public class HttpParserHandler : IHttpParserHandler
{
    private string? headerName = null;
    public HttpRequestFeature Request { get; } = new HttpRequestFeature();

    public void OnBody(HttpParser parser, ArraySegment<byte> data) => Request.Body = new MemoryStream(data.Array!, data.Offset, data.Count);
    public void OnFragment(HttpParser parser, string fragment) { }
    public void OnHeaderName(HttpParser parser, string name) => headerName = name;
    public void OnHeadersEnd(HttpParser parser) { }
    public void OnHeaderValue(HttpParser parser, string value) => Request.Headers[headerName!] = value;
    public void OnMessageBegin(HttpParser parser) { }
    public void OnMessageEnd(HttpParser parser) { }
    public void OnMethod(HttpParser parser, string method) => Request.Method = method;
    public void OnQueryString(HttpParser parser, string queryString) => Request.QueryString = queryString;
    public void OnRequestUri(HttpParser parser, string requestUri) => Request.Path = requestUri;
}

如下所示的演示程序利用WebApplication對象的Services屬性提供的IServicePovider對象來提供IConnectionListenerFactory工廠。我們調用該工廠的BindAsync方法創建了一個連接監聽器並將其綁定到採用5000端口本地終結點。在一個無限循環中,我們調用監聽器的AcceptAsync方法開始監聽連接請求,並最終將代表連接的ConnectionContext上下文創建出來。

using App;
using HttpMachine;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Http.Features;
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Text;

var factory = WebApplication.Create().Services.GetRequiredService<IConnectionListenerFactory>();
var listener = await factory.BindAsync(new IPEndPoint(IPAddress.Any, 5000));
while (true)
{
    var context = await listener.AcceptAsync();
    _ = HandleAsync(context!);

    static async Task HandleAsync(ConnectionContext connection)
    {
        var reader = connection!.Transport.Input;
        while (true)
        {
            var result = await reader.ReadAsync();
            var request = ParseRequest(result);
            reader.AdvanceTo(result.Buffer.End);
            Console.WriteLine("[{0}]Receive request: {1} {2} Connection:{3}",connection.ConnectionId, request.Method, request.Path, request.Headers?["Connection"] ?? "N/A");

            var response = @"HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 12

Hello World!";
            await connection.Transport.Output.WriteAsync(Encoding.UTF8.GetBytes(response));
            if (request.Headers.TryGetValue("Connection", out var value) && string.Compare(value, "close", true) == 0)
            {
                await connection.DisposeAsync();
                return;
            }
            if (result.IsCompleted)
            {
                break;
            }
        }
    }

    static  HttpRequestFeature ParseRequest(ReadResult result)
    {
        var handler = new HttpParserHandler();
        var parserHandler = new HttpParser(handler);
        parserHandler.Execute(new ArraySegment<byte>(result.Buffer.ToArray()));
        return handler.Request;
    }
}

針對連接的處理實現在HandleAsync方法中。HTTP 1.1默認會採用長連接,多個請求會使用同一個連接發送過來,所以針對單個請求的接收和處理會放在一個循環中,直到連接被關閉。請求的接收利用ConnectionContext對象的Transport屬性返回的IDuplexPipe對象來完成。簡單起見,我們假設每個請求的讀取剛好能夠一次完成,所以每次讀取的二進制剛好是一個完整的請求。讀取的二進制內容利用ParseRequest方法藉助於HttpParser對象轉換成HttpRequestFeature對象後,我們直接生成一個表示響應報文的字符串並採用UTF-8對其編碼,編碼後的響應利用上述的IDuplexPipe對象發送出去。這份手工生成的“Hello World!”響應將以圖18-5的形式呈現在瀏覽器上。

圖2 面向“連接”編程

按照HTTP 1.1規範的約定,如果客户端希望關閉默認開啟的長連接,可以在請求中添加“Connection:Close”報頭。HandleAsync方法在處理每個請求時會確定是否攜帶了此報頭,並在需要的時候調用ConnectionContext上下文的 DisposeAsync方法關閉並釋放當前連接。該方法在對請求進行處理時會將此報頭和連接的ID輸出到控制枱上。圖2所示的控制枱輸出是先後接收到三次請求的結果,後面兩次顯式添加了“Connection:Close”,可以看出前兩次複用同一個連接。