(十一)Netty實戰篇:基於Netty框架打造一款高效能的IM即時通訊程式

語言: CN / TW / HK

theme: channing-cyan

引言

   關於Netty網路框架的內容,前面已經講了兩個章節,但總歸來說難以真正掌握,畢竟只是對其中一個個元件進行講解,很難讓諸位將其串起來形成一條線,所以本章中則會結合實戰案例,對Netty進行更深層次的學習與掌握,實戰案例也並不難,一個非常樸素的IM聊天程式。

原本打算做個多人鬥地主,但鬥地主需要織入過多的業務邏輯,因此一方面會帶來不必要的理解難度,讓案例更為複雜化,另一方面程式碼量也會偏多,所以最終依舊選擇實現基本的聊天程式,既簡單,又能加深對Netty的理解。

一、基於Netty設計通訊協議

   協議,這玩意兒相信大家肯定不陌生了,畢竟在《網路程式設計》系列的前兩章,都在圍繞著網路協議展開敘述,再來簡單回顧一下協議的概念:網路協議是指一種通訊雙方都必須遵守的約定,兩個不同的端,按照一定的格式對資料進行“編碼”,同時按照相同的規則進行“解碼”,從而實現兩者之間的資料傳輸與通訊。

當自己想要打造一款IM通訊程式時,對於訊息的封裝、拆分也同樣需要設計一個協議,通訊的兩端都必須遵守該協議工作,這也是實現通訊程式的前提,但為什麼需要通訊協議呢?因為TCP/IP中是基於流的方式傳輸訊息,訊息與訊息之間沒有邊界,而協議的目的則在於約定訊息的樣式、邊界等。

1.1、Redis通訊的RESP協議

   不知大家是否還記得之前在《Redis綜述篇》中聊到的RESP客戶端協議,這是Redis提供的一種客戶端通訊協議,如果想要操作Redis,就必須遵守該協議的格式傳送資料,但這個協議特別簡單,如下: - 首先要求所有命令,都以*開頭,後面跟著具體的子命令數量,接著用換行符分割。 - 接著需要先用$符號宣告每個子命令的長度,然後再用換行符分割。 - 最後再拼接上具體的子命令,同樣用換行符分割。

這樣描述有些令人難懂,那就直接看個案例,例如一條簡單set命令,如下: ```shell 客戶端命令: set name ZhuZi

轉變為RESP指令: *3 $3 set $4 name $5 ZhuZi 按照`Redis`的規定,但凡滿足`RESP`協議的客戶端,都可以直接連線並操作`Redis`服務端,這也就意味著咱們可以直接通過`Netty`來手寫一個`Redis`客戶端,程式碼如下:java // 基於Netty、RESP協議實現的Redis客戶端 public class RedisClient { // 換行符的ASCII碼 static final byte[] LINE = {13, 10};

public static void main(String[] args) {
    EventLoopGroup worker = new NioEventLoopGroup();
    Bootstrap client = new Bootstrap();

    try {
        client.group(worker);
        client.channel(NioSocketChannel.class);
        client.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel)
                                                    throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();

                pipeline.addLast(new ChannelInboundHandlerAdapter(){

                    // 通道建立成功後呼叫:向Redis傳送一條set命令
                    @Override
                    public void channelActive(ChannelHandlerContext ctx)
                                                        throws Exception {
                        String command = "set name ZhuZi";
                        ByteBuf buffer = respCommand(command);
                        ctx.channel().writeAndFlush(buffer);
                    }

                    // Redis響應資料時觸發:列印Redis的響應結果
                    @Override
                    public void channelRead(ChannelHandlerContext ctx,
                                            Object msg) throws Exception {
                        // 接受Redis服務端執行指令後的結果
                        ByteBuf buffer = (ByteBuf) msg;
                        System.out.println(buffer.toString(CharsetUtil.UTF_8));
                    }
                });
            }
        });

        // 根據IP、埠連線Redis服務端
        client.connect("192.168.12.129", 6379).sync();
    } catch (Exception e){
        e.printStackTrace();
    }
}

private static ByteBuf respCommand(String command){
    // 先對傳入的命令以空格進行分割
    String[] commands = command.split(" ");
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();

    // 遵循RESP協議:先寫入指令的個數
    buffer.writeBytes(("*" + commands.length).getBytes());
    buffer.writeBytes(LINE);

    // 接著分別寫入每個指令的長度以及具體值
    for (String s : commands) {
        buffer.writeBytes(("$" + s.length()).getBytes());
        buffer.writeBytes(LINE);
        buffer.writeBytes(s.getBytes());
        buffer.writeBytes(LINE);
    }
    // 把轉換成RESP格式的命令返回
    return buffer;
}

} 在上述這個案例中,也僅僅只是通過`respCommand()`這個方法,對使用者輸入的指令進行了轉換,同時在上面通過`Netty`,與`Redis`的地址、埠建立了連線,在連線建立成功後,就會向`Redis`傳送一條轉換成`RESP`指令的`set`命令,接著等待`Redis`的響應結果並輸出,如下:java +OK `` 因為這是一條寫指令,所以當Redis收到執行完成後,最終就會返回一個OK,大家也可直接去Redis中查詢,也依舊能夠查詢到剛剛寫入的name`這個鍵值。

