(九)Java網路程式設計無冕之王-這回把大名鼎鼎的Netty框架一網打盡!

語言: CN / TW / HK

theme: channing-cyan

本文正在參加「金石計劃 . 瓜分6萬現金大獎」

引言

   現如今的開發環境中,分散式/微服務架構大行其道,而分散式/微服務的根基在於網路程式設計,而Netty恰恰是Java網路程式設計領域的無冕之王。Netty這個框架相信大家定然聽說過,其在Java網路程式設計中的地位,好比JavaEE中的Spring

   當然,這樣去聊它大家可能無法實際感受出它的重要性,那先來看看基於Netty構建的應用:
基於Netty構建的開源元件
觀察上述列出的開源元件,一眼望去幾乎全是各個領域中大名鼎鼎的框架,而這些元件都是基於Netty構建的,涵蓋中介軟體、大資料、離線計算、分散式、RPC、No-SQL等各個方向....,很明顯的可感知出Netty的地位之高,因此如果要打造一款Java高效能的網路通訊程式、想要真正熟知分散式架構的底層原理,Netty成為了每個Java開發進階必須要掌握的核心技術之一。

Netty的重要性不言而知,但網上相關的大部分視訊、文章、書籍等資料卻五花八門,很難真正幫助大家構建出一套完整的體系,本文的目的就是帶諸位走入基於Netty的網路世界,在真正意義上為諸君構建一套Netty的知識儲備。

本文會先從概念開始,到基礎入門、核心元件依次展開,後續結合多個實戰案例全面詳解Netty的應用。但在學習之前,大家最好有Java-IO體系、多路複用模型等相關知識的儲備,如若未曾具備請先移步:《Java-IO機制全解》《多路複用模型剖析》兩文,前者是必須,後者則暫且無需掌握,因為在後續的《Netty原始碼篇》中才會涉及。

一、初識Netty的基礎概念與快速入門

作者
   注意看:上圖中右邊這位黑眼圈堪比熊貓眼的哥們,從他頭頂的髮量就能明顯感受出其技術強度,他!!!旁邊的這位才是Netty框架的原作者Trustin Lee(韓國人),同時他也是另一個著名網路框架Mina的核心主程之一,現任職於Apple蘋果集團......,不過多介紹作者了,總之是一位網路方面的大牛。

   重點來聊聊我們的主角:Netty框架,其實這個框架是基於Java原生NIO技術的進一步封裝,在其中對Java-NIO技術做了進一步增強,作者充分結合了Reactor執行緒模型,將Netty變為了一個基於非同步事件驅動的網路框架,Netty從誕生至今共釋出了五個大版本,但目前最常用的反而並非是最新的5.x系列,而是4.x系列的版本,原因在於Netty本身就是基於Java-NIO封裝的,而JDK本身又很穩定,再加上5.x版本並未有太大的效能差異,因此4.x系列才是主流。

   再回過頭來思考一個問題:為什麼Netty要二次封裝原生NIO呢?相信看過NIO原始碼的小夥伴都清楚,原生的NIO設計的特別繁瑣,而且還存在一系列安全隱患,因此Netty則是抱著簡化NIO、解決隱患、提升效能等目的而研發的。

不過有意思的一點在於:Netty雖然是基於Java-NIO封裝的框架,但實際使用起來卻跟之前聊到的Java-AIO(NIO2)技術有些相似。

