拆開Netty,我發現了這個8個從來沒見過的東西?

語言: CN / TW / HK

Netty 概述

1、什麼是 Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是一個非同步的、基於事件驅動的網路應用框架,用於快速開發可維護、高效能的網路伺服器和客戶端

注意:netty的非同步還是基於多路複用的,並沒有實現真正意義上的非同步IO

2、Netty 的優勢

如果使用傳統 NIO,其工作量大,bug 多

  • 需要自己構建協議
  • 解決 TCP 傳輸問題,如粘包、半包
  • 因為 bug 的存在,epoll 空輪詢導致 CPU 100%

Netty 對 API 進行增強,使之更易用,如

  • FastThreadLocal => ThreadLocal
  • ByteBuf => ByteBuffer

3、入門案例

1、伺服器端程式碼

public class HelloServer {
    public static void main(String[] args) {
        // 1、啟動器,負責裝配netty元件,啟動伺服器
        new ServerBootstrap()
                // 2、建立 NioEventLoopGroup,可以簡單理解為 執行緒池 + Selector
                .group(new NioEventLoopGroup())
                // 3、選擇伺服器的 ServerSocketChannel 實現
                .channel(NioServerSocketChannel.class)
                // 4、child 負責處理讀寫,該方法決定了 child 執行哪些操作
            	// ChannelInitializer 處理器(僅執行一次)
            	// 它的作用是待客戶端 SocketChannel 建立連線後,執行 initChannel 以便新增更多的處理器
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 5、SocketChannel的處理器,使用StringDecoder解碼,ByteBuf=>String
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 6、SocketChannel的業務處理,使用上一個處理器的處理結果
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                System.out.println(s);
                            }
                        });
                    }
                    // 7、ServerSocketChannel繫結8080埠
                }).bind(8080);
    }
}

2、客戶端程式碼

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        new Bootstrap()
                .group(new NioEventLoopGroup())
                // 選擇客戶 Socket 實現類,NioSocketChannel 表示基於 NIO 的客戶端實現
                .channel(NioSocketChannel.class)
                // ChannelInitializer 處理器(僅執行一次)
                // 它的作用是待客戶端SocketChannel建立連線後,執行initChannel以便新增更多的處理器
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        // 訊息會經過通道 handler 處理,這裡是將 String => ByteBuf 編碼發出
                        channel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 指定要連線的伺服器和埠
                .connect(new InetSocketAddress("localhost", 8080))
                // Netty 中很多方法都是非同步的,如 connect
                // 這時需要使用 sync 方法等待 connect 建立連線完畢
                .sync()
                // 獲取 channel 物件,它即為通道抽象,可以進行資料讀寫操作
                .channel()
                // 寫入訊息並清空緩衝區
                .writeAndFlush("hello world");
    }
}

3、執行流程

左:客戶端 右:伺服器端

file

元件解釋

  • channel 可以理解為資料的通道
  • msg 理解為流動的資料,最開始輸入是 ByteBuf,但經過 pipeline 中的各個 handler 加工,會變成其它型別物件,最後輸出又變成 ByteBuf
  • handler 可以理解為資料的處理工序
    • 工序有多道,合在一起就是 pipeline(傳遞途徑),pipeline 負責釋出事件(讀、讀取完成…)傳播給每個 handler, handler 對自己感興趣的事件進行處理(重寫了相應事件處理方法)

      • pipeline 中有多個 handler,處理時會依次呼叫其中的 handler
    • handler 分 Inbound 和 Outbound 兩類

      • Inbound 入站
    • Outbound 出站

  • eventLoop 可以理解為處理資料的工人
    • eventLoop 可以管理多個 channel 的 io 操作,並且一旦 eventLoop 負責了某個 channel,就會將其與 channel 進行繫結,以後該 channel 中的 io 操作都由該 eventLoop 負責
    • eventLoop 既可以執行 io 操作,也可以進行任務處理,每個 eventLoop 有自己的任務佇列,佇列裡可以堆放多個 channel 的待處理任務,任務分為普通任務、定時任務
    • eventLoop 按照 pipeline 順序,依次按照 handler 的規劃(程式碼)處理資料,可以為每個 handler 指定不同的 eventLoop

1、EventLoop

事件迴圈物件 EventLoop