1.2、HTTP超文字傳輸協議

   前面咱們自己針對於RedisRESP協議,對使用者指令進行了封裝,然後發往Redis執行,但對於這些常用的協議,Netty早已提供好了現成的處理器,想要使用時無需從頭開發,可以直接使用現成的處理器來實現,比如現在咱們可以基於Netty提供的處理器,實現一個簡單的HTTP伺服器,程式碼如下: ```java // 基於Netty提供的處理器實現HTTP伺服器 public class HttpServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap server = new ServerBootstrap(); server .group(boss,worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel ch) { ChannelPipeline pipeline = ch.pipeline();

                // 新增一個Netty提供的HTTP處理器
                pipeline.addLast(new HttpServerCodec());
                pipeline.addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx,
                                            Object msg) throws Exception {
                        // 在這裡輸出一下訊息的型別
                        System.out.println("訊息型別:" + msg.getClass());
                        super.channelRead(ctx, msg);
                    }
                });
                pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx,
                                                HttpRequest msg) throws Exception {
                        System.out.println("客戶端的請求路徑:" + msg.uri());

                        // 建立一個響應物件,版本號與客戶端保持一致,狀態碼為OK/200
                        DefaultFullHttpResponse response =
                                new DefaultFullHttpResponse(
                                        msg.protocolVersion(),
                                        HttpResponseStatus.OK);

                        // 構造響應內容
                        byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();

                        // 設定響應頭:告訴客戶端本次響應的資料長度
                        response.headers().setInt(
                            HttpHeaderNames.CONTENT_LENGTH,content.length);
                        // 設定響應主體
                        response.content().writeBytes(content);

                        // 向客戶端寫入響應資料
                        ctx.writeAndFlush(response);
                    }
                });
            }
        })
        .bind("127.0.0.1",8888)
        .sync();
}

} 在該案例中,咱們就未曾手動對`HTTP`的資料包進行拆包處理了,而是在服務端的`pipeline`上添加了一個`HttpServerCodec`處理器,這個處理器是`Netty`官方提供的,其類繼承關係如下:java public final class HttpServerCodec extends CombinedChannelDuplexHandler implements SourceCodec { // ...... } `` 觀察會發現,該類繼承自CombinedChannelDuplexHandler這個組合類,它組合了編碼器、解碼器,這也就意味著HttpServerCodec`即可以對客戶端的資料做解碼,也可以對服務端響應的資料做編碼,同時除開添加了這個處理器外,在第二個處理器中列印了一下客戶端的訊息型別,最後一個處理器中,對客戶端的請求做出了響應,其實也就是返回了一句話而已。

此時在瀏覽器輸入http://127.0.0.1:8888/index.html,結果如下: java 訊息型別:class io.netty.handler.codec.http.DefaultHttpRequest 訊息型別:class io.netty.handler.codec.http.LastHttpContent$1 客戶端的請求路徑:/index.html 此時來看結果,客戶端的請求會被解析成兩個部分,第一個是請求資訊,第二個是主體資訊,但按理來說瀏覽器發出的請求,屬於GET型別的請求,GET請求是沒有請求體資訊的,但Netty依舊會解析成兩部分~,只不過GET請求的第二部分是空的。

在第三個處理器中,咱們直接向客戶端返回了一個h1標籤,同時也要記得在響應頭裡面,加上響應內容的長度資訊,否則瀏覽器的載入圈,會一直不同的轉動,畢竟瀏覽器也不知道內容有多長,就會一直反覆載入,嘗試等待更多的資料。

1.3、自定義訊息傳輸協議

   Netty除開提供了HTTP協議的處理器外,還提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列協議的實現,具體定義位於io.netty.handler.codec這個包下,當然,咱們也可以自己實現自定義協議,按照自己的邏輯對資料進行編解碼處理。

很多基於Netty開發的中介軟體/元件,其內部基本上都開發了專屬的通訊協議,以此來作為不同節點間通訊的基礎,所以解下來咱們基於Netty也來自己設計一款通訊協議,這也會作為後續實現聊天程式時的基礎。

但所謂的協議設計,其實僅僅只需要按照一定約束,實現編碼器與解碼器即可,傳送方在發出資料之前,會經過編碼器對資料進行處理,而接收方在收到資料之前,則會由解碼器對資料進行處理。

1.3.1、自定義協議的要素

在自定義傳輸協議時,咱們必然需要考慮幾個因素,如下: - 魔數:用來第一時間判斷是否為自己需要的資料包。 - 版本號:提高協議的拓展性,方便後續對協議進行升級。 - 序列化演算法:訊息正文具體該使用哪種方式進行序列化傳輸,例如Json、ProtoBuf、JDK...。 - 訊息型別:第一時間判斷出當前訊息的型別。 - 訊息序號:為了實現雙工通訊,客戶端和服務端之間收/發訊息不會相互阻塞。 - 正文長度:提供給LTC解碼器使用,防止解碼時出現粘包、半包的現象。 - 訊息正文:本次訊息要傳輸的具體資料。

在設計協議時,一個完整的協議應該涵蓋上述所說的幾方面,這樣才能提供雙方通訊時的基礎,基於上述幾個欄位,能夠在第一時間內判斷出:訊息是否可用、當前協議版本、訊息的具體型別、訊息的長度等各類資訊,從而給後續處理器使用(自定義的協議規則本身就是一個編解碼處理器而已)。

1.3.2、自定義協議實戰