1.1、Netty的入門例項

   上面扯了不少Netty的概念,現在就直接先實操一番快速入門,畢竟程式設計講究施展出真理,首先第一步則是新增對應的依賴,如下: xml <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.43.Final</version> </dependency> 然後先建立NettyServer服務端,程式碼如下: java public class NettyServer { public static void main(String[] args) throws InterruptedException { // 建立兩個EventLoopGroup,boss:處理連線事件,worker處理I/O事件 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); // 建立一個ServerBootstrap服務端(同之前的ServerSocket類似) ServerBootstrap server = new ServerBootstrap(); try { // 將前面建立的兩個EventLoopGroup繫結在server上 server.group(boss,worker) // 指定服務端的通道為Nio型別 .channel(NioServerSocketChannel.class) // 為到來的客戶端Socket新增處理器 .childHandler(new ChannelInitializer<NioSocketChannel>() { // 這個只會執行一次(主要是用於新增更多的處理器) @Override protected void initChannel(NioSocketChannel ch) { // 新增一個字元解碼處理器:對客戶端的資料解碼 ch.pipeline().addLast( new StringDecoder(CharsetUtil.UTF_8)); // 新增一個入站處理器,對收到的資料進行處理 ch.pipeline().addLast( new SimpleChannelInboundHandler<String>() { // 讀取事件的回撥方法 @Override protected void channelRead0(ChannelHandlerContext ctx,String msg) { System.out.println("收到客戶端資訊:" + msg); } }); } }); // 為當前服務端繫結IP與埠地址(sync是同步阻塞至連線成功為止) ChannelFuture cf = server.bind("127.0.0.1",8888).sync(); // 關閉服務端的方法(之後不會在這裡關閉) cf.channel().closeFuture().sync(); }finally { // 優雅停止之前建立的兩個Group boss.shutdownGracefully(); worker.shutdownGracefully(); } } } 緊接著再構建一個NettyClient客戶端,程式碼如下: java public class NettyClient { public static void main(String[] args) { // 由於無需處理連線事件,所以只需要建立一個EventLoopGroup EventLoopGroup worker = new NioEventLoopGroup(); // 建立一個客戶端(同之前的Socket、SocketChannel) Bootstrap client = new Bootstrap(); try { client.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // 新增一個編碼處理器,對資料編碼為UTF-8格式 sc.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8)); } }); // 與指定的地址建立連線 ChannelFuture cf = client.connect("127.0.0.1", 8888).sync(); // 建立連線成功後,向服務端傳送資料 System.out.println("正在向服務端傳送資訊......"); cf.channel().writeAndFlush("我是<竹子愛熊貓>!"); } catch (Exception e){ e.printStackTrace(); } finally { worker.shutdownGracefully(); } } } 先看執行結果吧,控制檯輸出如下: ```java NettyServer控制檯輸出: 收到客戶端資訊:我是<竹子愛熊貓>!

NettyClient控制檯輸出: 正在向服務端傳送資訊...... `` 從結果中很容易看出這個案例中做了什麼事情,其實無非就是利用Netty實現了簡單的對端通訊,實現的功能很簡單,但對於未學習過Netty`技術的小夥伴,在程式碼方面估計有些許懵,那麼接下來簡單的解釋一下程式碼。

但在此之前先宣告一點:Netty是支援鏈式程式設計的一個框架,也就是如上述中的程式碼呼叫,所有的方法都可以一直用.連下去,所以在Netty的應用中會見到大量的這類寫法。

上述案例的程式碼,說複雜呢也其實並不難,相信認真看完了之前《Java-IO篇》的小夥伴多少都能看懂程式碼,不理解的地方估計就在於其中出現的幾個新的概念:EventLoopGroup、ServerBootstrap、childHandler,我們先對於這些概念做簡單解釋,後續會重點剖析: - EventLoopGroup:可以理解成之前的Selector選擇器,但結合了執行緒池(後續詳細分析)。 - ServerBootstrap/Bootstrap:類似於之前的ServerSocketChannel/SocketChannel。 - childHandler:這個是新概念,可以理解成過濾器,在之前的Servlet程式設計中,新請求到來都會經過一個個的過濾器,而這個處理器也類似於之前的過濾器,新連線到來時,也會經過新增好的一系列處理器。

OK~,對於上述幾個新概念有了簡單認知後,接著把上面案例的完整流程分析一下: - ①先建立兩個EventLoopGroup事件組,然後建立一個ServerBootstrap服務端。 - ②將建立的兩個事件組boss、worker繫結在服務端上,並指定服務端通道為NIO型別。 - ③在server上新增處理器,對新到來的Socket連線進行處理,在這裡主要分為兩類: - ChannelInitializer:連線到來時執行,主要是用於新增更多的處理器(只觸發一次)。 - addLast():通過該方式新增的處理器不會立馬執行,而是根據處理器型別擇機執行。 - ④為建立好的服務端繫結IP及埠號,呼叫sync()意思是阻塞至繫結成功為止。 - ⑤再建立一個EventLoopGroup事件組,並建立一個Bootstrap客戶端。 - ⑥將事件組繫結在客戶端上,由於無需處理連線事件,所以只需要一個事件組。 - ⑦指定Channel通道型別為NIO、新增處理器.....(同服務端類似) - ⑧與前面服務端繫結的地址建立連線,由於預設是非同步的,也要呼叫sync()阻塞。 - ⑨建立連線後,客戶端將資料寫入到通道準備傳送,首先會先經過新增好的編碼處理器,將資料的格式設為UTF-8。 - ⑩伺服器收到資料後,會先經過解碼處理器,然後再去到入站處理,執行對應的Read()方法邏輯。 - ⑪客戶端完成資料傳送後,先關閉通道,再優雅關閉建立好的事件組。 - ⑫同理,服務端工作完成後,先關閉通道再停止事件組。

結合上述的流程,再去看一遍給出的案例原始碼,相信諸位應該可以徹底理解。不過需要注意的一點是:Netty的大部分操作都是非同步的,比如地址繫結、客戶端連線等。好比呼叫connect()方法與服務端建立連線時,主執行緒會把這個工作交給事件組中的執行緒去完成,所以此刻如果主執行緒直接去向通道中寫入資料,有機率會出現報錯,因為實際生產環境中,可能由於網路延遲導致連線建立的時間有些長,此時通道並未建立成功,因此嘗試傳送資料時就會有問題,這點與之前的Java-AIO通訊案例中,客戶端建立連線要呼叫.get()方法是同理。

到這裡,你對Netty框架已經入門了,接著咱們重點聊聊Netty中的一些核心元件。

二、Netty框架核心元件:啟動器與事件組

   對於Netty有了基本的認知後,接下來慢慢的熟悉這個框架吧,先依次來看看其中的一些核心元件,瞭解這些元件及作用後,才能真正意義上的“玩轉Netty”。

2.1、啟動器-ServerBootstrap、Bootstrap

   ServerBootstrap、Bootstrap這兩個元件應該無需過多解釋,上個表格對比大家就理解了: 對比項 | 服務端 | 客戶端 :-:|:-:|:-: BIO | ServerSocket | Socket NIO | ServerSocketChannel | SocketChannel AIO | AsynchronousServerSocketChannel | AsynchronousSocketChannel Netty | ServerBootstrap | Bootstrap

從上表中能明顯感覺出它倆在Netty中的作用,無非就是服務端與客戶端換了個叫法而已。

2.2、事件組-EventLoopGroup、EventLoop

   這兩個東西比較重要,但同時也比較抽象,EventLoop這東西翻譯過來就是事件迴圈的意思,你可以把它理解成NIO中的Selector選擇器,實際它本質上就是這玩意兒,因為內部會維護一個Selector,然後由一條執行緒會迴圈處理Channel通道上發生的所有事件,所以每個EventLoop物件都可以看成一個單執行緒執行器。

   EventLoopGroup可以將其理解成AIO中的AsynchronousChannelGroup可能會更合適,在AIOACG(前面那玩意兒的縮寫)中,我們需要手動指定一個執行緒池,然後AIO的所有客戶端工作都會使用執行緒池中的執行緒進行管理,而Netty中的EventLoopGroup就類似於AIO-ACG這玩意兒,只不過不需要我們管理執行緒池了,而是Netty內部維護。

EventLoopGroup、EventLoop有了基本認知後,你再點進它們的原始碼實現,其實能夠觀測到:其實它們繼承了兩個類,一個是Netty自己實現的有序執行緒池OrderedEventExecutor類,另一個則JDK提供的原生定時排程執行緒池ScheduledExecutorService類(原始碼篇會詳細分析,這裡先簡單瞭解)。

看過之前關於《JDK執行緒池》文章的小夥伴應該清楚,既然EventLoop/EventLoopGroup繼承自JDK原生的定時執行緒池,那也就代表著:它擁有JDK執行緒池中所有提供的方法,同時也應該會支援執行非同步任務、定時任務的功能。那麼實際情況是這樣嗎?答案是Yes,如下: ```java public static void main(String[] args) { EventLoopGroup threadPool = new NioEventLoopGroup(); // 遞交Runnable型別的普通非同步任務 threadPool.execute(()->{ System.out.println("execute()方法提交的任務...."); }); // 遞交Callable型別的有返回非同步任務 threadPool.submit(() -> { System.out.println("submit()方法提交的任務...."); return "我是執行結果噢!"; }); // 遞交Callable型別的延時排程任務 threadPool.schedule(()->{ System.out.println("schedule()方法提交的任務,三秒後執行...."); return "排程執行後我會返回噢!"; },3,TimeUnit.SECONDS); // 遞交Runnable型別的延遲間隔排程任務 threadPool.scheduleAtFixedRate(()->{ System.out.println("scheduleAtFixedRate()方法提交的任務...."); },3,1,TimeUnit.SECONDS); }

/ ~~~~~~~~~~~~~~~~~~我是性感的分割線~~~~~~~~~~~~~~~~~~ / 執行結果如下: 立即執行: execute()方法提交的任務.... submit()方法提交的任務....

延時三秒後執行:
    schedule()方法提交的任務....
    scheduleAtFixedRate()方法提交的任務....

之後沒間隔一秒執行:
    scheduleAtFixedRate()方法提交的任務....
    scheduleAtFixedRate()方法提交的任務....

`` 上述我們建立了一個EventLoopGroup事件迴圈組,然後通過之前JDK執行緒池提供的一系列的提交任務的方法,向其遞交了幾個非同步任務,然後執行該程式,答案顯而易見,EventLoopGroup確實可以當做JDK`原生執行緒池來使用。

當然,這些並非分析的重點,重點來看看EventLoopGroup如何在Netty中合理運用。

在瞭解它們的Netty用法之前,先來看看除原生執行緒池之外所提供的方法: - EventLoop.inEventLoop(Thread):判斷一個執行緒是否屬於當前EventLoop。 - EventLoop.parent():判斷當前EventLoop屬於哪一個事件迴圈組。 - EventLoopGroup.next():獲取當前事件組中的下一個EventLoop(執行緒)。

這些方法我們簡單瞭解即可,因為大多數情況下在Netty原始碼中才會用到,暫且無需關注太多,我們先把目光移到前面給出的Netty使用案例中,還記得最開始定義的兩個事件組嗎? java EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); 為什麼在服務端要定義兩個組呢?一個難道不行嗎?其實也是可以的,但定義兩個組的好處在於:可以讓Group中的每個EventLoop分工更加明確,不同的Group分別處理不同型別的事件,各司其職。

在前面案例中,為服務端綁定了兩個事件迴圈組,也就代表著會根據ServerSocketChannel上觸發的不同事件,將對應的工作分發到這兩個Group中處理,其中boss主要負責客戶端的連線事件,而worker大多數情況下負責處理客戶端的IO讀寫事件。

當客戶端的SocketChannel連線到來時,首先會將這個註冊事件的工作交給boss處理,boss會呼叫worker.register()方法,將這條客戶端連線註冊到worker工作組中的一個EventLoop上。前面提到過:EventLoop內部會維護一個Selector選擇器,因此實際上也就是將客戶端通道註冊到其內部中的選擇器上。

注意:將一個Socket連線註冊到一個EventLoop上之後,這個客戶端連線則會和這個EventLoop繫結,以後這條通道上發生的所有事件,都會交由這個EventLoop處理。

到這裡大家應該也理解了為何要拆出兩個EventLoopGroup,主要目的就在於分工更為明細。當然,由於EventLoopGroup本質上可以理解成一個執行緒池,其中存在的執行緒資源自然是有限的,那此時如果到來的客戶端連線大於執行緒數量怎麼辦呢?這是不影響的,因為Netty本身是基於Java-NIO封裝的,而NIO底層又是基於多路複用模型實現的,天生就能實現一條執行緒管理多個連線的功能,所以就算連線數大於執行緒數,也完全可以Hold住。

OK~,除開可以根據事件型別劃分Group之外,也可以根據為每個處理器劃分不同的事件組,如下: // 建立EventLoopGroup和JDK原生的執行緒池一樣,可以指定執行緒數量 EventLoopGroup extra = new NioEventLoopGroup(2); sc.pipeline().addLast(extra, new xxxChannelHandler()); 這樣做的好處在於什麼呢?因為前面提到過:一個連線註冊到EventLoop,之後所有的工作都會由這個EventLoop處理,而一個EventLoop又有可能同時管理多個連線,因此假設一條連線上的某個處理器,執行過程非常耗時,此時必然就會影響到這個EventLoop管理的其他連線,因此對於一些較為耗時的Handler,可以專門指派給一個額外的extra事件組處理,這樣就不會影響到所管理的其他連線。

當然,這個功能其實也略微有些雞肋,一般多個Handler之間都會存在耦合關係,下一個Handler需要依賴上一個Handler的處理結果執行,因此也很難拆出來單獨放到另一個事件組中執行。

看到這裡,相信你對於EventLoopGroup、EventLoop這兩個元件應該有了基本認知,簡單來說可以EventLoop理解成有一條執行緒專門維護的Selector選擇器,而EventLoopGroup則可以理解成一個有序的定時排程執行緒池,負責管理所有的EventLoop。舉個生活案例來加深印象:

現在有個工廠,其中分為了不同的片區,一個片區中有很多條流水線,由每個工人負責一部分流水線的作業。開始工作後,流水線的傳輸帶會源源不斷的將貨物傳遞過來,這些貨物最終會等待工人進行加工。

例子
在上述這個例子中,工廠就是ServerBootstrap服務端,而一個個片區就是不同的EventLoopGroup事件組,一條流水線則可以理解成一個SocketChannel客戶端通道,而負責多條流水線的工人就是EventLoop單執行緒執行器,加工的動作其實就是處理通道上發生的事件。

大家可以將這個例子套進去想象一下,相信這會讓你印象更加深刻。

三、Netty中的增強版通道(ChannelFuture)

   對於通道這個概念,相信諸位都不陌生,這也是Java-NIO、AIO中的核心元件之一,而在Netty中也對其做了增強和拓展。首先來看看通道型別,Netty根據不同的多路複用函式,分別拓展出了不同的通道型別:
Channel拓展
- NioServerSocketChannel:通用的NIO通道模型,也是Netty的預設通道。 - EpollServerSocketChannel:對應Linux系統下的epoll多路複用函式。 - KQueueServerSocketChannel:對應Mac系統下的kqueue多路複用函式。 - OioServerSocketChannel:對應原本的BIO模型,用的較少,一般用原生的。

當然,對於客戶端的通道也可以選擇TCP、UDP...型別的,就不再介紹了,重點來看看Netty中是如何對於通道類做的增強。

其實在Netty中,主要結合了JDK提供的Future介面,對通道類做了進一步增強。

增強的方面主要是支援了非同步,但並非Future那種偽非同步,而是跟之前聊到過的《CompletableFuture》有些類似,支援非同步回撥處理結果。還記得之前客戶端如何連線服務端的嘛?如下:
java Bootstrap client = new Bootstrap(); client.connect("127.0.0.1", 8888); 但這個connect()連線方法,本質上是一個非同步方法,返回的並不是Channel物件,而是一個ChannelFuture物件,如下: java public ChannelFuture connect(String inetHost, int inetPort); 也包括ServerBootstrap繫結地址的bind()也相同,返回的並非ServerChannel,也是一個ChannelFuture物件。這是因為在Netty的機制中,繫結/連線工作都是非同步的,因此如果要用Netty建立一個客戶端連線,為了確保連線建立成功後再操作,通常情況下都會再呼叫.sync()方法同步阻塞,直到連線建立成功後再使用通道寫入資料,如下: java // 與服務端建立連線 ChannelFuture cf = client.connect("127.0.0.1", 8888); // 同步阻塞至連線建立成功為止 cf.sync(); // 連線建立成功後再獲取對應的Socket通道寫入資料 cf.channel().writeAndFlush("..."); 上述這種方式能夠確保連線建立成功後再寫資料,但既然Netty中的繫結、連線等這些操作都是非同步的,有沒有辦法讓整個過程都是非同步的呢?

答案是當然有,如何操作呢?

我們可以向ChannelFuture中添加回調處理器,然後非同步處理,如下: java ChannelFuture cf = client.connect("127.0.0.1", 8888); cf.addListener((ChannelFutureListener) cfl -> { // 這裡可以用cf,也可以用cfl,返回的都是同一個channel通道 cf.channel().writeAndFlush("..."); }); 當通過connect()方法與服務端建立連線時,Netty會將這個任務交給當前Bootstrap繫結的EventLoopGroup中的執行緒執行,因此建立連線的過程是非同步的,所以會返還一個ChannelFuture物件給我們,而此時可以通過該物件的addListener()方法編寫成功回撥邏輯,當連線建立成功後,會由對應的執行緒來執行其中的程式碼,因此可以實現全過程的非同步操作。

這樣做,似乎的確實現了整個過程的非同步,甚至關閉通道的過程也可以換成非同步的,如下:

java // 非同步關閉Channel通道 ChannelFuture closeCF = cf.channel().closeFuture(); // 通道關閉後,新增對應的回撥函式 closeCF.addListener((ChannelFutureListener) cfl -> { // 關閉前面建立的EventLoopGroup事件組,也可以在這裡做其他善後工作 worker.shutdownGracefully(); });Netty中為何要將大量的操作都抽象成非同步執行呢?這不是反而讓邏輯更加複雜化嗎?讓發起連線、建立連線、傳送資料、接收資料、關閉連線等一系列操作,全部交由呼叫的那條執行緒執行不可以嗎?答案是可以的,但非同步能在一定程度上提升效能,尤其是併發越高,帶來的優勢更為明顯。

對於這段話大家估計會有疑惑,為什麼能提升效能呢?下面舉個例子理解。

3.1、為何Netty所有API都是非同步式操作?

相信大家一定在生活中見過這樣的場景:醫院看病/體檢、銀行開戶、政府辦事、法院起訴、保險公司買保險等等,各類辦理業務的地方,都會拿號辦理,然後經過一個個的視窗辦理不同的業務,那為什麼要這麼做呢?就拿常規的醫院看病來說,為什麼會分為如下步驟呢? - 導診處:先說明大致情況,導診人員根據你的病理,指導你掛什麼科的號。 - 掛號處:去到對應的病理科排隊掛號(暫且不考慮繳費,假設網上繳掛號費)。 - 診斷室:跟著掛的號找到對應的科室,醫生根據你的情況進行診斷。 - 化驗處:從你身上提取一些標本,然後去到化驗處等待化驗結果。 - 繳費處:醫生根據化驗結果分析病情,然後給出具體的治療方案,讓你來繳費。 - 拿藥/治療處:交完相關的費用後,根據治療方案進行拿藥/治療等處理措施。

有上述這些步驟實際上並不奇怪,問題是在於每個步驟都分為了專門的科室處理,因此以上述流程為例,至少需要有六個醫生提供服務,那麼為什麼不專門由這六位醫生專門提供全系列服務呢?如下: 看病流程
我們分析一下,假設此時每個步驟平均要五分鐘,一個病人的完整流程下來就需要半小時,而下一批預約看病的其他病人,則需要等待半小時後才能被受理,而把這些步驟拆開之後再來看看:
非同步拆分
此時有六位醫生各司其職,每位醫生負責單一的工作,這樣做的好處在於:每個掛號的病人只需要等待五分鐘,就能夠被受理,通過這種方式就將之前批次式看病,轉變為了流水線式看病。

Netty框架中的非同步處理方式,也具備異曲同工之妙,將API的操作從批處理轉變成了流式處理。套入實際的業務中,也就是主執行緒(呼叫API的執行緒)無需等待操作完成後再執行,而是呼叫某個API後可繼續往下執行,相較而言,在併發情況下能很大程度上提升程式效能。

但上述這個例子估計有些小夥伴照舊會犯迷糊,那接著再舉個更加形象化的例子,好比快遞小哥送貨,如果以同步模式工作,將一個貨物送達指定地點後,需要等待客戶簽收才能去送下個貨物,這無疑會讓下個客戶等很久很久,並且也極其影響快遞小哥的工作效率。

而採用非同步模式工作,快遞小哥將一個貨物送達指定地點後,給對應客戶發個資訊後,就立馬趕往下個客戶的貨物地點,前面的客戶拿到貨物後,再給快遞小哥回個資訊即可。在這種非同步工作模式中,小哥無需在原地“阻塞”等待客戶簽收,只需要將手中一個個貨物送達指定地點就行,這在很大程度上提升了整體工作效率,每個客戶之間拿到貨物的時間也大大縮短了,Netty框架中的非同步思想也是同理。

3.2、ChannelFuture、Netty-Future、JDK-Future的關係

當大家試圖翻閱ChannelFuture的實現時,會發現該類繼承了Future介面: java public interface ChannelFuture extends Future<Void> { // 省略內部方法..... } 但要注意,這個Future介面並非是JDK原生的Future介面,而是Netty框架中的Future介面: ```java package io.netty.util.concurrent;

public interface Future extends java.util.concurrent.Future { // 省略內部方法..... } `` 此時會發現,Netty-Future又繼承自JDK-Future介面,這也就意味著Netty-Future拓展了JDK-Future介面的功能,在之前[《併發程式設計-非同步任務》](http://juejin.cn/post/7038471861860040741#heading-2)中,咱們曾詳細聊到過JDK原生的Future類,雖說基於Future+Callable可以實現非同步回撥,但這種方式實現的非同步回撥則是一種“偽非同步”,為啥呢?先來看看JDK-Future提供的核心方法: 方法名 | 方法作用 :-:|:-:isDone()| 判斷當前非同步任務是否結束cancel()| 取消當前非同步任務isCancel()| 判斷當前非同步任務是否被取消get()` | 阻塞等待當前非同步任務執行完成

JDK-Future介面中,想要獲取一個非同步任務的執行結果,此時只能呼叫get()方法,但該方法是一個阻塞方法,呼叫後會阻塞主執行緒直到任務結束為止,這顯然依舊會導致非同步變為同步執行,所以這種方式是一種“偽非同步”,此時再來看看Netty-Future中增強的核心方法:
方法名 | 方法作用 :-:|:-: getNow() | 非阻塞式獲取任務結果,任務未執行完成時返回null sync() | 阻塞等待至非同步任務執行結束,執行出錯時會丟擲異常 await() | 阻塞等待至非同步任務執行結束,執行出錯時不會丟擲異常 isSuccess() | 判斷任務是否執行成功,如果為true代表執行成功 cause() | 獲取任務執行出錯時的報錯資訊,如果執行未出錯,則返回null addLinstener() | 添加回調方法,非同步任務執行完成後會主動執行回撥方法中的程式碼

在原生JDK-Future的基礎上,Netty-Future新增了一個異常檢測機制,當非同步任務執行出錯時,可以通過cause()方法處理異常,同時也基於回撥模式,可通過addLinstener()方法新增非同步執行後的回撥邏輯,從而讓主執行緒建立任務後永遠不會阻塞,做到了真正意義上的非同步執行。

當然,除開基本的Future介面外,Netty框架中還有一個Promise介面,該介面繼承自Netty-Future介面: java public interface Promise<V> extends Future<V> { // 省略內部方法..... } 這個介面中主要多拓展了兩個方法: 方法名 | 方法作用 :-:|:-: setSuccess() | 設定任務的執行狀態為成功 setFailure() | 設定任務的執行狀態為失敗

這兩個方法可以用來設定非同步任務的執行狀態,因此Promise介面除開具備Netty-Future的功能外,還能作為多個執行緒之間傳遞非同步任務結果的容器。

3.3、不同Future的效果測試

```java public class FutureDemo {

// 測試JDK-Future的方法
public static void jdkFuture() throws Exception {
    System.out.println("--------JDK-Future測試--------");
    // 建立一個JDK執行緒池用於執行非同步任務
    ExecutorService threadPool = Executors.newSingleThreadExecutor();

    System.out.println("主執行緒:步驟①");

    // 向執行緒池提交一個帶有返回值的Callable任務
    java.util.concurrent.Future<String> task =
            threadPool.submit(() ->
                "我是JDK-Future任務.....");
    // 輸出獲取到的任務執行結果(阻塞式獲取)
    System.out.println(task.get());

    System.out.println("主執行緒:步驟②");
    // 關閉執行緒池
    threadPool.shutdownNow();
}

// 測試Netty-Future的方法
public static void nettyFuture(){
    System.out.println("--------Netty-Future測試--------");
    // 建立一個Netty中的事件迴圈組(本質是執行緒池)
    NioEventLoopGroup group = new NioEventLoopGroup();
    EventLoop eventLoop = group.next();

    System.out.println("主執行緒:步驟①");

    // 向執行緒池中提交一個帶有返回值的Callable任務
    io.netty.util.concurrent.Future<String> task =
            eventLoop.submit(() ->
                "我是Netty-Future任務.....");

    // 新增一個非同步任務執行完成之後的回撥方法
    task.addListener(listenerTask ->
            System.out.println(listenerTask.getNow()));

    System.out.println("主執行緒:步驟②");
    // 關閉事件組(執行緒池)
    group.shutdownGracefully();
}

// 測試Netty-Promise的方法
public static void nettyPromise() throws Exception {
    System.out.println("--------Netty-Promise測試--------");
    // 建立一個Netty中的事件迴圈組(本質是執行緒池)
    NioEventLoopGroup group = new NioEventLoopGroup();
    EventLoop eventLoop = group.next();

    // 主動建立一個傳遞非同步任務結果的容器
    DefaultPromise<String> promise = new DefaultPromise<>(eventLoop);
    // 建立一條執行緒執行,往結果中新增資料
    new Thread(() -> {
        try {
            // 主動丟擲一個異常
            int i = 100 / 0;
            // 如果非同步任務執行成功,向容器中新增資料
            promise.setSuccess("我是Netty-Promise容器:執行成功!");
        }catch (Throwable throwable){
            // 如果任務執行失敗,將異常資訊放入容器中
            promise.setFailure(throwable);
        }
    }).start();
    // 輸出容器中的任務結果
    System.out.println(promise.get());
}

public static void main(String[] args) throws Exception {
    jdkFuture();
    nettyFuture();
    nettyPromise();
}

} `` 在上述的測試類中,存在三個測試方法: -jdkFuture():測試JDK-Future的方法。 -nettyFuture():測試Netty-Future的方法。 -nettyPromise():測試Netty-Promise`的方法。

接著啟動對應的類,來看看控制檯的輸出結果: ```log --------JDK-Future測試-------- 主執行緒:步驟① 我是JDK-Future任務..... 主執行緒:步驟②

--------Netty-Future測試-------- 主執行緒:步驟① 主執行緒:步驟② 我是Netty-Future任務.....

--------Netty-Promise測試-------- Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero ........ 首先來對比一下`JDK-Future、Netty-Future`兩者之間的差別,在使用`JDK-Future`時,想要獲取非同步任務的執行結果,呼叫`get()`方法後會阻塞主執行緒,也就是主執行緒的步驟②,需要等到非同步任務執行完成後才會繼續執行,因此輸出結果為:log --------JDK-Future測試-------- 主執行緒:步驟① 我是JDK-Future任務..... 主執行緒:步驟② 但此時再來看看`Netty-Future`,因為在內部咱們提交非同步任務後,就立即通過`addListener()`添加了一個回撥,這個回撥方法會在非同步任務執行結束後呼叫,咱們將獲取任務結果的工作,放入到了回撥方法中完成,此時會觀測到,獲取`Netty-Future`的執行結果並不會阻塞主執行緒:log --------Netty-Future測試-------- 主執行緒:步驟① 主執行緒:步驟② 我是Netty-Future任務..... `` 而對於Netty-Promise的使用就無需過多講解,也就是可以根據非同步任務的執行狀態,向Promise物件中設定不同的結果,在前面的多執行緒中,由於主動製造了異常,所以最終會進入catch程式碼塊,執行setFailure()`向容器中填充異常資訊。

四、核心元件 - 通道處理器(Handler)

   Handler可謂是整個Netty框架中最為重要的一部分,它的職責主要是用於處理Channel通道上的各種事件,所有的處理器都可被大體分為兩類: - 入站處理器:一般都是ChannelInboundHandlerAdapter以及它的子類實現。 - 出站處理器:一般都是ChannelOutboundHandlerAdapter以及它的子類實現。

在系統中網路操作都通常會分為入站和出站兩種,所謂的入站即是指接收請求,反之,所謂的出站則是指返回響應,而Netty中的入站處理器,會在客戶端訊息到來時被觸發,而出站處理器則會在服務端返回資料時被觸發,接著來展開聊一聊。

4.1、入站處理器與出站處理器

前面講明白了入站、出站的基本概念,接著來簡單認識一下Netty中的入站處理器,這裡先上個案例: ```java // 服務端 public class HandlerServer { public static void main(String[] args) { // 0.準備工作:建立一個事件迴圈組、一個ServerBootstrap服務端 EventLoopGroup group = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap();

    server
        // 1.繫結前面建立的事件迴圈組
        .group(group)
        // 2.宣告通道型別為服務端NIO通道
        .channel(NioServerSocketChannel.class)
        // 3.通過ChannelInitializer完成通道的初始化工作
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nsc) throws Exception {
                // 4.獲取通道的ChannelPipeline處理器連結串列
                ChannelPipeline pipeline = nsc.pipeline();
                // 5.基於pipeline連結串列向通道上新增入站處理器
                pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg)
                                            throws Exception {
                        System.out.println("俺是第一個入站處理器...");
                        super.channelRead(ctx, msg);
                    }
                });
                pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg)
                                            throws Exception {
                        System.out.println("我是第二個入站處理器...");
                        super.channelRead(ctx, msg);
                    }
                });
                pipeline.addLast("In-③",new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg)
                                            throws Exception {
                        System.out.println("朕是第三個入站處理器...");
                    }
                });
            }
        })
        // 為當前啟動的服務端繫結IP和埠地址
        .bind("127.0.0.1",8888);
}

}

// 客戶端 public class HandlerClient { public static void main(String[] args) { // 0.準備工作:建立一個事件迴圈組、一個Bootstrap啟動器 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap client = new Bootstrap(); try { client // 1.繫結事件迴圈組 .group(group) // 2.宣告通道型別為NIO客戶端通道 .channel(NioSocketChannel.class) // 3.初始化通道,新增一個UTF-8的編碼器 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { // 新增一個編碼處理器,對資料編碼為UTF-8格式 ChannelPipeline pipeline = sc.pipeline(); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); } });

        // 4.與指定的地址建立連線
        ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
        // 5.建立連線成功後,向服務端傳送資料
        System.out.println("正在向服務端傳送資訊......");
        cf.channel().writeAndFlush("我是<竹子愛熊貓>!");
    } catch (Exception e){
        e.printStackTrace();
    } finally {
        // 6.最後關閉事件迴圈組
        group.shutdownGracefully();
    }
}

} 在上述案例的服務端程式碼中,啟動服務端時為其添加了`In-①、In-②、In-③`這三個入站處理器,接著編寫了一個客戶端,其內部主要是向服務端傳送了一條資料,執行結果如下:log 俺是In-①入站處理器... 我是In-②入站處理器... 朕是In-③入站處理器... `` 此時大家觀察結果會發現,入站處理器的執行順序,會按照新增的順序執行,兩個過濾器之間,依靠super.channelRead(ctx, msg);這行程式碼來實現向下呼叫的邏輯,這和之前Servlet`中的過濾器相差無幾。

除開上述重寫的channelRead()方法外,入站處理器中還有很多其他方法可以重寫,每個方法都對應著一種事件,會在不同時機下被觸發,如下: java // 會在當前Channel通道註冊到選擇器時觸發(與EventLoop繫結時觸發) public void channelRegistered(ChannelHandlerContext ctx) ... // 會在選擇器移除當前Channel通道時觸發(與EventLoop解除繫結時觸發) public void channelUnregistered(ChannelHandlerContext ctx) ... // 會在通道準備就緒後觸發(Pipeline處理器新增完成、繫結EventLoop後觸發) public void channelActive(ChannelHandlerContext ctx) ... // 會在通道關閉時觸發 public void channelInactive(ChannelHandlerContext ctx) ... // 會在收到客戶端資料時觸發(每當有資料時都會呼叫該方法,表示有資料可讀) public void channelRead(ChannelHandlerContext ctx, Object msg) ... // 會在一次資料讀取完成後觸發 public void channelReadComplete(ChannelHandlerContext ctx) ... // 當通道上的某個事件被觸發時,這個方法會被呼叫 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) ... // 當通道的可寫狀態發生改變時被呼叫(一般在傳送緩衝區超出限制時呼叫) public void channelWritabilityChanged(ChannelHandlerContext ctx) ... // 當通道在讀取過程中丟擲異常時,當前方法會被觸發呼叫 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) ...

接著再來看看出站處理器,這回基於上述案例做些許改造即可,也就是再通過pipeline.addLast()方法多新增幾個處理器,但處理器的型別為ChannelOutboundHandlerAdapter,如下: java // 基於pipeline連結串列向通道上添加出站處理器 pipeline.addLast("Out-A",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("在下是Out-A出站處理器..."); super.write(ctx, msg, promise); } }); pipeline.addLast("Out-B",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("鄙人是Out-B出站處理器..."); super.write(ctx, msg, promise); } }); pipeline.addLast("Out-C",new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("寡人是Out-C出站處理器..."); super.write(ctx, msg, promise); } }); 根據原本入站處理器的執行邏輯,是不是理論上執行順序為Out-A、Out-B、Out-C?先看執行結果:
執行截圖
此時觀察結果可明顯看到,在通道上新增的出站處理器壓根沒被觸發呀,這是為何呢?這要說回前面聊到的出站概念,出站是指響應過程,也意味著出站處理器是在服務端返回資料時被觸發的,而案例中並未向客戶端返回資料,顯然就不會觸發出站處理器,所以此時咱們在In-③入站處理器中,多加幾行程式碼: java // 利用通道向客戶端返回資料 ByteBuf resultMsg = ctx.channel().alloc().buffer(); resultMsg.writeBytes("111".getBytes()); nsc.writeAndFlush(resultMsg); 此時再執行案例,就會看到如下結果: log 俺是In-①入站處理器... 我是In-②入站處理器... 朕是In-③入站處理器... 寡人是Out-C出站處理器... 鄙人是Out-B出站處理器... 在下是Out-A出站處理器... 此時注意看,結果和預料的不同,呈現的順序並非Out-A、Out-B、Out-C,而是Out-C、Out-B、Out-A,這是啥原因呢?為什麼與新增順序反過來了?這其實跟pipeline處理器連結串列有關,等會兒再聊聊pipeline這個概念,先來看看出站處理器中的其他方法: java // 當通道呼叫bind()方法時觸發(當Channel繫結埠地址時被呼叫,一般用於客戶端通道) public void bind(...) ... // 當通道呼叫connect()方法,連線到遠端節點/服務端時觸發(一般也用於客戶端通道) public void connect(...) ... // 當客戶端通道呼叫disconnect()方法,與服務端斷開連線時觸發 public void disconnect(...) ... // 當客戶端通道呼叫close()方法,關閉連線時觸發 public void close(...) ... // 當通道與EventLoop解除繫結時觸發 public void deregister(...) ... // 當通道中讀取多次資料時被呼叫觸發 public void read(...) ... // 當通道中寫入資料時觸發 public void write(...) ... // 當通道中的資料被Flush給對端節點時呼叫 public void flush(...) ... 對於出站/入站處理器的這些其他方法/事件,大家可根據業務的不同,選擇重寫不同的方法,其中每個不同的方法,其觸發時機也不同,因此可以在適當的位置重寫方法,作為業務程式碼的切入點。

4.2、pipeline處理器連結串列

如果接觸Netty框架的小夥伴應該對這玩意兒不陌生,如果沒接觸過也無關緊要,其實它也並非是特別難懂的概念,一個處理器被稱為Handler,而一個Handler新增到一個通道上之後,則被稱之為ChannelHandler,而一個通道上的所有ChannelHandler全部連線起來,則被稱之為ChannelPipeline處理器連結串列。

以上述給出的案例來說,其內部形成的ChannelPipeline連結串列如下:
處理器連結串列
pipeline本質上是一個雙向連結串列,同時具備head、tail頭尾節點,每當呼叫pipeline.addLast()方法新增一個處理器時,就會將處理器封裝成一個節點,然後加入pipeline連結串列中: - 當接收到客戶端的資料時,Netty會從Head節點開始依次往後執行所有入站處理器。 - 而當服務端返回資料時,Netty會從Tail節點開始依次向前執行所有入站處理器。

理解上述過程後,大家應該就理解了之前出站處理器的執行順序,為何是Out-C、Out-B、Out-A,因為出站處理器是以Tail尾節點開始,向前依次執行的原因造成的,那處理器的作用是幹嘛的呢?舉個例子大家就懂了。

這裡假設Netty的服務端是一個飼料加工廠,客戶端則是原料供應商,連線兩者之間的通道就相當於一條條的流水線,而客戶端傳送的資料相當於原料。
在一條流水線上,玉米、豆粕、小麥....等原料不可能啥也不幹,直接從頭傳到尾,如果原料想要加工成某款私聊,顯然需要經過一道道工序,而處理器則是這一道道工序。
比如原料剛傳進來時,首先要將其粉碎成顆粒,接著需要將其碾壓成粉末,最後需要按照配方比例進行混合,才能形成按配方製成的飼料。在這個過程中,原料進入加工廠後,經過的一道道工序則可以被稱為入站處理器。
而原料被加工成飼料後,想要對外出售,還需要先裝入一個個的飼料袋,然後將飼料袋進行封口,最後印上生產日期與廠家,才能打包成最終的商用飼料對外出售。而該過程中的一道道工序,則可被理解成是一個個出站處理器。

在上述的例子中,一個加工廠的流水線上,存在著一道道工序,經過依次處理後,能夠將原料加工成最終商品。Netty中亦是同理,對於客戶端和服務端之間的資料,可以通過處理器,完成一系列核心處理,如轉換編碼格式、對資料進行序列化、對資料進行加/解密等操作。

4.3、自定義出/入站處理器

前面簡單講明白了一些關於Netty處理器的知識,但實際開發過程中,為了更好的程式碼閱讀性,以及程式碼的維護性,通常pipeline.addLast並不會直接new介面,而是自己定義處理器類,然後繼承對應的父類,如下: ```java // 自定義的入站處理器 public class ZhuziHandler extends ChannelInboundHandlerAdapter { public ZhuziHandler() { super(); }

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 在這裡面編寫處理入站msg的核心程式碼.....
    // (如果要自定義msg的處理邏輯,請記住去掉下面這行程式碼)
    super.channelRead(ctx, msg);
}

} 對於入站處理器而言,主要重寫其`channelRead()`方法即可,該方法會在訊息入站時被呼叫,可以在其中完成對資料的複雜處理,而自定義處理器完成後,想要讓該處理器生效,請記得將其繫結到對應的通道上,如下:java pipeline.addLast("In-X", new ZhuziHandler()); `` 與入站處理器相反的出站處理器亦是同理,只不過將父類實現換成ChannelInboundHandlerAdapter,並且重寫其write()`方法即可,這樣所有訊息(資料)出站時,都會呼叫該方法。

最後,不僅僅處理器可以單獨抽出來實現,而且對於通道的初始化器,也可以單獨抽出來實現,如下: java // 自定義的通道初始化器 public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 設定編碼器、解碼器、處理器 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new ZhuziHandler()); } } 這樣寫能夠讓程式碼的整潔性更強,並且可以統一管理通道上的所有出/入站處理器,而服務端的程式碼改成下述方式即可: java server .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()) .bind("127.0.0.1",8888);

五、Netty重構後的緩衝區(ByteBuf)

   在之前講《JavaIO體系-NIO》的時候曾聊到過它的三大件,其中就包含了ByteBuffer,其作用主要是用來作為服務端和客戶端之間傳輸資料的容器,NIO中的ByteBuffer支援使用堆記憶體、本地(直接)記憶體來建立,而Netty-ByteBuf也同樣如此,如下: - ByteBufAllocator.DEFAULT.heapBuffer(cap):使用堆記憶體來建立ByteBuf物件。 - ByteBufAllocator.DEFAULT.directBuffer(cap):使用本地記憶體來建立ByteBuf物件。

基於堆記憶體建立的ByteBuf物件會受到GC機制管理,在發生GC時需要來回移動Buffer物件,同時之前在NIO中也聊到過堆、本地記憶體的區別,如下:
堆記憶體VS本地記憶體
通常本地記憶體的讀寫效率都會比堆記憶體高,因為OS可以直接操作本地記憶體,而堆記憶體在讀寫資料時,則需要多出一步記憶體拷貝的動作,總結如下: - 堆記憶體因為直接受到JVM管理,所以在Java程式中建立時,分配效率較高,但讀寫效率低。 - 本地記憶體因為OS可直接操作,所以讀寫效率高,但由於建立時,需要向OS額外申請,分配效率低。

但上述聊到的這些特徵,NIOBuffer也具備,那Netty對於Buffer緩衝區到底增強了什麼呢?主要是三方面:Buffer池化技術、動態擴容機制、零拷貝實現。

5.1、ByteBuf緩衝區池化技術

池化這個詞彙大家應該都不陌生,Java執行緒池、資料庫連線池,這些都是池化思想的產物,一般系統中較為珍貴的資源,都會採用池化技術來快取,以便於下次需要時可直接使用,而無需經過繁瑣的建立過程。

前面聊到過,Netty預設會採用本地記憶體建立ByteBuf物件,而本地記憶體因為不是作業系統分配給Java程式使用的,所以基於本地記憶體建立物件時,則需要額外單獨向OS申請,這個過程自然開銷較大,在高併發情況下,頻繁的建立、銷燬ByteBuf物件,一方面會導致效能降低,同時還有可能造成OOM的風險(使用完沒及時釋放,記憶體未歸還給OS的情況下會出現記憶體溢位)。

而使用池化技術後,一方面能有效避免OOM問題產生,同時還可以省略等待建立緩衝區的時間,那Netty中的池化技術,什麼時候會開啟呢?這個要分平臺! - Android系統預設會採用非池化技術,而其他系統,如Linux、Mac、Windows等會預設啟用。

但上述這條原則是Netty4.1版本之後才加入的,因為4.1之前的版本,其內部的池化技術還不夠完善,所以4.1之前的版本預設會禁用池化技術。當然,如果你在某些平臺下想自行決定是否開啟池化,可通過下述引數控制: - -Dio.netty.allocator.type=unpooled:關閉池化技術。 - -Dio.netty.allocator.type=pooled:開啟池化技術。

這兩個引數直接通過JVM引數的形式,在啟動Java程式時指定即可。如果你想要檢視自己建立的ByteBuf物件,是否使用了池化技術,可直接列印物件的Class即可,如下: ```java // 檢視建立的緩衝區是否使用了池化技術 private static void byteBufferIsPooled(){ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16); System.out.println(buffer.getClass()); }

public static void main(String[] args) { byteBufferIsPooled(); }

/ * * 輸出結果: * class io.netty.buffer.PooledUnsafeDirectByteBuf * / `` 從輸出的結果中的類名可看出,如果是以Pooled開頭的類名,則表示當前ByteBuf物件使用池化技術,如若是以Unpooled`開頭的類名,則表示未使用池化技術。

5.2、ByteBuf動態擴容機制

在前面聊《JavaNIO-Buffer緩衝區》的時候曾簡單聊過NIOBuffer原始碼,其內部的實現有些傻,每個Buffer物件都擁有一根limit指標,這根指標用於控制讀取/寫入模式,因此在使用NIO-Buffer時,每次寫完緩衝區後,都需要呼叫flip()方法來反轉指標,以此來確保NIO-Buffer的正常讀寫。

由於Java-NIO中的Buffer設計有些缺德,因此在使用NIO的原生Buffer物件時,就顯得額外麻煩,必須要遵從如下步驟: - ①先建立對應型別的緩衝區 - ②通過put這類方法往緩衝區中寫入資料 - ③呼叫flip()方法將緩衝區轉換為讀模式 - ④通過get這類方法從緩衝區中讀取資料 - ⑤呼叫clear()、compact()方法清空緩衝區資料

而正是由於Java-NIO原生的Buffer設計的不合理,因此Netty中直接重構了整個緩衝區元件,在Netty-ByteBuf中,存在四個核心屬性: - initialCapacity:初始容量,建立緩衝區時指定的容量大小,預設為256位元組。 - maxCapacity:最大容量,當初始容量不足以供給使用時,ByteBuf的最大擴容限制。 - readerIndex:讀取指標,預設為0,當讀取一部分資料時,指標會隨之移動。 - writerIndex:寫入指標,預設為0,當寫入一部分資料時,指標會隨之移動。

首先來說說和NIO-Buffer的兩個主要區別:首先將原本一根指標變為了兩根,分別對應讀/寫操作,這樣就保障了使用ByteBuf時,無需每次讀寫資料時手動翻轉模式。同時加入了一個最大容量限制,在建立的ByteBuf無法存下資料時,允許在最大容量的範圍內,對ByteBuf進行自動擴容,下面上個圖理解:
ByteBuf
上圖中模擬了使用ByteBuf緩衝區的過程,在建立時會先分配一個初始容量,這個容量可以自己指定,不指定預設為256,接著會去創建出對應容量的緩衝區,最初讀寫指標都為0,後續會隨著使用情況不斷變化。

這裡重點觀察最後一個狀態,在真正使用過程中,一個ByteBuf會被分為四個區域: - 已廢棄區域:這是指已經被讀取過的資料區域,因為其中的資料已被使用,所以屬於廢棄區域。 - 可讀取區域:這主要是指被寫入過資料,但還未讀取的區域,這塊區域的資料都可被讀取使用。 - 可寫入區域:這主要是指寫入指標和容量之間的區域,意味著這塊區域是可以被寫入資料的。 - 可擴容區域:這主要是指容量和最大容量之間的區域,代表當前緩衝區可擴容的範圍。

ByteBuf的主要實現位於AbstractByteBuf這個子類中,但內部還有兩根markedReaderIndex、markedWriterIndex標記指標,這兩根指標就類似於NIO-Buffer中的mark指標,這裡就不做重複贅述。下面上個案例簡單實驗一下BtyeBuf的自動擴容特性,程式碼如下: java // 測試Netty-ByteBuf自動擴容機制 private static void byteBufCapacityExpansion() { // 不指定預設容量大小為16 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16); System.out.println("測試前的Buffer容量:" + buffer); // 使用StringBuffer來測試ByteBuf的自動擴容特性 StringBuffer sb = new StringBuffer(); // 往StringBuffer中插入17個位元組的資料 for (int i = 0; i < 17; i++) { sb.append("6"); } // 將17個位元組大小的資料寫入緩衝區 buffer.writeBytes(sb.toString().getBytes()); printBuffer(buffer); } 在這個測試自動擴容的方法中,最後用到了一個printBuffer()方法來列印緩衝區,這是自定義的一個輸出方法,也就基於Netty自身提供的Dump方法實現的,如下: ```java // 列印ByteBuf中資料的方法 private static void printBuffer(ByteBuf buffer) { // 讀取ByteBuffer已使用的位元組數 int byteSize = buffer.readableBytes(); // 基於byteSize來計算顯示的行數 int rows = byteSize / 16 + (byteSize % 15 == 0 ? 0 : 1) + 4; // 建立一個StringBuilder用來顯示輸出 StringBuilder sb = new StringBuilder(rows * 80 * 2); // 獲取緩衝區的容量、讀/寫指標資訊放入StringBuilder sb.append("ByteBuf緩衝區資訊:{"); sb.append("讀取指標=").append(buffer.readerIndex()).append(", "); sb.append("寫入指標=").append(buffer.writerIndex()).append(", "); sb.append("容量大小=").append(buffer.capacity()).append("}");

// 利用Netty框架自帶的格式化方法、Dump方法輸出緩衝區資料
sb.append(StringUtil.NEWLINE);
ByteBufUtil.appendPrettyHexDump(sb, buffer);
System.out.println(sb.toString());

} 接著在`main`方法中呼叫並執行,如下:java public static void main(String[] args) { byteBufCapacityExpansion(); }

/ * 執行結果: * * 測試前的Buffer容量:PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16) * ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=17, 容量大小=64} * +-------------------------------------------------+ * | 0 1 2 3 4 5 6 7 8 9 a b c d e f | * +--------+-------------------------------------------------+----------------+ * |00000000| 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 |6666666666666666| * |00000010| 36 |6 | * +--------+-------------------------------------------------+----------------+ * / `` 先來觀察最初的容量:cap=16,因為這是咱們顯示指定的初始容量,接著向該ByteBuf中插入17個位元組資料後,會發現容量自動擴充套件到了64,但如果使用NIO-Buffer來進行這樣的操作,則會丟擲異常。同時最後還把緩衝區中具體的資料打印出來了,這個是利用Netty自帶的appendPrettyHexDump()`方法實現的,中間是位元組值,後面是具體的值,這裡就不做過多闡述~

5.3、 Netty中的讀寫API

首先在講述Netty-ByteBuf的讀寫API之前,咱們再說清楚一點與NIO-Buffer的區別,不知大家是否還記得我在之前NIO中聊到的一點:
NIO-Buffer
其實這也是NIO-Buffer設計不合理的一個地方,當你想要向緩衝區中寫入不同型別的資料,要麼得自己手動轉換成Byte位元組型別,要麼得new一個對應的子實現,所以整個實現就較為臃腫,大家可以點進Java.nio包看一下,你會看到下述場景:
Java-NIO包
這裡的類關係,大家一眼看過去明顯會感覺頭大,基本上實現都大致相同,但針對於每個資料型別,都編寫了對應的實現類,而Netty的作者顯然意識到了這點,因此並未提供多種資料型別的緩衝區,僅提供了ByteBuf這一種緩衝區,Why

其實道理十分簡單,因為計算機上的所有資料資源,在底層本質上都是0、1形成的位元組資料,所以只提供Byte型別的ByteBuf緩衝區就夠了,畢竟它能夠儲存所有型別的資料,同時為了便於寫入其他型別的資料,如Int、boolean、long....Netty框架中也對外提供了相關的寫入API,接著一起來看看。

```java // Netty-ByteBuf抽象類 public abstract class ByteBuf implements ReferenceCounted, Comparable { // 寫入boolean資料的方法,內部使用一個位元組表示,0=false、1=true public abstract ByteBuf writeBoolean(boolean var1); // 寫入位元組資料的方法 public abstract ByteBuf writeByte(int var1); // 大端寫入Short資料的方法 public abstract ByteBuf writeShort(int var1); // 小端寫入Short資料的方法 public abstract ByteBuf writeShortLE(int var1);

// 下述方法和寫Short型別的方法僅型別不同,都區分了大小端,不再重複註釋
public abstract ByteBuf writeMedium(int var1);
public abstract ByteBuf writeMediumLE(int var1);
public abstract ByteBuf writeInt(int var1);
public abstract ByteBuf writeIntLE(int var1);
public abstract ByteBuf writeLong(long var1);
public abstract ByteBuf writeLongLE(long var1);
public abstract ByteBuf writeChar(int var1);
public abstract ByteBuf writeFloat(float var1);
public ByteBuf writeFloatLE(float value) {
    return this.writeIntLE(Float.floatToRawIntBits(value));
}
public abstract ByteBuf writeDouble(double var1);
public ByteBuf writeDoubleLE(double value) {
    return this.writeLongLE(Double.doubleToRawLongBits(value));
}

// 將另一個ByteBuf物件寫入到當前緩衝區
public abstract ByteBuf writeBytes(ByteBuf var1);
// 將另一個ByteBuf物件的前N個長度的資料,寫入到當前緩衝區
public abstract ByteBuf writeBytes(ByteBuf var1, int var2);
// 將另一個ByteBuf物件的指定範圍資料,寫入到當前緩衝區
public abstract ByteBuf writeBytes(ByteBuf var1, int var2, int var3);
// 向緩衝區中寫入一個位元組陣列
public abstract ByteBuf writeBytes(byte[] var1);
// 向緩衝區中寫入一個位元組陣列中,指定範圍的資料
public abstract ByteBuf writeBytes(byte[] var1, int var2, int var3);
// 將一個NIO的ByteBuffer資料寫入到當前ByteBuf物件
public abstract ByteBuf writeBytes(ByteBuffer var1);
// 將一個輸入流中的資料寫入到當前緩衝區
public abstract int writeBytes(InputStream var1, int var2) 
                                            throws IOException;
// 將一個NIO的ScatteringByteChannel通道中的資料寫入當前緩衝區
public abstract int writeBytes(ScatteringByteChannel var1, int var2)
                                                    throws IOException;
// 將一個NIO的檔案通道中的資料寫入當前緩衝區
public abstract int writeBytes(FileChannel var1, long var2, int var4)
                                                    throws IOException;
// 將一個任意字元型別的資料寫入緩衝區(CharSequence是所有字元型別的老大)
public abstract int writeCharSequence(CharSequence var1, Charset var2);

// 省略其他寫入資料的API方法........

} `` 上面列出了Netty-ByteBuf中常用的寫入方法,其實大家在這裡就能明顯觀察出與NIO的區別,NIO是為不同資料型別提供了不同的實現類,而Netty則僅僅只是為不同型別,提供了不同的API`方法,顯然後者的做法更佳,因為整體的程式碼結構會更為優雅。

這裡主要說一下大端寫入和小端寫入的區別,從前面的API列表中,大家可以看到,Netty為每種資料型別,都提供了一個結尾帶LE的寫入方法,這個帶LE的方法則是小端寫入方法,那麼大小端之間有何差異呢?

大小端寫入是網路程式設計中的通用概念,因為網路資料傳輸過程中,所有的資料都是以二進位制的位元組格式傳輸的,而所謂的大端(Big Endian)寫入,是指先寫高位,再寫低位,高低位又是什麼意思呢? - 高位寫入:指從前往後寫,例如1這個數字,位元位形式為000...001。 - 低位寫入:指從後往前寫,依舊是1這個數字,位元位形式為100...000

這裡不瞭解的小夥伴又會疑惑:為啥高位寫入時,1在最後面呀?這是因為要先寫0,再寫1的原因導致的。而反過來。所謂的小端(Little Endian)寫入,也就是指先寫低位,再寫高位。預設情況下,網路通訊會採用大端寫入的模式。

簡單瞭解Netty-ByteBuf寫入資料的API後,接著再來看一些讀取資料的API方法,如下: ```java // 一系列read開頭的讀取方法,這種方式會改變讀取指標(區分大小端) public abstract boolean readBoolean(); public abstract byte readByte(); public abstract short readUnsignedByte(); public abstract short readShort(); public abstract short readShortLE(); public abstract int readUnsignedShort(); public abstract int readUnsignedShortLE(); public abstract int readMedium(); public abstract int readMediumLE(); public abstract int readUnsignedMedium(); public abstract int readUnsignedMediumLE(); public abstract int readInt(); public abstract int readIntLE(); public abstract long readUnsignedInt(); public abstract long readUnsignedIntLE(); public abstract long readLong(); public abstract long readLongLE(); public abstract char readChar(); public abstract float readFloat(); // 省略其他的read方法.....

// 一系列get開頭的讀取方法,這種方式不會改變讀取指標(區分大小端) public abstract boolean getBoolean(int var1); public abstract byte getByte(int var1); public abstract short getUnsignedByte(int var1); public abstract short getShort(int var1); public abstract short getShortLE(int var1); public abstract int getUnsignedShort(int var1); public abstract int getUnsignedShortLE(int var1); public abstract int getMedium(int var1); public abstract int getMediumLE(int var1); public abstract int getUnsignedMedium(int var1); public abstract int getUnsignedMediumLE(int var1); public abstract int getInt(int var1); public abstract int getIntLE(int var1); public abstract long getUnsignedInt(int var1); public abstract long getUnsignedIntLE(int var1); public abstract long getLong(int var1); public abstract long getLongLE(int var1); public abstract char getChar(int var1); public abstract float getFloat(int var1); // 省略其他的get方法..... `` 在上面列出的一系列讀取方法中,主要可分為read、get兩大類方法: -readXXX():這種方式讀取資料後,會導致ByteBuf內部的讀取指標隨之移動。 -getXXX():這種方式讀取資料後,不會改變ByteBuf`內部的讀取指標。

那麼讀取指標改變之後會出現什麼影響呢?大家還記得前面聊到的ByteBuf的四部分嘛?前面講過,讀取指標之前的資料部分,都會被標記為廢棄部分,這也就意味著通過read系列的方式讀取一段資料後,會導致這些資料無法再次被讀取到,這裡來做個實驗: ```java // 測試ByteBuf的read、get、mark功能 private static void bufferReader(){ // 分配一個初始容量為10的緩衝區 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);

// 向緩衝區中寫入10個字元(佔位十個位元組)
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
    sb.append(i);
}
buffer.writeBytes(sb.toString().getBytes());

// 使用read方法讀取前5個位元組資料
printBuffer(buffer);
buffer.readBytes(5);
printBuffer(buffer);

// 再使用get方法讀取後五個位元組資料
buffer.getByte(5);
printBuffer(buffer);

}

public static void main(String[] args) { bufferReader(); } 在上面的迴圈中,我是通過`StringBuffer`來作為緩衝區的資料,但為何不直接寫入`int`資料呢?這是因為`int`預設會佔四個位元組,而`StringBuffer`底層是`char`,一個字元只佔用一個位元組~,這裡是一個小細節,接著來看看執行結果:log ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 | +--------+-------------------------------------------------+----------------+ ByteBuf緩衝區資訊:{讀取指標=5, 寫入指標=10, 容量大小=10} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 35 36 37 38 39 |56789 | +--------+-------------------------------------------------+----------------+

ByteBuf緩衝區資訊:{讀取指標=5, 寫入指標=10, 容量大小=10} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 35 36 37 38 39 |56789 | +--------+-------------------------------------------------+----------------+ `` 從上述結果中可看出,使用readBytes()方法讀取五個位元組後,讀取指標會隨之移動到5,接著看看前後的資料變化,此時會發現資料從0123456789變成了56789,這是因為前面五個位元組的資料,已經屬於廢棄部分了,所以printBuffer()`方法無法讀取顯示。

接著再看看後面,通過getByte()讀取五個位元組後,此時ByteBuf物件的讀取指標,顯然不會隨之移動,也就是通過get系列方法讀取緩衝區資料,並不會導致讀過的資料廢棄。

那如果使用read系列方法讀取資料後,後續依舊想要讀取資料該怎麼辦呢?這裡可以使用ByteBuf內部的標記指標實現,如下: ```java // 在上述方法的最後繼續追加下述程式碼:

// 使用mark標記一下讀取指標,然後再使用read方法讀取資料 buffer.markReaderIndex(); buffer.readBytes(5); printBuffer(buffer);

// 此時再通過reset方法,使讀取指標恢復到前面的標記位置 buffer.resetReaderIndex(); printBuffer(buffer); 此時再次查詢執行結果,如下:log ByteBuf緩衝區資訊:{讀取指標=10, 寫入指標=10, 容量大小=10}

ByteBuf緩衝區資訊:{讀取指標=5, 寫入指標=10, 容量大小=10} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 35 36 37 38 39 |56789 | +--------+-------------------------------------------------+----------------+ `` 從結果中可以明顯看到,對讀取指標做了標記後,再次使用read系列方法讀取資料,依舊會導致讀過的部分變為廢棄資料,但後續可以通過reset`方法,將讀取指標恢復到前面的標記位置,然後再次檢視緩衝區的資料,就會發現資料又可以重複被讀取啦~

其實除開可以通過markReaderIndex()、resetReaderIndex()方法標記、恢復讀取指標外,還可以通過markWriterIndex()、resetWriterIndex()方法來標記、恢復寫入指標。標記讀取指標後,可以讓緩衝區中的一段資料被多次read讀取,而標記寫入指標後,可以讓緩衝區的一段區間被反覆寫入,但每次後面的寫入會覆蓋前面寫入的資料。

OK~,對於ByteBufAPI操作就介紹到這裡,其實內部提供了一百多個API方法,但我就不一一去做說明啦,大家點進原始碼後就能看到,感興趣的小夥伴可以自行除錯!

5.4、ByteBuf的記憶體回收

在前面聊到過,Netty-ByteBuf在除安卓平臺外,都會使用池化技術來建立,那一個已創建出的ByteBuf物件,其佔用的記憶體在什麼情況下會歸還給記憶體池呢?想要聊明白這點,得先理解ByteBuf的引用釋放。

學習過JVM-GC機制的小夥伴應該知道,JVM中使用的物件存活判定法是根可達演算法,而在此之前的一種常用演算法被稱之為《引用計數法》,但由於該演算法存在迴圈引用的問題,所以並不適合作為自動判定存活的演算法,但Netty-ByteBuf中恰恰使用了這種演算法。

首先來看看Netty-ByteBuf的類關係: java public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> 從上面的類定義中可明顯看到,ByteBuf實現了ReferenceCounted介面,該介面翻譯過來的含義則是引用計數,該介面中提供的方法列表如下: ```java public interface ReferenceCounted { // 檢視一個物件的引用計數統計值 int refCnt();

// 對一個物件的引用計數+1
ReferenceCounted retain();
// 對一個物件的引用計數+n
ReferenceCounted retain(int var1);

// 記錄當前物件的當前訪問位置,記憶體洩漏時會返回該方法記錄的值
ReferenceCounted touch();
ReferenceCounted touch(Object var1);

// 對一個物件的引用計數-1
boolean release();
// 對一個物件的引用計數-n
boolean release(int var1);

} `` 重點關注retain()、release()方法,這兩個方法分別對應加/減一個物件的引用計數,把ByteBuf套入進來,當一個緩衝區物件的引用計數為0時,會清空當前緩衝區中的資料,並且將佔用的記憶體歸還給記憶體池,所有嘗試再次訪問該ByteBuf物件的操作,都會被拒絕。簡單來說,一句話總結就是:**當一個ByteBuf物件的引用計數變為0`時,該緩衝區就會變為外部不可訪問的狀態**。

綜上所述,在使用完一個ByteBuf物件後,明確後續不會用到該物件時,一定要記得手動呼叫release()清空引用計數,否則會導致該緩衝區長久佔用記憶體,最終引發記憶體洩漏。

這裡拓展一點小細節,似乎在Netty-Channel中,都會採用ByteBuf來發送/接收資料,那這些通道傳輸資料用的ByteBuf物件,其佔用的記憶體會在何時回收呢?這會牽扯到前面的ChannelPipeline連結串列。

處理器連結串列
還記得這幅通道處理器連結串列圖嘛?在其中有兩個特殊的處理器,即Head、Tail處理器: - Head處理器: - 如果通道上只有入站處理器,它會作為整個處理器連結串列的第一個處理器呼叫。 - 如果通道上只有出站處理器,它會作為整個處理器連結串列的最後一個處理器呼叫。 - 如果通道上入/出站處理器都有,它會作為入站的第一個處理呼叫,出站的最後一個處理器呼叫。 - Tail處理器: - 如果通道上只有入站處理器,Tail節點會作為整個連結串列的最後一個處理器呼叫。 - 如果通道上只有出站處理器,Tail節點會作為整個連結串列的第一個處理器呼叫。 - 如果通道上入/出站處理器都有,它會作為出站的第一個呼叫、入站的最後一個呼叫。

結合上面所說的內容,Head、Tail處理器在任何情況下,其中至少會有一個,作為通道上的最後一個處理器呼叫,而在這兩個頭尾處理器中,會自動釋放ByteBuf的工作,先來看看Head處理器,原始碼如下: ```java // ChannelPipeline處理器連結串列的預設實現類 public class DefaultChannelPipeline implements ChannelPipeline { // Head處理器的實現類:同時實現了入站、出站處理器介面 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {

    // 作為入站連結串列第一個處理器時,會呼叫的方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 繼續往下呼叫其他自定義的入站處理器
        ctx.fireChannelRead(msg);
    }

    // 作為出站連結串列的最後一個處理器時,會呼叫的方法
    public void write(ChannelHandlerContext ctx, Object msg, 
                                        ChannelPromise promise) {
        // unsafe.write()最終會呼叫到AbstractUnsafe.write()方法
        this.unsafe.write(msg, promise);
    }
}
// 省略其他方法....

}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { protected abstract class AbstractUnsafe implements Unsafe { public final void write(Object msg, ChannelPromise promise) { this.assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

        // 這裡先不需要理解,後續原始碼篇會聊
        if (outboundBuffer == null) {
            this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.
                            this.initialCloseCause));

            // 最終在這裡,依舊呼叫了引用計數工具類的release方法
            ReferenceCountUtil.release(msg);
        } else {
            int size;
            try {
                msg = AbstractChannel.this.filterOutboundMessage(msg);
                size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable var6) {
                this.safeSetFailure(promise, var6);
                // 這裡也會呼叫了引用計數工具類的release方法
                ReferenceCountUtil.release(msg);
                return;
            }

            outboundBuffer.addMessage(msg, size, promise);
        }
    }
    // 省略其他方法....
}
// 省略其他類與方法....

}

// 引用計數工具類 public final class ReferenceCountUtil { public static boolean release(Object msg) { // 這裡會先判斷一下對應的msg物件是否實現了引用計數介面, // 只有對應的msg實現了ReferenceCounted介面時,才會釋放引用 return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false; }

// 省略其他方法.....

} ``Head節點會作為出站連結串列的最後一個處理器呼叫,因此在所有自定義出站處理器執行完成後,最終呼叫該節點的write()方法,在這個方法內部,最終呼叫了AbstractUnsafe.write()方法,對應的方法實現中,咱們僅需關注ReferenceCountUtil.release(msg)這行程式碼即可,最終會在該工具類中釋放msg`物件的引用計數。

接著再來看看Tail節點的實現原始碼: ```java // ChannelPipeline處理器連結串列的預設實現類 public class DefaultChannelPipeline implements ChannelPipeline {

// Tail處理器的實現類:實現了入站處理器介面,作為入站呼叫鏈最後的處理器
final class TailContext extends AbstractChannelHandlerContext 
                                implements ChannelInboundHandler {
    // 所有自定義的入站處理器執行完成後,會呼叫的方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
    }

    // 省略其他方法.....
}

// 前面Tail、Head呼叫的釋放方法
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
    // 呼叫釋放ByteBuf緩衝區的方法
    this.onUnhandledInboundMessage(msg);
    // 記錄日誌
    if (logger.isDebugEnabled()) {
        logger.debug("Discarded message pipeline :" + 
            "{}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
    }
}
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached " + 
            "at the tail of the pipeline. Please check your pipeline" + 
            "configuration.", msg);
    } finally {
        // 最終呼叫了引用計數工具類的release方法
        ReferenceCountUtil.release(msg);
    }
}

} ``Tail節點會作為入站連結串列的最後一個處理器呼叫,所以在執行Tail處理器時,最終會呼叫它的channelRead()方法,而在相應的方法內部,呼叫了onUnhandledInboundMessage()方法,跟著原始碼繼續走,此時也會發現,最終也呼叫了ReferenceCountUtil.release(msg)`方法來釋放引用。

根據原始碼中的推斷,似乎Netty框架傳送/接收資料用的ByteBuf,都會由頭尾處理器來釋放,但答案確實如此嗎?NO,為什麼呢?再次將目光放到ReferenceCountUtil.release(msg)這處程式碼:

```java // 引用計數工具類 public final class ReferenceCountUtil { public static boolean release(Object msg) { // 這裡會先判斷一下對應的msg物件是否實現了引用計數介面, // 只有對應的msg實現了ReferenceCounted介面時,才會釋放引用 return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false; }

// 省略其他方法.....

}

// ByteBuf的類定義 public abstract class ByteBuf implements ReferenceCounted, Comparable 此時大家注意看,`ReferenceCountUtil.release()`在執行前,會先判斷一下當前的`msg`是否實現了`ReferenceCounted`介面,而`ByteBuf`是實現了的,因此如果執行到`Head/Tail`處理器時,`msg`資料依舊為`ByteBuf`型別,頭尾處理器自然可以完成回收工作,但如若是下面這種情況呢?java pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("俺是In-①入站處理器...");

    // 在第一個入站處理器中,將接收到的ByteBuf資料轉換為String向下傳遞
    ByteBuf buffer = (ByteBuf) msg;
    String message = buffer.toString(Charset.defaultCharset());

    super.channelRead(ctx, message);
}

}); pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("我是In-②入站處理器..."); super.channelRead(ctx, msg); } }); `` 在上述這個案例中,咱們在第一個入站處理器中,將接收到的ByteBuf資料轉換為String向下傳遞,也就意味著從In-②處理器開始,後面所有的處理器收到的msg都為String型別,當自定義的兩個處理器執行完成後,最終會呼叫Tail`處理器完成收尾工作,但問題來了!

因為在In-①msg型別發生了改變,所以當Tail處理器中呼叫ReferenceCountUtil.release()時,由於String並未實現ReferenceCounted介面,所以Tail無法對該msg進行釋放,最終就會造成記憶體洩漏問題。

但此時記憶體洩漏,發生在哪個位置呢?答案是位於In-①中,因為In-①處理器中就已經將ByteBuf用完了,將其中的資料轉換成了String型別,而ByteBuf後續處理器都不會用到,因此該ByteBuf佔用的記憶體永遠不會被釋放,所以一定要注意:在使用處理器的過程中,如果明確ByteBuf不會繼續使用,那請一定要記得手動呼叫release()方法釋放引用,以上述案例說明: java ByteBuf buffer = (ByteBuf) msg; String message = buffer.toString(Charset.defaultCharset()); buffer.release(); 當明確不使用該ByteBuf值時,請記住呼叫對應的release()方法釋放引用!這樣能夠有效避免記憶體洩漏的問題出現,有人也許會說,JVM不是有GC機制嗎?為什麼會出現記憶體洩漏呀?

關於上述問題的道理十分簡單,因為Netty預設採用本地記憶體來建立緩衝區,並且會利用池化技術管理所有緩衝區,如果一個ByteBuf物件的引用不為0,那麼該ByteBuf會永久的佔用記憶體資源,Netty無法主動將其佔用的記憶體回收到池中。

5.5、 Netty中的零拷貝技術

想要講清楚Netty-ByteBuf中的零拷貝技術,那首先得先明白零拷貝到底是個啥,因此咱們先講明白零拷貝的概念,再講清楚作業系統的零拷貝技術,然後再說說Java-NIO中的零拷貝體現,最後再來聊Netty-ByteBuf中的零拷貝技術。

六、隨處可見的零拷貝技術

   零拷貝這個詞,在很多地方都有出現,例如Kafka、Nginx、Tomcat、RocketMQ...的底層都使用了零拷貝的技術,那究竟什麼叫做零拷貝呢?其實所謂的零拷貝,並不是不需要經過資料拷貝,而是減少記憶體拷貝的次數,上個例子來理解,比如Nginx向客戶端提供檔案下載的功能。

客戶端要下載的檔案都位於Nginx所在的伺服器磁碟中,如果當一個客戶端請求下載某個資原始檔時,這時需要經過的步驟如下:
檔案下載的拷貝過程
先來簡單聊一聊檔案下載時,Nginx伺服器內部的資料傳輸過程: - ①客戶端請求下載伺服器上的某個資源,Nginx解析請求並得知客戶端要下載的具體檔案。 - ②NginxOS發起系統IO呼叫,呼叫核心read(fd)函式,應用上下文切態至核心空間。 - ③read()函式通過DMA控制器,將目標檔案的資料從磁碟讀取至核心緩衝區。 - ④DMA傳輸資料完成後,CPU將資料從核心緩衝區拷貝至使用者緩衝區(程式的記憶體空間)。 - ⑤CPU拷貝資料完成後,read()呼叫結束並返回,上下文從核心態切回用戶態。 - ⑥Nginx再次向OS發起核心write(fd)函式的系統呼叫,應用上下文再次切到核心態。 - ⑦接著CPU將使用者緩衝區中的資料,寫入到Socket網路套接字的緩衝區。 - ⑧資料複製到Socket緩衝區後,DMA控制器將Socket緩衝區的資料傳輸到網絡卡裝置。 - ⑨DMA控制器將資料拷貝至網絡卡裝置後,write()函式呼叫結束,再次切回用戶態。 - ⑩檔案資料抵達網絡卡後,Nginx準備向客戶端響應資料,組裝報文返回資料......

從上述流程大家可得知,一次檔案下載傳統的IO流程,需要經過四次切態,四次資料拷貝(CPU、DMA各兩次),而所謂的零拷貝,並不是指不需要經過資料拷貝,而是指減少其中的資料拷貝次數。

6.1、作業系統中的零拷貝技術

我這裡指的作業系統預設是Linux,因為MacOS、Windows系統相對閉源,因此對於這兩個作業系統中的零拷貝技術個人並不熟悉。在Linux中提供了多種零拷貝的實現: - ①MMAP共享記憶體 + write()系統函式。 - ②sendfile()核心函式。 - ③結合DMA-Scatter/Gather Copy收集拷貝功能實現的sendfile()函式。 - ④splice()核心函式。

6.1.1、MMAP共享記憶體

先來聊聊第①種吧,MMAP共享記憶體這個概念,在上篇關於《Linux-IO多路複用模型:select、poll、epoll原始碼分析》的文章結尾提到過,MMAP共享記憶體是指:在核心空間和使用者空間之間的一塊共享記憶體,這塊記憶體可被使用者態和核心態直接訪問,結構如下:
MMAP共享記憶體
先看左邊的圖,這也是眾多資料中流傳的圖,共享記憶體位於使用者態和核心態之間,這樣理解其實也並無大礙,但右邊的圖才更為準確,因為核心態和使用者態本身是兩個空間,各自之間並不存在真正的共享區域,MMAP共享記憶體是通過虛擬記憶體機制實現的,也就是通過記憶體對映技術實現的。

什麼又叫做記憶體對映技術呢?這個其實很好理解,就好比Linux中的軟連結、Windows中的快捷方式一樣,拿大家熟悉的Windows系統來說,一般在安裝一個程式後,為了方便後續使用,通常都會預設在桌面上生成快捷方式(圖示),這個快捷方式其實並不是一個真正的程式,而是指向安裝目錄下xxx.exe的連結。

Windows系統上安裝一個程式後,咱們可以通過點選桌面圖示開啟,亦可雙擊安裝目錄下的xxx.exe檔案啟動,而作業系統中的共享記憶體也是同樣的思路。

在主流作業系統中都有一種名為虛擬記憶體的機制,這是指可以分配多個虛擬記憶體地址,指向同一個實體記憶體地址,此時核心態程式和使用者態程式,可以通過不同的虛擬地址,來操縱同一塊實體記憶體,這也就是MMAP共享記憶體技術的真正實現。

MMAP的系統定義如下:
c void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);

  • addr:指定對映的虛擬記憶體地址。
  • length:對映的記憶體空間長度。
  • prot:對映記憶體的保護模式。
  • flags:指定對映的型別。
  • fd:進行對映的檔案控制代碼。
  • offset:檔案偏移量。

