SwiftNIO 實戰之TCP粘包/拆包問題

語言: CN / TW / HK

在 TCP 程式設計中,無論是伺服器還是客戶端,當我們讀取或者傳送訊息的時候,都需要考慮 TCP 底層的粘包/拆包機制。本文先簡單介紹 TCP 粘包/拆包的基礎知識,然後模擬一個沒有考慮 TCP 粘包/拆包導致功能異常的案例,最後探討 SwiftNIO 是如何解決這個問題的。

TCP 粘包/拆包

在 Socket 通訊過程中,如果通訊的一端一次性連續傳送多條資料包,TCP 協議會將多個數據包打包成一個 TCP 報文傳送出去,這就是所謂的粘包。而如果通訊的一端傳送的資料包超過一次 TCP 報文所能傳輸的最大值時,就會將一個數據包拆成多個最大 TCP 長度的 TCP 報文分開傳輸,這就叫做拆包

一些基本概念

  • MTU(Maximum Transmission Unit):泛指通訊協議中的最大傳輸單元。一般用來說明TCP/IP四層協議中資料鏈路層的最大傳輸單元,不同型別的網路MTU也會不同,我們普遍使用的乙太網的MTU是1500,即最大隻能傳輸 1500 位元組的資料幀。

  • MSS(Maximum Segment Size):指 TCP 建立連線後雙方約定的可傳輸的最大 TCP 報文長度,是 TCP 用來限制應用層可傳送的最大位元組數。如果底層的 MTU 是 1500 byte,則 MSS = 1500 - 20(IP Header) - 20 (TCP Header) = 1460 byte。

    字 word、位元組 byte、位 bit

示意圖

假設客戶端分別傳送了兩個資料包 D1 和 D2 給伺服器,由於伺服器一次讀取的位元組數是不確定的,故可能存在以下 4 中情況:

(1)服務端分兩次讀取到了兩個獨立的資料包,分別是 D1 和 D2,沒有粘包和拆包;

(2)服務端一次接收到了兩個資料包,D1 和 D2 粘合在一起,被稱為 TCP 粘包

(3)服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的 D1 包和 D2 包的部分內容,第二次讀取到了D2 包的剩餘內容,這被稱為 TCP 拆包

(4)服務端分兩次讀取到了兩個資料包,第一次讀取到了 D1 包的部分內容 D1_1,第二次讀取到了 D1 包的剩餘內容 D1_2 和 D2 包的整包。

如果此時服務端 TCP 接收滑窗非常小,而資料包 D1 和 D2 比較大,很有可能會發生第五種可能,即服務端分多次才能將 D1 和 D2 包接收完全,期間發生多次拆包。

TCP 粘包/拆包發生的原因

問題的產生原因有三個:

(1)應用程式 write 寫入的位元組大小大於套介面傳送緩衝區大小; (2)進行 MSS 大小的 TCP 分段; (3)乙太網幀的 payload 大於 MTU 進行 IP 分片。

粘包問題的解決策略

由於底層的TCP無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下:

(1)訊息定長,例如每個報文的大小為固定長度 200 位元組,如果不夠,空位補空格;

(2)在包尾增加回車換行符進行分割,例如 FTP 協議;

(3)將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度(或者訊息體長度)的欄位,通常設計思路為訊息頭的第一個欄位使用 int32 來表示訊息的總長度;

(4)更復雜的應用層協議。

沒有考慮 TCP 粘包/拆包的案例

本例子是個回顯伺服器,客戶端傳送 Hello world, oldbird learn swiftNIO, day day up !!! 100 次,服務端原文返回 100 次。我們的期望就是伺服器和客戶端都有 100 次的計數列印。

服務端程式碼

