(九)Java網路程式設計無冕之王-這回把大名鼎鼎的Netty框架一網打盡!
theme: channing-cyan
本文正在參加「金石計劃 . 瓜分6萬現金大獎」
引言
現如今的開發環境中,分散式/微服務架構大行其道,而分散式/微服務的根基在於網路程式設計,而Netty
恰恰是Java網路程式設計領域的無冕之王。Netty
這個框架相信大家定然聽說過,其在Java網路程式設計中的地位,好比JavaEE
中的Spring
。
當然,這樣去聊它大家可能無法實際感受出它的重要性,那先來看看基於Netty
構建的應用:
觀察上述列出的開源元件,一眼望去幾乎全是各個領域中大名鼎鼎的框架,而這些元件都是基於Netty
構建的,涵蓋中介軟體、大資料、離線計算、分散式、RPC、No-SQL
等各個方向....,很明顯的可感知出Netty
的地位之高,因此如果要打造一款Java
高效能的網路通訊程式、想要真正熟知分散式架構的底層原理,Netty
成為了每個Java
開發進階必須要掌握的核心技術之一。
Netty
的重要性不言而知,但網上相關的大部分視訊、文章、書籍等資料卻五花八門,很難真正幫助大家構建出一套完整的體系,本文的目的就是帶諸位走入基於Netty
的網路世界,在真正意義上為諸君構建一套Netty
的知識儲備。
本文會先從概念開始,到基礎入門、核心元件依次展開,後續結合多個實戰案例全面詳解
Netty
的應用。但在學習之前,大家最好有Java-IO
體系、多路複用模型等相關知識的儲備,如若未曾具備請先移步:《Java-IO機制全解》、《多路複用模型剖析》兩文,前者是必須,後者則暫且無需掌握,因為在後續的《Netty
原始碼篇》中才會涉及。
一、初識Netty的基礎概念與快速入門
注意看:上圖中右邊這位黑眼圈堪比熊貓眼的哥們,從他頭頂的髮量就能明顯感受出其技術強度,他!!!旁邊的這位才是Netty
框架的原作者Trustin Lee
(韓國人),同時他也是另一個著名網路框架Mina
的核心主程之一,現任職於Apple
蘋果集團......,不過多介紹作者了,總之是一位網路方面的大牛。
重點來聊聊我們的主角:Netty
框架,其實這個框架是基於Java原生NIO
技術的進一步封裝,在其中對Java-NIO
技術做了進一步增強,作者充分結合了Reactor
執行緒模型,將Netty
變為了一個基於非同步事件驅動的網路框架,Netty
從誕生至今共釋出了五個大版本,但目前最常用的反而並非是最新的5.x
系列,而是4.x
系列的版本,原因在於Netty
本身就是基於Java-NIO
封裝的,而JDK
本身又很穩定,再加上5.x
版本並未有太大的效能差異,因此4.x
系列才是主流。
再回過頭來思考一個問題:為什麼Netty
要二次封裝原生NIO
呢?相信看過NIO
原始碼的小夥伴都清楚,原生的NIO
設計的特別繁瑣,而且還存在一系列安全隱患,因此Netty
則是抱著簡化NIO
、解決隱患、提升效能等目的而研發的。
不過有意思的一點在於:
Netty
雖然是基於Java-NIO
封裝的框架,但實際使用起來卻跟之前聊到的Java-AIO(NIO2)
技術有些相似。
1.1、Netty的入門例項
上面扯了不少Netty
的概念,現在就直接先實操一番快速入門,畢竟程式設計講究施展出真理,首先第一步則是新增對應的依賴,如下:
xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
然後先建立NettyServer
服務端,程式碼如下:
java
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 建立兩個EventLoopGroup,boss:處理連線事件,worker處理I/O事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
// 建立一個ServerBootstrap服務端(同之前的ServerSocket類似)
ServerBootstrap server = new ServerBootstrap();
try {
// 將前面建立的兩個EventLoopGroup繫結在server上
server.group(boss,worker)
// 指定服務端的通道為Nio型別
.channel(NioServerSocketChannel.class)
// 為到來的客戶端Socket新增處理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 這個只會執行一次(主要是用於新增更多的處理器)
@Override
protected void initChannel(NioSocketChannel ch) {
// 新增一個字元解碼處理器:對客戶端的資料解碼
ch.pipeline().addLast(
new StringDecoder(CharsetUtil.UTF_8));
// 新增一個入站處理器,對收到的資料進行處理
ch.pipeline().addLast(
new SimpleChannelInboundHandler<String>() {
// 讀取事件的回撥方法
@Override
protected void channelRead0(ChannelHandlerContext
ctx,String msg) {
System.out.println("收到客戶端資訊:" + msg);
}
});
}
});
// 為當前服務端繫結IP與埠地址(sync是同步阻塞至連線成功為止)
ChannelFuture cf = server.bind("127.0.0.1",8888).sync();
// 關閉服務端的方法(之後不會在這裡關閉)
cf.channel().closeFuture().sync();
}finally {
// 優雅停止之前建立的兩個Group
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
緊接著再構建一個NettyClient
客戶端,程式碼如下:
java
public class NettyClient {
public static void main(String[] args) {
// 由於無需處理連線事件,所以只需要建立一個EventLoopGroup
EventLoopGroup worker = new NioEventLoopGroup();
// 建立一個客戶端(同之前的Socket、SocketChannel)
Bootstrap client = new Bootstrap();
try {
client.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc)
throws Exception {
// 新增一個編碼處理器,對資料編碼為UTF-8格式
sc.pipeline().addLast(new
StringEncoder(CharsetUtil.UTF_8));
}
});
// 與指定的地址建立連線
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 建立連線成功後,向服務端傳送資料
System.out.println("正在向服務端傳送資訊......");
cf.channel().writeAndFlush("我是<竹子愛熊貓>!");
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
先看執行結果吧,控制檯輸出如下:
```java
NettyServer控制檯輸出:
收到客戶端資訊:我是<竹子愛熊貓>!
NettyClient控制檯輸出:
正在向服務端傳送資訊......
``
從結果中很容易看出這個案例中做了什麼事情,其實無非就是利用
Netty實現了簡單的對端通訊,實現的功能很簡單,但對於未學習過
Netty`技術的小夥伴,在程式碼方面估計有些許懵,那麼接下來簡單的解釋一下程式碼。
但在此之前先宣告一點:
Netty
是支援鏈式程式設計的一個框架,也就是如上述中的程式碼呼叫,所有的方法都可以一直用.
連下去,所以在Netty
的應用中會見到大量的這類寫法。
上述案例的程式碼,說複雜呢也其實並不難,相信認真看完了之前《Java-IO篇》的小夥伴多少都能看懂程式碼,不理解的地方估計就在於其中出現的幾個新的概念:EventLoopGroup、ServerBootstrap、childHandler
,我們先對於這些概念做簡單解釋,後續會重點剖析:
- EventLoopGroup
:可以理解成之前的Selector
選擇器,但結合了執行緒池(後續詳細分析)。
- ServerBootstrap/Bootstrap
:類似於之前的ServerSocketChannel/SocketChannel
。
- childHandler
:這個是新概念,可以理解成過濾器,在之前的Servlet
程式設計中,新請求到來都會經過一個個的過濾器,而這個處理器也類似於之前的過濾器,新連線到來時,也會經過新增好的一系列處理器。
OK~,對於上述幾個新概念有了簡單認知後,接著把上面案例的完整流程分析一下:
- ①先建立兩個EventLoopGroup
事件組,然後建立一個ServerBootstrap
服務端。
- ②將建立的兩個事件組boss、worker
繫結在服務端上,並指定服務端通道為NIO
型別。
- ③在server
上新增處理器,對新到來的Socket
連線進行處理,在這裡主要分為兩類:
- ChannelInitializer
:連線到來時執行,主要是用於新增更多的處理器(只觸發一次)。
- addLast()
:通過該方式新增的處理器不會立馬執行,而是根據處理器型別擇機執行。
- ④為建立好的服務端繫結IP
及埠號,呼叫sync()
意思是阻塞至繫結成功為止。
- ⑤再建立一個EventLoopGroup
事件組,並建立一個Bootstrap
客戶端。
- ⑥將事件組繫結在客戶端上,由於無需處理連線事件,所以只需要一個事件組。
- ⑦指定Channel
通道型別為NIO
、新增處理器.....(同服務端類似)
- ⑧與前面服務端繫結的地址建立連線,由於預設是非同步的,也要呼叫sync()
阻塞。
- ⑨建立連線後,客戶端將資料寫入到通道準備傳送,首先會先經過新增好的編碼處理器,將資料的格式設為UTF-8
。
- ⑩伺服器收到資料後,會先經過解碼處理器,然後再去到入站處理,執行對應的Read()
方法邏輯。
- ⑪客戶端完成資料傳送後,先關閉通道,再優雅關閉建立好的事件組。
- ⑫同理,服務端工作完成後,先關閉通道再停止事件組。
結合上述的流程,再去看一遍給出的案例原始碼,相信諸位應該可以徹底理解。不過需要注意的一點是:Netty
的大部分操作都是非同步的,比如地址繫結、客戶端連線等。好比呼叫connect()
方法與服務端建立連線時,主執行緒會把這個工作交給事件組中的執行緒去完成,所以此刻如果主執行緒直接去向通道中寫入資料,有機率會出現報錯,因為實際生產環境中,可能由於網路延遲導致連線建立的時間有些長,此時通道並未建立成功,因此嘗試傳送資料時就會有問題,這點與之前的Java-AIO
通訊案例中,客戶端建立連線要呼叫.get()
方法是同理。
到這裡,你對
Netty
框架已經入門了,接著咱們重點聊聊Netty
中的一些核心元件。
二、Netty框架核心元件:啟動器與事件組
對於Netty
有了基本的認知後,接下來慢慢的熟悉這個框架吧,先依次來看看其中的一些核心元件,瞭解這些元件及作用後,才能真正意義上的“玩轉Netty
”。
2.1、啟動器-ServerBootstrap、Bootstrap
ServerBootstrap、Bootstrap
這兩個元件應該無需過多解釋,上個表格對比大家就理解了:
對比項 | 服務端 | 客戶端
:-:|:-:|:-:
BIO
| ServerSocket
| Socket
NIO
| ServerSocketChannel
| SocketChannel
AIO
| AsynchronousServerSocketChannel
| AsynchronousSocketChannel
Netty
| ServerBootstrap
| Bootstrap
從上表中能明顯感覺出它倆在Netty
中的作用,無非就是服務端與客戶端換了個叫法而已。
2.2、事件組-EventLoopGroup、EventLoop
這兩個東西比較重要,但同時也比較抽象,EventLoop
這東西翻譯過來就是事件迴圈的意思,你可以把它理解成NIO
中的Selector
選擇器,實際它本質上就是這玩意兒,因為內部會維護一個Selector
,然後由一條執行緒會迴圈處理Channel
通道上發生的所有事件,所以每個EventLoop
物件都可以看成一個單執行緒執行器。
EventLoopGroup
可以將其理解成AIO
中的AsynchronousChannelGroup
可能會更合適,在AIO
的ACG
(前面那玩意兒的縮寫)中,我們需要手動指定一個執行緒池,然後AIO
的所有客戶端工作都會使用執行緒池中的執行緒進行管理,而Netty
中的EventLoopGroup
就類似於AIO-ACG
這玩意兒,只不過不需要我們管理執行緒池了,而是Netty
內部維護。
對
EventLoopGroup、EventLoop
有了基本認知後,你再點進它們的原始碼實現,其實能夠觀測到:其實它們繼承了兩個類,一個是Netty
自己實現的有序執行緒池OrderedEventExecutor
類,另一個則JDK
提供的原生定時排程執行緒池ScheduledExecutorService
類(原始碼篇會詳細分析,這裡先簡單瞭解)。
看過之前關於《JDK執行緒池》文章的小夥伴應該清楚,既然EventLoop/EventLoopGroup
繼承自JDK
原生的定時執行緒池,那也就代表著:它擁有JDK
執行緒池中所有提供的方法,同時也應該會支援執行非同步任務、定時任務的功能。那麼實際情況是這樣嗎?答案是Yes
,如下:
```java
public static void main(String[] args) {
EventLoopGroup threadPool = new NioEventLoopGroup();
// 遞交Runnable型別的普通非同步任務
threadPool.execute(()->{
System.out.println("execute()方法提交的任務....");
});
// 遞交Callable型別的有返回非同步任務
threadPool.submit(() -> {
System.out.println("submit()方法提交的任務....");
return "我是執行結果噢!";
});
// 遞交Callable型別的延時排程任務
threadPool.schedule(()->{
System.out.println("schedule()方法提交的任務,三秒後執行....");
return "排程執行後我會返回噢!";
},3,TimeUnit.SECONDS);
// 遞交Runnable型別的延遲間隔排程任務
threadPool.scheduleAtFixedRate(()->{
System.out.println("scheduleAtFixedRate()方法提交的任務....");
},3,1,TimeUnit.SECONDS);
}
/ ~~~~~~~~~~~~~~~~~~我是性感的分割線~~~~~~~~~~~~~~~~~~ / 執行結果如下: 立即執行: execute()方法提交的任務.... submit()方法提交的任務....
延時三秒後執行:
schedule()方法提交的任務....
scheduleAtFixedRate()方法提交的任務....
之後沒間隔一秒執行:
scheduleAtFixedRate()方法提交的任務....
scheduleAtFixedRate()方法提交的任務....
``
上述我們建立了一個
EventLoopGroup事件迴圈組,然後通過之前
JDK執行緒池提供的一系列的提交任務的方法,向其遞交了幾個非同步任務,然後執行該程式,答案顯而易見,
EventLoopGroup確實可以當做
JDK`原生執行緒池來使用。
當然,這些並非分析的重點,重點來看看
EventLoopGroup
如何在Netty
中合理運用。
在瞭解它們的Netty
用法之前,先來看看除原生執行緒池之外所提供的方法:
- EventLoop.inEventLoop(Thread)
:判斷一個執行緒是否屬於當前EventLoop
。
- EventLoop.parent()
:判斷當前EventLoop
屬於哪一個事件迴圈組。
- EventLoopGroup.next()
:獲取當前事件組中的下一個EventLoop
(執行緒)。
這些方法我們簡單瞭解即可,因為大多數情況下在Netty
原始碼中才會用到,暫且無需關注太多,我們先把目光移到前面給出的Netty
使用案例中,還記得最開始定義的兩個事件組嗎?
java
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
為什麼在服務端要定義兩個組呢?一個難道不行嗎?其實也是可以的,但定義兩個組的好處在於:可以讓Group
中的每個EventLoop
分工更加明確,不同的Group
分別處理不同型別的事件,各司其職。
在前面案例中,為服務端綁定了兩個事件迴圈組,也就代表著會根據
ServerSocketChannel
上觸發的不同事件,將對應的工作分發到這兩個Group
中處理,其中boss
主要負責客戶端的連線事件,而worker
大多數情況下負責處理客戶端的IO
讀寫事件。
當客戶端的SocketChannel
連線到來時,首先會將這個註冊事件的工作交給boss
處理,boss
會呼叫worker.register()
方法,將這條客戶端連線註冊到worker
工作組中的一個EventLoop
上。前面提到過:EventLoop
內部會維護一個Selector
選擇器,因此實際上也就是將客戶端通道註冊到其內部中的選擇器上。
注意:將一個
Socket
連線註冊到一個EventLoop
上之後,這個客戶端連線則會和這個EventLoop
繫結,以後這條通道上發生的所有事件,都會交由這個EventLoop
處理。
到這裡大家應該也理解了為何要拆出兩個EventLoopGroup
,主要目的就在於分工更為明細。當然,由於EventLoopGroup
本質上可以理解成一個執行緒池,其中存在的執行緒資源自然是有限的,那此時如果到來的客戶端連線大於執行緒數量怎麼辦呢?這是不影響的,因為Netty
本身是基於Java-NIO
封裝的,而NIO
底層又是基於多路複用模型實現的,天生就能實現一條執行緒管理多個連線的功能,所以就算連線數大於執行緒數,也完全可以Hold
住。
OK~,除開可以根據事件型別劃分Group
之外,也可以根據為每個處理器劃分不同的事件組,如下:
// 建立EventLoopGroup和JDK原生的執行緒池一樣,可以指定執行緒數量
EventLoopGroup extra = new NioEventLoopGroup(2);
sc.pipeline().addLast(extra, new xxxChannelHandler());
這樣做的好處在於什麼呢?因為前面提到過:一個連線註冊到EventLoop
,之後所有的工作都會由這個EventLoop
處理,而一個EventLoop
又有可能同時管理多個連線,因此假設一條連線上的某個處理器,執行過程非常耗時,此時必然就會影響到這個EventLoop
管理的其他連線,因此對於一些較為耗時的Handler
,可以專門指派給一個額外的extra
事件組處理,這樣就不會影響到所管理的其他連線。
當然,這個功能其實也略微有些雞肋,一般多個
Handler
之間都會存在耦合關係,下一個Handler
需要依賴上一個Handler
的處理結果執行,因此也很難拆出來單獨放到另一個事件組中執行。
看到這裡,相信你對於EventLoopGroup、EventLoop
這兩個元件應該有了基本認知,簡單來說可以EventLoop
理解成有一條執行緒專門維護的Selector
選擇器,而EventLoopGroup
則可以理解成一個有序的定時排程執行緒池,負責管理所有的EventLoop
。舉個生活案例來加深印象:
現在有個工廠,其中分為了不同的片區,一個片區中有很多條流水線,由每個工人負責一部分流水線的作業。開始工作後,流水線的傳輸帶會源源不斷的將貨物傳遞過來,這些貨物最終會等待工人進行加工。
在上述這個例子中,工廠就是ServerBootstrap
服務端,而一個個片區就是不同的EventLoopGroup
事件組,一條流水線則可以理解成一個SocketChannel
客戶端通道,而負責多條流水線的工人就是EventLoop
單執行緒執行器,加工的動作其實就是處理通道上發生的事件。
大家可以將這個例子套進去想象一下,相信這會讓你印象更加深刻。
三、Netty中的增強版通道(ChannelFuture)
對於通道這個概念,相信諸位都不陌生,這也是Java-NIO、AIO
中的核心元件之一,而在Netty
中也對其做了增強和拓展。首先來看看通道型別,Netty
根據不同的多路複用函式,分別拓展出了不同的通道型別:
- NioServerSocketChannel
:通用的NIO
通道模型,也是Netty
的預設通道。
- EpollServerSocketChannel
:對應Linux
系統下的epoll
多路複用函式。
- KQueueServerSocketChannel
:對應Mac
系統下的kqueue
多路複用函式。
- OioServerSocketChannel
:對應原本的BIO
模型,用的較少,一般用原生的。
當然,對於客戶端的通道也可以選擇TCP、UDP...
型別的,就不再介紹了,重點來看看Netty
中是如何對於通道類做的增強。
其實在
Netty
中,主要結合了JDK
提供的Future
介面,對通道類做了進一步增強。
增強的方面主要是支援了非同步,但並非Future
那種偽非同步,而是跟之前聊到過的《CompletableFuture》有些類似,支援非同步回撥處理結果。還記得之前客戶端如何連線服務端的嘛?如下:
java
Bootstrap client = new Bootstrap();
client.connect("127.0.0.1", 8888);
但這個connect()
連線方法,本質上是一個非同步方法,返回的並不是Channel
物件,而是一個ChannelFuture
物件,如下:
java
public ChannelFuture connect(String inetHost, int inetPort);
也包括ServerBootstrap
繫結地址的bind()
也相同,返回的並非ServerChannel
,也是一個ChannelFuture
物件。這是因為在Netty
的機制中,繫結/連線工作都是非同步的,因此如果要用Netty
建立一個客戶端連線,為了確保連線建立成功後再操作,通常情況下都會再呼叫.sync()
方法同步阻塞,直到連線建立成功後再使用通道寫入資料,如下:
java
// 與服務端建立連線
ChannelFuture cf = client.connect("127.0.0.1", 8888);
// 同步阻塞至連線建立成功為止
cf.sync();
// 連線建立成功後再獲取對應的Socket通道寫入資料
cf.channel().writeAndFlush("...");
上述這種方式能夠確保連線建立成功後再寫資料,但既然Netty
中的繫結、連線等這些操作都是非同步的,有沒有辦法讓整個過程都是非同步的呢?
答案是當然有,如何操作呢?
我們可以向ChannelFuture
中添加回調處理器,然後非同步處理,如下:
java
ChannelFuture cf = client.connect("127.0.0.1", 8888);
cf.addListener((ChannelFutureListener) cfl -> {
// 這裡可以用cf,也可以用cfl,返回的都是同一個channel通道
cf.channel().writeAndFlush("...");
});
當通過connect()
方法與服務端建立連線時,Netty
會將這個任務交給當前Bootstrap
繫結的EventLoopGroup
中的執行緒執行,因此建立連線的過程是非同步的,所以會返還一個ChannelFuture
物件給我們,而此時可以通過該物件的addListener()
方法編寫成功回撥邏輯,當連線建立成功後,會由對應的執行緒來執行其中的程式碼,因此可以實現全過程的非同步操作。
這樣做,似乎的確實現了整個過程的非同步,甚至關閉通道的過程也可以換成非同步的,如下:
java
// 非同步關閉Channel通道
ChannelFuture closeCF = cf.channel().closeFuture();
// 通道關閉後,新增對應的回撥函式
closeCF.addListener((ChannelFutureListener) cfl -> {
// 關閉前面建立的EventLoopGroup事件組,也可以在這裡做其他善後工作
worker.shutdownGracefully();
});
那Netty
中為何要將大量的操作都抽象成非同步執行呢?這不是反而讓邏輯更加複雜化嗎?讓發起連線、建立連線、傳送資料、接收資料、關閉連線等一系列操作,全部交由呼叫的那條執行緒執行不可以嗎?答案是可以的,但非同步能在一定程度上提升效能,尤其是併發越高,帶來的優勢更為明顯。
對於這段話大家估計會有疑惑,為什麼能提升效能呢?下面舉個例子理解。
3.1、為何Netty所有API都是非同步式操作?
相信大家一定在生活中見過這樣的場景:醫院看病/體檢、銀行開戶、政府辦事、法院起訴、保險公司買保險等等,各類辦理業務的地方,都會拿號辦理,然後經過一個個的視窗辦理不同的業務,那為什麼要這麼做呢?就拿常規的醫院看病來說,為什麼會分為如下步驟呢? - 導診處:先說明大致情況,導診人員根據你的病理,指導你掛什麼科的號。 - 掛號處:去到對應的病理科排隊掛號(暫且不考慮繳費,假設網上繳掛號費)。 - 診斷室:跟著掛的號找到對應的科室,醫生根據你的情況進行診斷。 - 化驗處:從你身上提取一些標本,然後去到化驗處等待化驗結果。 - 繳費處:醫生根據化驗結果分析病情,然後給出具體的治療方案,讓你來繳費。 - 拿藥/治療處:交完相關的費用後,根據治療方案進行拿藥/治療等處理措施。
有上述這些步驟實際上並不奇怪,問題是在於每個步驟都分為了專門的科室處理,因此以上述流程為例,至少需要有六個醫生提供服務,那麼為什麼不專門由這六位醫生專門提供全系列服務呢?如下:
我們分析一下,假設此時每個步驟平均要五分鐘,一個病人的完整流程下來就需要半小時,而下一批預約看病的其他病人,則需要等待半小時後才能被受理,而把這些步驟拆開之後再來看看:
此時有六位醫生各司其職,每位醫生負責單一的工作,這樣做的好處在於:每個掛號的病人只需要等待五分鐘,就能夠被受理,通過這種方式就將之前批次式看病,轉變為了流水線式看病。
而
Netty
框架中的非同步處理方式,也具備異曲同工之妙,將API
的操作從批處理轉變成了流式處理。套入實際的業務中,也就是主執行緒(呼叫API
的執行緒)無需等待操作完成後再執行,而是呼叫某個API
後可繼續往下執行,相較而言,在併發情況下能很大程度上提升程式效能。
但上述這個例子估計有些小夥伴照舊會犯迷糊,那接著再舉個更加形象化的例子,好比快遞小哥送貨,如果以同步模式工作,將一個貨物送達指定地點後,需要等待客戶簽收才能去送下個貨物,這無疑會讓下個客戶等很久很久,並且也極其影響快遞小哥的工作效率。
而採用非同步模式工作,快遞小哥將一個貨物送達指定地點後,給對應客戶發個資訊後,就立馬趕往下個客戶的貨物地點,前面的客戶拿到貨物後,再給快遞小哥回個資訊即可。在這種非同步工作模式中,小哥無需在原地“阻塞”等待客戶簽收,只需要將手中一個個貨物送達指定地點就行,這在很大程度上提升了整體工作效率,每個客戶之間拿到貨物的時間也大大縮短了,
Netty
框架中的非同步思想也是同理。
3.2、ChannelFuture、Netty-Future、JDK-Future的關係
當大家試圖翻閱ChannelFuture
的實現時,會發現該類繼承了Future
介面:
java
public interface ChannelFuture extends Future<Void> {
// 省略內部方法.....
}
但要注意,這個Future
介面並非是JDK
原生的Future
介面,而是Netty
框架中的Future
介面:
```java
package io.netty.util.concurrent;
public interface Future``
此時會發現,
Netty-Future又繼承自
JDK-Future介面,這也就意味著
Netty-Future拓展了
JDK-Future介面的功能,在之前[《併發程式設計-非同步任務》](https://juejin.cn/post/7038471861860040741#heading-2)中,咱們曾詳細聊到過
JDK原生的
Future類,雖說基於
Future+Callable可以實現非同步回撥,但這種方式實現的非同步回撥則是一種“偽非同步”,為啥呢?先來看看
JDK-Future提供的核心方法:
方法名 | 方法作用
:-:|:-:
isDone()| 判斷當前非同步任務是否結束
cancel()| 取消當前非同步任務
isCancel()| 判斷當前非同步任務是否被取消
get()` | 阻塞等待當前非同步任務執行完成
在JDK-Future
介面中,想要獲取一個非同步任務的執行結果,此時只能呼叫get()
方法,但該方法是一個阻塞方法,呼叫後會阻塞主執行緒直到任務結束為止,這顯然依舊會導致非同步變為同步執行,所以這種方式是一種“偽非同步”,此時再來看看Netty-Future
中增強的核心方法:
方法名 | 方法作用
:-:|:-:
getNow()
| 非阻塞式獲取任務結果,任務未執行完成時返回null
sync()
| 阻塞等待至非同步任務執行結束,執行出錯時會丟擲異常
await()
| 阻塞等待至非同步任務執行結束,執行出錯時不會丟擲異常
isSuccess()
| 判斷任務是否執行成功,如果為true
代表執行成功
cause()
| 獲取任務執行出錯時的報錯資訊,如果執行未出錯,則返回null
addLinstener()
| 添加回調方法,非同步任務執行完成後會主動執行回撥方法中的程式碼
在原生JDK-Future
的基礎上,Netty-Future
新增了一個異常檢測機制,當非同步任務執行出錯時,可以通過cause()
方法處理異常,同時也基於回撥模式,可通過addLinstener()
方法新增非同步執行後的回撥邏輯,從而讓主執行緒建立任務後永遠不會阻塞,做到了真正意義上的非同步執行。
當然,除開基本的Future
介面外,Netty
框架中還有一個Promise
介面,該介面繼承自Netty-Future
介面:
java
public interface Promise<V> extends Future<V> {
// 省略內部方法.....
}
這個介面中主要多拓展了兩個方法:
方法名 | 方法作用
:-:|:-:
setSuccess()
| 設定任務的執行狀態為成功
setFailure()
| 設定任務的執行狀態為失敗
這兩個方法可以用來設定非同步任務的執行狀態,因此Promise
介面除開具備Netty-Future
的功能外,還能作為多個執行緒之間傳遞非同步任務結果的容器。
3.3、不同Future的效果測試
```java public class FutureDemo {
// 測試JDK-Future的方法
public static void jdkFuture() throws Exception {
System.out.println("--------JDK-Future測試--------");
// 建立一個JDK執行緒池用於執行非同步任務
ExecutorService threadPool = Executors.newSingleThreadExecutor();
System.out.println("主執行緒:步驟①");
// 向執行緒池提交一個帶有返回值的Callable任務
java.util.concurrent.Future<String> task =
threadPool.submit(() ->
"我是JDK-Future任務.....");
// 輸出獲取到的任務執行結果(阻塞式獲取)
System.out.println(task.get());
System.out.println("主執行緒:步驟②");
// 關閉執行緒池
threadPool.shutdownNow();
}
// 測試Netty-Future的方法
public static void nettyFuture(){
System.out.println("--------Netty-Future測試--------");
// 建立一個Netty中的事件迴圈組(本質是執行緒池)
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
System.out.println("主執行緒:步驟①");
// 向執行緒池中提交一個帶有返回值的Callable任務
io.netty.util.concurrent.Future<String> task =
eventLoop.submit(() ->
"我是Netty-Future任務.....");
// 新增一個非同步任務執行完成之後的回撥方法
task.addListener(listenerTask ->
System.out.println(listenerTask.getNow()));
System.out.println("主執行緒:步驟②");
// 關閉事件組(執行緒池)
group.shutdownGracefully();
}
// 測試Netty-Promise的方法
public static void nettyPromise() throws Exception {
System.out.println("--------Netty-Promise測試--------");
// 建立一個Netty中的事件迴圈組(本質是執行緒池)
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 主動建立一個傳遞非同步任務結果的容器
DefaultPromise<String> promise = new DefaultPromise<>(eventLoop);
// 建立一條執行緒執行,往結果中新增資料
new Thread(() -> {
try {
// 主動丟擲一個異常
int i = 100 / 0;
// 如果非同步任務執行成功,向容器中新增資料
promise.setSuccess("我是Netty-Promise容器:執行成功!");
}catch (Throwable throwable){
// 如果任務執行失敗,將異常資訊放入容器中
promise.setFailure(throwable);
}
}).start();
// 輸出容器中的任務結果
System.out.println(promise.get());
}
public static void main(String[] args) throws Exception {
jdkFuture();
nettyFuture();
nettyPromise();
}
}
``
在上述的測試類中,存在三個測試方法:
-
jdkFuture():測試
JDK-Future的方法。
-
nettyFuture():測試
Netty-Future的方法。
-
nettyPromise():測試
Netty-Promise`的方法。
接著啟動對應的類,來看看控制檯的輸出結果: ```log --------JDK-Future測試-------- 主執行緒:步驟① 我是JDK-Future任務..... 主執行緒:步驟②
--------Netty-Future測試-------- 主執行緒:步驟① 主執行緒:步驟② 我是Netty-Future任務.....
--------Netty-Promise測試--------
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zero
........
首先來對比一下`JDK-Future、Netty-Future`兩者之間的差別,在使用`JDK-Future`時,想要獲取非同步任務的執行結果,呼叫`get()`方法後會阻塞主執行緒,也就是主執行緒的步驟②,需要等到非同步任務執行完成後才會繼續執行,因此輸出結果為:
log
--------JDK-Future測試--------
主執行緒:步驟①
我是JDK-Future任務.....
主執行緒:步驟②
但此時再來看看`Netty-Future`,因為在內部咱們提交非同步任務後,就立即通過`addListener()`添加了一個回撥,這個回撥方法會在非同步任務執行結束後呼叫,咱們將獲取任務結果的工作,放入到了回撥方法中完成,此時會觀測到,獲取`Netty-Future`的執行結果並不會阻塞主執行緒:
log
--------Netty-Future測試--------
主執行緒:步驟①
主執行緒:步驟②
我是Netty-Future任務.....
``
而對於
Netty-Promise的使用就無需過多講解,也就是可以根據非同步任務的執行狀態,向
Promise物件中設定不同的結果,在前面的多執行緒中,由於主動製造了異常,所以最終會進入
catch程式碼塊,執行
setFailure()`向容器中填充異常資訊。
四、核心元件 - 通道處理器(Handler)
Handler
可謂是整個Netty
框架中最為重要的一部分,它的職責主要是用於處理Channel
通道上的各種事件,所有的處理器都可被大體分為兩類:
- 入站處理器:一般都是ChannelInboundHandlerAdapter
以及它的子類實現。
- 出站處理器:一般都是ChannelOutboundHandlerAdapter
以及它的子類實現。
在系統中網路操作都通常會分為入站和出站兩種,所謂的入站即是指接收請求,反之,所謂的出站則是指返回響應,而Netty
中的入站處理器,會在客戶端訊息到來時被觸發,而出站處理器則會在服務端返回資料時被觸發,接著來展開聊一聊。
4.1、入站處理器與出站處理器
前面講明白了入站、出站的基本概念,接著來簡單認識一下Netty
中的入站處理器,這裡先上個案例:
```java
// 服務端
public class HandlerServer {
public static void main(String[] args) {
// 0.準備工作:建立一個事件迴圈組、一個ServerBootstrap服務端
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server
// 1.繫結前面建立的事件迴圈組
.group(group)
// 2.宣告通道型別為服務端NIO通道
.channel(NioServerSocketChannel.class)
// 3.通過ChannelInitializer完成通道的初始化工作
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nsc) throws Exception {
// 4.獲取通道的ChannelPipeline處理器連結串列
ChannelPipeline pipeline = nsc.pipeline();
// 5.基於pipeline連結串列向通道上新增入站處理器
pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("俺是第一個入站處理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("我是第二個入站處理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-③",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("朕是第三個入站處理器...");
}
});
}
})
// 為當前啟動的服務端繫結IP和埠地址
.bind("127.0.0.1",8888);
}
}
// 客戶端
public class HandlerClient {
public static void main(String[] args) {
// 0.準備工作:建立一個事件迴圈組、一個Bootstrap啟動器
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client
// 1.繫結事件迴圈組
.group(group)
// 2.宣告通道型別為NIO客戶端通道
.channel(NioSocketChannel.class)
// 3.初始化通道,新增一個UTF-8的編碼器
.handler(new ChannelInitializer
// 4.與指定的地址建立連線
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 5.建立連線成功後,向服務端傳送資料
System.out.println("正在向服務端傳送資訊......");
cf.channel().writeAndFlush("我是<竹子愛熊貓>!");
} catch (Exception e){
e.printStackTrace();
} finally {
// 6.最後關閉事件迴圈組
group.shutdownGracefully();
}
}
}
在上述案例的服務端程式碼中,啟動服務端時為其添加了`In-①、In-②、In-③`這三個入站處理器,接著編寫了一個客戶端,其內部主要是向服務端傳送了一條資料,執行結果如下:
log
俺是In-①入站處理器...
我是In-②入站處理器...
朕是In-③入站處理器...
``
此時大家觀察結果會發現,入站處理器的執行順序,會按照新增的順序執行,兩個過濾器之間,依靠
super.channelRead(ctx, msg);這行程式碼來實現向下呼叫的邏輯,這和之前
Servlet`中的過濾器相差無幾。
除開上述重寫的channelRead()
方法外,入站處理器中還有很多其他方法可以重寫,每個方法都對應著一種事件,會在不同時機下被觸發,如下:
java
// 會在當前Channel通道註冊到選擇器時觸發(與EventLoop繫結時觸發)
public void channelRegistered(ChannelHandlerContext ctx) ...
// 會在選擇器移除當前Channel通道時觸發(與EventLoop解除繫結時觸發)
public void channelUnregistered(ChannelHandlerContext ctx) ...
// 會在通道準備就緒後觸發(Pipeline處理器新增完成、繫結EventLoop後觸發)
public void channelActive(ChannelHandlerContext ctx) ...
// 會在通道關閉時觸發
public void channelInactive(ChannelHandlerContext ctx) ...
// 會在收到客戶端資料時觸發(每當有資料時都會呼叫該方法,表示有資料可讀)
public void channelRead(ChannelHandlerContext ctx, Object msg) ...
// 會在一次資料讀取完成後觸發
public void channelReadComplete(ChannelHandlerContext ctx) ...
// 當通道上的某個事件被觸發時,這個方法會被呼叫
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) ...
// 當通道的可寫狀態發生改變時被呼叫(一般在傳送緩衝區超出限制時呼叫)
public void channelWritabilityChanged(ChannelHandlerContext ctx) ...
// 當通道在讀取過程中丟擲異常時,當前方法會被觸發呼叫
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) ...
接著再來看看出站處理器,這回基於上述案例做些許改造即可,也就是再通過pipeline.addLast()
方法多新增幾個處理器,但處理器的型別為ChannelOutboundHandlerAdapter
,如下:
java
// 基於pipeline連結串列向通道上添加出站處理器
pipeline.addLast("Out-A",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("在下是Out-A出站處理器...");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("Out-B",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("鄙人是Out-B出站處理器...");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("Out-C",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("寡人是Out-C出站處理器...");
super.write(ctx, msg, promise);
}
});
根據原本入站處理器的執行邏輯,是不是理論上執行順序為Out-A、Out-B、Out-C
?先看執行結果:
此時觀察結果可明顯看到,在通道上新增的出站處理器壓根沒被觸發呀,這是為何呢?這要說回前面聊到的出站概念,出站是指響應過程,也意味著出站處理器是在服務端返回資料時被觸發的,而案例中並未向客戶端返回資料,顯然就不會觸發出站處理器,所以此時咱們在In-③
入站處理器中,多加幾行程式碼:
java
// 利用通道向客戶端返回資料
ByteBuf resultMsg = ctx.channel().alloc().buffer();
resultMsg.writeBytes("111".getBytes());
nsc.writeAndFlush(resultMsg);
此時再執行案例,就會看到如下結果:
log
俺是In-①入站處理器...
我是In-②入站處理器...
朕是In-③入站處理器...
寡人是Out-C出站處理器...
鄙人是Out-B出站處理器...
在下是Out-A出站處理器...
此時注意看,結果和預料的不同,呈現的順序並非Out-A、Out-B、Out-C
,而是Out-C、Out-B、Out-A
,這是啥原因呢?為什麼與新增順序反過來了?這其實跟pipeline
處理器連結串列有關,等會兒再聊聊pipeline
這個概念,先來看看出站處理器中的其他方法:
java
// 當通道呼叫bind()方法時觸發(當Channel繫結埠地址時被呼叫,一般用於客戶端通道)
public void bind(...) ...
// 當通道呼叫connect()方法,連線到遠端節點/服務端時觸發(一般也用於客戶端通道)
public void connect(...) ...
// 當客戶端通道呼叫disconnect()方法,與服務端斷開連線時觸發
public void disconnect(...) ...
// 當客戶端通道呼叫close()方法,關閉連線時觸發
public void close(...) ...
// 當通道與EventLoop解除繫結時觸發
public void deregister(...) ...
// 當通道中讀取多次資料時被呼叫觸發
public void read(...) ...
// 當通道中寫入資料時觸發
public void write(...) ...
// 當通道中的資料被Flush給對端節點時呼叫
public void flush(...) ...
對於出站/入站處理器的這些其他方法/事件,大家可根據業務的不同,選擇重寫不同的方法,其中每個不同的方法,其觸發時機也不同,因此可以在適當的位置重寫方法,作為業務程式碼的切入點。
4.2、pipeline處理器連結串列
如果接觸Netty
框架的小夥伴應該對這玩意兒不陌生,如果沒接觸過也無關緊要,其實它也並非是特別難懂的概念,一個處理器被稱為Handler
,而一個Handler
新增到一個通道上之後,則被稱之為ChannelHandler
,而一個通道上的所有ChannelHandler
全部連線起來,則被稱之為ChannelPipeline
處理器連結串列。
以上述給出的案例來說,其內部形成的ChannelPipeline
連結串列如下:
pipeline
本質上是一個雙向連結串列,同時具備head、tail
頭尾節點,每當呼叫pipeline.addLast()
方法新增一個處理器時,就會將處理器封裝成一個節點,然後加入pipeline
連結串列中:
- 當接收到客戶端的資料時,Netty
會從Head
節點開始依次往後執行所有入站處理器。
- 而當服務端返回資料時,Netty
會從Tail
節點開始依次向前執行所有入站處理器。
理解上述過程後,大家應該就理解了之前出站處理器的執行順序,為何是Out-C、Out-B、Out-A
,因為出站處理器是以Tail
尾節點開始,向前依次執行的原因造成的,那處理器的作用是幹嘛的呢?舉個例子大家就懂了。
這裡假設
Netty
的服務端是一個飼料加工廠,客戶端則是原料供應商,連線兩者之間的通道就相當於一條條的流水線,而客戶端傳送的資料相當於原料。
在一條流水線上,玉米、豆粕、小麥....等原料不可能啥也不幹,直接從頭傳到尾,如果原料想要加工成某款私聊,顯然需要經過一道道工序,而處理器則是這一道道工序。
比如原料剛傳進來時,首先要將其粉碎成顆粒,接著需要將其碾壓成粉末,最後需要按照配方比例進行混合,才能形成按配方製成的飼料。在這個過程中,原料進入加工廠後,經過的一道道工序則可以被稱為入站處理器。
而原料被加工成飼料後,想要對外出售,還需要先裝入一個個的飼料袋,然後將飼料袋進行封口,最後印上生產日期與廠家,才能打包成最終的商用飼料對外出售。而該過程中的一道道工序,則可被理解成是一個個出站處理器。
在上述的例子中,一個加工廠的流水線上,存在著一道道工序,經過依次處理後,能夠將原料加工成最終商品。Netty
中亦是同理,對於客戶端和服務端之間的資料,可以通過處理器,完成一系列核心處理,如轉換編碼格式、對資料進行序列化、對資料進行加/解密等操作。
4.3、自定義出/入站處理器
前面簡單講明白了一些關於Netty
處理器的知識,但實際開發過程中,為了更好的程式碼閱讀性,以及程式碼的維護性,通常pipeline.addLast
並不會直接new
介面,而是自己定義處理器類,然後繼承對應的父類,如下:
```java
// 自定義的入站處理器
public class ZhuziHandler extends ChannelInboundHandlerAdapter {
public ZhuziHandler() {
super();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 在這裡面編寫處理入站msg的核心程式碼.....
// (如果要自定義msg的處理邏輯,請記住去掉下面這行程式碼)
super.channelRead(ctx, msg);
}
}
對於入站處理器而言,主要重寫其`channelRead()`方法即可,該方法會在訊息入站時被呼叫,可以在其中完成對資料的複雜處理,而自定義處理器完成後,想要讓該處理器生效,請記得將其繫結到對應的通道上,如下:
java
pipeline.addLast("In-X", new ZhuziHandler());
``
與入站處理器相反的出站處理器亦是同理,只不過將父類實現換成
ChannelInboundHandlerAdapter,並且重寫其
write()`方法即可,這樣所有訊息(資料)出站時,都會呼叫該方法。
最後,不僅僅處理器可以單獨抽出來實現,而且對於通道的初始化器,也可以單獨抽出來實現,如下:
java
// 自定義的通道初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 設定編碼器、解碼器、處理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ZhuziHandler());
}
}
這樣寫能夠讓程式碼的整潔性更強,並且可以統一管理通道上的所有出/入站處理器,而服務端的程式碼改成下述方式即可:
java
server
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.bind("127.0.0.1",8888);
五、Netty重構後的緩衝區(ByteBuf)
在之前講《JavaIO體系-NIO》的時候曾聊到過它的三大件,其中就包含了ByteBuffer
,其作用主要是用來作為服務端和客戶端之間傳輸資料的容器,NIO
中的ByteBuffer
支援使用堆記憶體、本地(直接)記憶體來建立,而Netty-ByteBuf
也同樣如此,如下:
- ByteBufAllocator.DEFAULT.heapBuffer(cap)
:使用堆記憶體來建立ByteBuf
物件。
- ByteBufAllocator.DEFAULT.directBuffer(cap)
:使用本地記憶體來建立ByteBuf
物件。
基於堆記憶體建立的ByteBuf
物件會受到GC
機制管理,在發生GC
時需要來回移動Buffer
物件,同時之前在NIO
中也聊到過堆、本地記憶體的區別,如下:
通常本地記憶體的讀寫效率都會比堆記憶體高,因為OS
可以直接操作本地記憶體,而堆記憶體在讀寫資料時,則需要多出一步記憶體拷貝的動作,總結如下:
- 堆記憶體因為直接受到JVM
管理,所以在Java
程式中建立時,分配效率較高,但讀寫效率低。
- 本地記憶體因為OS
可直接操作,所以讀寫效率高,但由於建立時,需要向OS
額外申請,分配效率低。
但上述聊到的這些特徵,NIO
的Buffer
也具備,那Netty
對於Buffer
緩衝區到底增強了什麼呢?主要是三方面:Buffer
池化技術、動態擴容機制、零拷貝實現。
5.1、ByteBuf緩衝區池化技術
池化這個詞彙大家應該都不陌生,Java
執行緒池、資料庫連線池,這些都是池化思想的產物,一般系統中較為珍貴的資源,都會採用池化技術來快取,以便於下次需要時可直接使用,而無需經過繁瑣的建立過程。
前面聊到過,
Netty
預設會採用本地記憶體建立ByteBuf
物件,而本地記憶體因為不是作業系統分配給Java
程式使用的,所以基於本地記憶體建立物件時,則需要額外單獨向OS
申請,這個過程自然開銷較大,在高併發情況下,頻繁的建立、銷燬ByteBuf
物件,一方面會導致效能降低,同時還有可能造成OOM
的風險(使用完沒及時釋放,記憶體未歸還給OS
的情況下會出現記憶體溢位)。
而使用池化技術後,一方面能有效避免OOM
問題產生,同時還可以省略等待建立緩衝區的時間,那Netty
中的池化技術,什麼時候會開啟呢?這個要分平臺!
- Android
系統預設會採用非池化技術,而其他系統,如Linux、Mac、Windows
等會預設啟用。
但上述這條原則是Netty4.1
版本之後才加入的,因為4.1
之前的版本,其內部的池化技術還不夠完善,所以4.1
之前的版本預設會禁用池化技術。當然,如果你在某些平臺下想自行決定是否開啟池化,可通過下述引數控制:
- -Dio.netty.allocator.type=unpooled
:關閉池化技術。
- -Dio.netty.allocator.type=pooled
:開啟池化技術。
這兩個引數直接通過JVM
引數的形式,在啟動Java
程式時指定即可。如果你想要檢視自己建立的ByteBuf
物件,是否使用了池化技術,可直接列印物件的Class
即可,如下:
```java
// 檢視建立的緩衝區是否使用了池化技術
private static void byteBufferIsPooled(){
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());
}
public static void main(String[] args) { byteBufferIsPooled(); }
/ *
* 輸出結果:
* class io.netty.buffer.PooledUnsafeDirectByteBuf
* /
``
從輸出的結果中的類名可看出,如果是以
Pooled開頭的類名,則表示當前
ByteBuf物件使用池化技術,如若是以
Unpooled`開頭的類名,則表示未使用池化技術。
5.2、ByteBuf動態擴容機制
在前面聊《JavaNIO-Buffer緩衝區》的時候曾簡單聊過NIO
的Buffer
原始碼,其內部的實現有些傻,每個Buffer
物件都擁有一根limit
指標,這根指標用於控制讀取/寫入模式,因此在使用NIO-Buffer
時,每次寫完緩衝區後,都需要呼叫flip()
方法來反轉指標,以此來確保NIO-Buffer
的正常讀寫。
由於Java-NIO
中的Buffer
設計有些缺德,因此在使用NIO
的原生Buffer
物件時,就顯得額外麻煩,必須要遵從如下步驟:
- ①先建立對應型別的緩衝區
- ②通過put
這類方法往緩衝區中寫入資料
- ③呼叫flip()
方法將緩衝區轉換為讀模式
- ④通過get
這類方法從緩衝區中讀取資料
- ⑤呼叫clear()、compact()
方法清空緩衝區資料
而正是由於Java-NIO
原生的Buffer
設計的不合理,因此Netty
中直接重構了整個緩衝區元件,在Netty-ByteBuf
中,存在四個核心屬性:
- initialCapacity
:初始容量,建立緩衝區時指定的容量大小,預設為256
位元組。
- maxCapacity
:最大容量,當初始容量不足以供給使用時,ByteBuf
的最大擴容限制。
- readerIndex
:讀取指標,預設為0
,當讀取一部分資料時,指標會隨之移動。
- writerIndex
:寫入指標,預設為0
,當寫入一部分資料時,指標會隨之移動。
首先來說說和NIO-Buffer
的兩個主要區別:首先將原本一根指標變為了兩根,分別對應讀/寫操作,這樣就保障了使用ByteBuf
時,無需每次讀寫資料時手動翻轉模式。同時加入了一個最大容量限制,在建立的ByteBuf
無法存下資料時,允許在最大容量的範圍內,對ByteBuf
進行自動擴容,下面上個圖理解:
上圖中模擬了使用ByteBuf
緩衝區的過程,在建立時會先分配一個初始容量,這個容量可以自己指定,不指定預設為256
,接著會去創建出對應容量的緩衝區,最初讀寫指標都為0
,後續會隨著使用情況不斷變化。
這裡重點觀察最後一個狀態,在真正使用過程中,一個ByteBuf
會被分為四個區域:
- 已廢棄區域:這是指已經被讀取過的資料區域,因為其中的資料已被使用,所以屬於廢棄區域。
- 可讀取區域:這主要是指被寫入過資料,但還未讀取的區域,這塊區域的資料都可被讀取使用。
- 可寫入區域:這主要是指寫入指標和容量之間的區域,意味著這塊區域是可以被寫入資料的。
- 可擴容區域:這主要是指容量和最大容量之間的區域,代表當前緩衝區可擴容的範圍。
ByteBuf
的主要實現位於AbstractByteBuf
這個子類中,但內部還有兩根markedReaderIndex、markedWriterIndex
標記指標,這兩根指標就類似於NIO-Buffer
中的mark
指標,這裡就不做重複贅述。下面上個案例簡單實驗一下BtyeBuf
的自動擴容特性,程式碼如下:
java
// 測試Netty-ByteBuf自動擴容機制
private static void byteBufCapacityExpansion() {
// 不指定預設容量大小為16
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println("測試前的Buffer容量:" + buffer);
// 使用StringBuffer來測試ByteBuf的自動擴容特性
StringBuffer sb = new StringBuffer();
// 往StringBuffer中插入17個位元組的資料
for (int i = 0; i < 17; i++) {
sb.append("6");
}
// 將17個位元組大小的資料寫入緩衝區
buffer.writeBytes(sb.toString().getBytes());
printBuffer(buffer);
}
在這個測試自動擴容的方法中,最後用到了一個printBuffer()
方法來列印緩衝區,這是自定義的一個輸出方法,也就基於Netty
自身提供的Dump
方法實現的,如下:
```java
// 列印ByteBuf中資料的方法
private static void printBuffer(ByteBuf buffer) {
// 讀取ByteBuffer已使用的位元組數
int byteSize = buffer.readableBytes();
// 基於byteSize來計算顯示的行數
int rows = byteSize / 16 + (byteSize % 15 == 0 ? 0 : 1) + 4;
// 建立一個StringBuilder用來顯示輸出
StringBuilder sb = new StringBuilder(rows * 80 * 2);
// 獲取緩衝區的容量、讀/寫指標資訊放入StringBuilder
sb.append("ByteBuf緩衝區資訊:{");
sb.append("讀取指標=").append(buffer.readerIndex()).append(", ");
sb.append("寫入指標=").append(buffer.writerIndex()).append(", ");
sb.append("容量大小=").append(buffer.capacity()).append("}");
// 利用Netty框架自帶的格式化方法、Dump方法輸出緩衝區資料
sb.append(StringUtil.NEWLINE);
ByteBufUtil.appendPrettyHexDump(sb, buffer);
System.out.println(sb.toString());
}
接著在`main`方法中呼叫並執行,如下:
java
public static void main(String[] args) {
byteBufCapacityExpansion();
}
/ * 執行結果:
*
* 測試前的Buffer容量:PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16)
* ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=17, 容量大小=64}
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 |6666666666666666|
* |00000010| 36 |6 |
* +--------+-------------------------------------------------+----------------+
* /
``
先來觀察最初的容量:
cap=16,因為這是咱們顯示指定的初始容量,接著向該
ByteBuf中插入
17個位元組資料後,會發現容量自動擴充套件到了
64,但如果使用
NIO-Buffer來進行這樣的操作,則會丟擲異常。同時最後還把緩衝區中具體的資料打印出來了,這個是利用
Netty自帶的
appendPrettyHexDump()`方法實現的,中間是位元組值,後面是具體的值,這裡就不做過多闡述~
5.3、 Netty中的讀寫API
首先在講述Netty-ByteBuf
的讀寫API
之前,咱們再說清楚一點與NIO-Buffer
的區別,不知大家是否還記得我在之前NIO
中聊到的一點:
其實這也是NIO-Buffer
設計不合理的一個地方,當你想要向緩衝區中寫入不同型別的資料,要麼得自己手動轉換成Byte
位元組型別,要麼得new
一個對應的子實現,所以整個實現就較為臃腫,大家可以點進Java.nio
包看一下,你會看到下述場景:
這裡的類關係,大家一眼看過去明顯會感覺頭大,基本上實現都大致相同,但針對於每個資料型別,都編寫了對應的實現類,而Netty
的作者顯然意識到了這點,因此並未提供多種資料型別的緩衝區,僅提供了ByteBuf
這一種緩衝區,Why
?
其實道理十分簡單,因為計算機上的所有資料資源,在底層本質上都是
0、1
形成的位元組資料,所以只提供Byte
型別的ByteBuf
緩衝區就夠了,畢竟它能夠儲存所有型別的資料,同時為了便於寫入其他型別的資料,如Int、boolean、long....
,Netty
框架中也對外提供了相關的寫入API
,接著一起來看看。
```java
// Netty-ByteBuf抽象類
public abstract class ByteBuf implements ReferenceCounted, Comparable
// 下述方法和寫Short型別的方法僅型別不同,都區分了大小端,不再重複註釋
public abstract ByteBuf writeMedium(int var1);
public abstract ByteBuf writeMediumLE(int var1);
public abstract ByteBuf writeInt(int var1);
public abstract ByteBuf writeIntLE(int var1);
public abstract ByteBuf writeLong(long var1);
public abstract ByteBuf writeLongLE(long var1);
public abstract ByteBuf writeChar(int var1);
public abstract ByteBuf writeFloat(float var1);
public ByteBuf writeFloatLE(float value) {
return this.writeIntLE(Float.floatToRawIntBits(value));
}
public abstract ByteBuf writeDouble(double var1);
public ByteBuf writeDoubleLE(double value) {
return this.writeLongLE(Double.doubleToRawLongBits(value));
}
// 將另一個ByteBuf物件寫入到當前緩衝區
public abstract ByteBuf writeBytes(ByteBuf var1);
// 將另一個ByteBuf物件的前N個長度的資料,寫入到當前緩衝區
public abstract ByteBuf writeBytes(ByteBuf var1, int var2);
// 將另一個ByteBuf物件的指定範圍資料,寫入到當前緩衝區
public abstract ByteBuf writeBytes(ByteBuf var1, int var2, int var3);
// 向緩衝區中寫入一個位元組陣列
public abstract ByteBuf writeBytes(byte[] var1);
// 向緩衝區中寫入一個位元組陣列中,指定範圍的資料
public abstract ByteBuf writeBytes(byte[] var1, int var2, int var3);
// 將一個NIO的ByteBuffer資料寫入到當前ByteBuf物件
public abstract ByteBuf writeBytes(ByteBuffer var1);
// 將一個輸入流中的資料寫入到當前緩衝區
public abstract int writeBytes(InputStream var1, int var2)
throws IOException;
// 將一個NIO的ScatteringByteChannel通道中的資料寫入當前緩衝區
public abstract int writeBytes(ScatteringByteChannel var1, int var2)
throws IOException;
// 將一個NIO的檔案通道中的資料寫入當前緩衝區
public abstract int writeBytes(FileChannel var1, long var2, int var4)
throws IOException;
// 將一個任意字元型別的資料寫入緩衝區(CharSequence是所有字元型別的老大)
public abstract int writeCharSequence(CharSequence var1, Charset var2);
// 省略其他寫入資料的API方法........
}
``
上面列出了
Netty-ByteBuf中常用的寫入方法,其實大家在這裡就能明顯觀察出與
NIO的區別,
NIO是為不同資料型別提供了不同的實現類,而
Netty則僅僅只是為不同型別,提供了不同的
API`方法,顯然後者的做法更佳,因為整體的程式碼結構會更為優雅。
這裡主要說一下大端寫入和小端寫入的區別,從前面的
API
列表中,大家可以看到,Netty
為每種資料型別,都提供了一個結尾帶LE
的寫入方法,這個帶LE
的方法則是小端寫入方法,那麼大小端之間有何差異呢?
大小端寫入是網路程式設計中的通用概念,因為網路資料傳輸過程中,所有的資料都是以二進位制的位元組格式傳輸的,而所謂的大端(Big Endian
)寫入,是指先寫高位,再寫低位,高低位又是什麼意思呢?
- 高位寫入:指從前往後寫,例如1
這個數字,位元位形式為000...001
。
- 低位寫入:指從後往前寫,依舊是1
這個數字,位元位形式為100...000
。
這裡不瞭解的小夥伴又會疑惑:為啥高位寫入時,1
在最後面呀?這是因為要先寫0
,再寫1
的原因導致的。而反過來。所謂的小端(Little Endian
)寫入,也就是指先寫低位,再寫高位。預設情況下,網路通訊會採用大端寫入的模式。
簡單瞭解Netty-ByteBuf
寫入資料的API
後,接著再來看一些讀取資料的API
方法,如下:
```java
// 一系列read開頭的讀取方法,這種方式會改變讀取指標(區分大小端)
public abstract boolean readBoolean();
public abstract byte readByte();
public abstract short readUnsignedByte();
public abstract short readShort();
public abstract short readShortLE();
public abstract int readUnsignedShort();
public abstract int readUnsignedShortLE();
public abstract int readMedium();
public abstract int readMediumLE();
public abstract int readUnsignedMedium();
public abstract int readUnsignedMediumLE();
public abstract int readInt();
public abstract int readIntLE();
public abstract long readUnsignedInt();
public abstract long readUnsignedIntLE();
public abstract long readLong();
public abstract long readLongLE();
public abstract char readChar();
public abstract float readFloat();
// 省略其他的read方法.....
// 一系列get開頭的讀取方法,這種方式不會改變讀取指標(區分大小端)
public abstract boolean getBoolean(int var1);
public abstract byte getByte(int var1);
public abstract short getUnsignedByte(int var1);
public abstract short getShort(int var1);
public abstract short getShortLE(int var1);
public abstract int getUnsignedShort(int var1);
public abstract int getUnsignedShortLE(int var1);
public abstract int getMedium(int var1);
public abstract int getMediumLE(int var1);
public abstract int getUnsignedMedium(int var1);
public abstract int getUnsignedMediumLE(int var1);
public abstract int getInt(int var1);
public abstract int getIntLE(int var1);
public abstract long getUnsignedInt(int var1);
public abstract long getUnsignedIntLE(int var1);
public abstract long getLong(int var1);
public abstract long getLongLE(int var1);
public abstract char getChar(int var1);
public abstract float getFloat(int var1);
// 省略其他的get方法.....
``
在上面列出的一系列讀取方法中,主要可分為
read、get兩大類方法:
-
readXXX():這種方式讀取資料後,會導致
ByteBuf內部的讀取指標隨之移動。
-
getXXX():這種方式讀取資料後,不會改變
ByteBuf`內部的讀取指標。
那麼讀取指標改變之後會出現什麼影響呢?大家還記得前面聊到的ByteBuf
的四部分嘛?前面講過,讀取指標之前的資料部分,都會被標記為廢棄部分,這也就意味著通過read
系列的方式讀取一段資料後,會導致這些資料無法再次被讀取到,這裡來做個實驗:
```java
// 測試ByteBuf的read、get、mark功能
private static void bufferReader(){
// 分配一個初始容量為10的緩衝區
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 向緩衝區中寫入10個字元(佔位十個位元組)
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(i);
}
buffer.writeBytes(sb.toString().getBytes());
// 使用read方法讀取前5個位元組資料
printBuffer(buffer);
buffer.readBytes(5);
printBuffer(buffer);
// 再使用get方法讀取後五個位元組資料
buffer.getByte(5);
printBuffer(buffer);
}
public static void main(String[] args) {
bufferReader();
}
在上面的迴圈中,我是通過`StringBuffer`來作為緩衝區的資料,但為何不直接寫入`int`資料呢?這是因為`int`預設會佔四個位元組,而`StringBuffer`底層是`char`,一個字元只佔用一個位元組~,這裡是一個小細節,接著來看看執行結果:
log
ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 |
+--------+-------------------------------------------------+----------------+
ByteBuf緩衝區資訊:{讀取指標=5, 寫入指標=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
ByteBuf緩衝區資訊:{讀取指標=5, 寫入指標=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
``
從上述結果中可看出,使用
readBytes()方法讀取五個位元組後,讀取指標會隨之移動到
5,接著看看前後的資料變化,此時會發現資料從
0123456789變成了
56789,這是因為前面五個位元組的資料,已經屬於廢棄部分了,所以
printBuffer()`方法無法讀取顯示。
接著再看看後面,通過
getByte()
讀取五個位元組後,此時ByteBuf
物件的讀取指標,顯然不會隨之移動,也就是通過get
系列方法讀取緩衝區資料,並不會導致讀過的資料廢棄。
那如果使用read
系列方法讀取資料後,後續依舊想要讀取資料該怎麼辦呢?這裡可以使用ByteBuf
內部的標記指標實現,如下:
```java
// 在上述方法的最後繼續追加下述程式碼:
// 使用mark標記一下讀取指標,然後再使用read方法讀取資料 buffer.markReaderIndex(); buffer.readBytes(5); printBuffer(buffer);
// 此時再通過reset方法,使讀取指標恢復到前面的標記位置
buffer.resetReaderIndex();
printBuffer(buffer);
此時再次查詢執行結果,如下:
log
ByteBuf緩衝區資訊:{讀取指標=10, 寫入指標=10, 容量大小=10}
ByteBuf緩衝區資訊:{讀取指標=5, 寫入指標=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
``
從結果中可以明顯看到,對讀取指標做了標記後,再次使用
read系列方法讀取資料,依舊會導致讀過的部分變為廢棄資料,但後續可以通過
reset`方法,將讀取指標恢復到前面的標記位置,然後再次檢視緩衝區的資料,就會發現資料又可以重複被讀取啦~
其實除開可以通過
markReaderIndex()、resetReaderIndex()
方法標記、恢復讀取指標外,還可以通過markWriterIndex()、resetWriterIndex()
方法來標記、恢復寫入指標。標記讀取指標後,可以讓緩衝區中的一段資料被多次read
讀取,而標記寫入指標後,可以讓緩衝區的一段區間被反覆寫入,但每次後面的寫入會覆蓋前面寫入的資料。
OK~,對於ByteBuf
的API
操作就介紹到這裡,其實內部提供了一百多個API
方法,但我就不一一去做說明啦,大家點進原始碼後就能看到,感興趣的小夥伴可以自行除錯!
5.4、ByteBuf的記憶體回收
在前面聊到過,Netty-ByteBuf
在除安卓平臺外,都會使用池化技術來建立,那一個已創建出的ByteBuf
物件,其佔用的記憶體在什麼情況下會歸還給記憶體池呢?想要聊明白這點,得先理解ByteBuf
的引用釋放。
學習過
JVM-GC
機制的小夥伴應該知道,JVM
中使用的物件存活判定法是根可達演算法,而在此之前的一種常用演算法被稱之為《引用計數法》,但由於該演算法存在迴圈引用的問題,所以並不適合作為自動判定存活的演算法,但Netty-ByteBuf
中恰恰使用了這種演算法。
首先來看看Netty-ByteBuf
的類關係:
java
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
從上面的類定義中可明顯看到,ByteBuf
實現了ReferenceCounted
介面,該介面翻譯過來的含義則是引用計數,該介面中提供的方法列表如下:
```java
public interface ReferenceCounted {
// 檢視一個物件的引用計數統計值
int refCnt();
// 對一個物件的引用計數+1
ReferenceCounted retain();
// 對一個物件的引用計數+n
ReferenceCounted retain(int var1);
// 記錄當前物件的當前訪問位置,記憶體洩漏時會返回該方法記錄的值
ReferenceCounted touch();
ReferenceCounted touch(Object var1);
// 對一個物件的引用計數-1
boolean release();
// 對一個物件的引用計數-n
boolean release(int var1);
}
``
重點關注
retain()、release()方法,這兩個方法分別對應加/減一個物件的引用計數,把
ByteBuf套入進來,當一個緩衝區物件的引用計數為
0時,會清空當前緩衝區中的資料,並且將佔用的記憶體歸還給記憶體池,所有嘗試再次訪問該
ByteBuf物件的操作,都會被拒絕。簡單來說,一句話總結就是:**當一個
ByteBuf物件的引用計數變為
0`時,該緩衝區就會變為外部不可訪問的狀態**。
綜上所述,在使用完一個ByteBuf
物件後,明確後續不會用到該物件時,一定要記得手動呼叫release()
清空引用計數,否則會導致該緩衝區長久佔用記憶體,最終引發記憶體洩漏。
這裡拓展一點小細節,似乎在
Netty-Channel
中,都會採用ByteBuf
來發送/接收資料,那這些通道傳輸資料用的ByteBuf
物件,其佔用的記憶體會在何時回收呢?這會牽扯到前面的ChannelPipeline
連結串列。
還記得這幅通道處理器連結串列圖嘛?在其中有兩個特殊的處理器,即Head、Tail
處理器:
- Head
處理器:
- 如果通道上只有入站處理器,它會作為整個處理器連結串列的第一個處理器呼叫。
- 如果通道上只有出站處理器,它會作為整個處理器連結串列的最後一個處理器呼叫。
- 如果通道上入/出站處理器都有,它會作為入站的第一個處理呼叫,出站的最後一個處理器呼叫。
- Tail
處理器:
- 如果通道上只有入站處理器,Tail
節點會作為整個連結串列的最後一個處理器呼叫。
- 如果通道上只有出站處理器,Tail
節點會作為整個連結串列的第一個處理器呼叫。
- 如果通道上入/出站處理器都有,它會作為出站的第一個呼叫、入站的最後一個呼叫。
結合上面所說的內容,Head、Tail
處理器在任何情況下,其中至少會有一個,作為通道上的最後一個處理器呼叫,而在這兩個頭尾處理器中,會自動釋放ByteBuf
的工作,先來看看Head
處理器,原始碼如下:
```java
// ChannelPipeline處理器連結串列的預設實現類
public class DefaultChannelPipeline implements ChannelPipeline {
// Head處理器的實現類:同時實現了入站、出站處理器介面
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
// 作為入站連結串列第一個處理器時,會呼叫的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 繼續往下呼叫其他自定義的入站處理器
ctx.fireChannelRead(msg);
}
// 作為出站連結串列的最後一個處理器時,會呼叫的方法
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
// unsafe.write()最終會呼叫到AbstractUnsafe.write()方法
this.unsafe.write(msg, promise);
}
}
// 省略其他方法....
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { protected abstract class AbstractUnsafe implements Unsafe { public final void write(Object msg, ChannelPromise promise) { this.assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 這裡先不需要理解,後續原始碼篇會聊
if (outboundBuffer == null) {
this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.
this.initialCloseCause));
// 最終在這裡,依舊呼叫了引用計數工具類的release方法
ReferenceCountUtil.release(msg);
} else {
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable var6) {
this.safeSetFailure(promise, var6);
// 這裡也會呼叫了引用計數工具類的release方法
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
}
// 省略其他方法....
}
// 省略其他類與方法....
}
// 引用計數工具類 public final class ReferenceCountUtil { public static boolean release(Object msg) { // 這裡會先判斷一下對應的msg物件是否實現了引用計數介面, // 只有對應的msg實現了ReferenceCounted介面時,才會釋放引用 return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false; }
// 省略其他方法.....
}
``
Head節點會作為出站連結串列的最後一個處理器呼叫,因此在所有自定義出站處理器執行完成後,最終呼叫該節點的
write()方法,在這個方法內部,最終呼叫了
AbstractUnsafe.write()方法,對應的方法實現中,咱們僅需關注
ReferenceCountUtil.release(msg)這行程式碼即可,最終會在該工具類中釋放
msg`物件的引用計數。
接著再來看看Tail
節點的實現原始碼:
```java
// ChannelPipeline處理器連結串列的預設實現類
public class DefaultChannelPipeline implements ChannelPipeline {
// Tail處理器的實現類:實現了入站處理器介面,作為入站呼叫鏈最後的處理器
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler {
// 所有自定義的入站處理器執行完成後,會呼叫的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}
// 省略其他方法.....
}
// 前面Tail、Head呼叫的釋放方法
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
// 呼叫釋放ByteBuf緩衝區的方法
this.onUnhandledInboundMessage(msg);
// 記錄日誌
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline :" +
"{}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached " +
"at the tail of the pipeline. Please check your pipeline" +
"configuration.", msg);
} finally {
// 最終呼叫了引用計數工具類的release方法
ReferenceCountUtil.release(msg);
}
}
}
``
Tail節點會作為入站連結串列的最後一個處理器呼叫,所以在執行
Tail處理器時,最終會呼叫它的
channelRead()方法,而在相應的方法內部,呼叫了
onUnhandledInboundMessage()方法,跟著原始碼繼續走,此時也會發現,最終也呼叫了
ReferenceCountUtil.release(msg)`方法來釋放引用。
根據原始碼中的推斷,似乎
Netty
框架傳送/接收資料用的ByteBuf
,都會由頭尾處理器來釋放,但答案確實如此嗎?NO
,為什麼呢?再次將目光放到ReferenceCountUtil.release(msg)
這處程式碼:
```java // 引用計數工具類 public final class ReferenceCountUtil { public static boolean release(Object msg) { // 這裡會先判斷一下對應的msg物件是否實現了引用計數介面, // 只有對應的msg實現了ReferenceCounted介面時,才會釋放引用 return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false; }
// 省略其他方法.....
}
// ByteBuf的類定義
public abstract class ByteBuf implements ReferenceCounted, Comparable此時大家注意看,`ReferenceCountUtil.release()`在執行前,會先判斷一下當前的`msg`是否實現了`ReferenceCounted`介面,而`ByteBuf`是實現了的,因此如果執行到`Head/Tail`處理器時,`msg`資料依舊為`ByteBuf`型別,頭尾處理器自然可以完成回收工作,但如若是下面這種情況呢?
java
pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("俺是In-①入站處理器...");
// 在第一個入站處理器中,將接收到的ByteBuf資料轉換為String向下傳遞
ByteBuf buffer = (ByteBuf) msg;
String message = buffer.toString(Charset.defaultCharset());
super.channelRead(ctx, message);
}
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("我是In-②入站處理器...");
super.channelRead(ctx, msg);
}
});
``
在上述這個案例中,咱們在第一個入站處理器中,將接收到的
ByteBuf資料轉換為
String向下傳遞,也就意味著從
In-②處理器開始,後面所有的處理器收到的
msg都為
String型別,當自定義的兩個處理器執行完成後,最終會呼叫
Tail`處理器完成收尾工作,但問題來了!
因為在
In-①
中msg
型別發生了改變,所以當Tail
處理器中呼叫ReferenceCountUtil.release()
時,由於String
並未實現ReferenceCounted
介面,所以Tail
無法對該msg
進行釋放,最終就會造成記憶體洩漏問題。
但此時記憶體洩漏,發生在哪個位置呢?答案是位於In-①
中,因為In-①
處理器中就已經將ByteBuf
用完了,將其中的資料轉換成了String
型別,而ByteBuf
後續處理器都不會用到,因此該ByteBuf
佔用的記憶體永遠不會被釋放,所以一定要注意:在使用處理器的過程中,如果明確ByteBuf
不會繼續使用,那請一定要記得手動呼叫release()
方法釋放引用,以上述案例說明:
java
ByteBuf buffer = (ByteBuf) msg;
String message = buffer.toString(Charset.defaultCharset());
buffer.release();
當明確不使用該ByteBuf
值時,請記住呼叫對應的release()
方法釋放引用!這樣能夠有效避免記憶體洩漏的問題出現,有人也許會說,JVM
不是有GC
機制嗎?為什麼會出現記憶體洩漏呀?
關於上述問題的道理十分簡單,因為
Netty
預設採用本地記憶體來建立緩衝區,並且會利用池化技術管理所有緩衝區,如果一個ByteBuf
物件的引用不為0
,那麼該ByteBuf
會永久的佔用記憶體資源,Netty
無法主動將其佔用的記憶體回收到池中。
5.5、 Netty中的零拷貝技術
想要講清楚Netty-ByteBuf
中的零拷貝技術,那首先得先明白零拷貝到底是個啥,因此咱們先講明白零拷貝的概念,再講清楚作業系統的零拷貝技術,然後再說說Java-NIO
中的零拷貝體現,最後再來聊Netty-ByteBuf
中的零拷貝技術。
六、隨處可見的零拷貝技術
零拷貝這個詞,在很多地方都有出現,例如Kafka、Nginx、Tomcat、RocketMQ...
的底層都使用了零拷貝的技術,那究竟什麼叫做零拷貝呢?其實所謂的零拷貝,並不是不需要經過資料拷貝,而是減少記憶體拷貝的次數,上個例子來理解,比如Nginx
向客戶端提供檔案下載的功能。
客戶端要下載的檔案都位於Nginx
所在的伺服器磁碟中,如果當一個客戶端請求下載某個資原始檔時,這時需要經過的步驟如下:
先來簡單聊一聊檔案下載時,Nginx
伺服器內部的資料傳輸過程:
- ①客戶端請求下載伺服器上的某個資源,Nginx
解析請求並得知客戶端要下載的具體檔案。
- ②Nginx
向OS
發起系統IO
呼叫,呼叫核心read(fd)
函式,應用上下文切態至核心空間。
- ③read()
函式通過DMA
控制器,將目標檔案的資料從磁碟讀取至核心緩衝區。
- ④DMA
傳輸資料完成後,CPU
將資料從核心緩衝區拷貝至使用者緩衝區(程式的記憶體空間)。
- ⑤CPU
拷貝資料完成後,read()
呼叫結束並返回,上下文從核心態切回用戶態。
- ⑥Nginx
再次向OS
發起核心write(fd)
函式的系統呼叫,應用上下文再次切到核心態。
- ⑦接著CPU
將使用者緩衝區中的資料,寫入到Socket
網路套接字的緩衝區。
- ⑧資料複製到Socket
緩衝區後,DMA
控制器將Socket
緩衝區的資料傳輸到網絡卡裝置。
- ⑨DMA
控制器將資料拷貝至網絡卡裝置後,write()
函式呼叫結束,再次切回用戶態。
- ⑩檔案資料抵達網絡卡後,Nginx
準備向客戶端響應資料,組裝報文返回資料......
從上述流程大家可得知,一次檔案下載傳統的IO
流程,需要經過四次切態,四次資料拷貝(CPU、DMA
各兩次),而所謂的零拷貝,並不是指不需要經過資料拷貝,而是指減少其中的資料拷貝次數。
6.1、作業系統中的零拷貝技術
我這裡指的作業系統預設是Linux
,因為MacOS、Windows
系統相對閉源,因此對於這兩個作業系統中的零拷貝技術個人並不熟悉。在Linux
中提供了多種零拷貝的實現:
- ①MMAP
共享記憶體 + write()
系統函式。
- ②sendfile()
核心函式。
- ③結合DMA-Scatter/Gather Copy
收集拷貝功能實現的sendfile()
函式。
- ④splice()
核心函式。
6.1.1、MMAP共享記憶體
先來聊聊第①種吧,MMAP
共享記憶體這個概念,在上篇關於《Linux-IO多路複用模型:select、poll、epoll原始碼分析》的文章結尾提到過,MMAP
共享記憶體是指:在核心空間和使用者空間之間的一塊共享記憶體,這塊記憶體可被使用者態和核心態直接訪問,結構如下:
先看左邊的圖,這也是眾多資料中流傳的圖,共享記憶體位於使用者態和核心態之間,這樣理解其實也並無大礙,但右邊的圖才更為準確,因為核心態和使用者態本身是兩個空間,各自之間並不存在真正的共享區域,MMAP
共享記憶體是通過虛擬記憶體機制實現的,也就是通過記憶體對映技術實現的。
什麼又叫做記憶體對映技術呢?這個其實很好理解,就好比
Linux
中的軟連結、Windows
中的快捷方式一樣,拿大家熟悉的Windows
系統來說,一般在安裝一個程式後,為了方便後續使用,通常都會預設在桌面上生成快捷方式(圖示),這個快捷方式其實並不是一個真正的程式,而是指向安裝目錄下xxx.exe
的連結。
在Windows
系統上安裝一個程式後,咱們可以通過點選桌面圖示開啟,亦可雙擊安裝目錄下的xxx.exe
檔案啟動,而作業系統中的共享記憶體也是同樣的思路。
在主流作業系統中都有一種名為虛擬記憶體的機制,這是指可以分配多個虛擬記憶體地址,指向同一個實體記憶體地址,此時核心態程式和使用者態程式,可以通過不同的虛擬地址,來操縱同一塊實體記憶體,這也就是
MMAP
共享記憶體技術的真正實現。
MMAP
的系統定義如下:
c
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
addr
:指定對映的虛擬記憶體地址。length
:對映的記憶體空間長度。prot
:對映記憶體的保護模式。flags
:指定對映的型別。fd
:進行對映的檔案控制代碼。offset
:檔案偏移量。
還是之前那幅圖(不要問我為什麼,因為懶的畫~),重點看圖中圈出來的區域:
如果核心緩衝區和使用者緩衝區使用了MMAP
共享記憶體,那當DMA
控制器將資料拷貝至核心緩衝區時,因為這裡的核心緩衝區,本質是一個虛擬記憶體地址指向使用者緩衝區,所以DMA
會直接將磁碟資料拷貝至使用者緩衝區,這就減少了一次核心緩衝區到使用者緩衝區的CPU
拷貝過程,後續直接呼叫write()
函式把資料寫到Socket
緩衝區即可,因此這也是一種零拷貝的體現。
6.1.2、sendfile()核心函式
sendfile()
是Linux2.1
版本中推出的一個核心函式,系統呼叫的原型如下:
c
ssize_t sendfile(int fd_in, int fd_out, off_t *offset, size_t count);
- fd_in
:待寫入資料的檔案描述符(一般為Socket
網路套接字的描述符)。
- fd_out
:待讀取資料的檔案描述符(一般為磁碟檔案的描述符)。
- offset
:磁碟檔案的檔案偏移量。
- count
:宣告在fd_out
和fd_in
之間,要傳輸的位元組數。
對於啥是檔案描述符我就不重複贅述了,這依舊在上篇的《Linux多路複用函式原始碼分析-FD檔案描述符》中聊到過,當呼叫sendfile()
函式傳輸資料時,將out_fd
指定為等待寫入資料的網路套接字,將in_fd
指定為待讀取資料的磁碟檔案,就可以直接在核心緩衝區中完成傳輸過程,無需經過使用者緩衝區,如下:
依舊以前面Nginx
下載檔案的過程為例,完整流程如下:
- ①客戶端請求下載伺服器上的某個資源,Nginx
解析請求並得知客戶端要下載的具體檔案。
- ②Nginx
向OS
發起系統IO
呼叫,呼叫核心sendfile()
函式,上下文切態至核心空間。
- ③sendfile()
函式通過DMA
控制器,將目標檔案的資料從磁碟讀取至核心緩衝區。
- ④DMA
傳輸資料完成後,CPU
將資料從核心緩衝區拷貝至Socket
緩衝區。
- ⑤CPU
拷貝資料完成後,DMA
控制器將資料從Socket
緩衝區拷貝至網絡卡裝置。
- ⑥資料拷貝到網絡卡後,sendfile()
呼叫結束,應用上下文切回用戶態空間。
- ⑦Nginx
準備向客戶端響應資料,組裝報文返回資料......
相較於原本的MMAP+write()
的方式,使用sendfile()
函式來處理IO
請求,這顯然效能更佳,因為這裡不僅僅減少了一次CPU
拷貝,而且還減少了兩次切態的過程。
6.1.3、DMA-Scatter/Gather Copy - sendfile()函式
前面聊了Linux2.1
版本中的sendfile()
函式,而到了Linux2.4
版本中,又對sendfile()
做了升級,引入了S/G-DMA
技術支援,也就是在DMA
拷貝階段,如果硬體支援的情況下,會加入Scatter/Gather
操作,這樣就省去了僅有的一次CPU
拷貝過程,如下:
優化後的sendfile()
函式,拷貝資料時只需要告知out_fd、in_fd、count
即可,然後DMA
控制器會直接將資料從磁碟拷貝至網絡卡,而無需經過CPU
將資料拷貝至Socket
緩衝區這一步。
6.1.4、splice()核心函式
前面聊到的sendfile()
函式只適用於將資料從磁碟檔案拷貝到Socket
套接字或網絡卡上,所以這也限制了它的使用範圍,因此在Linux2.6
版本中,引入了splice()
函式,其系統呼叫的原型如下:
c
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out,
size_t len, unsigned int flags);
- fd_in
:等待寫入資料的檔案描述符。
- off_in
:如果fd_in
是一個管道檔案(如Socket
),該值必須為NULL
,否則為檔案的偏移量。
- fd_out
:等待讀取資料的檔案描述符。
- off_out
:作用同off_in
引數。
- len
:指定fd_in、fd_out
之間傳輸資料的長度。
- flags
:控制資料傳輸的模式:
- SPLICE_F_MOVE
:如果資料合適,按標準頁大小移動資料(2.6.21
版本後被廢棄)。
- SPLICE_F_NONBLOCK
:以非阻塞式模式執行splice()
,實際依舊會受FD
狀態影響。
- SPLICE_F_MORE
:給核心一個提示,後續splice()
還會繼續傳輸更多的資料。
- SPLICE_F_GIFT
:沒有效果的選項。
使用splice
函式時,fd_in、fd_out
中必須至少有一個是管道檔案描述符,套到網路程式設計中的含義即是指:必須要有一個檔案描述符是Socket
型別,如果兩個磁碟檔案進行復制,則無法使用splice
函式。
splice()
函式的作用和DMA-Scatter/Gather
版的sendfile()
函式完全相同,但與其不同的是:splice()
函式不僅不需要硬體支援,而且能夠做到兩個檔案描述符之間的資料零拷貝,實現的過程是基於一端的管道檔案描述符,在兩個FD
之間搭建pipeline
管道,從而實現兩個FD
之間的資料零拷貝。
6.2、另類的零拷貝技術
前面聊到了四種Linux
系統中的零拷貝技術,而除開Linux
系統中的零拷貝技術外,還有一些另類的零拷貝實現,先來聊一聊緩衝區共享技術,然後再聊聊應用程式中的零拷貝體現。
6.2.1、緩衝區共享
緩衝區共享技術類似於Linux
中的MMAP
共享記憶體,但緩衝區共享則是真正意義上的記憶體共享技術,核心緩衝區和使用者緩衝區共享同一塊記憶體,如下:
作業系統一般為了系統的安全性,在執行期間都會分為使用者態和核心態,無法直接訪問使用者態程式核心態空間,所以Linux
中的MMAP
是基於虛擬記憶體實現的,而想要實現真正意義上的記憶體共享,這也就意味著需要重寫核心結構,目前比較成熟的只有Solaris
系統上的Fast Buffer
技術,但大家只需瞭解即可,因為這個也很少用到。
6.2.2、程式資料的零拷貝
前面聊到的零拷貝技術,都是在減少磁碟檔案和網路套接字之間的資料拷貝次數,而程式中也會存在很多的資料拷貝過程,比如將一個大集合拆分為兩個小集合、將多個小集合合併成一個大集合等等,傳統的做法如下:
```java
List
for (Integer num : a) {
int index = a.indexOf(num);
if (index < 5){
b.add(num);
} else {
c.add(num);
}
}
``
而這種做法顯然會牽扯到資料拷貝,但上述這個做法,會從
a中將資料拷貝到
b、c集合中,而所謂的零拷貝,即是無需發生拷貝動作,也能夠將
a拆分成
b、c`兩個集合。
關於具體如何實現,這點待會兒在
Netty-ByteBuf
中演示,因為Netty
中的零拷貝技術,也實現了程式資料的零拷貝。
6.3、Java-NIO中的零拷貝體現
Java-NIO
中,主要有三個方面用到了零拷貝技術:
- MappedByteBuffer.map()
:底層呼叫了作業系統的mmap()
核心函式。
- DirectByteBuffer.allocateDirect()
:可以直接建立基於本地記憶體的緩衝區。
- FileChannel.transferFrom()/transferTo()
:底層呼叫了sendfile()
核心函式。
觀察上述給出的三處位置,其實本質也就是在呼叫作業系統核心提供的零拷貝函式,以此減少資料的拷貝次數。
6.4、再聊Netty中的零拷貝體現
Netty
中的零拷貝與前面作業系統層面的零拷貝不同,它是一種使用者程序級別的零拷貝體現,主要也包含三方面:
①
Netty
的傳送、接收資料的ByteBuf
緩衝區,預設會使用堆外本地記憶體建立,採用直接記憶體進行Socket
讀寫,資料傳輸時無需經過二次拷貝。如果使用傳統的堆記憶體進行Socket
網路資料讀寫,JVM
需要先將堆記憶體中的資料拷貝一份到直接記憶體,然後才寫入Socket
緩衝區中,相較於堆外直接記憶體,訊息在傳送過程中多了一次緩衝區的記憶體拷貝。②
Netty
的檔案傳輸採用了transferTo()/transferFrom()
方法,它可以直接將檔案緩衝區的資料傳送到目標Channel(Socket)
,底層就是呼叫了sendfile()
核心函式,避免了檔案資料的CPU
拷貝過程。③
Netty
提供了組合、拆解ByteBuf
物件的API
,咱們可以基於一個ByteBuf
物件,對資料進行拆解,也可以基於多個ByteBuf
物件進行資料合併,這個過程中不會出現資料拷貝,下面重點聊一聊這個!
其中前兩條就不過多贅述了,畢竟前面都嘮叨過好幾次,重點說說第三種零拷貝技術,這是一種Java
級別的零拷貝技術,ByteBuf
中主要有slice()、composite()
這兩個方法,用於拆分、合併緩衝區,先來聊聊拆分緩衝區的方法,案例如下:
```java
// 測試Netty-ByteBuf的slice零拷貝方法
private static void sliceZeroCopy(){
// 分配一個初始容量為10的緩衝區
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 寫入0~9十個位元組資料
byte[] numData = {'0','1','2','3','4','5','6','7','8','9'};
buffer.writeBytes(numData);
printBuffer(buffer);
// 從下標0開始,向後擷取五個位元組,拆分成一個新ByteBuf物件
ByteBuf b1 = buffer.slice(0, 5);
printBuffer(b1);
// 從下標5開始,向後擷取五個位元組,拆分成一個新ByteBuf物件
ByteBuf b2 = buffer.slice(5, 5);
printBuffer(b2);
// 證明切割出的兩個ByteBuf物件,是共享第一個ByteBuf物件資料的
// 這裡修改擷取後的b1物件,然後檢視最初的buffer物件
b1.setByte(0,'a');
printBuffer(buffer);
}
public static void main(String[] args) {
sliceZeroCopy();
}
在上述方法中,首先建立了一個`buffer`物件,往其中寫入了`0~9`這十個字元,接著將其拆分成了`b1、b2`這兩個`ByteBuf`物件,`b1、b2`都具備獨立的讀寫指標,但卻並未真正的從`buffer`中拷貝新的資料出來,而是基於`buffer`這個物件,進行了資料擷取,執行結果如下:
log
ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 |
+--------+-------------------------------------------------+----------------+
ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=5, 容量大小=5} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 30 31 32 33 34 |01234 | +--------+-------------------------------------------------+----------------+ ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=5, 容量大小=5} +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 35 36 37 38 39 |56789 | +--------+-------------------------------------------------+----------------+
ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 31 32 33 34 35 36 37 38 39 |a123456789 |
+--------+-------------------------------------------------+----------------+
``
觀察上述第二、三個
ByteBuf緩衝區資訊,與前面說的毫無差異,明顯都具備獨立的讀寫指標,但我為什麼說:
b1、b2沒有拷貝資料呢?接著看方法中的最後一步,我對
b1的第一個元素做了修改,然後輸出了
buffer物件,看上述結果中的第四個
ByteBuf緩衝區資訊,其實會發現:**
buffer物件中下標為
0的資料,也被改成了
a`**!由此即可證明前面的觀點。
不過這種零拷貝方式,雖然減少了資料複製次數,但也會有一定的侷限性:
①使用slice()
方法拆分出的ByteBuf
物件,不支援擴容,也就是切割的長度為5
,最大長度也只能是5
,超出長度時會丟擲下標越界異常。
②由於拆分出的ByteBuf
物件,其資料依賴於原ByteBuf
物件,因此當原始ByteBuf
物件被釋放時,拆分出的緩衝區也會不可用,所以在使用slice()
方法時,要手動呼叫retain()/release()
來增加引用計數(這個後面細聊)。
除開上述的slice()
方法外,還有其他一個叫做duplicate()
的零拷貝方法,它的作用是完全克隆原有ByteBuf
物件,但讀寫指標都是獨立的,並且支援自動擴容,大家感興趣可以自行實驗。
接著聊一聊合併ByteBuf
緩衝區的零拷貝方法,該方法的使用方式與前面的方法並不同,如下:
```java
// 測試Netty-ByteBuf的composite零拷貝方法
private static void compositeZeroCopy(){
// 建立兩個小的ByteBuf緩衝區,並往兩個緩衝區中插入資料
ByteBuf b1 = ByteBufAllocator.DEFAULT.buffer(5);
ByteBuf b2 = ByteBufAllocator.DEFAULT.buffer(5);
byte[] data1 = {'a','b','c','d','e'};
byte[] data2 = {'n','m','x','y','z'};
b1.writeBytes(data1);
b2.writeBytes(data2);
// 建立一個合併緩衝區的CompositeByteBuf物件
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
// 將前面兩個小的緩衝區,合併成一個大的緩衝區
buffer.addComponents(true,b1,b2);
printBuffer(buffer);
}
public static void main(String[] args) { compositeZeroCopy(); }
/ * 執行結果:
ByteBuf緩衝區資訊:{讀取指標=0, 寫入指標=10, 容量大小=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 6e 6d 78 79 7a |abcdenmxyz |
+--------+-------------------------------------------------+----------------+
* /
``
案例中,想要將多個緩衝區合併成一個大的緩衝區,需要先建立一個
CompositeByteBuf物件,接著呼叫它的
addComponent()/addComponents()方法,將小的緩衝區新增進去即可。但在合併多個緩衝區時,
addComponents()方法中的第一個引數必須為
true`,否則不會自動增長讀寫指標。
其實說到底,
Netty-ByteBuf
緩衝區的零拷貝方法,實際上也可以被稱之為“一種特殊的淺拷貝”,與之對應的是“深拷貝”,而ByteBuf
中的“深拷貝”,則是一系列以Copy
開頭的方法,通過這類方法複製緩衝區,會完全分配新的記憶體地址、讀寫指標。
最後,在Netty
內部還提供了一個名為Unpooled
的工具類,這主要是針對於非池化緩衝區的工具類,內部也提供了一系列wrappend
開頭的方法,可以用來組合、包裝多個ByteBuf
物件或位元組陣列,呼叫對應方法時,內部也不會發生拷貝動作,這也是一類零拷貝的方法。
七、Netty入門篇小結
經過上述一系列的叨叨絮絮後,對於Netty
框架的基本概念,以及Netty
框架中大多數核心元件做了介紹,但對於一些粘包、半包、解碼器、長連線、心跳機制等內容未闡述,原本打算將這些內容一篇寫完,但本章的字數實在太多,嚴重超出單章限制:
因此對於後續一些進階的知識,會再開設一篇講述,預計Netty
的文章會有4~5
篇左右,大體順序為《Netty
入門篇》、《Netty
進階篇》、《Netty
實戰篇》、《Netty
應用篇》、《Netty
原始碼篇》,但具體的篇幅會在後續適當調整。
本篇的內容就到這裡啦,如若對你有幫助,請記得點個小贊支援一下~
- 全解MySQL終章:這份爆肝30W字的資料庫寶典贈與有緣的你!
- 追憶四年前:一段關於我被外企CTO用登入註冊吊打的不堪往事
- (十一)Netty實戰篇:基於Netty框架打造一款高效能的IM即時通訊程式
- (四)MySQL之索引初識篇:索引機制、索引分類、索引使用與管理綜述
- (九)MySQL之MVCC機制:為什麼你改了的資料我還看不見?
- (十)全解MySQL之死鎖問題分析、事務隔離與鎖機制的底層原理剖析
- (二十八)MySQL面試通關祕籍:這次你也可以在簡歷寫上精通MySQL!
- (一)全解MySQL之架構篇:自頂向下深入剖析MySQL整體架構!
- (九)Java網路程式設計無冕之王-這回把大名鼎鼎的Netty框架一網打盡!
- (八)MySQL鎖機制:高併發場景下該如何保證資料讀寫的安全性?
- (十五)MySQL命令大全:以後再也不用擔心忘記SQL該怎麼寫啦~
- (七)MySQL事務篇:ACID原則、事務隔離級別及事務機制原理剖析
- 深入理解SpringMVC工作原理,像大牛一樣手寫SpringMVC框架
- (三)MySQL之庫表設計篇:一、二、三、四、五正規化、BC正規化與反正規化詳解!
- (五)MySQL索引應用篇:建立索引的正確姿勢與使用索引的最佳指南!
- (六)MySQL索引原理篇:深入資料庫底層揭開索引機制的神祕面紗!
- (五)網路程式設計之流量接入層設計:基於效能怪獸從零構建日均億級吞吐量的閘道器架構!
- (四)網路程式設計之請求分發篇:負載均衡靜態排程演算法、平滑輪詢加權、一致性雜湊、最小活躍數演算法實踐!
- (三)Nginx一網打盡:動靜分離、壓縮、快取、黑白名單、跨域、高可用、效能優化...想要的這都有!
- Redis綜述篇:與面試官徹夜長談Redis快取、持久化、淘汰機制、哨兵、叢集底層原理!