還是之前那幅圖(不要問我為什麼,因為懶的畫~),重點看圖中圈出來的區域:
使用MMAP
如果核心緩衝區和使用者緩衝區使用了MMAP共享記憶體,那當DMA控制器將資料拷貝至核心緩衝區時,因為這裡的核心緩衝區,本質是一個虛擬記憶體地址指向使用者緩衝區,所以DMA會直接將磁碟資料拷貝至使用者緩衝區,這就減少了一次核心緩衝區到使用者緩衝區的CPU拷貝過程,後續直接呼叫write()函式把資料寫到Socket緩衝區即可,因此這也是一種零拷貝的體現。

6.1.2、sendfile()核心函式

sendfile()Linux2.1版本中推出的一個核心函式,系統呼叫的原型如下: c ssize_t sendfile(int fd_in, int fd_out, off_t *offset, size_t count); - fd_in:待寫入資料的檔案描述符(一般為Socket網路套接字的描述符)。 - fd_out:待讀取資料的檔案描述符(一般為磁碟檔案的描述符)。 - offset:磁碟檔案的檔案偏移量。 - count:宣告在fd_outfd_in之間,要傳輸的位元組數。

對於啥是檔案描述符我就不重複贅述了,這依舊在上篇的《Linux多路複用函式原始碼分析-FD檔案描述符》中聊到過,當呼叫sendfile()函式傳輸資料時,將out_fd指定為等待寫入資料的網路套接字,將in_fd指定為待讀取資料的磁碟檔案,就可以直接在核心緩衝區中完成傳輸過程,無需經過使用者緩衝區,如下:
sendfile函式流程
依舊以前面Nginx下載檔案的過程為例,完整流程如下: - ①客戶端請求下載伺服器上的某個資源,Nginx解析請求並得知客戶端要下載的具體檔案。 - ②NginxOS發起系統IO呼叫,呼叫核心sendfile()函式,上下文切態至核心空間。 - ③sendfile()函式通過DMA控制器,將目標檔案的資料從磁碟讀取至核心緩衝區。 - ④DMA傳輸資料完成後,CPU將資料從核心緩衝區拷貝至Socket緩衝區。 - ⑤CPU拷貝資料完成後,DMA控制器將資料從Socket緩衝區拷貝至網絡卡裝置。 - ⑥資料拷貝到網絡卡後,sendfile()呼叫結束,應用上下文切回用戶態空間。 - ⑦Nginx準備向客戶端響應資料,組裝報文返回資料......

