多圖詳解 Netty

語言: CN / TW / HK

簡單來說 Netty 就是 JBOSS 開源的一個基於 NIO 的網路程式設計框架。它可以幫助我們快速開發高效能高可靠性的網路 IO 程式。

Netty 在 Java 語言中使用非常廣泛,涉及到網路通訊的基本上都使用 Netty,很少會直接去使用原生的 NIO 元件或者是其他框架。並且像Dubbo、RocketMQ、Zookeeper、ElasticSearch 這些知名的中介軟體所使用的網路通訊框架都是基於 Netty 去實現的。

Netty 是在原生 NIO 的基礎上發展起來的框架,其中的許多理念都非常像,所以學習 Netty 前需要了解一下原生 NIO 程式設計。

原生 NIO 程式設計

在瞭解原生 NIO 程式設計之前需要了解一個基礎概念 Socket。

Socket

Netty 是基於 TCP 協議的,我們知道 TCP 協議三個重要的特點分別是面向連線、可靠的和位元組流。要達成這三點建立連線時需要客戶端與服務端達成三個資訊的共享,分別是:

  • Socket: 包含五個資訊:連線使用的協議、本地主機 IP 地址和埠號、遠端主機的 IP 地址和埠號
  • 序列號: 解決亂序問題
  • 容器大小: 用來做流量控制

Socket 就是兩臺主機之間的邏輯連線的端點,TCP 所說的面向連線,指的就是面向客戶端和服務端兩個 Socket 之間的連線。

這裡要注意的是,服務端會涉及到兩種 socket,一種叫做監聽 socket ,一種叫做已完成連線 socket 。當監聽 Socket 發現連線成功了之後會返回一個已完成連線 socket 檔案描述符,用於後續傳輸資料。

原生 NIO 元件

Netty 底層其實用了很多 Java 原生的 NIO 的元件,Netty 自定義的元件中有些理念也來自於原生的 NIO 元件。因此學習 Netty 之前需要了解一下原生的 NIO 元件的一些知識。

這裡主要講三個非常重要的元件:Channel (通道)、Buffer (緩衝區)、Selector (選擇器)。

下圖展示了這三個元件在 NIO 模型中發揮的作用:

Buffer (緩衝區)

Buffer 本質上就是一塊可以讀寫資料的記憶體塊,我們在使用的時候可以把它理解成一個數組。

下圖是 Buffer 各個類的繼承關係:

這裡著重講一下 ByteBuffer ,ByteBuffer 在原生 NIO 程式設計時使用頻率是最高的。下面主要講一下它的使用。

注意 ByteBuffer 初始化時其實是 建立並返回了一個它的子類 HeapByteBuffer 物件,我們操作的也是它的子類。

首先是初始化,初始化主要通過兩種方式:

  • **allocate(int capacity)**:建立 byte 型別的指定長度的緩衝區;
  • wrap(byte[] array): 建立 byte 型別的有內容的緩衝區。

在學習資料操作之前,有幾個 ByteBuffer 非常重要的引數和方法需要了解一下:

  • position :當前讀取或寫入的起始座標;
  • limite: 最多可以操作到哪個索引;
  • capacity: 緩衝區的總長度;
  • remaining(): 這個方法返回的是 limit - position 的計算值,代表還有多少空間可以操作。

資料操作主要是兩個方法:

  • put(): 插入位元組,它是一個過載方法,可以傳入不同形式的位元組;
  • get(): 讀取位元組,不傳參獲取 position 位置的位元組並讓 position + 1,也可以通過引數讀取指定位置的位元組。

下圖是新增位元組時各屬性值的變化:

ByteBuffer 雖然即支援讀也支援寫,但同一時間只能是其中一種模式,模式切換需要呼叫相應的方法。

下圖是呼叫 flip() 方法將寫模式切換為讀時各屬性的變化:

下圖呼叫 clear() 方法將讀切換為寫時各屬性的變化:

Channel (通道)

通常來說 NIO 所有的操作都是由通道開始的,它跟我們平常使用的流(InputStream,OutputStream)有點類似。但也有些區別:

  • 通道可以讀也可以寫,流是單向的,所以需要輸入流輸出流;
  • 通道可以非同步讀寫
  • 通道總是基於緩衝區來讀寫(將資料從通道讀取到 buffer 或者將資料以 buffer 的形式寫入到通道)

下圖是 Channel 的繼承關係:

常用的 Channel 主要有四種:

  • FileChannel: 用於檔案資料的讀寫;
  • DatagramChannel: 用於 UDP 資料的讀寫;
  • ServerSocketChannel 和 SocketChannel: 用於 TCP 資料的讀寫,前者代表服務端的通道,後者代表客戶端。

使用 ServerSocketChannel 和 SocketChannel 進行 NIO 程式設計與直接使用 ServerSocket 和 Socket 類似,這裡就不贅述了。

Selector (選擇器)

Selector 是多路複用器的一種,雖然它的效能不是最好的,但它幾乎在所有平臺上都支援,具有良好的跨平臺性。

Selector 是實現一個執行緒處理多個客戶端請求的核心元件, Channel 註冊到 Selector 上之後,如果有就緒事件產生,Selector 就會去獲取事件然後針對事件進行相應的處理。

Selector 常用方法如下:

  • open() : 靜態方法,獲取一個選擇器物件;
  • select(): 呼叫後阻塞執行緒,阻塞期間會監控所有註冊的通道,當有就緒事件需要操作時,會將 SelectionKey 放入集合並返回事件數量;
  • select(1000): 只阻塞 1000 毫秒,阻塞期間與上面的方法相同;
  • selectedKeys(): 返回集合中儲存的全部 SelectionKey 。

這些方法多次提到了 SelectionKey ,那麼 SelectionKey 是什麼呢?

SelectionKey 就是用來描述各種就緒事件的類,通過它能獲取到當前的就緒事件型別。

SelectionKey 通過 4 個常量來定義 4 種不同的就緒事件:

  • OP_READ: 值為 1 << 0,讀就緒事件,表示通道中有可讀資料,可以執行讀操作;
  • OP_WRITE: 值為 1 << 2,寫就緒事件,表示可以向通道寫資料了;
  • OP_CONNECT: 值為 1 << 3,連線就緒事件,代表客戶端與伺服器連線已經建立成功了;
  • OP_ACCEPT: = 1 << 4,接收連線就緒事件,表示伺服器監聽到了客戶端連線。

SelectionKey 通過以下 4 個靜態方法判斷當前是否是對應的就緒事件:

  • isReadable():是否是讀就緒事件;
  • isWritable():是否是寫就緒事件;
  • isConnectable():是否是連線就緒事件;
  • isAcceptable():是否是接收連線就緒事件。

原生 NIO 元件程式設計示例

下面是使用 Selector 、Channel 和 ByteBuffer 進行 NIO 程式設計的示例。

伺服器端程式碼:

package com.zephyr.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/**
 * 服務端-選擇器 
 */
