Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext
概述
之前学习的Reactor模型只是懂得了服务器端如何把一个连接放到workergroup中处理,那么真正处理某一个具体的请求的过程是什么样的呢
基本概念
- 我们知道数据的读取都是通过channel来完成的,Netty在channel的IO事件中定义了许多的
生命周期函数
,Netty将这些生命周期函数封装在Handler
中,通过责任链的模式封装在了一个pipeline
中。当 channel中触发了对应的IO事件,就会调用pipeline中的头或者尾
的一个handler,至于是否传递到下一级,由对应的handler判断. - Netty把IO事件分成两类,一个是Netty从网络中读取数据,成为InBound。一个是Netty把数据写到网络中,成为OutBound。Handler也被分成
ChannelInBoundHandler和ChannelOutBoundHandler
,他们各自有着不同的生命周期函数 - pipeline是要给双向链表,有着头尾两个指针,
inbound事件是从head节点往后传播,outbound事件是从tail节点往前传播
- 为了能让handler更加的灵活,Netty给handler外面包了一层的
HandlerContext
. context允许handler在生命周期方法中直接改变数据的流向
,比如读完数据就可以让 context在把数据写出去,那么数据的流向就从 inbound变成了outbound
InboundHandler
public interface ChannelInboundHandler extends ChannelHandler {
# channel 注册到eventLoop
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
# channel从eventloop中取消注册
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
# channel已经建立了连接
void channelActive(ChannelHandlerContext ctx) throws Exception;
# channel结束了连接
void channelInactive(ChannelHandlerContext ctx) throws Exception;
# channel从网络中读到了数据
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
# channel读取完数据
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
# 读取数据中发生了异常调用该方法
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码
我们来看看-生命周期的调用源码。
- channelRegistered: 在AbstractChannel方法中register方法中
private void register0(ChannelPromise promise) {
try {
# 这里就是让 channel注册到eventloop并且注册到selector中的方法
doRegister();
# 注册结束之后就会调用 pipeline的fireChannelRegistered方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
# 如果是激活状态还会调用 fireChannelActive 方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
}
}
} catch (Throwable t) {
}
}
复制代码
那么我们来看看pipeline的源码
@Override
public final ChannelPipeline fireChannelRegistered() {
# 传了一个pipeline 的头结点进去,说明是从头开始传播的
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
复制代码
pipeline调用了 context的静态方法,下面的都是context内部的调用关系
# AbstractChannelHandlerContext 的 invokeChannelRegistered 方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
# 又调用头结点的 invokeChannelRegistered 方法
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
# 拿到head节点的 handler调用handler的钩子函数
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
复制代码
最后调用到了就是我们自定义的handler的钩子函数,特别注意如果我们在这个函数里面没有调用super.channelRegistered(ctx);那么转播就会终止
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered----Inbound1");
super.channelRegistered(ctx);
}
复制代码
如果我们继续转播的话, context就会找到下一个inbound的context然后再执行方法。
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
复制代码
outboundHandler
public interface ChannelOutboundHandler extends ChannelHandler {
# 当绑定端口成功后触发
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
# 客户端连接到服务器触发
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
# 断开连接触发
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
复制代码
outbound和inbound就是触发的时机不同,基本上用不太到,以后有用到再来补。
ChannelInitializer
在 BootstrapServer中,我们往往会创建一个 ChannelInitializer的 ChildHandler。然后在 initChannel方法中对新来的 channel的pipeline中添加handler。,我们来看看这个流程
- 在serverBootStrap中存放了ChannelInitializer
public static void main(String[] args) throws Exception {
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
# 保存childHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Inbound1());
ch.pipeline().addLast(new OutBound1());
ch.pipeline().addLast(new Inbound2());
ch.pipeline().addLast(new OutBound2());
ch.pipeline().addLast(new Inbound3());
ch.pipeline().addLast(new OutBound3());
}
}).bind(9090).sync().channel().closeFuture().sync();
}
复制代码
- 当客户端连接服务端的时候会触发 ServerBootStrap的 channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
# 这就是客户端channel
final Channel child = (Channel) msg;
# 给客户端Channel添加了刚才写的ChannelInitializer
child.pipeline().addLast(childHandler);
}
复制代码
- 然后就是reactor的一系列东西,这个channel被bossgroup添加了一个handler后丢到了workergroup,然后准备注册到workergroup的channel的时候
private void register0(ChannelPromise promise) {
try {
# 这里就是让 channel注册到eventloop并且注册到selector中的方法
doRegister();
# 调用pipeline的添加handler的方法
pipeline.invokeHandlerAddedIfNeeded();
# 注册结束之后就会调用 pipeline的fireChannelRegistered方法
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
# 如果是激活状态还会调用 fireChannelActive 方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
}
}
} catch (Throwable t) {
}
}
复制代码
- 然后最后掉到pipeline的
callHandlerAdded0
方法,里面拿到handler执行handlerAdded方法
5. pipeline中的方法
总结
这几个流程就大体的梳理完了,总结一下
- 在创建ServerBootStrap的时候传了一个initHandler
- 在server端收到客户端连接的时候就把这个initHandler添加到这个客户端channel的pipeline中
- 然后客户端准备注册到eventLoop的时候会去调用一下pipeline中的inithandler,然后这时候客户端的pipeline就添加到了用户自定义的handler。
- 然后把这个初始化的handler给remove掉
最后
如果你觉得此文对你有一丁点帮助,点个赞。或者可以加入我的开发交流群:1025263163相互学习,我们会有专业的技术答疑解惑
如果你觉得这篇文章对你有点用的话,麻烦请给我们的开源项目点点star:http://github.crmeb.net/u/defu不胜感激 !
PHP学习手册:http://doc.crmeb.com
技术交流论坛:http://q.crmeb.com
「其他文章」
- 遵循Promises/A 规范,深入分析Promise实现细节 | 通过872测试样例
- 80 行代码实现简易 RxJS
- 前后端分离项目,如何解决跨域问题?
- springboot中拦截并替换token来简化身份验证
- 15 行代码在 wangEditor v5 使用数学公式
- Java线程池必知必会
- EdgeDB 架构简析
- TS 类型体操:图解一个复杂高级类型
- 基于babel的埋点工具简单实现及思考
- 使用craco对cra项目进行构建优化
- Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext
- 理解python异步编程与简单实现asyncio
- Mycat 作为代理服务端的小知识点
- 一文吃透 React Expiration Time
- 前端模块化详解
- Java必备主流技术流程图
- 【建议使用】告别if,Java超好用参数校验工具类
- MySQL模糊查询再也不用like %了
- Java 8 的Stream流那么强大,你知道它的原理吗
- Vue SEO的四种方案