相較於原本的MMAP+write()的方式,使用sendfile()函式來處理IO請求,這顯然效能更佳,因為這裡不僅僅減少了一次CPU拷貝,而且還減少了兩次切態的過程。

6.1.3、DMA-Scatter/Gather Copy - sendfile()函式

前面聊了Linux2.1版本中的sendfile()函式,而到了Linux2.4版本中,又對sendfile()做了升級,引入了S/G-DMA技術支援,也就是在DMA拷貝階段,如果硬體支援的情況下,會加入Scatter/Gather操作,這樣就省去了僅有的一次CPU拷貝過程,如下:
S/G-sendfile
優化後的sendfile()函式,拷貝資料時只需要告知out_fd、in_fd、count即可,然後DMA控制器會直接將資料從磁碟拷貝至網絡卡,而無需經過CPU將資料拷貝至Socket緩衝區這一步。

6.1.4、splice()核心函式

前面聊到的sendfile()函式只適用於將資料從磁碟檔案拷貝到Socket套接字或網絡卡上,所以這也限制了它的使用範圍,因此在Linux2.6版本中,引入了splice()函式,其系統呼叫的原型如下: c ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags); - fd_in:等待寫入資料的檔案描述符。 - off_in:如果fd_in是一個管道檔案(如Socket),該值必須為NULL,否則為檔案的偏移量。 - fd_out:等待讀取資料的檔案描述符。 - off_out:作用同off_in引數。 - len:指定fd_in、fd_out之間傳輸資料的長度。 - flags:控制資料傳輸的模式: - SPLICE_F_MOVE:如果資料合適,按標準頁大小移動資料(2.6.21版本後被廢棄)。 - SPLICE_F_NONBLOCK:以非阻塞式模式執行splice(),實際依舊會受FD狀態影響。 - SPLICE_F_MORE:給核心一個提示,後續splice()還會繼續傳輸更多的資料。 - SPLICE_F_GIFT:沒有效果的選項。