public class NIOSelectorServer {
    public static void main(String[] args) throws IOException {
        //開啟一個服務端通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //繫結對應的埠號
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //通道預設是阻塞的,需要設定為非阻塞
        serverSocketChannel.configureBlocking(false);
        //建立選擇器
        Selector selector = Selector.open();
        //將服務端通道註冊到選擇器上,並指定註冊監聽的事件為OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服務端啟動成功...");
        while (true) {
            //檢查選擇器是否有事件
            int select = selector.select(2000);
            if (select == 0) {
                continue;
            }
            //獲取事件集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                //判斷事件是否是客戶端連線事件 SelectionKey.isAcceptable()
                SelectionKey key = iterator.next();
                //得到客戶端通道,並將通道註冊到選擇器上, 並指定監聽事件為OP_READ
                if (key.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客戶端已連線......" + socketChannel);
                    //必須設定通道為非阻塞, 因為selector需要輪詢監聽每個通道的事件
                    socketChannel.configureBlocking(false);
                    //並指定監聽事件為OP_READ
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                //判斷是否是客戶端讀就緒事件SelectionKey.isReadable()
                if (key.isReadable()) {
                    //得到客戶端通道,讀取資料到緩衝區
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int read = socketChannel.read(byteBuffer);
                    if (read > 0) {
                        System.out.println("客戶端訊息:" +
                                new String(byteBuffer.array(), 0, read,
                                        StandardCharsets.UTF_8));
                        //給客戶端回寫資料
                        socketChannel.write(ByteBuffer.wrap("yo yo yo, hi man".getBytes(StandardCharsets.UTF_8)));
                        socketChannel.close();
                    }
                }
                //從集合中刪除對應的事件, 因為防止二次處理.
                iterator.remove();
            }
        }
    }
}

客戶端程式碼:

package com.zephyr.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

/**
 * 客戶端 
 */
public class NIOClient {
    public static void main(String[] args) throws IOException {
        //開啟通道
        SocketChannel socketChannel = SocketChannel.open();
        //設定連線IP和埠號
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
        //寫出資料
        socketChannel.write(ByteBuffer.wrap("What's up.".getBytes(StandardCharsets.UTF_8)));
        //讀取伺服器寫回的資料
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int read=socketChannel.read(readBuffer);
        System.out.println("服務端訊息:" + new String(readBuffer.array(), 0, read, StandardCharsets.UTF_8));
        //釋放資源
        socketChannel.close(); }
}

為什麼需要 Netty

上面講了原生 NIO 相關的知識,那麼問題就來了,既然原生就有完備的 NIO 程式設計的各個元件,為什麼還需要 Netty 呢。

主要原因還是因為原生 NIO 存在一些弊端:

  • NIO 的類庫和 API 繁雜: 開發者需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等原生元件;
  • 有一定的門檻: 必須對多執行緒和網路程式設計非常熟悉,才能編寫出高質量的 NIO 程式;
  • 開發工作量和難度都非常大: 例如客戶端面臨斷連重連、網路閃斷、半包讀寫、失敗快取、網路擁塞和異常流的處理等等;
  • JDK NIO 的 Bug: 臭名昭著的 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。

而 Netty 這個框架就很好地解決了這些問題,前三個比較好理解,簡單講一下第 4 個問題是怎麼被解決的。

第 4 個問題講到了 Selector 空輪詢的 Bug,那麼,什麼是空輪詢呢?

空輪詢是指本來 Selector 呼叫 select() 方法如果沒有就緒事件在設定的時間到之前是阻塞的,但由於 Linux 底層實現有問題,導致在沒有就緒事件時也有概率直接返回,而 select() 方法一般都是放在 while (true) 迴圈裡的,這時就會開始不斷地空輪詢,直到 CPU 使用率飆到 100% 。

Netty 解決這個問題主要分別兩步:

  • 檢測空輪詢: 判斷阻塞時間小於 timeoutMillis (初始化的超時引數),且 select 執行次數大於閾值;
  • 重建 Selector : 新建立一個 Selector 並把舊 Selector 的 Channel 註冊到這個 Selector 上,然後關閉這個 Selector;

Netty 執行緒模型

接著我們學習一下 Netty 的執行緒模型,瞭解了 Netty 的執行緒模型之後我們對 Netty 的整體架構也就有了一個大致的瞭解。

由於 Netty 的執行緒模型是基於 Reactor 模型改進而來的,因此先講講 Reactor 模型,有助於我們對 Netty 執行緒模型的理解 。

Reactor 模型