EventLoop 本質是一個單執行緒執行器(同時維護了一個 Selector),裡面有 run 方法處理一個或多個 Channel 上源源不斷的 io 事件

它的繼承關係如下

  • 繼承自 j.u.c.ScheduledExecutorService 因此包含了執行緒池中所有的方法

  • 繼承自 netty 自己的 OrderedEventExecutor

    • 提供了 boolean inEventLoop (Thread thread) 方法判斷一個執行緒是否屬於此 EventLoop

    • 提供了 EventLoopGroup parent () 方法來看看自己屬於哪個 EventLoopGroup

事件迴圈組 EventLoopGroup

EventLoopGroup 是一組 EventLoop,Channel 一般會呼叫 EventLoopGroup 的 register 方法來繫結其中一個 EventLoop,後續這個 Channel 上的 io 事件都由此 EventLoop 來處理(保證了 io 事件處理時的執行緒安全)

  • 繼承自 netty 自己的 EventExecutorGroup
    • 實現了 Iterable 介面提供遍歷 EventLoop 的能力
    • 另有 next 方法獲取集合中下一個 EventLoop

1.1 處理普通與定時任務

public class TestEventLoop {
    public static void main(String[] args) {
        // 建立擁有兩個EventLoop的NioEventLoopGroup,對應兩個執行緒
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 通過next方法可以獲得下一個 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());

        // 通過EventLoop執行普通任務
        group.next().execute(()->{
            System.out.println(Thread.currentThread().getName() + " hello");
        });

        // 通過EventLoop執行定時任務
        group.next().scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName() + " hello2");
        }, 0, 1, TimeUnit.SECONDS);
        
        // 優雅地關閉
        group.shutdownGracefully();
    }
}

輸出結果如下

io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2

關閉 EventLoopGroup

優雅關閉 shutdownGracefully 方法。該方法會首先切換 EventLoopGroup 到關閉狀態從而拒絕新的任務的加入,然後在任務佇列的任務都處理完成後,停止執行緒的執行。從而確保整體應用是在正常有序的狀態下退出的

1.2 處理 IO 任務

伺服器程式碼

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));

                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客戶端程式碼

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        // 此處打斷點除錯,呼叫 channel.writeAndFlush(...);
        System.in.read();
    }
}

1.3 分工

Bootstrap 的 group () 方法可以傳入兩個 EventLoopGroup 引數,分別負責處理不同的事件

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
            	// 兩個Group,分別為Boss 負責Accept事件,Worker 負責讀寫事件
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
            
				...
    }
}

多個客戶端分別傳送 hello 結果

nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4

可以看出,一個 EventLoop 可以負責多個 Channel,且 EventLoop 一旦與 Channel 繫結,則一直負責處理該 Channel 中的事件

file

增加自定義 EventLoopGroup

當有的任務需要較長的時間處理時,可以使用非 NioEventLoopGroup,避免同一個 NioEventLoop 中的其他 Channel 在較長的時間內都無法得到處理

   public class MyServer {
    public static void main(String[] args) {
        // 增加自定義的非NioEventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();
        
        new ServerBootstrap()
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 增加兩個handler,第一個使用NioEventLoopGroup處理,第二個使用自定義EventLoopGroup處理
                        socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                // 呼叫下一個handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 該handler繫結自定義的Group
                        .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}


啟動四個客戶端傳送資料

nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4

可以看出,客戶端與伺服器之間的事件,被 nioEventLoopGroup 和 defaultEventLoopGroup 分別處理

file

切換的實現

不同的 EventLoopGroup 切換的實現原理如下

由上面的圖可以看出,當 handler 中繫結的 Group 不同時,需要切換 Group 來執行不同的任務

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 獲得下一個EventLoop, excutor 即為 EventLoopGroup
    EventExecutor executor = next.executor();
    
    // 如果下一個EventLoop 在當前的 EventLoopGroup中
    if (executor.inEventLoop()) {
        // 使用當前 EventLoopGroup 中的 EventLoop 來處理任務
        next.invokeChannelRead(m);
    } else {
        // 否則讓另一個 EventLoopGroup 中的 EventLoop 來建立任務並執行
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

  • 如果兩個 handler 繫結的是同一個 EventLoopGroup,那麼就直接呼叫
  • 否則,把要呼叫的程式碼封裝為一個任務物件,由下一個 handler 的 EventLoopGroup 來呼叫

2、Channel

Channel 的常用方法

  • close () 可以用來關閉 Channel
  • closeFuture () 用來處理 Channel 的關閉
    • sync 方法作用是同步等待 Channel 關閉
    • 而 addListener 方法是非同步等待 Channel 關閉
  • pipeline () 方法用於新增處理器
  • write () 方法將資料寫入
    • 因為緩衝機制,資料被寫入到 Channel 中以後,不會立即被髮送
    • 只有當緩衝滿了或者呼叫了 flush () 方法後,才會將資料通過 Channel 傳送出去
  • writeAndFlush () 方法將資料寫入並立即傳送(刷出)

2.1 ChannelFuture

連線問題

拆分客戶端程式碼

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 該方法為非同步非阻塞方法,主執行緒呼叫後不會被阻塞,真正去執行連線操作的是NIO執行緒
            	// NIO執行緒:NioEventLoop 中的執行緒
                .connect(new InetSocketAddress("localhost", 8080));
        
        // 該方法用於等待連線真正建立
        channelFuture.sync();
        
        // 獲取客戶端-伺服器之間的Channel物件
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        System.in.read();
    }
}

