Netty核心概念之ChannelHandler&Pipeline&ChannelHandlerContext

语言: CN / TW / HK

概述

之前学习的Reactor模型只是懂得了服务器端如何把一个连接放到workergroup中处理,那么真正处理某一个具体的请求的过程是什么样的呢

基本概念

  1. 我们知道数据的读取都是通过channel来完成的,Netty在channel的IO事件中定义了许多的生命周期函数,Netty将这些生命周期函数封装在Handler中,通过责任链的模式封装在了一个 pipeline中。当 channel中触发了对应的IO事件,就会调用pipeline中的头或者尾的一个handler,至于是否传递到下一级,由对应的handler判断.
  2. Netty把IO事件分成两类,一个是Netty从网络中读取数据,成为InBound。一个是Netty把数据写到网络中,成为OutBound。Handler也被分成ChannelInBoundHandler和ChannelOutBoundHandler,他们各自有着不同的生命周期函数
  3. pipeline是要给双向链表,有着头尾两个指针,inbound事件是从head节点往后传播,outbound事件是从tail节点往前传播
  4. 为了能让handler更加的灵活,Netty给handler外面包了一层的 HandlerContext. context允许handler在生命周期方法中直接改变数据的流向,比如读完数据就可以让 context在把数据写出去,那么数据的流向就从 inbound变成了outbound

InboundHandler

public interface ChannelInboundHandler extends ChannelHandler {

    # channel 注册到eventLoop
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    # channel从eventloop中取消注册
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    # channel已经建立了连接
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    # channel结束了连接
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    # channel从网络中读到了数据
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    # channel读取完数据
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    
    # 读取数据中发生了异常调用该方法
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码

我们来看看-生命周期的调用源码。

  1. channelRegistered: 在AbstractChannel方法中register方法中
private void register0(ChannelPromise promise) {
    try {
        # 这里就是让 channel注册到eventloop并且注册到selector中的方法
        doRegister();
        # 注册结束之后就会调用 pipeline的fireChannelRegistered方法
        pipeline.fireChannelRegistered();
      
        if (isActive()) {
            if (firstRegistration) {
                # 如果是激活状态还会调用 fireChannelActive 方法
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {

            }
        }
    } catch (Throwable t) {
      
    }
}
复制代码

那么我们来看看pipeline的源码

@Override
public final ChannelPipeline fireChannelRegistered() {
    # 传了一个pipeline 的头结点进去,说明是从头开始传播的
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}
复制代码

pipeline调用了 context的静态方法,下面的都是context内部的调用关系

# AbstractChannelHandlerContext 的 invokeChannelRegistered 方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        # 又调用头结点的 invokeChannelRegistered 方法
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            # 拿到head节点的 handler调用handler的钩子函数
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}
复制代码

最后调用到了就是我们自定义的handler的钩子函数,特别注意如果我们在这个函数里面没有调用super.channelRegistered(ctx);那么转播就会终止

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channelRegistered----Inbound1");
    super.channelRegistered(ctx);
}
复制代码

如果我们继续转播的话, context就会找到下一个inbound的context然后再执行方法。

@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound());
    return this;
}

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}
复制代码

outboundHandler

public interface ChannelOutboundHandler extends ChannelHandler {
    # 当绑定端口成功后触发
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    # 客户端连接到服务器触发
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    # 断开连接触发
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
   
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    
    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    void flush(ChannelHandlerContext ctx) throws Exception;
}
复制代码

outbound和inbound就是触发的时机不同,基本上用不太到,以后有用到再来补。

ChannelInitializer

在 BootstrapServer中,我们往往会创建一个 ChannelInitializer的 ChildHandler。然后在 initChannel方法中对新来的 channel的pipeline中添加handler。,我们来看看这个流程

  1. 在serverBootStrap中存放了ChannelInitializer
public static void main(String[] args) throws Exception {

    new ServerBootstrap()
            .group(new NioEventLoopGroup(), new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            # 保存childHandler
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new Inbound1());
                    ch.pipeline().addLast(new OutBound1());
                    ch.pipeline().addLast(new Inbound2());
                    ch.pipeline().addLast(new OutBound2());
                    ch.pipeline().addLast(new Inbound3());
                    ch.pipeline().addLast(new OutBound3());
                }
            }).bind(9090).sync().channel().closeFuture().sync();
}
复制代码
  1. 当客户端连接服务端的时候会触发 ServerBootStrap的 channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    # 这就是客户端channel
    final Channel child = (Channel) msg;
    # 给客户端Channel添加了刚才写的ChannelInitializer
    child.pipeline().addLast(childHandler);
}
复制代码
  1. 然后就是reactor的一系列东西,这个channel被bossgroup添加了一个handler后丢到了workergroup,然后准备注册到workergroup的channel的时候
private void register0(ChannelPromise promise) {
    try {
        # 这里就是让 channel注册到eventloop并且注册到selector中的方法
        doRegister();
        # 调用pipeline的添加handler的方法
        pipeline.invokeHandlerAddedIfNeeded();

        # 注册结束之后就会调用 pipeline的fireChannelRegistered方法
        pipeline.fireChannelRegistered();
      
        if (isActive()) {
            if (firstRegistration) {
                # 如果是激活状态还会调用 fireChannelActive 方法
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {

            }
        }
    } catch (Throwable t) {
      
    }
}
复制代码
  1. 然后最后掉到pipeline的 callHandlerAdded0方法,里面拿到handler执行handlerAdded方法

5. pipeline中的方法

总结

这几个流程就大体的梳理完了,总结一下

  1. 在创建ServerBootStrap的时候传了一个initHandler
  2. 在server端收到客户端连接的时候就把这个initHandler添加到这个客户端channel的pipeline中
  3. 然后客户端准备注册到eventLoop的时候会去调用一下pipeline中的inithandler,然后这时候客户端的pipeline就添加到了用户自定义的handler。
  4. 然后把这个初始化的handler给remove掉

最后

如果你觉得此文对你有一丁点帮助,点个赞。或者可以加入我的开发交流群:1025263163相互学习,我们会有专业的技术答疑解惑

如果你觉得这篇文章对你有点用的话,麻烦请给我们的开源项目点点star:http://github.crmeb.net/u/defu不胜感激 !

PHP学习手册:http://doc.crmeb.com
技术交流论坛:http://q.crmeb.com