Reactor 模型是指當伺服器接收到多個請求時,伺服器程式會把它們分派到不同的方法或執行緒去處理。Reactor 模式也被稱作 Dispatcher 模式。它的核心是多路複用器,多路複用器收到事件後會進行分發,這點是網路伺服器高併發的關鍵。

Reactor 模型分為三種:單 Reactor 單執行緒、單 Reactor 多執行緒和多 Reactor 多執行緒。

這三種模型按順序來看理解起來複雜度不斷提升,也會更接近 Netty 的執行緒模型,下面來分別看看這三種模型。

單 Reactor 單執行緒

這個最好理解,只有一個執行緒,只是會把建立連線和處理請求這兩種任務分發給不同的類去處理,如下圖所示:

整個流程簡單來講就是 Reactor 通過 Selector 監聽事件,收到事件使用 dispatch 對事件進行分發,如果是連線事件就由 Acceptor 進行處理,處理完成會建立一個 Handler 對後續業務進行處理。後面的資料請求都會由 Handler 進行處理。

優點:

  • 模型簡單,不會有多執行緒的那些問題

缺點:

  • 效能問題:單執行緒無法發揮多核 CPU 的效能
  • 可靠性問題:處理業務時往往容易出問題,當 Handler 出問題了,由於只有一個執行緒,整個節點也掛了

單 Reactor 多執行緒

這個執行緒模型針對前面的問題作出了一定的優化,多出了處理業務的執行緒池,如下圖所示:

前面的流程與單 Reactor 單執行緒是一致的,到 Handler 這一步就不一樣了。這個模型 Handler 只負責讀取資料和傳送資料部分,業務處理交給了 Worker 執行緒,而 Worker 執行緒是由 Worker 執行緒池統一管理的。

優點:

  • 可以充分利用多核 CPU 的處理能力

缺點:

  • 多執行緒資源共享和訪問處理會比較複雜,在主執行緒處理所有的連線、監聽和響應也會出現效能瓶頸

主從 Reactor 多執行緒

主從 Reactor 多執行緒模型又在前面的模型基礎上做了進一步優化,增加了子 Reactor ,如下圖所示:

整個流程大概可以分為以下幾步:

  • 主執行緒的 MainReactor 負責監聽連線請求,收到連線請求會由 Acceptor 進行處理,成功建立連線之後 MainReactor 會把連線分派給 SubReactor ,由 SubReactor 監聽和處理資料請求;
  • SubReactor 監聽到資料請求,會派發給 Handler 處理,Handler 只會處理讀取資料和傳送資料部分,中間業務處理部分也是放線上程池中完成。

優點:

  • MainReactor 與 SubReactor 職責分明,一個處理連線事件,一個處理資料請求;
  • MainReactor 與 SubReactor 互動邏輯比較簡單,MainReactor 單向地將建立好的連線傳遞出去;
  • 多 Reactor 設計能在高併發場景擁有更好的效能。

缺點:

  • 程式設計複雜度較高

主從 Reactor 多執行緒模式是業界非常成熟的伺服器程式設計模式,在很多中介軟體中都使用到了這種模式,像 Nginx、Memcached、Netty 等。這種模式也被稱為 1 + M + N 模式,分別代指相對少的連線執行緒(不一定為 1 ),多個 I/O 執行緒和多個業務處理執行緒。

Netty 執行緒模型

Netty 執行緒模型是基於主從 Reactor 多執行緒模型優化而來的,整體架構如下圖所示:

Netty 的執行緒模型主要分為兩部分,分別是 BossGroup 和 WorkerGroup,它們都分別管理一個或多個 NioEventLoop。每個 NioEventLoop 對應著一個執行緒,一個 Selector,一個 Executor 和一個 TaskQueue。

NioEventLoop 可以理解成一個事件迴圈,當程式啟動後每個 NioEventLoop 都會通過 Executor 啟動一個執行緒,開始執行事件迴圈,在迴圈中 Selector 會通過 select 方法阻塞並監聽就緒事件,當有事件到來時通過 processSeelectedKeys 方法處理 Selector 事件,之後再通過 runAllTasks 方法處理其他的任務。