前面簡單聊到過,所謂的自定義協議就是自己規定訊息格式,以及自己實現編/解碼器對訊息實現封裝/拆解,所以這裡想要自定義一個訊息協議,就只需要滿足前面兩個條件即可,因此實現如下: ```java @ChannelHandler.Sharable public class ChatMessageCodec extends MessageToMessageCodec {

// 訊息出站時會經過的編碼方法(將原生訊息物件封裝成自定義協議的訊息格式)
@Override
protected void encode(ChannelHandlerContext ctx, Message msg,
                      List<Object> list) throws Exception {
    ByteBuf outMsg = ctx.alloc().buffer();
    // 前五個位元組作為魔數
    byte[] magicNumber = new byte[]{'Z','h','u','Z','i'};
    outMsg.writeBytes(magicNumber);
    // 一個位元組作為版本號
    outMsg.writeByte(1);
    // 一個位元組表示序列化方式  0:JDK、1:Json、2:ProtoBuf.....
    outMsg.writeByte(0);
    // 一個位元組用於表示訊息型別
    outMsg.writeByte(msg.getMessageType());
    // 四個位元組表示訊息序號
    outMsg.writeInt(msg.getSequenceId());

    // 使用Java-Serializable的方式對訊息物件進行序列化
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(msg);
    byte[] msgBytes = bos.toByteArray();

    // 使用四個位元組描述訊息正文的長度
    outMsg.writeInt(msgBytes.length);
    // 將序列化後的訊息物件作為訊息正文
    outMsg.writeBytes(msgBytes);

    // 將封裝好的資料傳遞給下一個處理器
    list.add(outMsg);
}

// 訊息入站時會經過的解碼方法(將自定義格式的訊息轉變為具體的訊息物件)
@Override
protected void decode(ChannelHandlerContext ctx,
                      ByteBuf inMsg, List<Object> list) throws Exception {
    // 讀取前五個位元組得到魔數
    byte[] magicNumber = new byte[5];
    inMsg.readBytes(magicNumber,0,5);
    // 再讀取一個位元組得到版本號
    byte version = inMsg.readByte();
    // 再讀取一個位元組得到序列化方式
    byte serializableType = inMsg.readByte();
    // 再讀取一個位元組得到訊息型別
    byte messageType = inMsg.readByte();
    // 再讀取四個位元組得到訊息序號
    int sequenceId = inMsg.readInt();
    // 再讀取四個位元組得到訊息正文長度
    int messageLength = inMsg.readInt();

    // 再根據正文長度讀取序列化後的位元組正文資料
    byte[] msgBytes = new byte[messageLength];
    inMsg.readBytes(msgBytes,0,messageLength);

    // 對於讀取到的訊息正文進行反序列化,最終得到具體的訊息物件
    ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
    ObjectInputStream ois = new ObjectInputStream(bis);
    Message message = (Message) ois.readObject();

    // 最終把反序列化得到的訊息物件傳遞給後續的處理器
    list.add(message);
}

} `` 上面自定義的處理器中,繼承了MessageToMessageCodec類,主要負責將資料在原生ByteBufMessage之間進行相互轉換,而Message物件是自定義的訊息物件,這裡暫且無需過多關心。其中主要實現了兩個方法: -encode():出站時會經過的編碼方法,會將原生訊息物件按自定義的協議封裝成對應的位元組資料。 -decode()`:入站時會經過的解碼方法,會將協議格式的位元組資料,轉變為具體的訊息物件。

上述自定義的協議,也就是一定規則的位元組資料,每條訊息資料的組成如下: - 魔數:使用第1~5個位元組來描述,這個魔數值可以按自己的想法自定義。 - 版本號:使用第6個位元組來描述,不同數字表示不同版本。 - 序列化演算法:使用第7個位元組來描述,不同數字表示不同序列化方式。 - 訊息型別:使用第8個位元組來描述,不同的訊息型別使用不同數字表示。 - 訊息序號:使用第9~12個位元組來描述,其實就是一個四位元組的整數。 - 正文長度:使用第13~16個位元組來描述,也是一個四位元組的整數。 - 訊息正文:長度不固定,根據每次具體傳送的資料來決定。

在其中,為了實現簡單,這裡的序列化方式,則採用的是JDK預設的Serializable介面方式,但這種方式生成的物件位元組較大,實際情況中最好還是選擇谷歌的ProtoBuf方式,這種演算法屬於序列化演算法中,效能最佳的一種落地實現。

當然,這個自定義的協議是提供給後續的聊天業務使用的,但這種實戰型的內容分享,基本上程式碼量較高,所以大家看起來會有些枯燥,而本文所使用的聊天室案例,是基於《B站-黑馬Netty視訊教程》二次改良的,因此如若感覺文字描述較為枯燥,可直接點選前面給出的連結,觀看P101~P121視訊進行學習。

最後來觀察一下,大家會發現,在咱們定義的這個協議編解碼處理器上,存在著一個@ChannelHandler.Sharable註解,這個註解的作用是幹嗎的呢?其實很簡單,用來標識當前處理器是否可在多執行緒環境下使用,如果帶有該註解的處理器,則表示可以在多個通道間共用,因此只需要建立一個即可,反之同理,如果不帶有該註解的處理器,則每個通道需要單獨建立使用。

二、基於Netty打造IM聊天程式

   前面簡單過了一下自定義協議後,接著來基於Netty框架上手一個真正的實戰專案,那也就是基於Netty打造一款IM即時通訊的聊天程式,這裡在實現過程中,僅僅只會給出核心實現,但最後會提供完整程式碼的Github連結,因此大家重點理解核心即可。

2.1、IM程式的使用者模組

   聊天、聊天,自然是得先有人,然後才能進行聊天溝通,與QQ、微信類似,如果你想要使用某款聊天程式時,前提都得是先具備一個對應的賬戶才行,因此在咱們設計IM系統之處,那也需要對應的使用者功能實現,但這裡為了簡單,同樣不再結合資料庫實現完整的使用者模組了,而是基於記憶體實現使用者的管理,如下: java public interface UserService { boolean login(String username, String password); } 這是使用者模組的頂層介面,僅僅只提供了一個登入介面,關於註冊、鑑權、等級.....等一系列功能,大家感興趣的可在後續進行拓展實現,接著來看看該介面的實現類,如下: ```java public class UserServiceMemoryImpl implements UserService { private Map allUserMap = new ConcurrentHashMap<>();

{
    // 在程式碼塊中對使用者列表進行初始化,向其中添加了兩個使用者資訊
    allUserMap.put("ZhuZi", "123");
    allUserMap.put("XiongMao", "123");
}

@Override
public boolean login(String username, String password) {
    String pass = allUserMap.get(username);
    if (pass == null) {
        return false;
    }
    return pass.equals(password);
}

} `` 這個實現類並未結合資料庫來實現,而是僅僅在程式啟動時,通過程式碼塊的方式,載入了ZhuZi、XiongMao兩個使用者資訊並放入記憶體的Map容器中,這裡有興趣的小夥伴,可自行將Map`容器換成資料庫的表即可。