let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
let bootstrap = ServerBootstrap(group: group)
    .serverChannelOption(ChannelOptions.backlog, value: 256)
    .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
    .childChannelInitializer { channel in
        channel.pipeline.addHandlers([ServerEchoHandler()])
    }
    .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
    .childChannelOption(ChannelOptions.maxMessagesPerRead, value: 16)
    .childChannelOption(ChannelOptions.recvAllocator, value: AdaptiveRecvByteBufferAllocator())


let defaultHost = "::1" // ipv6
let defaultPort = 8899

let channel = try bootstrap.bind(host: defaultHost, port: defaultPort).wait()
print("Server started and listening on \(channel.localAddress!)")

try channel.closeFuture.wait()
print("Server closed")

final class ServerEchoHandler: ChannelInboundHandler {
    typealias InboundIn =  ByteBuffer
    typealias OutboundOut = ByteBuffer

    private var counter: Int = 0

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        let inBuf = self.unwrapInboundIn(data)
        let str = inBuf.getString(at: 0, length: inBuf.readableBytes) ?? ""
        counter += 1
        print("Server receive:\(str), counter=\(counter)")
        /// data 是沒有換行符號的,我們需要給資料新增換行符
        let body = "\(str)\r\n"

        var buffOut = context.channel.allocator.buffer(capacity: body.count)
        buffOut.writeString(body)
        context.writeAndFlush(self.wrapOutboundOut(buffOut), promise: nil)
    }

    func channelReadComplete(context: ChannelHandlerContext) {
        context.flush()
    }

    func errorCaught(context: ChannelHandlerContext, error: Error) {
        context.close(promise: nil)
    }
}
複製程式碼

客戶端程式碼

let group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)

//        defer {
//            try! group.syncShutdownGracefully()
//        }

let bootstrap = ClientBootstrap(group: group)
    .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
    .channelInitializer { channel in
        channel.pipeline.addHandler(EchoHandler())
    }

let defaultHost = "::1"
let defaultPort = 8899

let channel = try bootstrap.connect(host: defaultHost, port: defaultPort).wait()

try channel.closeFuture.wait()

print("client closed")


final class EchoHandler: ChannelInboundHandler {
    typealias InboundIn = ByteBuffer
    typealias OutboundOut = ByteBuffer

    private var count = 0

    func channelActive(context: ChannelHandlerContext) {
        var i: Int = 0
        repeat {
            let buffer = context.channel.allocator.buffer(string: "Hello world, oldbird learn swiftNIO, day day up !!!\r\n")
            context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
            i += 1;
        } while ( i < 100 )
    }

    func channelRead(context: ChannelHandlerContext, data: NIOAny) {
        var buffer = unwrapInboundIn(data)
        if let received = buffer.readString(length: buffer.readableBytes) {
            count += 1
            print("client received: \(received), count=\(count)")
        }
    }

    func errorCaught(context: ChannelHandlerContext, error: Error) {
        context.close(promise: nil)
    }
}
複製程式碼

分別執行客戶端和服務端程式碼,當客戶端程式碼連線完成後,context.writeAndFlush 呼叫 100 次. 輸出結果過長,用 ... 代表輸出多個完整的 oldbird learn swiftNIO, day day up !!!

在伺服器端的輸出:

Server started and listening on [IPv6]::1/::1:8899
...Hello wo, counter=1
...Hello world, old, counter=2
oldbird learn swiftNIO, day day up !!!, counter=3
複製程式碼

客戶端的輸出:

[2/2] Merging module niots
...Hello wo, count=1
...Hello world, old, count=2
...Hello world, oldbird learn swiftNIO, day day up !!!, count=3
複製程式碼

從結果來看,很明顯,次數不是我們期待的 100 次,而且從這幾次的結尾資料,很容易知道發生了粘包現象,以及半包的情況。那如何才能達到我們想要功能?

使用 SwiftNIO 解決讀半包問題

首先我們需要引入 swift-nio-extras 依賴,在 Package.swift 中:


let package = Package(
    name: "niots",
    dependencies: [
        .package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
        // 加入依賴
        .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.0.0"),
    ],
    targets: [
        .target(
            name: "niots",
            dependencies: [
                .product(name:"NIO", package: "swift-nio"),
                // 加入依賴
                .product(name: "NIOExtras", package: "swift-nio-extras")
            ]),
        .testTarget(
            name: "niotsTests",
            dependencies: ["niots"]),
    ]
)
複製程式碼

服務端進行修改:


final class EchoServer {
    static func run() throws {
        ...
        .childChannelInitializer { channel in
            // ByteToMessageHandler(LineBasedFrameDecoder())
            channel.pipeline.addHandlers([ByteToMessageHandler(LineBasedFrameDecoder()),ServerEchoHandler()])
        }
        .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
        ...
    }
}
複製程式碼

新增了 ByteToMessageHandler(LineBasedFrameDecoder()),然後重新啟動客戶端,伺服器端輸出符合我們的需求:

Server receive:Hello world, oldbird learn swiftNIO, day day up !!!, counter=1
...
Server receive:Hello world, oldbird learn swiftNIO, day day up !!!, counter=99
Server receive:Hello world, oldbird learn swiftNIO, day day up !!!, counter=100
複製程式碼

客戶端的程式碼我們並沒有進行修改,它的輸出:

client received: Hello world, oldbird learn swiftNIO, day day up !!!...Hello wo, count=1
client received: rld, oldbird learn swiftNIO, day day up !!!...Hello world, old, count=2
client received: bird learn swiftNIO, day day up !!!..., count=3
複製程式碼

客戶端依舊是列印了 3 次,且發生粘包現象,不滿足預期。

然後修改客戶端程式碼如下:

final class EchoClient {
    static func run() throws {
        ...
        let bootstrap = ClientBootstrap(group: group)
            .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
            .channelInitializer { channel in
                channel.pipeline.addHandlers([ByteToMessageHandler(LineBasedFrameDecoder()),EchoHandler()])
            }

        ...
    }
}
複製程式碼

執行客戶端,輸出結果跟伺服器端一樣,100 次!

程式的執行結果完全符合預期,說明通過使用 ByteToMessageHandler(LineBasedFrameDecoder()) 成功解決了 TCP 粘包導致的讀半包問題。對於使用者來說,只要將這個 handler 新增到 ChannnelPipeline 中即可,不需要寫額外的程式碼,使用非常簡單。

LineBasedFrameDecoder 的工作原理是它依次遍歷 ByteBuf 中的可讀位元組,判斷是否有 “\n” 或者 “\r\n”,如果有,就以此位置為結束位置,從可讀的索引到結束的位置區間的位元組就組成一行。它是以換行符為結束標誌的解碼器

可能你會提出新的疑問:如果傳送的訊息不是以換行符結束怎麼辦?或者沒有回車換行符,靠訊息頭中的長度欄位來分包怎麼辦?是不是需要自己寫半解碼器?答案是否定的,swift-nio-extras 提供了多種解碼器,用來滿足不同的訴求:

  • FixedLengthFrameDecoder, 按固定的位元組數分割傳入的 ByteBuffer
  • LengthFieldBasedFrameDecoder,將傳入的 ByteBuffer 按報頭中指定的長度進行分割
  • ...

總結

本文首先對 TCP 的粘包和拆包進行了講解,給出瞭解決這個問題的通用做法,然後我們提供個回顯的例子進行驗證沒有考慮 TCP 粘包/拆包導致的問題,然後給出了一個解決方案,即利用 ByteToMessageHandler(LineBasedFrameDecoder()) 來解決 TCP 的粘包/拆包問題。

通用做法:

(1)訊息定長;

(2)在包尾增加回車換行符進行分割;

(3)將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度(或者訊息體長度)的欄位,通常設計思路為訊息頭的第一個欄位使用 int32 來表示訊息的總長度;

(4)更復雜的應用層協議;

更多閱讀請關注官方微信公眾號: Oldbirds