與前面介紹的 主從 Reactor 多執行緒模型類似,BossGoup 負責連線事件,當建立連線之後會生成一個 NioSocketChannel 並註冊到 WorkGroup 其中一個 NioEventLoop 的 Selector 上。WokerGroup 中的 NioEventLoop 負責處理資料請求,當請求到來時會呼叫 processSelectedKeys 方法,其中的業務處理會依次經過 Pipeline 中的多個 Handler。

Netty 程式設計

學習完 Netty 執行緒模型,我們來看一下使用 Netty 寫出來的程式大概是什麼樣的。

服務端程式碼

Nettry 伺服器:

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {
      	// 建立 BossGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 建立 WorkerGroup
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 建立伺服器啟動類
        ServerBootstrap bootstrap = new ServerBootstrap();
        // 新增配置
        bootstrap.group(bossGroup, workerGroup) // 設定 BossGroup 和 ChildGroup
                .channel(NioServerSocketChannel.class) // 設定 Channel 具體類
                .option(ChannelOption.SO_BACKLOG, 128) // 設定連線佇列
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 設定開啟保活機制
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                      	// 把自定義 Handler 新增到 pipeline
                        socketChannel.pipeline().addLast(new NettyServerHandler()); 
                    }
                });
        // 繫結埠號
        ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(9999)).sync();
        System.out.println("伺服器啟動成功!");
        // 阻塞直到通道關閉
        channelFuture.channel().closeFuture().sync();
        // 優雅地關閉 BossGroup
        bossGroup.shutdownGracefully();
        // 優雅地關閉 WorkerGroup
        workerGroup.shutdownGracefully();
    }

}

自定義伺服器端 ChannelHandler 程式碼,只列出了主要幾個方法的實現:

public class NettyServerHandler implements ChannelInboundHandler {

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuf byteBuf = (ByteBuf) o;
        System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
        Channel channel = channelHandlerContext.pipeline().channel();
        System.out.println(channel);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("這是伺服器的響應資訊...".getBytes(CharsetUtil.UTF_8)));
    }

    @Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        System.out.println("通道註冊");
    }

    ...
      
}

客戶端程式碼

Netty 客戶端:

public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        // 建立 EventLoopGroup
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
        // 建立啟動類
        Bootstrap bootstrap = new Bootstrap();
        // 設定引數
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class) // 設定 Channel 的類
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 新增自定義 Handler
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });
        // 連線伺服器
        ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9999)).sync();
        System.out.println("客戶端啟動成功!");
        // 阻塞直到通道判斷
        channelFuture.channel().closeFuture().sync();
        // 優雅地關閉 EventLoopGroup
        eventLoopGroup.shutdownGracefully();
    }

}

自定義客戶端 ChannelHandler 程式碼:

public class NettyClientHandler implements ChannelInboundHandler {

    @Override
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("這是客戶端發來的訊息", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        ByteBuf byteBuf = (ByteBuf) o;
        System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
    }
  
  	@Override
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
				System.out.println("通道註冊");
    }

  	...  
  
}

如果對原生 NIO 程式設計比較熟悉理解上面的程式碼應該比較容易,同時也能看出使用 Netty 框架程式設計的難度是遠遠小於原生 NIO 的。

下面我們就詳細瞭解一下上面程式碼涉及的這些 Netty 元件。

Netty 的核心元件

ChannelHandler

ChannelHandler 是一個介面,繼承於它的兩個介面 ChannelInboundHandler 和 ChannelOutboundHandler 定義了很多事件處理方法,我們可以通過實現這些方法或者重寫子類的方法的來實現相應的業務邏輯。

ChannelHandler 的繼承關係如圖所示:

如果通過實現上述介面來開發,需要實現的方法中常用的有以下幾個:

public void channelActive(ChannelHandlerContext ctx)
public void channelRead(ChannelHandlerContext ctx, Object msg)
public void channelReadComplete(ChannelHandlerContext ctx) 
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)

但一般開發中自定義 Handler 會直接繼承 SimpleChannelInboundHandler ,我們自己必須要實現的就只有

protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) 這個方法,這種開發方式在繼承的時候傳入泛型指定出入站訊息型別,配合編解碼器使用會非常的方便。程式碼如下:

public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Channel active");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg);
    }
}

ChannelHandlerContext

ChannelHandlerContext 是 ChannelHandler 的上下文,它的核心就是 ChannelHandler ,它同時也儲存了 Channel、Pipeline、Executor (NioEventLoop) 等資訊。

它的繼承關係如下圖所示:

Netty 中的 Context 分為三種: HeadContext 、TailContext 和 DefaultChannelHandlerContext 。

HeadContext 和 TailContext 比較特殊,它既是 ChannelHandlerContext 也是 ChannelHandler (實現了 Handler 的介面)。

我們通過 ChannelPipeline 的 addLast() 方法新增的 Handler 都會封裝成 DefaultChannelHandlerContext 。

ChannelPipeline

ChannelPipeline 是一個介面,我們平常程式設計用到的一般是它的實現類 DefaultChannelPipeline 。

Pipeline 佇列

DefaultChannelPipline 其實就是一個管道,它維護了一個 ChannelHandlerContext 的雙鏈表佇列。

在 Pipeline 初始化時會建立頭節點和尾節點,它們的型別分別是 HeadContext 和 TailContext,所以整個連結串列至少有兩個節點。

中間的節點型別都是 DefaultChannelHandlerContext 。

連結串列如圖所示:

ChannelHandler 的傳遞性

前面說過 Handler 分為 InboundHander 和 OutboundHandler ,訊息入站時只會訪問 InboundHander ,訊息出站時只會訪問 OutboundHander 。如果既是 InboundHandler 又是 OutboundHandler 出站入站都會訪問。

而 InboundHandler 與 OutboundHandler 都具有傳遞性,不過傳遞方法有些區別:

  • InboundHander 是向後傳遞,需要呼叫 ChannelHandlerContext 的 fireChannel…() ,比如如果是傳遞 ChannelRead() 方法就要呼叫 fireChannelRead() ,那麼下一個節點的 ChannelRead() 方法就會被呼叫;
  • OutboundHandler 是向前傳遞,需要呼叫 ChannelHanderContext 的同名方法,比如如果是傳遞 write() 方法呼叫的也是 write() ,這裡下一個節點的 write() 方法就會被呼叫。

正常我們在開發中對資料的讀寫使用一個節點就夠了,不需要使用這種傳遞性,這種傳遞性一般用在編解碼器上。

無論是我們寫子類自定義的編解碼器還是使用 Netty 提供的編解碼器,它們內部都會自動呼叫這些傳遞方法,開發者對這些是無感知的。

我們瞭解這些傳遞性的最大意義在於確定在新增 Handler 到 pipeline 中時(Handle 會被封裝成 DefaultChannelHandlerContext 然後新增到佇列中去)的順序:

  • 先新增編解碼器,並且解碼器在前,編碼器在後;
  • 先新增 OutboundHandler ,後新增 InboundHandler。

Pipeline 訊息入站

訊息入站首先是 Selector 監聽到讀就緒事件,接著判斷就緒事件如果是讀事件就呼叫通道的 read() 方法,通道會把訊息讀到 ByteBuf 裡,然後把 ByteBuf 傳遞給 Pipeline 自已去處理。

Pipeline 會直接把 ByteBuf 交給 HeadContext 去處理,而 HeadContext 沒有具體的處理邏輯,會直接傳遞給下一個節點去處理。

下圖就是 Pipeline 節點的處理順序:

Pipeline 訊息出站