使用splice函式時,fd_in、fd_out中必須至少有一個是管道檔案描述符,套到網路程式設計中的含義即是指:必須要有一個檔案描述符是Socket型別,如果兩個磁碟檔案進行復制,則無法使用splice函式。

splice()函式的作用和DMA-Scatter/Gather版的sendfile()函式完全相同,但與其不同的是:splice()函式不僅不需要硬體支援,而且能夠做到兩個檔案描述符之間的資料零拷貝,實現的過程是基於一端的管道檔案描述符,在兩個FD之間搭建pipeline管道,從而實現兩個FD之間的資料零拷貝。

6.2、另類的零拷貝技術

前面聊到了四種Linux系統中的零拷貝技術,而除開Linux系統中的零拷貝技術外,還有一些另類的零拷貝實現,先來聊一聊緩衝區共享技術,然後再聊聊應用程式中的零拷貝體現。

6.2.1、緩衝區共享

緩衝區共享技術類似於Linux中的MMAP共享記憶體,但緩衝區共享則是真正意義上的記憶體共享技術,核心緩衝區和使用者緩衝區共享同一塊記憶體,如下:
共享緩衝區
作業系統一般為了系統的安全性,在執行期間都會分為使用者態和核心態,無法直接訪問使用者態程式核心態空間,所以Linux中的MMAP是基於虛擬記憶體實現的,而想要實現真正意義上的記憶體共享,這也就意味著需要重寫核心結構,目前比較成熟的只有Solaris系統上的Fast Buffer技術,但大家只需瞭解即可,因為這個也很少用到。