如果我們去掉 channelFuture.sync() 方法,會伺服器無法收到 hello world

這是因為建立連線 (connect) 的過程是 非同步非阻塞 的,若不通過 sync() 方法阻塞主執行緒,等待連線真正建立,這時通過 channelFuture.channel () 拿到的 Channel 物件,並不是真正與伺服器建立好連線的 Channel,也就沒法將資訊正確的傳輸給伺服器端

所以需要通過 channelFuture.sync() 方法,阻塞主執行緒,同步處理結果,等待連線真正建立好以後,再去獲得 Channel 傳遞資料。使用該方法,獲取 Channel 和傳送資料的執行緒 都是主執行緒

下面還有一種方法,用於 非同步 獲取建立連線後的 Channel 和傳送資料,使得執行這些操作的執行緒是 NIO 執行緒(去執行 connect 操作的執行緒)

addListener 方法

通過這種方法可以在 NIO 執行緒中獲取 Channel 併發送資料,而不是在主執行緒中執行這些操作

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 該方法為非同步非阻塞方法,主執行緒呼叫後不會被阻塞,真正去執行連線操作的是NIO執行緒
                // NIO執行緒:NioEventLoop 中的執行緒
                .connect(new InetSocketAddress("localhost", 8080));
        
		// 當connect方法執行完畢後,也就是連線真正建立後
        // 會在NIO執行緒中呼叫operationComplete方法
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                channel.writeAndFlush("hello world");
            }
        });
        System.in.read();
    }
}

處理關閉

public class ReadClient {
    public static void main(String[] args) throws InterruptedException {
        // 建立EventLoopGroup,使用完畢後關閉
        NioEventLoopGroup group = new NioEventLoopGroup();
        
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();

        Channel channel = channelFuture.channel();
        Scanner scanner = new Scanner(System.in);

        // 建立一個執行緒用於輸入並向伺服器傳送
        new Thread(()->{
            while (true) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    // 關閉操作是非同步的,在NIO執行緒中執行
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "inputThread").start();

        // 獲得closeFuture物件
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close...");
        
        // 同步等待NIO執行緒執行完close操作
        closeFuture.sync();
        
        // 關閉之後執行一些操作,可以保證執行的操作一定是在channel關閉以後執行的
        System.out.println("關閉之後執行一些額外操作...");
        
        // 關閉EventLoopGroup
        group.shutdownGracefully();
    }
}

關閉channel

當我們要關閉 channel 時,可以呼叫 channel.close () 方法進行關閉。但是該方法也是一個非同步方法。真正的關閉操作並不是在呼叫該方法的執行緒中執行的,而是在 NIO 執行緒中執行真正的關閉操作

如果我們想在 channel 真正關閉以後,執行一些額外的操作,可以選擇以下兩種方法來實現