其中實現的login()登入介面尤為簡單,僅僅只是判斷了一下有沒有對應使用者,如果有的話則看看密碼是否正確,正確返回true,密碼錯誤則返回false,是的,我所寫的登入功能就是這麼簡單,走個簡單的過場,哈哈哈~

2.1.1、服務端、客戶端的基礎架構

基本的使用者模組有了,但這裡還未曾套入具體實現,因此先簡單的搭建出服務端、客戶端的架構,然後再基於構建好的架構實現基礎的使用者登入功能,服務端的基礎搭建如下: ```java public class ChatServer { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup();

    ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();

    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.group(boss, worker);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(MESSAGE_CODEC);
            }
        });

        Channel channel = serverBootstrap.bind(8888).sync().channel();
        channel.closeFuture().sync();
    } catch (InterruptedException e) {
        System.out.println("服務端出現錯誤:" + e);
    } finally {
        boss.shutdownGracefully();
        worker.shutdownGracefully();
    }
}

} 服務端的程式碼目前很簡單,僅僅只是裝載了一個自己的協議編/解碼處理器,然後就是一些老步驟,不再過多的重複贅述,接著再來搭建一個簡單的客戶端,程式碼實現如下:java public class ChatClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup();

    ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();

    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(group);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(MESSAGE_CODEC);
            }
        });
        Channel channel = bootstrap.connect("localhost", 8888).sync().channel();
        channel.closeFuture().sync();
    } catch (Exception e) {
        System.out.println("客戶端出現錯誤:" + e);
    } finally {
        group.shutdownGracefully();
    }
}

} ``` 目前僅僅只是與服務端建立了連線,然後裝載了一個自定義的編解碼器,到這裡就搭建了最基本的服務端、客戶端的基礎架構,接著來基於它實現簡單的登入功能。

2.1.2、使用者登入功能的實現

對於登入功能,由於需要在服務端與客戶端之間傳輸資料,因此咱們可以設計一個訊息物件,但由於後續單聊、群聊都需要傳送不同的訊息格式,因此先設計出一個父類,如下: ```java public abstract class Message implements Serializable {

private int sequenceId;
private int messageType;


@Override
public String toString() {
    return "Message{" +
            "sequenceId=" + sequenceId +
            ", messageType=" + messageType +
            '}';
}

public int getSequenceId() {
    return sequenceId;
}

public void setSequenceId(int sequenceId) {
    this.sequenceId = sequenceId;
}

public void setMessageType(int messageType) {
    this.messageType = messageType;
}

public abstract int getMessageType();

public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;

} 在這個訊息父類中,定義了多種訊息型別的狀態碼,不同的訊息型別對應不同數字,同時其中還設計了一個抽象方法,即`getMessageType()`,該方法交給具體的子類實現,每個子類返回各自的訊息型別,為了方便後續拓展,這裡又建立了一個抽象類作為中間類,如下:java public abstract class AbstractResponseMessage extends Message { private boolean success; private String reason;

public AbstractResponseMessage() {
}

public AbstractResponseMessage(boolean success, String reason) {
    this.success = success;
    this.reason = reason;
}

@Override
public String toString() {
    return "AbstractResponseMessage{" +
            "success=" + success +
            ", reason='" + reason + '\'' +
            '}';
}

public boolean isSuccess() {
    return success;
}

public void setSuccess(boolean success) {
    this.success = success;
}

public String getReason() {
    return reason;
}

public void setReason(String reason) {
    this.reason = reason;
}

} 這個類主要是提供給響應時使用的,其中包含了響應狀態以及響應資訊,接著再設計兩個登入時會用到的訊息物件,如下:java public class LoginRequestMessage extends Message { private String username; private String password;

public LoginRequestMessage() {
}

@Override
public String toString() {
    return "LoginRequestMessage{" +
            "username='" + username + '\'' +
            ", password='" + password + '\'' +
            '}';
}

public String getUsername() {
    return username;
}

public void setUsername(String username) {
    this.username = username;
}

public String getPassword() {
    return password;
}

public void setPassword(String password) {
    this.password = password;
}

public LoginRequestMessage(String username, String password) {
    this.username = username;
    this.password = password;
}

@Override
public int getMessageType() {
    return LoginRequestMessage;
}

} 上述這個訊息類,主要是提供給客戶端登入時使用,本質上也就是一個涵蓋使用者名稱、使用者密碼的物件而已,同時還有一個用來給服務端響應時的響應類,如下:java public class LoginResponseMessage extends AbstractResponseMessage { public LoginResponseMessage(boolean success, String reason) { super(success, reason); }

@Override
public int getMessageType() {
    return LoginResponseMessage;
}

} ``` 登入響應類的實現十分簡單,由登入狀態和登入訊息組成,OK,接著來看看登入的具體實現。