6.2.2、程式資料的零拷貝

前面聊到的零拷貝技術,都是在減少磁碟檔案和網路套接字之間的資料拷貝次數,而程式中也會存在很多的資料拷貝過程,比如將一個大集合拆分為兩個小集合、將多個小集合合併成一個大集合等等,傳統的做法如下: ```java List a = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List b = new ArrayList<>(); List c = new ArrayList<>();

for (Integer num : a) { int index = a.indexOf(num); if (index < 5){ b.add(num); } else { c.add(num); } } `` 而這種做法顯然會牽扯到資料拷貝,但上述這個做法,會從a中將資料拷貝到b、c集合中,而所謂的零拷貝,即是無需發生拷貝動作,也能夠將a拆分成b、c`兩個集合。

關於具體如何實現,這點待會兒在Netty-ByteBuf中演示,因為Netty中的零拷貝技術,也實現了程式資料的零拷貝。

6.3、Java-NIO中的零拷貝體現

Java-NIO中,主要有三個方面用到了零拷貝技術: - MappedByteBuffer.map():底層呼叫了作業系統的mmap()核心函式。 - DirectByteBuffer.allocateDirect():可以直接建立基於本地記憶體的緩衝區。 - FileChannel.transferFrom()/transferTo():底層呼叫了sendfile()核心函式。