  • 通過 channel.closeFuture () 方法獲得對應的 ChannelFuture 物件,然後呼叫 sync () 方法阻塞執行操作的執行緒,等待 channel 真正關閉後,再執行其他操作
// 獲得closeFuture物件
ChannelFuture closeFuture = channel.closeFuture();

// 同步等待NIO執行緒執行完close操作
closeFuture.sync();
  • 呼叫 closeFuture.addListener 方法,新增 close 的後續操作
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        // 等待channel關閉後才執行的操作
        System.out.println("關閉之後執行一些額外操作...");
        // 關閉EventLoopGroup
        group.shutdownGracefully();
    }
});

3、Future 與 Promise

3.1 概念

netty 中的 Future 與 jdk 中的 Future 同名,但是是兩個介面

netty 的 Future 繼承自 jdk 的 Future,而 Promise 又對 netty Future 進行了擴充套件

  • jdk Future 只能同步等待任務結束(或成功、或失敗)才能得到結果

  • netty Future 可以同步等待任務結束得到結果,也可以非同步方式得到結果,但都是要等任務結束

  • netty Promise 不僅有 netty Future 的功能,而且脫離了任務獨立存在,只作為兩個執行緒間傳遞結果的容器

file

3.2 JDK Future

public class JdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "JdkFuture");
            }
        };
        // 建立執行緒池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);

        // 獲得Future物件
        Future<Integer> future = executor.submit(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });

        // 通過阻塞的方式,獲得執行結果
        System.out.println(future.get());
    }
}

3.3 Netty Future

public class NettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();

        // 獲得 EventLoop 物件
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 50;
            }
        });

        // 主執行緒中獲取結果
        System.out.println(Thread.currentThread().getName() + " 獲取結果");
        System.out.println("getNow " + future.getNow());
        System.out.println("get " + future.get());

        // NIO執行緒中非同步獲取結果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(Thread.currentThread().getName() + " 獲取結果");
                System.out.println("getNow " + future.getNow());
            }
        });
    }
}

執行結果

main 獲取結果
getNow null
get 50
nioEventLoopGroup-2-1 獲取結果
getNow 50

Netty 中的 Future 物件,可以通過 EventLoop 的 sumbit () 方法得到

  • 可以通過 Future 物件的 get 方法,阻塞地獲取返回結果

  • 也可以通過 getNow 方法,獲取結果,若還沒有結果,則返回 null,該方法是非阻塞的

  • 還可以通過 future.addListener 方法,在 Callable 方法執行的執行緒中,非同步獲取返回結果

3.4 Netty Promise

Promise 相當於一個容器,可以用於存放各個執行緒中的結果,然後讓其他執行緒去獲取該結果

public class NettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 建立EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        // 建立Promise物件,用於存放結果
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 自定義執行緒向Promise中存放結果
            promise.setSuccess(50);
        }).start();

        // 主執行緒從Promise中獲取結果
        System.out.println(Thread.currentThread().getName() + " " + promise.get());
    }
}

4、Handler 與 Pipeline

4.1 Pipeline

public class PipeLineServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 在socketChannel的pipeline中新增handler
                        // pipeline中handler是帶有head與tail節點的雙向連結串列,的實際結構為
    				 	// head <-> handler1 <-> ... <-> handler4 <->tail
                        // Inbound主要處理入站操作,一般為讀操作,發生入站操作時會觸發Inbound方法
                        // 入站時,handler是從head向後呼叫的
                        socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
                                // 父類該方法內部會呼叫fireChannelRead
                                // 將資料傳遞給下一個handler
                                super.channelRead(ctx, msg);
                            }
                        });
                        socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
                                // 執行write操作,使得Outbound的方法能夠得到呼叫
          socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
                                super.channelRead(ctx, msg);
                            }
                        });
                        // Outbound主要處理出站操作,一般為寫操作,發生出站操作時會觸發Outbound方法
                        // 出站時,handler的呼叫是從tail向前呼叫的
                        socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
                                super.write(ctx, msg, promise);
                            }
                        });
                        socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

執行結果如下

nioEventLoopGroup-2-2 Inbound handler 1
nioEventLoopGroup-2-2 Inbound handler 2
nioEventLoopGroup-2-2 Outbound handler 2
nioEventLoopGroup-2-2 Outbound handler 1

通過 channel.pipeline ().addLast (name, handler) 新增 handler 時,記得給 handler 取名字。這樣可以呼叫 pipeline 的 addAfter、addBefore 等方法更靈活地向 pipeline 中新增 handler