首先在客戶端中,再通過pipeline新增一個處理器,如下: ```java CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1); AtomicBoolean LOGIN = new AtomicBoolean(false); AtomicBoolean EXIT = new AtomicBoolean(false); Scanner scanner = new Scanner(System.in);

ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 負責接收使用者在控制檯的輸入,負責向伺服器傳送各種訊息 new Thread(() -> { System.out.println("請輸入使用者名稱:"); String username = scanner.nextLine(); if(EXIT.get()){ return; } System.out.println("請輸入密碼:"); String password = scanner.nextLine(); if(EXIT.get()){ return; } // 構造訊息物件 LoginRequestMessage message = new LoginRequestMessage(username, password); System.out.println(message); // 傳送訊息 ctx.writeAndFlush(message); System.out.println("等待後續操作..."); try { WAIT_FOR_LOGIN.await(); } catch (InterruptedException e) { e.printStackTrace(); } // 如果登入失敗 if (!LOGIN.get()) { ctx.channel().close(); return; } }).start(); } 在與服務端建立連線成功之後,就提示使用者需要登入,接著接收使用者輸入的使用者名稱、密碼,然後構建出一個`LoginRequestMessage`訊息物件,接著將其傳送給服務端,由於前面裝載了自定義的協議編解碼器,所以訊息在出站時,這個`Message`物件會被序列化成位元組碼,接著再服務端入站時,又會被反序列化成訊息物件,接著來看看服務端的實現,如下:java @ChannelHandler.Sharable public class LoginRequestMessageHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception { String username = msg.getUsername(); String password = msg.getPassword(); boolean login = UserServiceFactory.getUserService().login(username, password); LoginResponseMessage message; if (login) { SessionFactory.getSession().bind(ctx.channel(), username); message = new LoginResponseMessage(true, "登入成功"); } else { message = new LoginResponseMessage(false, "使用者名稱或密碼不正確"); } ctx.writeAndFlush(message); } } `` 在服務端中,新增了一個處理器類,繼承自SimpleChannelInboundHandler這個處理器,其中指定的泛型為LoginRequestMessage,這表示當前處理器只關注這個型別的訊息,當出現登入型別的訊息時,會進入該處理器並觸發內部的channelRead0()`方法。

在該方法中,獲取了登入訊息中的使用者名稱、密碼,接著對其做了基本的登入效驗,如果使用者名稱存在並且密碼正確,就會返回登入成功,否則會返回登入失敗,最終登入後的狀態會被封裝成一個LoginResponseMessage物件,然後寫回客戶端的通道中。

當然,為了該處理器能夠成功生效,這裡需要將其裝載到服務端的pipeline上,如下: java LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler(); ch.pipeline().addLast(LOGIN_HANDLER); 裝載好登入處理器後,接著分別啟動服務端、客戶端,測試結果如下:
登入測試結果
從圖中的效果來看,這裡實現了最基本的登入功能,估計有些小夥伴看到這裡就有些暈了,但其實非常簡單,僅僅只是通過Netty在做資料互動而已,客戶端則提供輸入使用者名稱、密碼的功能,然後將使用者輸入的名稱、密碼傳送給服務端,服務端提供登入判斷的功能,最終根據判斷結果再向客戶端返回資料罷了。

2.2、基於Netty實現點對點單聊

   有了基本的使用者登入功能後,接著來看看如何實現點對點的單聊功能呢?首先我定義了一個會話介面,如下: java public interface Session { void bind(Channel channel, String username); void unbind(Channel channel); Channel getChannel(String username); } 這個介面中依舊只有三個方法,釋義如下: - bind():傳入一個使用者名稱和Socket通道,讓兩者之間的產生繫結關係。 - unbind():取消一個使用者與某個Socket通道的繫結關係。 - getChannel():根據一個使用者名稱,獲取與其存在繫結關係的通道。

該介面的實現類如下: ```java public class SessionMemoryImpl implements Session {

private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();

@Override
public void bind(Channel channel, String username) {
    usernameChannelMap.put(username, channel);
    channelUsernameMap.put(channel, username);
    channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}

@Override
public void unbind(Channel channel) {
    String username = channelUsernameMap.remove(channel);
    usernameChannelMap.remove(username);
    channelAttributesMap.remove(channel);
}

@Override
public Channel getChannel(String username) {
    return usernameChannelMap.get(username);
}

@Override
public String toString() {
    return usernameChannelMap.toString();
}

} `` 該實現類最關鍵的是其中的兩個Map容器,usernameChannelMap用來儲存所有使用者名稱與Socket通道的繫結關係,而channelUsernameMap則是反過來的順序,這主要是為了方便,即可以通過使用者名稱獲得對應通道,也可以通過通道判斷出使用者名稱,實際上一個Map`也能搞定,但還是那句話,主要為了簡單嘛~

有了上述這個最簡單的會話管理功能後,就要著手實現具體的功能了,其實在前面實現登入功能的時候,就用過這其中的bind()方法,也就是當登入成功之後,就會將當前傳送登入訊息的通道,與正在登入的使用者名稱產生繫結關係,這樣就方便後續實現單聊、群聊的功能。

2.2.1、定義單聊的訊息物件