觀察上述給出的三處位置,其實本質也就是在呼叫作業系統核心提供的零拷貝函式,以此減少資料的拷貝次數。

6.4、再聊Netty中的零拷貝體現

Netty中的零拷貝與前面作業系統層面的零拷貝不同,它是一種使用者程序級別的零拷貝體現,主要也包含三方面:

Netty的傳送、接收資料的ByteBuf緩衝區,預設會使用堆外本地記憶體建立,採用直接記憶體進行Socket讀寫,資料傳輸時無需經過二次拷貝。如果使用傳統的堆記憶體進行Socket網路資料讀寫,JVM需要先將堆記憶體中的資料拷貝一份到直接記憶體,然後才寫入Socket緩衝區中,相較於堆外直接記憶體,訊息在傳送過程中多了一次緩衝區的記憶體拷貝。

Netty的檔案傳輸採用了transferTo()/transferFrom()方法,它可以直接將檔案緩衝區的資料傳送到目標Channel(Socket),底層就是呼叫了sendfile()核心函式,避免了檔案資料的CPU拷貝過程。

Netty提供了組合、拆解ByteBuf物件的API,咱們可以基於一個ByteBuf物件,對資料進行拆解,也可以基於多個ByteBuf物件進行資料合併,這個過程中不會出現資料拷貝,下面重點聊一聊這個!