handler 需要放入通道的 pipeline 中,才能根據放入順序來使用 handler

  • pipeline 是結構是一個帶有 head 與 tail 指標的雙向連結串列,其中的節點為 handler
    • 要通過 ctx.fireChannelRead (msg) 等方法,將當前 handler 的處理結果傳遞給下一個 handler
  • 當有 **入站(Inbound)**操作時,會從 head 開始向後 呼叫 handler,直到 handler 不是處理 Inbound 操作為止
  • 當有 **出站(Outbound)**操作時,會從 tail 開始向前 呼叫 handler,直到 handler 不是處理 Outbound 操作為止

具體結構如下

file

呼叫順序如下

file

4.2 OutboundHandler

socketChannel.writeAndFlush()

當 handler 中呼叫該方法進行寫操作時,會觸發 Outbound 操作,此時是從 tail 向前尋找 OutboundHandler

file

ctx.writeAndFlush()

當 handler 中呼叫該方法進行寫操作時,會觸發 Outbound 操作,此時是從當前 handler 向前尋找 OutboundHandler

file

4.3 EmbeddedChannel

EmbeddedChannel 可以用於測試各個 handler,通過其建構函式按順序傳入需要測試 handler,然後呼叫對應的 Inbound 和 Outbound 方法即可

public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };

        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };

        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };

        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        // 用於測試Handler的Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        
        // 執行Inbound操作 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        // 執行Outbound操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

5、ByteBuf

除錯工具方法

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append(" write index:").append(buffer.writerIndex())
        .append(" capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

該方法可以幫助我們更為詳細地檢視 ByteBuf 中的內容

5.1 建立

public class ByteBufStudy {
    public static void main(String[] args) {
        // 建立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        ByteBufUtil.log(buffer);

        // 向buffer中寫入資料
        StringBuilder sb = new StringBuilder();
        for(int i = 0; i < 20; i++) {
            sb.append("a");
        }
        buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));

        // 檢視寫入結果
        ByteBufUtil.log(buffer);
    }
}

執行結果

read index:0 write index:0 capacity:16

read index:0 write index:20 capacity:64
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61                                     |aaaa            |
+--------+-------------------------------------------------+----------------+
  • ByteBuf 通過 ByteBufAllocator 選擇 allocator 並呼叫對應的 buffer () 方法來建立的 ,預設使用 直接記憶體 作為 ByteBuf,容量為 256 個位元組,可以指定初始容量的大小
  • 當 ByteBuf 的容量無法容納所有資料時,ByteBuf 會進行擴容操作
  • 如果在 handler 中建立 ByteBuf,建議使用 ChannelHandlerContext ctx.alloc ().buffer () 來建立

5.2 直接記憶體與堆記憶體

通過該方法建立的 ByteBuf,使用的是基於直接記憶體的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

可以使用下面的程式碼來建立池化 基於堆 的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也可以使用下面的程式碼來建立池化基於直接記憶體的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
  • 直接記憶體建立和銷燬的代價昂貴,但讀寫效能高(少一次記憶體複製),適合配合池化功能一起用
  • 直接記憶體對 GC 壓力小,因為這部分記憶體不受 JVM 垃圾回收的管理,但也要注意及時主動釋放

驗證

public class ByteBufStudy {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        System.out.println(buffer.getClass());

        buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
        System.out.println(buffer.getClass());

        buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
        System.out.println(buffer.getClass());
    }
}
// 使用池化的直接記憶體
class io.netty.buffer.PooledUnsafeDirectByteBuf
    
// 使用池化的堆記憶體    
class io.netty.buffer.PooledUnsafeHeapByteBuf
    
// 使用池化的直接記憶體    
class io.netty.buffer.PooledUnsafeDirectByteBuf

5.3 池化與非池化

池化的最大意義在於可以重用 ByteBuf,優點有

  • 沒有池化,則每次都得建立新的 ByteBuf 例項,這個操作對直接記憶體代價昂貴,就算是堆記憶體,也會增加 GC 壓力
  • 有了池化,則可以重用池中 ByteBuf 例項,並且採用了與 jemalloc 類似的記憶體分配演算法提升分配效率
  • 高併發時,池化功能更節約記憶體,減少記憶體溢位的可能