與登入時相同,由於需要在服務端和客戶端之間實現資料的轉發,因此這裡也需要兩個訊息物件,用來作為資料互動的訊息格式,如下: ```java public class ChatRequestMessage extends Message { private String content; private String to; private String from;

public ChatRequestMessage() {
}

public ChatRequestMessage(String from, String to, String content) {
    this.from = from;
    this.to = to;
    this.content = content;
}
// 省略Get/Setting、toString()方法.....

} 上述這個類,是提供給客戶端用來發送訊息資料的,其中主要包含了三個值,聊天的訊息內容、傳送人與接收人,因為這裡是需要實現一個`IM`聊天程式,所以並不是客戶端與服務端進行資料互動,而是客戶端與客戶端之間進行資料互動,服務端僅僅只提供訊息轉發的功能,接著再構建一個訊息類,如下:java public class ChatResponseMessage extends AbstractResponseMessage {

private String from;
private String content;

@Override
public String toString() {
    return "ChatResponseMessage{" +
            "from='" + from + '\'' +
            ", content='" + content + '\'' +
            '}';
}

public ChatResponseMessage(boolean success, String reason) {
    super(success, reason);
}

public ChatResponseMessage(String from, String content) {
    this.from = from;
    this.content = content;
}

@Override
public int getMessageType() {
    return ChatResponseMessage;
}
// 省略Get/Setting、toString()方法.....

} ``` 這個類是提供給服務端用來轉發的,當服務端收到一個聊天訊息後,因為聊天訊息中包含了接收人,所以可以先根據接收人的使用者名稱,找到對應的客戶端通道,然後再封裝成一個響應訊息,轉發給對應的客戶端即可,下面來做具體實現。

2.2.2、實現點對點單聊功能

由於聊天功能是提供給客戶端使用的,所以當一個客戶端登入成功之後,應該暴露給使用者一個操作選單,所以直接在原本客戶端的channelActive()方法中,登入成功之後繼續加程式碼即可,程式碼如下: java while (true) { System.out.println("=================================="); System.out.println("\t1、傳送單聊訊息"); System.out.println("\t2、傳送群聊訊息"); System.out.println("\t3、建立一個群聊"); System.out.println("\t4、獲取群聊成員"); System.out.println("\t5、加入一個群聊"); System.out.println("\t6、退出一個群聊"); System.out.println("\t7、退出聊天系統"); System.out.println("=================================="); String command = scanner.nextLine(); } 首先會開啟一個死迴圈,然後不斷接收使用者的操作,接著使用switch語法來對具體的選單功能進行實現,先實現單聊功能,如下: java switch (command){ case "1": System.out.print("請選擇你要傳送訊息給誰:"); String toUserName = scanner.nextLine(); System.out.print("請輸入你要傳送的訊息內容:"); String content = scanner.nextLine(); ctx.writeAndFlush(new ChatRequestMessage(username, toUserName, content)); break; } 如果使用者選擇了單聊,接著會提示使用者選擇要傳送訊息給誰,這裡也就是讓使用者輸入對方的使用者名稱,實際上如果有介面的話,這一步是並不需要使用者自己輸入的,而是提供視窗讓使用者點選,比如QQ、微信一樣,想要給某個人傳送訊息時,只需要點選“他”的頭像私聊即可。

等使用者選擇了聊天目標,並且輸入了訊息內容後,接著會構建一個ChatRequestMessage訊息物件,然後會發送給服務端,但這裡先不看服務端的實現,客戶端這邊還需要重寫一個方法,如下: java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到訊息:" + msg); if ((msg instanceof LoginResponseMessage)) { LoginResponseMessage response = (LoginResponseMessage) msg; if (response.isSuccess()) { // 如果登入成功 LOGIN.set(true); } // 喚醒 system in 執行緒 WAIT_FOR_LOGIN.countDown(); } } 前面的邏輯是在channelActive()方法中完成的,也就是連線建立成功後,就會讓使用者登入,接著登入成功之後會給使用者一個選單欄,提供給使用者進行操作,但前面的邏輯中一直沒有對服務端響應的訊息進行處理,因此channelRead()方法中會對服務端響應的資料進行處理。

channelRead()方法會在有資料可讀時被觸發,所以當服務端響應資料時,首先會判斷一下:目前服務端響應的是不是登入訊息,如果是的話,則需要根據登入的結果來喚醒前面channelActive()方法中的執行緒。如果目前服務端響應的不是登入訊息,這也就意味著客戶端前面已經登入成功了,所以接著會直接列印一下收到的資料。