其中前兩條就不過多贅述了,畢竟前面都嘮叨過好幾次,重點說說第三種零拷貝技術,這是一種Java級別的零拷貝技術,ByteBuf中主要有slice()、composite()這兩個方法,用於拆分、合併緩衝區,先來聊聊拆分緩衝區的方法,案例如下: ```java // 測試Netty-ByteBuf的slice零拷貝方法 private static void sliceZeroCopy(){ // 分配一個初始容量為10的緩衝區 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);

// 寫入0~9十個位元組資料
byte[] numData = {'0','1','2','3','4','5','6','7','8','9'};
buffer.writeBytes(numData);
printBuffer(buffer);

// 從下標0開始,向後擷取五個位元組,拆分成一個新ByteBuf物件
ByteBuf b1 = buffer.slice(0, 5);
printBuffer(b1);
// 從下標5開始,向後擷取五個位元組,拆分成一個新ByteBuf物件
ByteBuf b2 = buffer.slice(5, 5);
printBuffer(b2);

// 證明切割出的兩個ByteBuf物件,是共享第一個ByteBuf物件資料的
// 這裡修改擷取後的b1物件,然後檢視最初的buffer物件
b1.setByte(0,'a');
printBuffer(buffer);

}

public static void main(String[] args) { sliceZeroCopy(); } 在上述方法中,首先建立了一個`buffer`物件,往其中寫入了`0~9`這十個字元,接著將其拆分成了`b1、b2`這兩個`ByteBuf`物件,`b1、b2`都具備獨立的讀寫指標,但卻並未真正的從`buffer`中拷貝新的資料出來,而是基於`buffer`這個物件,進行了資料擷取,執行結果如下:log ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 | +--------+-------------------------------------------------+----------------+

ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=5, 容量大小=5} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 |01234 | +--------+-------------------------------------------------+----------------+ ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=5, 容量大小=5} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 35 36 37 38 39 |56789 | +--------+-------------------------------------------------+----------------+

ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 31 32 33 34 35 36 37 38 39 |a123456789 | +--------+-------------------------------------------------+----------------+ `` 觀察上述第二、三個ByteBuf緩衝區資訊,與前面說的毫無差異,明顯都具備獨立的讀寫指標,但我為什麼說:b1、b2沒有拷貝資料呢?接著看方法中的最後一步,我對b1的第一個元素做了修改,然後輸出了buffer物件,看上述結果中的第四個ByteBuf緩衝區資訊,其實會發現:**buffer物件中下標為0的資料,也被改成了a`**!由此即可證明前面的觀點。

不過這種零拷貝方式,雖然減少了資料複製次數,但也會有一定的侷限性:
①使用slice()方法拆分出的ByteBuf物件,不支援擴容,也就是切割的長度為5,最大長度也只能是5,超出長度時會丟擲下標越界異常。
②由於拆分出的ByteBuf物件,其資料依賴於原ByteBuf物件,因此當原始ByteBuf物件被釋放時,拆分出的緩衝區也會不可用,所以在使用slice()方法時,要手動呼叫retain()/release()來增加引用計數(這個後面細聊)。

除開上述的slice()方法外,還有其他一個叫做duplicate()的零拷貝方法,它的作用是完全克隆原有ByteBuf物件,但讀寫指標都是獨立的,並且支援自動擴容,大家感興趣可以自行實驗。

接著聊一聊合併ByteBuf緩衝區的零拷貝方法,該方法的使用方式與前面的方法並不同,如下: ```java // 測試Netty-ByteBuf的composite零拷貝方法 private static void compositeZeroCopy(){ // 建立兩個小的ByteBuf緩衝區,並往兩個緩衝區中插入資料 ByteBuf b1 = ByteBufAllocator.DEFAULT.buffer(5); ByteBuf b2 = ByteBufAllocator.DEFAULT.buffer(5); byte[] data1 = {'a','b','c','d','e'}; byte[] data2 = {'n','m','x','y','z'}; b1.writeBytes(data1); b2.writeBytes(data2);

// 建立一個合併緩衝區的CompositeByteBuf物件
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
// 將前面兩個小的緩衝區,合併成一個大的緩衝區
buffer.addComponents(true,b1,b2);
printBuffer(buffer);

}

public static void main(String[] args) { compositeZeroCopy(); }

/ * 執行結果: ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 6e 6d 78 79 7a |abcdenmxyz | +--------+-------------------------------------------------+----------------+ * / `` 案例中,想要將多個緩衝區合併成一個大的緩衝區,需要先建立一個CompositeByteBuf物件,接著呼叫它的addComponent()/addComponents()方法,將小的緩衝區新增進去即可。但在合併多個緩衝區時,addComponents()方法中的第一個引數必須為true`,否則不會自動增長讀寫指標。

其實說到底,Netty-ByteBuf緩衝區的零拷貝方法,實際上也可以被稱之為“一種特殊的淺拷貝”,與之對應的是“深拷貝”,而ByteBuf中的“深拷貝”,則是一系列以Copy開頭的方法,通過這類方法複製緩衝區,會完全分配新的記憶體地址、讀寫指標。

最後,在Netty內部還提供了一個名為Unpooled的工具類,這主要是針對於非池化緩衝區的工具類,內部也提供了一系列wrappend開頭的方法,可以用來組合、包裝多個ByteBuf物件或位元組陣列,呼叫對應方法時,內部也不會發生拷貝動作,這也是一類零拷貝的方法。

七、Netty入門篇小結

   經過上述一系列的叨叨絮絮後,對於Netty框架的基本概念,以及Netty框架中大多數核心元件做了介紹,但對於一些粘包、半包、解碼器、長連線、心跳機制等內容未闡述,原本打算將這些內容一篇寫完,但本章的字數實在太多,嚴重超出單章限制:
字數
因此對於後續一些進階的知識,會再開設一篇講述,預計Netty的文章會有4~5篇左右,大體順序為《Netty入門篇》、《Netty進階篇》、《Netty實戰篇》、《Netty應用篇》、《Netty原始碼篇》,但具體的篇幅會在後續適當調整。

本篇的內容就到這裡啦,如若對你有幫助,請記得點個小贊支援一下~

「其他文章」