訊息出站與入站最大的不同是發起方。入站的訊息是通過 Selector 監聽到的。而出站是程式主動發起的。

對外寫訊息有三種方式:

  • 呼叫 channel 的 writeAndFlush(),它內部會直接呼叫 pipeline.writeAndFlush(msg),最終會從佇列尾部開始呼叫;
  • 呼叫 pipeline 的 writeAndFlush(),它內部會直接呼叫 tail.writeAndFlush(msg),最終也是從佇列尾部開始呼叫;
  • 呼叫 channelHandlerContext 的 writeAndFlush(),它內部會以當前節點為起點找到下一個 OutboundHandler 讓它去處理,最終就是從這個節點的下一個 OutboundHander 開始處理。

下圖展示了各個節點處理順序:

NioEventLoop

NioEventLoop 就是一個事件迴圈類,幾乎所有事件處理都會經過這個類,它的繼承關係如下:

NioEventLoopGroup

NioEventLoopGroup 就是 NioEventLoop 組,負責管理 NioEventLoop,當有 Channel 需要註冊的時候,NioEventLoopGroup 會輪詢找到下一個 NioEventLoop 註冊上去。在 NioEventLoopGroup 上作出的配置最終都會作用到 NioEventLoop 上。

ChannelOption

在程式初始化的時候我們可以通過 ChannelOption 對 Channel 設定一些引數,常用的引數有兩個:SO_BACKLOG 和 SO_KEEPALIVE。

下面分別講講這兩個引數 :

SO_BACKLOG

這個引數主要是用來控制 Accept 佇列的大小的 (早期的 Linux 核心是控制的 SYN 佇列的大小)。

這裡展開說一下這兩個佇列,它們都是由 Linux 核心維護的。一個是儲存第一次握手的 SYN 的佇列,系統會依次從這個佇列取出 SYN 並進行響應,一個是儲存三次握手完成後的 Accept 佇列,呼叫 accept 方法就能拿到已完成連線的 socket,反應在 Netty 裡面就是返回一個新的 Channel。

SO_KEEPALIVE

這個引數對應的是連線的保活機制 ,如果不設定這個引數,請求完成連線就會被關閉。設定了這個引數之後,連線關閉的條件變成了如果客戶端與伺服器 2 個小時沒有資料互動,那麼客戶端就會開始發探活資料報文,如果多次傳送都沒有響應,就斷開連線。

ServerBootstrap 和 Bootstrap

服務端和客戶端的啟動類,負責對 Netty 的各個元件進行配置。

伺服器端配置程式碼如下:

bootstrap.group(bossGroup, workerGroup) // 設定 BossGroup 和 ChildGroup
  .channel(NioServerSocketChannel.class) // 設定 Channel 具體類
  .option(ChannelOption.SO_BACKLOG, 128) // 設定連線佇列
  .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 設定開啟保活機制
  .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
      // 把自定義 Handler 新增到 pipeline
      socketChannel.pipeline().addLast(new NettyServerHandler()); 
    }
	});

ChannelFuture

下圖是 ChannelFuture 的繼承關係

從圖中可以看出,它繼承的 Future 介面是 Netty 自定義的介面,這個介面同時也繼承自 Java 原生的 Future 介面。

在 Netty 中最常用的是 ChannelFuture 的子類 DefaultChannelPromise ,而這個類大部分功能都是由 DefaultPromise 實現的。

DefaultPromise 阻塞執行緒使用的是 Object 的 wait() 方法,而原生 Future 的子類 FutureTask 阻塞執行緒使用的是 LockSupport 的 park() 方法。

ChannelFuture 支援新增 ChannelFutureListener ,監聽各種事件。

Unpooled

這個類如果我們在使用 Netty 程式設計時不使用編解碼器就會經常用到,它可以通過傳入的字串快速生成一個 ByteBuf (Netty 獨有的類,類似於原生的 ByteBuffer,只是它在 ByteBuffer 的基礎上做了封裝) 物件。常用的方法如下:

public static ByteBuf copiedBuffer(CharSequence string, Charset charset)

StringDecoder 和 StringEncoder

這兩個類分別是 Netty 提供的解碼器和編碼器,它們同時也是 ChannelHandler 的子類。有了這兩個編解碼器,就不再需要與 ByteBuf 打交道,程式碼寫起來也更簡潔方便。

StringDecoder

下圖是解碼器類 StringDecoder 的繼承關係,注意它的父類是實現了 ChannelInboundHandler 介面的,作用在訊息入站的時候:

如果有特殊需求需要自定義解碼器也是可以的,只要實現 MessageToMessageDecoder 介面就可以了。

寫法如下:

public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        list.add(byteBuf.toString(CharsetUtil.UTF_8));
    }
}

StringEncoder

下圖是編碼器類 StringEncoder 的繼承關係,注意它的父類是實現了 ChannelOutboundHandler 介面的,作用在訊息出站:

如果要自定義編碼器,實現 MessageToMessageEncoder 介面就行了。

寫法如下:

public class MessageEncoder extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
        list.add(Unpooled.copiedBuffer(s, CharsetUtil.UTF_8));
    }
}

如果嫌為自定義編碼器和自定義解碼器分別建立一個類太麻煩,還可以直接繼承 MessageToMessageCodec 介面。

這個介面繼承關係如下,注意它的父類同時實現了 ChannelInboundHandler 和 ChannelOutboundHandler ,作用在訊息入站和出站:

寫法如下:

public class MessageCodec extends MessageToMessageCodec<ByteBuf, String> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
        list.add(Unpooled.copiedBuffer(s, CharsetUtil.UTF_8));
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        list.add(byteBuf.toString(CharsetUtil.UTF_8));
    }

}

LineBasedFrameDecoder 與 DelimiterBasedFrameDecoder

這兩個類也都是解碼器,但它們解決的問題與上面所講的編解碼器不同,這兩個類主要是解決粘包拆包的問題。

那麼問題來了,什麼是粘包和拆包?為什麼會出現粘包和拆包呢?

首先來說說什麼是粘包和拆包:

在文章開始講了 TCP 的三個重要的特點:面向連線、可靠的和位元組流。而 Netty 底層是基於 TCP 的,它的客戶端與服務端互動時傳送的資料在傳輸層都是通過位元組流傳輸的,位元組流是沒有界線的概念的,這時伺服器在讀取資料時就可能在一次讀取中讀取到到客戶端分幾次發的資料,這就叫粘包。如果客戶端傳送一次資料,伺服器分幾次才能完整讀到,這就是拆包。

粘包拆包大致如下圖所示:

粘包拆包大致有以下幾個原因:

  • socket緩衝區與滑動視窗: 在傳送資料的時,傳送方必須要先確認接收方的視窗沒有被填充滿,如果沒有填滿,則可以傳送
  • MSS/MTU限制
  • Nagle演算法: Nagle演算法是為了儘可能傳送大塊資料,避免網路中充斥著許多小資料塊。

Netty 中解決粘包拆包的方法:

  • FixedLengthFrameDecoder:固定長度拆包器,使用固定長度進行拆分;
  • LineBasedFrameDecoder: 行拆包器,使用換行符進行拆分;
  • DelimiterBasedFrameDecoder: 分隔符拆包器,使用自定義的分隔符進行拆分;
  • LengthFieldBasedFrameDecoder:基於資料包長度的拆包器,基於應用層協議中傳過來的長度進行拆分。

最常用的就是中間兩個 LineBasedFrameDecoder 和 DelimiterBasedFrameDecoder。

總結

以上就是 Netty 程式設計相關的知識點。Netty 的元件非常多,可以自定義的地方也非常多,但熟悉這些元件之後使用它們程式設計會非常方便快捷。