池化功能是否開啟,可以通過下面的系統環境變數來設定

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 以後,非 Android 平臺預設啟用池化實現,Android 平臺啟用非池化實現
  • 4.1 之前,池化功能還不成熟,預設是非池化實現

5.4 組成

ByteBuf 主要有以下幾個組成部分

  • 最大容量與當前容量
    • 在構造 ByteBuf 時,可傳入兩個引數,分別代表初始容量和最大容量,若未傳入第二個引數(最大容量),最大容量預設為 Integer.MAX_VALUE
    • 當 ByteBuf 容量無法容納所有資料時,會進行擴容操作,若超出最大容量,會丟擲 java.lang.IndexOutOfBoundsException 異常
  • 讀寫操作不同於 ByteBuffer 只用 position 進行控制,ByteBuf 分別由讀指標和寫指標兩個指標控制。進行讀寫操作時,無需進行模式的切換
    • 讀指標前的部分被稱為廢棄部分,是已經讀過的內容
  • 讀指標與寫指標之間的空間稱為可讀部分
    • 寫指標與當前容量之間的空間稱為可寫部分

file

5.5 寫入

常用方法如下

file

注意

  • 這些方法的未指明返回值的,其返回值都是 ByteBuf,意味著可以鏈式呼叫來寫入不同的資料
  • 網路傳輸中,預設習慣是 Big Endian,使用 writeInt (int value)

使用方法

public class ByteBufStudy {
    public static void main(String[] args) {
        // 建立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        ByteBufUtil.log(buffer);

        // 向buffer中寫入資料
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        ByteBufUtil.log(buffer);

        buffer.writeInt(5);
        ByteBufUtil.log(buffer);

        buffer.writeIntLE(6);
        ByteBufUtil.log(buffer);

        buffer.writeLong(7);
        ByteBufUtil.log(buffer);
    }
}

執行結果

read index:0 write index:0 capacity:16

read index:0 write index:4 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

read index:0 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00             |............    |
+--------+-------------------------------------------------+----------------+

read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+

還有一類方法是 set 開頭的一系列方法,也可以寫入資料,但不會改變寫指標位置

5.6 擴容

當 ByteBuf 中的容量無法容納寫入的資料時,會進行擴容操作

buffer.writeLong(7);
ByteBufUtil.log(buffer);
// 擴容前
read index:0 write index:12 capacity:16
...

// 擴容後
read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+

擴容規則

  • 如何寫入後資料大小未超過 512 位元組,則選擇下一個 16 的整數倍進行擴容

    • 例如寫入後大小為 12 位元組,則擴容後 capacity 是 16 位元組
  • 如果寫入後資料大小超過 512 位元組,則選擇下一個 2^n

  • 例如寫入後大小為 513 位元組,則擴容後 capacity 是 210=1024 位元組(29=512 已經不夠了)

  • 擴容不能超過 maxCapacity,否則會丟擲 java.lang.IndexOutOfBoundsException 異常

Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)

5.7 讀取

讀取主要是通過一系列 read 方法進行讀取,讀取時會根據讀取資料的位元組數移動讀指標

如果需要 重複讀取 ,需要呼叫 buffer.markReaderIndex() 對讀指標進行標記,並通過 buffer.resetReaderIndex() 將讀指標恢復到 mark 標記的位置

public class ByteBufStudy {
    public static void main(String[] args) {
        // 建立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中寫入資料
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);

        // 讀取4個位元組
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);

        // 通過mark與reset實現重複讀取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);

        // 恢復到mark標記處
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16

read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+

還有以 get 開頭的一系列方法,這些方法不會改變讀指標的位置

5.8 釋放

由於 Netty 中有堆外記憶體(直接記憶體)的 ByteBuf 實現,堆外記憶體最好是手動來釋放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 記憶體,只需等 GC 回收記憶體即可
  • UnpooledDirectByteBuf 使用的就是直接記憶體了,需要特殊的方法來回收記憶體
  • PooledByteBuf 和它的子類使用了池化機制,需要更復雜的規則來回收記憶體

