Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext
概述
之前學習的Reactor模型只是懂得了伺服器端如何把一個連線放到workergroup中處理,那麼真正處理某一個具體的請求的過程是什麼樣的呢
基本概念
- 我們知道資料的讀取都是通過channel來完成的,Netty在channel的IO事件中定義了許多的
生命週期函式
,Netty將這些生命週期函式封裝在Handler
中,通過責任鏈的模式封裝在了一個pipeline
中。當 channel中觸發了對應的IO事件,就會呼叫pipeline中的頭或者尾
的一個handler,至於是否傳遞到下一級,由對應的handler判斷. - Netty把IO事件分成兩類,一個是Netty從網路中讀取資料,成為InBound。一個是Netty把資料寫到網路中,成為OutBound。Handler也被分成
ChannelInBoundHandler和ChannelOutBoundHandler
,他們各自有著不同的生命週期函式 - pipeline是要給雙向連結串列,有著頭尾兩個指標,
inbound事件是從head節點往後傳播,outbound事件是從tail節點往前傳播
- 為了能讓handler更加的靈活,Netty給handler外面包了一層的
HandlerContext
. context允許handler在生命週期方法中直接改變資料的流向
,比如讀完資料就可以讓 context在把資料寫出去,那麼資料的流向就從 inbound變成了outbound
InboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
# channel 註冊到eventLoop
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
# channel從eventloop中取消註冊
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
# channel已經建立了連線
void channelActive(ChannelHandlerContext ctx) throws Exception;
# channel結束了連線
void channelInactive(ChannelHandlerContext ctx) throws Exception;
# channel從網路中讀到了資料
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
# channel讀取完資料
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
# 讀取資料中發生了異常呼叫該方法
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
複製程式碼
我們來看看-生命週期的呼叫原始碼。
- channelRegistered: 在AbstractChannel方法中register方法中
private void register0(ChannelPromise promise) {
try {
# 這裡就是讓 channel註冊到eventloop並且註冊到selector中的方法
doRegister();
# 註冊結束之後就會呼叫 pipeline的fireChannelRegistered方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
# 如果是啟用狀態還會呼叫 fireChannelActive 方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
}
}
} catch (Throwable t) {
}
}
複製程式碼
那麼我們來看看pipeline的原始碼
@Override
public final ChannelPipeline fireChannelRegistered() {
# 傳了一個pipeline 的頭結點進去,說明是從頭開始傳播的
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
複製程式碼
pipeline呼叫了 context的靜態方法,下面的都是context內部的呼叫關係
# AbstractChannelHandlerContext 的 invokeChannelRegistered 方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
# 又呼叫頭結點的 invokeChannelRegistered 方法
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
# 拿到head節點的 handler呼叫handler的鉤子函式
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
複製程式碼
最後呼叫到了就是我們自定義的handler的鉤子函式,特別注意如果我們在這個函式裡面沒有呼叫super.channelRegistered(ctx);那麼轉播就會終止
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered----Inbound1");
super.channelRegistered(ctx);
}
複製程式碼
如果我們繼續轉播的話, context就會找到下一個inbound的context然後再執行方法。
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
複製程式碼
outboundHandler
public interface ChannelOutboundHandler extends ChannelHandler {
# 當繫結埠成功後觸發
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
# 客戶端連線到伺服器觸發
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
# 斷開連線觸發
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
複製程式碼
outbound和inbound就是觸發的時機不同,基本上用不太到,以後有用到再來補。
ChannelInitializer
在 BootstrapServer中,我們往往會建立一個 ChannelInitializer的 ChildHandler。然後在 initChannel方法中對新來的 channel的pipeline中新增handler。,我們來看看這個流程
- 在serverBootStrap中存放了ChannelInitializer
public static void main(String[] args) throws Exception {
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
# 儲存childHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Inbound1());
ch.pipeline().addLast(new OutBound1());
ch.pipeline().addLast(new Inbound2());
ch.pipeline().addLast(new OutBound2());
ch.pipeline().addLast(new Inbound3());
ch.pipeline().addLast(new OutBound3());
}
}).bind(9090).sync().channel().closeFuture().sync();
}
複製程式碼
- 當客戶端連線服務端的時候會觸發 ServerBootStrap的 channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
# 這就是客戶端channel
final Channel child = (Channel) msg;
# 給客戶端Channel添加了剛才寫的ChannelInitializer
child.pipeline().addLast(childHandler);
}
複製程式碼
- 然後就是reactor的一系列東西,這個channel被bossgroup添加了一個handler後丟到了workergroup,然後準備註冊到workergroup的channel的時候
private void register0(ChannelPromise promise) {
try {
# 這裡就是讓 channel註冊到eventloop並且註冊到selector中的方法
doRegister();
# 呼叫pipeline的新增handler的方法
pipeline.invokeHandlerAddedIfNeeded();
# 註冊結束之後就會呼叫 pipeline的fireChannelRegistered方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
# 如果是啟用狀態還會呼叫 fireChannelActive 方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
}
}
} catch (Throwable t) {
}
}
複製程式碼
- 然後最後掉到pipeline的
callHandlerAdded0
方法,裡面拿到handler執行handlerAdded方法
5. pipeline中的方法
總結
這幾個流程就大體的梳理完了,總結一下
- 在建立ServerBootStrap的時候傳了一個initHandler
- 在server端收到客戶端連線的時候就把這個initHandler新增到這個客戶端channel的pipeline中
- 然後客戶端準備註冊到eventLoop的時候會去呼叫一下pipeline中的inithandler,然後這時候客戶端的pipeline就新增到了使用者自定義的handler。
- 然後把這個初始化的handler給remove掉
最後
如果你覺得此文對你有一丁點幫助,點個贊。或者可以加入我的開發交流群:1025263163相互學習,我們會有專業的技術答疑解惑
如果你覺得這篇文章對你有點用的話,麻煩請給我們的開源專案點點star:http://github.crmeb.net/u/defu不勝感激 !
PHP學習手冊:https://doc.crmeb.com
技術交流論壇:https://q.crmeb.com
「其他文章」
- 遵循Promises/A 規範,深入分析Promise實現細節 | 通過872測試樣例
- 80 行程式碼實現簡易 RxJS
- 前後端分離專案,如何解決跨域問題?
- springboot中攔截並替換token來簡化身份驗證
- 15 行程式碼在 wangEditor v5 使用數學公式
- Java執行緒池必知必會
- EdgeDB 架構簡析
- TS 型別體操:圖解一個複雜高階型別
- 基於babel的埋點工具簡單實現及思考
- 使用craco對cra專案進行構建優化
- Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext
- 理解python非同步程式設計與簡單實現asyncio
- Mycat 作為代理服務端的小知識點
- 一文吃透 React Expiration Time
- 前端模組化詳解
- Java必備主流技術流程圖
- 【建議使用】告別if,Java超好用引數校驗工具類
- MySQL模糊查詢再也不用like %了
- Java 8 的Stream流那麼強大,你知道它的原理嗎
- Vue SEO的四種方案