OK,有了上述客戶端的程式碼實現後,接著再來服務端多建立一個處理器,如下: java @ChannelHandler.Sharable public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception { String to = msg.getTo(); Channel channel = SessionFactory.getSession().getChannel(to); // 線上 if (channel != null) { channel.writeAndFlush(new ChatResponseMessage( msg.getFrom(), msg.getContent())); } // 不線上 else { ctx.writeAndFlush(new ChatResponseMessage( false, "對方使用者不存在或者不線上")); } } } 這裡依舊通過繼承SimpleChannelInboundHandler類的形式,來特別關注ChatRequestMessage單聊型別的訊息,如果目前服務端收到的是單聊訊息,則會進入觸發該處理器的channelRead0()方法,該處理器內部的邏輯也並不複雜,首先根據單聊訊息的接收人,去找一下與之對應的通道: - 如果根據使用者名稱查到了通道,表示接收人目前是登入線上狀態。 - 反之,如果無法根據使用者名稱找到通道,表示對應的使用者不存在或者沒有登入。

接著會根據上面的查詢結果,進行對應的結果返回: - 如果線上:把要傳送的單聊訊息,直接寫入至找到的通道中。 - 如果不線上:向傳送單聊訊息的客戶端,返回使用者不存在或使用者不線上。

有了這個處理器之後,接著還需要把該處理器裝載到服務端上,如下: java ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler(); ch.pipeline().addLast(CHAT_HANDLER); 裝載好單聊處理器後,接著分別啟動一個服務端、兩個客戶端,測試結果如下:
單聊測試
從測試結果中可以明顯看出效果,其中的單聊功能的確已經實現,可以實現A→B使用者之間的單聊功能,兩者之間藉助伺服器轉發,可以實現兩人私聊的功能。

2.3、基於Netty打造多人聊天室

   前面實現了兩個使用者之間的私聊功能,接著再來實現一個多人聊天室的功能,畢竟像QQ、微信、釘釘....等任何通訊軟體,都支援多人建立群聊的功能,但多人聊天室的功能,實現之前還需要先完成建群的功能,畢竟如果群都沒建立,自然無法向某個群內傳送資料。

實現拉群也好,群聊也罷,其實現步驟依舊和前面相同,如下: - ①先定義對應的訊息物件。 - ②實現客戶端傳送對應訊息資料的功能。 - ③再寫一個服務端的群聊處理器,然後裝載到服務端上。

2.3.1、定義拉群的訊息體

首先來定義兩個拉群時用的訊息體,如下: ```java public class GroupCreateRequestMessage extends Message { private String groupName; private Set members;

public GroupCreateRequestMessage(String groupName, Set<String> members) {
    this.groupName = groupName;
    this.members = members;
}

@Override
public int getMessageType() {
    return GroupCreateRequestMessage;
}

// 省略其他Get/Settings、toString()方法.....

} 上述這個訊息體是提供給客戶端使用的,其中主要存在兩個成員,也就是群名稱與群成員列表,存放所有群成員的容器選用了`Set`集合,因為`Set`集合具備不可重複性,因此可以有效的避免同一使用者多次進群,接著再來看看服務端響應時用的訊息體,如下:java public class GroupCreateResponseMessage extends AbstractResponseMessage { public GroupCreateResponseMessage(boolean success, String reason) { super(success, reason); }

@Override
public int getMessageType() {
    return GroupCreateResponseMessage;
}

} ``` 這個訊息體的實現尤為簡單,僅僅只是給客戶端返回了拉群狀態以及拉群的附加資訊。

2.3.2、定義群聊會話管理

前面單聊有單聊的會話管理機制,而實現多人群聊時,依舊需要有群聊的會話管理機制,首先封裝了一個群聊實體類,如下: ```java public class Group { // 聊天室名稱 private String name; // 聊天室成員 private Set members;

public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());

public Group(String name, Set<String> members) {
    this.name = name;
    this.members = members;
}

// 省略其他Get/Settings、toString()方法.....

} 接著定義了一個群聊會話的頂級介面,如下:java public interface GroupSession { // 建立一個群聊 Group createGroup(String name, Set members); // 加入某個群聊 Group joinMember(String name, String member); // 移除群聊中的某個成員 Group removeMember(String name, String member); // 解散一個群聊 Group removeGroup(String name); // 獲取一個群聊的成員列表 Set getMembers(String name); // 獲取一個群聊所有線上使用者的Channel通道 List getMembersChannel(String name); } 上述介面中,提供了幾個介面方法,其實也主要是群聊系統中的一些日常操作,如創群、加群、踢人、解散群、檢視群成員....等功能,接著來看看該介面的實現者,如下:java public class GroupSessionMemoryImpl implements GroupSession { private final Map groupMap = new ConcurrentHashMap<>();

@Override
public Group createGroup(String name, Set<String> members) {
    Group group = new Group(name, members);
    return groupMap.putIfAbsent(name, group);
}

@Override
public Group joinMember(String name, String member) {
    return groupMap.computeIfPresent(name, (key, value) -> {
        value.getMembers().add(member);
        return value;
    });
}

@Override
public Group removeMember(String name, String member) {
    return groupMap.computeIfPresent(name, (key, value) -> {
        value.getMembers().remove(member);
        return value;
    });
}

@Override
public Group removeGroup(String name) {
    return groupMap.remove(name);
}

@Override
public Set<String> getMembers(String name) {
    return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}

@Override
public List<Channel> getMembersChannel(String name) {
    return getMembers(name).stream()
            .map(member -> SessionFactory.getSession().getChannel(member))
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
}

} 這個實現類沒啥好說的,重點記住裡面有個`Map`容器即可,這個容器主要負責儲存所有群名稱與`Group`群聊物件的關係,後續可以通過群聊名稱,在這個容器中找到一個對應群聊物件。同時為了方便後續呼叫這些介面,還提供了一個工具類,如下:java public abstract class GroupSessionFactory { private static GroupSession session = new GroupSessionMemoryImpl();

public static GroupSession getGroupSession() {
    return session;
}

} `` 很簡單,僅僅只例項化了一個群聊會話管理的實現類,因為這裡沒有結合Spring來實現,所以並不能依靠IOC技術來自動管理Bean`,因此咱們需要手動創建出一個例項,以供於後續使用。

2.3.3、實現拉群功能

前面客戶端的功能選單中,3對應著拉群功能,所以咱們需要對3做具體的功能實現,邏輯如下: java case "3": System.out.print("請輸入你要建立的群聊暱稱:"); String newGroupName = scanner.nextLine(); System.out.print("請選擇你要邀請的群成員(不同成員用、分割):"); String members = scanner.nextLine(); Set<String> memberSet = new HashSet<>(Arrays.asList(members.split("、"))); memberSet.add(username); // 加入自己 ctx.writeAndFlush(new GroupCreateRequestMessage(newGroupName, memberSet)); break; 在該分支實現中,首先會要求使用者輸入一個群聊暱稱,接著需要輸入需要拉入群聊的使用者名稱稱,多個使用者之間使用分割,接著會把使用者輸入的群成員以及自己,全部放入到一個Set集合中,最終組裝成一個拉群訊息體,傳送給服務端處理,服務端的處理器如下: java @ChannelHandler.Sharable public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception { String groupName = msg.getGroupName(); Set<String> members = msg.getMembers(); // 群管理器 GroupSession groupSession = GroupSessionFactory.getGroupSession(); Group group = groupSession.createGroup(groupName, members); if (group == null) { // 發生成功訊息 ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "建立成功")); // 傳送拉群訊息 List<Channel> channels = groupSession.getMembersChannel(groupName); for (Channel channel : channels) { channel.writeAndFlush(new GroupCreateResponseMessage( true, "您已被拉入" + groupName)); } } else { ctx.writeAndFlush(new GroupCreateResponseMessage( false, groupName + "已經存在")); } } } 這裡依舊繼承了SimpleChannelInboundHandler類,只關心拉群的訊息,當客戶端出現拉群訊息時,首先會獲取使用者輸入的群暱稱和群成員,接著通過前面提供的創群介面,嘗試建立一個群聊,如果群聊已經存在,則會建立失敗,反之則會建立成功,在建立群聊成功的情況下,會給所有的群成員傳送一條“你已被拉入[XXX]”的訊息。

最後,同樣需要將該處理器裝載到服務端上,如下: java GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler(); ch.pipeline().addLast(GROUP_CREATE_HANDLER); 最後分別啟動一個服務端、兩個客戶端進行效果測試,如下:
拉群測試
從上圖的測試結果來看,的確實現了咱們的拉群效果,一個使用者拉群之後,被邀請的成員都會收到來自於服務端的拉群提醒,這也就為後續群聊功能奠定了基礎。

2.3.4、定義群聊的訊息體

這裡就不重複贅述了,還是之前的套路,定義一個客戶端用的訊息體,如下: ```java public class GroupChatRequestMessage extends Message { private String content; private String groupName; private String from;

public GroupChatRequestMessage(String from, String groupName, String content) {
    this.content = content;
    this.groupName = groupName;
    this.from = from;
}

@Override
public int getMessageType() {
    return GroupChatRequestMessage;
}
// 省略其他Get/Settings、toString()方法.....

}
這個是客戶端用來發送群聊訊息的訊息體,其中存在三個成員,傳送人、群聊暱稱、訊息內容,通過這三個成員,可以描述清楚任何一條群聊記錄,接著來看看服務端響應時用的訊息體,如下:java public class GroupChatResponseMessage extends AbstractResponseMessage { private String from; private String content;

public GroupChatResponseMessage(boolean success, String reason) {
    super(success, reason);
}

public GroupChatResponseMessage(String from, String content) {
    this.from = from;
    this.content = content;
}
@Override
public int getMessageType() {
    return GroupChatResponseMessage;
}
// 省略其他Get/Settings、toString()方法.....

} ``` 在這個訊息體中,就省去了群聊暱稱這個成員,因為這個訊息體的用處,主要是給服務端轉發給客戶端時使用的,因此不需要群聊暱稱,當然,要也可以,我這裡就直接省去了。

2.3.5、實現群聊功能

依舊先來做客戶端的實現,實現了客戶端之後再去完成服務端的實現,客戶端實現如下: java case "2": System.out.print("請選擇你要傳送訊息的群聊:"); String groupName = scanner.nextLine(); System.out.print("請輸入你要傳送的訊息內容:"); String groupContent = scanner.nextLine(); ctx.writeAndFlush(new GroupChatRequestMessage(username, groupName, groupContent)); break; 因為傳送群聊訊息對應著之前選單中的2,所以這裡對該分支進行實現,當用戶選擇傳送群聊訊息時,首先會讓使用者自己先選擇一個群聊,接著輸入要傳送的訊息內容,接著組裝成一個群聊訊息物件,傳送給服務端處理,服務端的實現如下: ```java @ChannelHandler.Sharable public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception { List channels = GroupSessionFactory.getGroupSession() .getMembersChannel(msg.getGroupName());

    for (Channel channel : channels) {
        channel.writeAndFlush(new GroupChatResponseMessage(
                        msg.getFrom(), msg.getContent()));
    }
}

} `` 這裡依舊定義了一個處理器,關於原因就不再重複囉嗦了,服務端對於群聊訊息的實現額外簡單,也就是先根據使用者選擇的群暱稱,找到該群所有的群成員,然後依次遍歷成員列表,獲取對應的Socket`通道,轉發訊息即可。

接著將該處理器裝載到服務端pipeline上,然後分別啟動一個服務端、兩個客戶端,進行效果測試,如下:
群聊測試
效果如上圖的註釋,基於上述的程式碼測試,效果確實達到了咱們需要的群聊效果~

2.3.6、聊天室的其他功能實現

到這裡為止,實現了最基本的建群、群聊的功能,但對於踢人、加群、解散群....等一系列群聊功能還未曾實現,但我這裡就不繼續重複了,畢竟還是那個套路: - ①定義對應功能的訊息體。 - ②客戶端向服務端傳送對應格式的訊息。 - ③服務端編寫處理器,對特定的訊息進行處理。

所以大家感興趣的情況下,可以根據上述步驟繼續進行實現,實現的過程沒有任何難度,重點就是時間問題罷了。

三、Netty實戰篇總結

   看到這裡,其實Netty實戰篇的內容也就大致結束了,個人對於實戰篇的內容並不怎麼滿意,因為與最初設想的實現存在很大偏差,這是由於近期工作、生活狀態不對,所以內容輸出也沒那麼夯實,對於這篇中的完整程式碼實現,也包括前面兩篇中的一些程式碼實現,這裡給出完整的GitHub連結:>>>戳我訪問<<<,大家感興趣可以自行Down下去玩玩。

在我所撰寫的案例中,自定義協議可以繼續優化,選擇效能更強的序列化方式,而聊天室也可以進一步拓展,比如將使用者資訊、群聊資訊、聯絡人資訊都結合資料庫實現,進一步實現離線訊息功能,但由於該案例的設計之初就有問題,所以是存在效能問題的,想要打造一款真正高效能的IM程式,那諸位可參考《計算機網路綜述-騰訊QQ原理》其中的內容。

「其他文章」