Netty 這裡採用了引用計數法來控制回收記憶體,每個 ByteBuf 都實現了 ReferenceCounted 介面

  • 每個 ByteBuf 物件的初始計數為 1
  • 呼叫 release 方法計數減 1,如果計數為 0,ByteBuf 記憶體被回收
  • 呼叫 retain 方法計數加 1,表示呼叫者沒用完之前,其它 handler 即使呼叫了 release 也不會造成回收
  • 當計數為 0 時,底層記憶體會被回收,這時即使 ByteBuf 物件還在,其各個方法均無法正常使用

釋放規則

因為 pipeline 的存在,一般需要將 ByteBuf 傳遞給下一個 ChannelHandler,如果在每個 ChannelHandler 中都去呼叫 release ,就失去了傳遞性(如果在這個 ChannelHandler 內這個 ByteBuf 已完成了它的使命,那麼便無須再傳遞)

基本規則是,誰是最後使用者,誰負責 release

  • 起點,對於 NIO 實現來講,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次建立 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead (byteBuf))

  • 入站 ByteBuf 處理原則

    • 對原始 ByteBuf 不做處理,呼叫 ctx.fireChannelRead (msg) 向後傳遞,這時無須 release
    • 將原始 ByteBuf 轉換為其它型別的 Java 物件,這時 ByteBuf 就沒用了,必須 release
    • 如果不呼叫 ctx.fireChannelRead (msg) 向後傳遞,那麼也必須 release
    • 注意各種異常,如果 ByteBuf 沒有成功傳遞到下一個 ChannelHandler,必須 release
    • 假設訊息一直向後傳,那麼 TailContext 會負責釋放未處理訊息(原始的 ByteBuf)
  • 出站 ByteBuf 處理原則

    • 出站訊息最終都會轉為 ByteBuf 輸出,一直向前傳,由 HeadContext flush 後 release
  • 異常處理原則

    • 有時候不清楚 ByteBuf 被引用了多少次,但又必須徹底釋放,可以迴圈呼叫 release 直到返回 true
while (!buffer.release()) {}

當 ByteBuf 被傳到了 pipeline 的 head 與 tail 時,ByteBuf 會被其中的方法徹底釋放,但前提是 ByteBuf 被傳遞到了 head 與 tail 中

TailConext 中釋放 ByteBuf 的原始碼

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        // 具體的釋放方法
        ReferenceCountUtil.release(msg);
    }
}

判斷傳過來的是否為 ByteBuf,是的話才需要釋放

public static boolean release(Object msg) {
	return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

5.9 切片

ByteBuf 切片是【零拷貝】的體現之一,對原始 ByteBuf 進行切片成多個 ByteBuf,切片後的 ByteBuf 並沒有發生記憶體複製,還是使用原始 ByteBuf 的記憶體,切片後的 ByteBuf 維護獨立的 read,write 指標

得到分片後的 buffer 後,要呼叫其 retain 方法,使其內部的引用計數加一。避免原 ByteBuf 釋放,導致切片 buffer 無法使用修改原 ByteBuf 中的值,也會影響切片後得到的 ByteBuf

file

public class TestSlice {
    public static void main(String[] args) {
        // 建立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

        // 向buffer中寫入資料
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});

        // 將buffer分成兩部分
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);

        // 需要讓分片的buffer引用計數加一
        // 避免原Buffer釋放導致分片buffer無法使用
        slice1.retain();
        slice2.retain();
        
        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);

        // 更改原始buffer中的值
        System.out.println("===========修改原buffer中的值===========");
        buffer.setByte(0,5);

        System.out.println("===========列印slice1===========");
        ByteBufUtil.log(slice1);
    }
}

執行結果

read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+
===========修改原buffer中的值===========
===========列印slice1===========
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+

5.10 優勢

  • 池化思想 - 可以重用池中 ByteBuf 例項,更節約記憶體,減少記憶體溢位的可能
  • 讀寫指標分離,不需要像 ByteBuffer 一樣切換讀寫模式
  • 可以自動擴容
  • 支援鏈式呼叫,使用更流暢
  • 很多地方體現零拷貝,例如
    • slice、duplicate、CompositeByteBuf

本文由傳智教育博學谷教研團隊釋出。

如果本文對您有幫助,歡迎關注點贊;如果您有任何建議也可留言評論私信,您的支援是我堅持創作的動力。

轉載請註明出處!