(十一)Netty實戰篇:基於Netty框架打造一款高效能的IM即時通訊程式
theme: channing-cyan
引言
關於Netty
網路框架的內容,前面已經講了兩個章節,但總歸來說難以真正掌握,畢竟只是對其中一個個元件進行講解,很難讓諸位將其串起來形成一條線,所以本章中則會結合實戰案例,對Netty
進行更深層次的學習與掌握,實戰案例也並不難,一個非常樸素的IM
聊天程式。
原本打算做個多人鬥地主,但鬥地主需要織入過多的業務邏輯,因此一方面會帶來不必要的理解難度,讓案例更為複雜化,另一方面程式碼量也會偏多,所以最終依舊選擇實現基本的聊天程式,既簡單,又能加深對
Netty
的理解。
一、基於Netty設計通訊協議
協議,這玩意兒相信大家肯定不陌生了,畢竟在《網路程式設計》系列的前兩章,都在圍繞著網路協議展開敘述,再來簡單回顧一下協議的概念:網路協議是指一種通訊雙方都必須遵守的約定,兩個不同的端,按照一定的格式對資料進行“編碼”,同時按照相同的規則進行“解碼”,從而實現兩者之間的資料傳輸與通訊。
當自己想要打造一款IM
通訊程式時,對於訊息的封裝、拆分也同樣需要設計一個協議,通訊的兩端都必須遵守該協議工作,這也是實現通訊程式的前提,但為什麼需要通訊協議呢?因為TCP/IP
中是基於流的方式傳輸訊息,訊息與訊息之間沒有邊界,而協議的目的則在於約定訊息的樣式、邊界等。
1.1、Redis通訊的RESP協議
不知大家是否還記得之前在《Redis綜述篇》中聊到的RESP
客戶端協議,這是Redis
提供的一種客戶端通訊協議,如果想要操作Redis
,就必須遵守該協議的格式傳送資料,但這個協議特別簡單,如下:
- 首先要求所有命令,都以*
開頭,後面跟著具體的子命令數量,接著用換行符分割。
- 接著需要先用$
符號宣告每個子命令的長度,然後再用換行符分割。
- 最後再拼接上具體的子命令,同樣用換行符分割。
這樣描述有些令人難懂,那就直接看個案例,例如一條簡單set
命令,如下:
```shell
客戶端命令:
set name ZhuZi
轉變為RESP指令:
*3
$3
set
$4
name
$5
ZhuZi
按照`Redis`的規定,但凡滿足`RESP`協議的客戶端,都可以直接連線並操作`Redis`服務端,這也就意味著咱們可以直接通過`Netty`來手寫一個`Redis`客戶端,程式碼如下:
java
// 基於Netty、RESP協議實現的Redis客戶端
public class RedisClient {
// 換行符的ASCII碼
static final byte[] LINE = {13, 10};
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter(){
// 通道建立成功後呼叫:向Redis傳送一條set命令
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
String command = "set name ZhuZi";
ByteBuf buffer = respCommand(command);
ctx.channel().writeAndFlush(buffer);
}
// Redis響應資料時觸發:列印Redis的響應結果
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
// 接受Redis服務端執行指令後的結果
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(CharsetUtil.UTF_8));
}
});
}
});
// 根據IP、埠連線Redis服務端
client.connect("192.168.12.129", 6379).sync();
} catch (Exception e){
e.printStackTrace();
}
}
private static ByteBuf respCommand(String command){
// 先對傳入的命令以空格進行分割
String[] commands = command.split(" ");
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 遵循RESP協議:先寫入指令的個數
buffer.writeBytes(("*" + commands.length).getBytes());
buffer.writeBytes(LINE);
// 接著分別寫入每個指令的長度以及具體值
for (String s : commands) {
buffer.writeBytes(("$" + s.length()).getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes(s.getBytes());
buffer.writeBytes(LINE);
}
// 把轉換成RESP格式的命令返回
return buffer;
}
}
在上述這個案例中,也僅僅只是通過`respCommand()`這個方法,對使用者輸入的指令進行了轉換,同時在上面通過`Netty`,與`Redis`的地址、埠建立了連線,在連線建立成功後,就會向`Redis`傳送一條轉換成`RESP`指令的`set`命令,接著等待`Redis`的響應結果並輸出,如下:
java
+OK
``
因為這是一條寫指令,所以當
Redis收到執行完成後,最終就會返回一個
OK,大家也可直接去
Redis中查詢,也依舊能夠查詢到剛剛寫入的
name`這個鍵值。
1.2、HTTP超文字傳輸協議
前面咱們自己針對於Redis
的RESP
協議,對使用者指令進行了封裝,然後發往Redis
執行,但對於這些常用的協議,Netty
早已提供好了現成的處理器,想要使用時無需從頭開發,可以直接使用現成的處理器來實現,比如現在咱們可以基於Netty
提供的處理器,實現一個簡單的HTTP
伺服器,程式碼如下:
```java
// 基於Netty提供的處理器實現HTTP伺服器
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer
// 新增一個Netty提供的HTTP處理器
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
// 在這裡輸出一下訊息的型別
System.out.println("訊息型別:" + msg.getClass());
super.channelRead(ctx, msg);
}
});
pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
HttpRequest msg) throws Exception {
System.out.println("客戶端的請求路徑:" + msg.uri());
// 建立一個響應物件,版本號與客戶端保持一致,狀態碼為OK/200
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
msg.protocolVersion(),
HttpResponseStatus.OK);
// 構造響應內容
byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();
// 設定響應頭:告訴客戶端本次響應的資料長度
response.headers().setInt(
HttpHeaderNames.CONTENT_LENGTH,content.length);
// 設定響應主體
response.content().writeBytes(content);
// 向客戶端寫入響應資料
ctx.writeAndFlush(response);
}
});
}
})
.bind("127.0.0.1",8888)
.sync();
}
}
在該案例中,咱們就未曾手動對`HTTP`的資料包進行拆包處理了,而是在服務端的`pipeline`上添加了一個`HttpServerCodec`處理器,這個處理器是`Netty`官方提供的,其類繼承關係如下:
java
public final class HttpServerCodec
extends CombinedChannelDuplexHandler``
觀察會發現,該類繼承自
CombinedChannelDuplexHandler這個組合類,它組合了編碼器、解碼器,這也就意味著
HttpServerCodec`即可以對客戶端的資料做解碼,也可以對服務端響應的資料做編碼,同時除開添加了這個處理器外,在第二個處理器中列印了一下客戶端的訊息型別,最後一個處理器中,對客戶端的請求做出了響應,其實也就是返回了一句話而已。
此時在瀏覽器輸入http://127.0.0.1:8888/index.html
,結果如下:
java
訊息型別:class io.netty.handler.codec.http.DefaultHttpRequest
訊息型別:class io.netty.handler.codec.http.LastHttpContent$1
客戶端的請求路徑:/index.html
此時來看結果,客戶端的請求會被解析成兩個部分,第一個是請求資訊,第二個是主體資訊,但按理來說瀏覽器發出的請求,屬於GET
型別的請求,GET
請求是沒有請求體資訊的,但Netty
依舊會解析成兩部分~,只不過GET
請求的第二部分是空的。
在第三個處理器中,咱們直接向客戶端返回了一個h1
標籤,同時也要記得在響應頭裡面,加上響應內容的長度資訊,否則瀏覽器的載入圈,會一直不同的轉動,畢竟瀏覽器也不知道內容有多長,就會一直反覆載入,嘗試等待更多的資料。
1.3、自定義訊息傳輸協議
Netty
除開提供了HTTP
協議的處理器外,還提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....
一系列協議的實現,具體定義位於io.netty.handler.codec
這個包下,當然,咱們也可以自己實現自定義協議,按照自己的邏輯對資料進行編解碼處理。
很多基於Netty
開發的中介軟體/元件,其內部基本上都開發了專屬的通訊協議,以此來作為不同節點間通訊的基礎,所以解下來咱們基於Netty
也來自己設計一款通訊協議,這也會作為後續實現聊天程式時的基礎。
但所謂的協議設計,其實僅僅只需要按照一定約束,實現編碼器與解碼器即可,傳送方在發出資料之前,會經過編碼器對資料進行處理,而接收方在收到資料之前,則會由解碼器對資料進行處理。
1.3.1、自定義協議的要素
在自定義傳輸協議時,咱們必然需要考慮幾個因素,如下:
- 魔數:用來第一時間判斷是否為自己需要的資料包。
- 版本號:提高協議的拓展性,方便後續對協議進行升級。
- 序列化演算法:訊息正文具體該使用哪種方式進行序列化傳輸,例如Json、ProtoBuf、JDK...
。
- 訊息型別:第一時間判斷出當前訊息的型別。
- 訊息序號:為了實現雙工通訊,客戶端和服務端之間收/發訊息不會相互阻塞。
- 正文長度:提供給LTC
解碼器使用,防止解碼時出現粘包、半包的現象。
- 訊息正文:本次訊息要傳輸的具體資料。
在設計協議時,一個完整的協議應該涵蓋上述所說的幾方面,這樣才能提供雙方通訊時的基礎,基於上述幾個欄位,能夠在第一時間內判斷出:訊息是否可用、當前協議版本、訊息的具體型別、訊息的長度等各類資訊,從而給後續處理器使用(自定義的協議規則本身就是一個編解碼處理器而已)。
1.3.2、自定義協議實戰
前面簡單聊到過,所謂的自定義協議就是自己規定訊息格式,以及自己實現編/解碼器對訊息實現封裝/拆解,所以這裡想要自定義一個訊息協議,就只需要滿足前面兩個條件即可,因此實現如下:
```java
@ChannelHandler.Sharable
public class ChatMessageCodec extends MessageToMessageCodec
// 訊息出站時會經過的編碼方法(將原生訊息物件封裝成自定義協議的訊息格式)
@Override
protected void encode(ChannelHandlerContext ctx, Message msg,
List<Object> list) throws Exception {
ByteBuf outMsg = ctx.alloc().buffer();
// 前五個位元組作為魔數
byte[] magicNumber = new byte[]{'Z','h','u','Z','i'};
outMsg.writeBytes(magicNumber);
// 一個位元組作為版本號
outMsg.writeByte(1);
// 一個位元組表示序列化方式 0:JDK、1:Json、2:ProtoBuf.....
outMsg.writeByte(0);
// 一個位元組用於表示訊息型別
outMsg.writeByte(msg.getMessageType());
// 四個位元組表示訊息序號
outMsg.writeInt(msg.getSequenceId());
// 使用Java-Serializable的方式對訊息物件進行序列化
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] msgBytes = bos.toByteArray();
// 使用四個位元組描述訊息正文的長度
outMsg.writeInt(msgBytes.length);
// 將序列化後的訊息物件作為訊息正文
outMsg.writeBytes(msgBytes);
// 將封裝好的資料傳遞給下一個處理器
list.add(outMsg);
}
// 訊息入站時會經過的解碼方法(將自定義格式的訊息轉變為具體的訊息物件)
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf inMsg, List<Object> list) throws Exception {
// 讀取前五個位元組得到魔數
byte[] magicNumber = new byte[5];
inMsg.readBytes(magicNumber,0,5);
// 再讀取一個位元組得到版本號
byte version = inMsg.readByte();
// 再讀取一個位元組得到序列化方式
byte serializableType = inMsg.readByte();
// 再讀取一個位元組得到訊息型別
byte messageType = inMsg.readByte();
// 再讀取四個位元組得到訊息序號
int sequenceId = inMsg.readInt();
// 再讀取四個位元組得到訊息正文長度
int messageLength = inMsg.readInt();
// 再根據正文長度讀取序列化後的位元組正文資料
byte[] msgBytes = new byte[messageLength];
inMsg.readBytes(msgBytes,0,messageLength);
// 對於讀取到的訊息正文進行反序列化,最終得到具體的訊息物件
ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
ObjectInputStream ois = new ObjectInputStream(bis);
Message message = (Message) ois.readObject();
// 最終把反序列化得到的訊息物件傳遞給後續的處理器
list.add(message);
}
}
``
上面自定義的處理器中,繼承了
MessageToMessageCodec類,主要負責將資料在原生
ByteBuf與
Message之間進行相互轉換,而
Message物件是自定義的訊息物件,這裡暫且無需過多關心。其中主要實現了兩個方法:
-
encode():出站時會經過的編碼方法,會將原生訊息物件按自定義的協議封裝成對應的位元組資料。
-
decode()`:入站時會經過的解碼方法,會將協議格式的位元組資料,轉變為具體的訊息物件。
上述自定義的協議,也就是一定規則的位元組資料,每條訊息資料的組成如下:
- 魔數:使用第1~5
個位元組來描述,這個魔數值可以按自己的想法自定義。
- 版本號:使用第6
個位元組來描述,不同數字表示不同版本。
- 序列化演算法:使用第7
個位元組來描述,不同數字表示不同序列化方式。
- 訊息型別:使用第8
個位元組來描述,不同的訊息型別使用不同數字表示。
- 訊息序號:使用第9~12
個位元組來描述,其實就是一個四位元組的整數。
- 正文長度:使用第13~16
個位元組來描述,也是一個四位元組的整數。
- 訊息正文:長度不固定,根據每次具體傳送的資料來決定。
在其中,為了實現簡單,這裡的序列化方式,則採用的是JDK
預設的Serializable
介面方式,但這種方式生成的物件位元組較大,實際情況中最好還是選擇谷歌的ProtoBuf
方式,這種演算法屬於序列化演算法中,效能最佳的一種落地實現。
當然,這個自定義的協議是提供給後續的聊天業務使用的,但這種實戰型的內容分享,基本上程式碼量較高,所以大家看起來會有些枯燥,而本文所使用的聊天室案例,是基於《B站-黑馬Netty視訊教程》二次改良的,因此如若感覺文字描述較為枯燥,可直接點選前面給出的連結,觀看
P101~P121
視訊進行學習。
最後來觀察一下,大家會發現,在咱們定義的這個協議編解碼處理器上,存在著一個@ChannelHandler.Sharable
註解,這個註解的作用是幹嗎的呢?其實很簡單,用來標識當前處理器是否可在多執行緒環境下使用,如果帶有該註解的處理器,則表示可以在多個通道間共用,因此只需要建立一個即可,反之同理,如果不帶有該註解的處理器,則每個通道需要單獨建立使用。
二、基於Netty打造IM聊天程式
前面簡單過了一下自定義協議後,接著來基於Netty
框架上手一個真正的實戰專案,那也就是基於Netty
打造一款IM
即時通訊的聊天程式,這裡在實現過程中,僅僅只會給出核心實現,但最後會提供完整程式碼的Github
連結,因此大家重點理解核心即可。
2.1、IM程式的使用者模組
聊天、聊天,自然是得先有人,然後才能進行聊天溝通,與QQ、微信類似,如果你想要使用某款聊天程式時,前提都得是先具備一個對應的賬戶才行,因此在咱們設計IM
系統之處,那也需要對應的使用者功能實現,但這裡為了簡單,同樣不再結合資料庫實現完整的使用者模組了,而是基於記憶體實現使用者的管理,如下:
java
public interface UserService {
boolean login(String username, String password);
}
這是使用者模組的頂層介面,僅僅只提供了一個登入介面,關於註冊、鑑權、等級.....等一系列功能,大家感興趣的可在後續進行拓展實現,接著來看看該介面的實現類,如下:
```java
public class UserServiceMemoryImpl implements UserService {
private Map
{
// 在程式碼塊中對使用者列表進行初始化,向其中添加了兩個使用者資訊
allUserMap.put("ZhuZi", "123");
allUserMap.put("XiongMao", "123");
}
@Override
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if (pass == null) {
return false;
}
return pass.equals(password);
}
}
``
這個實現類並未結合資料庫來實現,而是僅僅在程式啟動時,通過程式碼塊的方式,載入了
ZhuZi、XiongMao兩個使用者資訊並放入記憶體的
Map容器中,這裡有興趣的小夥伴,可自行將
Map`容器換成資料庫的表即可。
其中實現的login()
登入介面尤為簡單,僅僅只是判斷了一下有沒有對應使用者,如果有的話則看看密碼是否正確,正確返回true
,密碼錯誤則返回false
,是的,我所寫的登入功能就是這麼簡單,走個簡單的過場,哈哈哈~
2.1.1、服務端、客戶端的基礎架構
基本的使用者模組有了,但這裡還未曾套入具體實現,因此先簡單的搭建出服務端、客戶端的架構,然後再基於構建好的架構實現基礎的使用者登入功能,服務端的基礎搭建如下: ```java public class ChatServer { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = serverBootstrap.bind(8888).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("服務端出現錯誤:" + e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
服務端的程式碼目前很簡單,僅僅只是裝載了一個自己的協議編/解碼處理器,然後就是一些老步驟,不再過多的重複贅述,接著再來搭建一個簡單的客戶端,程式碼實現如下:
java
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = bootstrap.connect("localhost", 8888).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
System.out.println("客戶端出現錯誤:" + e);
} finally {
group.shutdownGracefully();
}
}
} ``` 目前僅僅只是與服務端建立了連線,然後裝載了一個自定義的編解碼器,到這裡就搭建了最基本的服務端、客戶端的基礎架構,接著來基於它實現簡單的登入功能。
2.1.2、使用者登入功能的實現
對於登入功能,由於需要在服務端與客戶端之間傳輸資料,因此咱們可以設計一個訊息物件,但由於後續單聊、群聊都需要傳送不同的訊息格式,因此先設計出一個父類,如下: ```java public abstract class Message implements Serializable {
private int sequenceId;
private int messageType;
@Override
public String toString() {
return "Message{" +
"sequenceId=" + sequenceId +
", messageType=" + messageType +
'}';
}
public int getSequenceId() {
return sequenceId;
}
public void setSequenceId(int sequenceId) {
this.sequenceId = sequenceId;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
}
在這個訊息父類中,定義了多種訊息型別的狀態碼,不同的訊息型別對應不同數字,同時其中還設計了一個抽象方法,即`getMessageType()`,該方法交給具體的子類實現,每個子類返回各自的訊息型別,為了方便後續拓展,這裡又建立了一個抽象類作為中間類,如下:
java
public abstract class AbstractResponseMessage extends Message {
private boolean success;
private String reason;
public AbstractResponseMessage() {
}
public AbstractResponseMessage(boolean success, String reason) {
this.success = success;
this.reason = reason;
}
@Override
public String toString() {
return "AbstractResponseMessage{" +
"success=" + success +
", reason='" + reason + '\'' +
'}';
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getReason() {
return reason;
}
public void setReason(String reason) {
this.reason = reason;
}
}
這個類主要是提供給響應時使用的,其中包含了響應狀態以及響應資訊,接著再設計兩個登入時會用到的訊息物件,如下:
java
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
@Override
public String toString() {
return "LoginRequestMessage{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
上述這個訊息類,主要是提供給客戶端登入時使用,本質上也就是一個涵蓋使用者名稱、使用者密碼的物件而已,同時還有一個用來給服務端響應時的響應類,如下:
java
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
} ``` 登入響應類的實現十分簡單,由登入狀態和登入訊息組成,OK,接著來看看登入的具體實現。
首先在客戶端中,再通過pipeline
新增一個處理器,如下:
```java
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
AtomicBoolean LOGIN = new AtomicBoolean(false);
AtomicBoolean EXIT = new AtomicBoolean(false);
Scanner scanner = new Scanner(System.in);
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 負責接收使用者在控制檯的輸入,負責向伺服器傳送各種訊息
new Thread(() -> {
System.out.println("請輸入使用者名稱:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("請輸入密碼:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 構造訊息物件
LoginRequestMessage message = new LoginRequestMessage(username, password);
System.out.println(message);
// 傳送訊息
ctx.writeAndFlush(message);
System.out.println("等待後續操作...");
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登入失敗
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
}).start();
}
在與服務端建立連線成功之後,就提示使用者需要登入,接著接收使用者輸入的使用者名稱、密碼,然後構建出一個`LoginRequestMessage`訊息物件,接著將其傳送給服務端,由於前面裝載了自定義的協議編解碼器,所以訊息在出站時,這個`Message`物件會被序列化成位元組碼,接著再服務端入站時,又會被反序列化成訊息物件,接著來看看服務端的實現,如下:
java
@ChannelHandler.Sharable
public class LoginRequestMessageHandler
extends SimpleChannelInboundHandler``
在服務端中,新增了一個處理器類,繼承自
SimpleChannelInboundHandler這個處理器,其中指定的泛型為
LoginRequestMessage,這表示當前處理器只關注這個型別的訊息,當出現登入型別的訊息時,會進入該處理器並觸發內部的
channelRead0()`方法。
在該方法中,獲取了登入訊息中的使用者名稱、密碼,接著對其做了基本的登入效驗,如果使用者名稱存在並且密碼正確,就會返回登入成功,否則會返回登入失敗,最終登入後的狀態會被封裝成一個LoginResponseMessage
物件,然後寫回客戶端的通道中。
當然,為了該處理器能夠成功生效,這裡需要將其裝載到服務端的pipeline
上,如下:
java
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ch.pipeline().addLast(LOGIN_HANDLER);
裝載好登入處理器後,接著分別啟動服務端、客戶端,測試結果如下:
從圖中的效果來看,這裡實現了最基本的登入功能,估計有些小夥伴看到這裡就有些暈了,但其實非常簡單,僅僅只是通過Netty
在做資料互動而已,客戶端則提供輸入使用者名稱、密碼的功能,然後將使用者輸入的名稱、密碼傳送給服務端,服務端提供登入判斷的功能,最終根據判斷結果再向客戶端返回資料罷了。
2.2、基於Netty實現點對點單聊
有了基本的使用者登入功能後,接著來看看如何實現點對點的單聊功能呢?首先我定義了一個會話介面,如下:
java
public interface Session {
void bind(Channel channel, String username);
void unbind(Channel channel);
Channel getChannel(String username);
}
這個介面中依舊只有三個方法,釋義如下:
- bind()
:傳入一個使用者名稱和Socket
通道,讓兩者之間的產生繫結關係。
- unbind()
:取消一個使用者與某個Socket
通道的繫結關係。
- getChannel()
:根據一個使用者名稱,獲取與其存在繫結關係的通道。
該介面的實現類如下: ```java public class SessionMemoryImpl implements Session {
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
@Override
public String toString() {
return usernameChannelMap.toString();
}
}
``
該實現類最關鍵的是其中的兩個
Map容器,
usernameChannelMap用來儲存所有使用者名稱與
Socket通道的繫結關係,而
channelUsernameMap則是反過來的順序,這主要是為了方便,即可以通過使用者名稱獲得對應通道,也可以通過通道判斷出使用者名稱,實際上一個
Map`也能搞定,但還是那句話,主要為了簡單嘛~
有了上述這個最簡單的會話管理功能後,就要著手實現具體的功能了,其實在前面實現登入功能的時候,就用過這其中的bind()
方法,也就是當登入成功之後,就會將當前傳送登入訊息的通道,與正在登入的使用者名稱產生繫結關係,這樣就方便後續實現單聊、群聊的功能。
2.2.1、定義單聊的訊息物件
與登入時相同,由於需要在服務端和客戶端之間實現資料的轉發,因此這裡也需要兩個訊息物件,用來作為資料互動的訊息格式,如下: ```java public class ChatRequestMessage extends Message { private String content; private String to; private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
// 省略Get/Setting、toString()方法.....
}
上述這個類,是提供給客戶端用來發送訊息資料的,其中主要包含了三個值,聊天的訊息內容、傳送人與接收人,因為這裡是需要實現一個`IM`聊天程式,所以並不是客戶端與服務端進行資料互動,而是客戶端與客戶端之間進行資料互動,服務端僅僅只提供訊息轉發的功能,接著再構建一個訊息類,如下:
java
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
@Override
public String toString() {
return "ChatResponseMessage{" +
"from='" + from + '\'' +
", content='" + content + '\'' +
'}';
}
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return ChatResponseMessage;
}
// 省略Get/Setting、toString()方法.....
} ``` 這個類是提供給服務端用來轉發的,當服務端收到一個聊天訊息後,因為聊天訊息中包含了接收人,所以可以先根據接收人的使用者名稱,找到對應的客戶端通道,然後再封裝成一個響應訊息,轉發給對應的客戶端即可,下面來做具體實現。
2.2.2、實現點對點單聊功能
由於聊天功能是提供給客戶端使用的,所以當一個客戶端登入成功之後,應該暴露給使用者一個操作選單,所以直接在原本客戶端的channelActive()
方法中,登入成功之後繼續加程式碼即可,程式碼如下:
java
while (true) {
System.out.println("==================================");
System.out.println("\t1、傳送單聊訊息");
System.out.println("\t2、傳送群聊訊息");
System.out.println("\t3、建立一個群聊");
System.out.println("\t4、獲取群聊成員");
System.out.println("\t5、加入一個群聊");
System.out.println("\t6、退出一個群聊");
System.out.println("\t7、退出聊天系統");
System.out.println("==================================");
String command = scanner.nextLine();
}
首先會開啟一個死迴圈,然後不斷接收使用者的操作,接著使用switch
語法來對具體的選單功能進行實現,先實現單聊功能,如下:
java
switch (command){
case "1":
System.out.print("請選擇你要傳送訊息給誰:");
String toUserName = scanner.nextLine();
System.out.print("請輸入你要傳送的訊息內容:");
String content = scanner.nextLine();
ctx.writeAndFlush(new ChatRequestMessage(username, toUserName, content));
break;
}
如果使用者選擇了單聊,接著會提示使用者選擇要傳送訊息給誰,這裡也就是讓使用者輸入對方的使用者名稱,實際上如果有介面的話,這一步是並不需要使用者自己輸入的,而是提供視窗讓使用者點選,比如QQ、微信一樣,想要給某個人傳送訊息時,只需要點選“他”的頭像私聊即可。
等使用者選擇了聊天目標,並且輸入了訊息內容後,接著會構建一個ChatRequestMessage
訊息物件,然後會發送給服務端,但這裡先不看服務端的實現,客戶端這邊還需要重寫一個方法,如下:
java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到訊息:" + msg);
if ((msg instanceof LoginResponseMessage)) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
// 如果登入成功
LOGIN.set(true);
}
// 喚醒 system in 執行緒
WAIT_FOR_LOGIN.countDown();
}
}
前面的邏輯是在channelActive()
方法中完成的,也就是連線建立成功後,就會讓使用者登入,接著登入成功之後會給使用者一個選單欄,提供給使用者進行操作,但前面的邏輯中一直沒有對服務端響應的訊息進行處理,因此channelRead()
方法中會對服務端響應的資料進行處理。
channelRead()
方法會在有資料可讀時被觸發,所以當服務端響應資料時,首先會判斷一下:目前服務端響應的是不是登入訊息,如果是的話,則需要根據登入的結果來喚醒前面channelActive()
方法中的執行緒。如果目前服務端響應的不是登入訊息,這也就意味著客戶端前面已經登入成功了,所以接著會直接列印一下收到的資料。
OK,有了上述客戶端的程式碼實現後,接著再來服務端多建立一個處理器,如下:
java
@ChannelHandler.Sharable
public class ChatRequestMessageHandler
extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
// 線上
if (channel != null) {
channel.writeAndFlush(new ChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
// 不線上
else {
ctx.writeAndFlush(new ChatResponseMessage(
false, "對方使用者不存在或者不線上"));
}
}
}
這裡依舊通過繼承SimpleChannelInboundHandler
類的形式,來特別關注ChatRequestMessage
單聊型別的訊息,如果目前服務端收到的是單聊訊息,則會進入觸發該處理器的channelRead0()
方法,該處理器內部的邏輯也並不複雜,首先根據單聊訊息的接收人,去找一下與之對應的通道:
- 如果根據使用者名稱查到了通道,表示接收人目前是登入線上狀態。
- 反之,如果無法根據使用者名稱找到通道,表示對應的使用者不存在或者沒有登入。
接著會根據上面的查詢結果,進行對應的結果返回: - 如果線上:把要傳送的單聊訊息,直接寫入至找到的通道中。 - 如果不線上:向傳送單聊訊息的客戶端,返回使用者不存在或使用者不線上。
有了這個處理器之後,接著還需要把該處理器裝載到服務端上,如下:
java
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
ch.pipeline().addLast(CHAT_HANDLER);
裝載好單聊處理器後,接著分別啟動一個服務端、兩個客戶端,測試結果如下:
從測試結果中可以明顯看出效果,其中的單聊功能的確已經實現,可以實現A→B
使用者之間的單聊功能,兩者之間藉助伺服器轉發,可以實現兩人私聊的功能。
2.3、基於Netty打造多人聊天室
前面實現了兩個使用者之間的私聊功能,接著再來實現一個多人聊天室的功能,畢竟像QQ、微信、釘釘....等任何通訊軟體,都支援多人建立群聊的功能,但多人聊天室的功能,實現之前還需要先完成建群的功能,畢竟如果群都沒建立,自然無法向某個群內傳送資料。
實現拉群也好,群聊也罷,其實現步驟依舊和前面相同,如下: - ①先定義對應的訊息物件。 - ②實現客戶端傳送對應訊息資料的功能。 - ③再寫一個服務端的群聊處理器,然後裝載到服務端上。
2.3.1、定義拉群的訊息體
首先來定義兩個拉群時用的訊息體,如下:
```java
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
上述這個訊息體是提供給客戶端使用的,其中主要存在兩個成員,也就是群名稱與群成員列表,存放所有群成員的容器選用了`Set`集合,因為`Set`集合具備不可重複性,因此可以有效的避免同一使用者多次進群,接著再來看看服務端響應時用的訊息體,如下:
java
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
} ``` 這個訊息體的實現尤為簡單,僅僅只是給客戶端返回了拉群狀態以及拉群的附加資訊。
2.3.2、定義群聊會話管理
前面單聊有單聊的會話管理機制,而實現多人群聊時,依舊需要有群聊的會話管理機制,首先封裝了一個群聊實體類,如下:
```java
public class Group {
// 聊天室名稱
private String name;
// 聊天室成員
private Set
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
// 省略其他Get/Settings、toString()方法.....
}
接著定義了一個群聊會話的頂級介面,如下:
java
public interface GroupSession {
// 建立一個群聊
Group createGroup(String name, Set上述介面中,提供了幾個介面方法,其實也主要是群聊系統中的一些日常操作,如創群、加群、踢人、解散群、檢視群成員....等功能,接著來看看該介面的實現者,如下:
java
public class GroupSessionMemoryImpl implements GroupSession {
private final Map
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group);
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
public List<Channel> getMembersChannel(String name) {
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
這個實現類沒啥好說的,重點記住裡面有個`Map`容器即可,這個容器主要負責儲存所有群名稱與`Group`群聊物件的關係,後續可以通過群聊名稱,在這個容器中找到一個對應群聊物件。同時為了方便後續呼叫這些介面,還提供了一個工具類,如下:
java
public abstract class GroupSessionFactory {
private static GroupSession session = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return session;
}
}
``
很簡單,僅僅只例項化了一個群聊會話管理的實現類,因為這裡沒有結合
Spring來實現,所以並不能依靠
IOC技術來自動管理
Bean`,因此咱們需要手動創建出一個例項,以供於後續使用。
2.3.3、實現拉群功能
前面客戶端的功能選單中,3
對應著拉群功能,所以咱們需要對3
做具體的功能實現,邏輯如下:
java
case "3":
System.out.print("請輸入你要建立的群聊暱稱:");
String newGroupName = scanner.nextLine();
System.out.print("請選擇你要邀請的群成員(不同成員用、分割):");
String members = scanner.nextLine();
Set<String> memberSet = new HashSet<>(Arrays.asList(members.split("、")));
memberSet.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(newGroupName, memberSet));
break;
在該分支實現中,首先會要求使用者輸入一個群聊暱稱,接著需要輸入需要拉入群聊的使用者名稱稱,多個使用者之間使用、
分割,接著會把使用者輸入的群成員以及自己,全部放入到一個Set
集合中,最終組裝成一個拉群訊息體,傳送給服務端處理,服務端的處理器如下:
java
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler
extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
// 群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if (group == null) {
// 發生成功訊息
ctx.writeAndFlush(new GroupCreateResponseMessage(true,
groupName + "建立成功"));
// 傳送拉群訊息
List<Channel> channels = groupSession.getMembersChannel(groupName);
for (Channel channel : channels) {
channel.writeAndFlush(new GroupCreateResponseMessage(
true, "您已被拉入" + groupName));
}
} else {
ctx.writeAndFlush(new GroupCreateResponseMessage(
false, groupName + "已經存在"));
}
}
}
這裡依舊繼承了SimpleChannelInboundHandler
類,只關心拉群的訊息,當客戶端出現拉群訊息時,首先會獲取使用者輸入的群暱稱和群成員,接著通過前面提供的創群介面,嘗試建立一個群聊,如果群聊已經存在,則會建立失敗,反之則會建立成功,在建立群聊成功的情況下,會給所有的群成員傳送一條“你已被拉入[XXX]”的訊息。
最後,同樣需要將該處理器裝載到服務端上,如下:
java
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =
new GroupCreateRequestMessageHandler();
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
最後分別啟動一個服務端、兩個客戶端進行效果測試,如下:
從上圖的測試結果來看,的確實現了咱們的拉群效果,一個使用者拉群之後,被邀請的成員都會收到來自於服務端的拉群提醒,這也就為後續群聊功能奠定了基礎。
2.3.4、定義群聊的訊息體
這裡就不重複贅述了,還是之前的套路,定義一個客戶端用的訊息體,如下: ```java public class GroupChatRequestMessage extends Message { private String content; private String groupName; private String from;
public GroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
@Override
public int getMessageType() {
return GroupChatRequestMessage;
}
// 省略其他Get/Settings、toString()方法.....
}
這個是客戶端用來發送群聊訊息的訊息體,其中存在三個成員,傳送人、群聊暱稱、訊息內容,通過這三個成員,可以描述清楚任何一條群聊記錄,接著來看看服務端響應時用的訊息體,如下:
java
public class GroupChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public GroupChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public GroupChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatResponseMessage;
}
// 省略其他Get/Settings、toString()方法.....
} ``` 在這個訊息體中,就省去了群聊暱稱這個成員,因為這個訊息體的用處,主要是給服務端轉發給客戶端時使用的,因此不需要群聊暱稱,當然,要也可以,我這裡就直接省去了。
2.3.5、實現群聊功能
依舊先來做客戶端的實現,實現了客戶端之後再去完成服務端的實現,客戶端實現如下:
java
case "2":
System.out.print("請選擇你要傳送訊息的群聊:");
String groupName = scanner.nextLine();
System.out.print("請輸入你要傳送的訊息內容:");
String groupContent = scanner.nextLine();
ctx.writeAndFlush(new GroupChatRequestMessage(username, groupName, groupContent));
break;
因為傳送群聊訊息對應著之前選單中的2
,所以這裡對該分支進行實現,當用戶選擇傳送群聊訊息時,首先會讓使用者自己先選擇一個群聊,接著輸入要傳送的訊息內容,接著組裝成一個群聊訊息物件,傳送給服務端處理,服務端的實現如下:
```java
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler
extends SimpleChannelInboundHandler
for (Channel channel : channels) {
channel.writeAndFlush(new GroupChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
}
}
``
這裡依舊定義了一個處理器,關於原因就不再重複囉嗦了,服務端對於群聊訊息的實現額外簡單,也就是先根據使用者選擇的群暱稱,找到該群所有的群成員,然後依次遍歷成員列表,獲取對應的
Socket`通道,轉發訊息即可。
接著將該處理器裝載到服務端pipeline
上,然後分別啟動一個服務端、兩個客戶端,進行效果測試,如下:
效果如上圖的註釋,基於上述的程式碼測試,效果確實達到了咱們需要的群聊效果~
2.3.6、聊天室的其他功能實現
到這裡為止,實現了最基本的建群、群聊的功能,但對於踢人、加群、解散群....等一系列群聊功能還未曾實現,但我這裡就不繼續重複了,畢竟還是那個套路: - ①定義對應功能的訊息體。 - ②客戶端向服務端傳送對應格式的訊息。 - ③服務端編寫處理器,對特定的訊息進行處理。
所以大家感興趣的情況下,可以根據上述步驟繼續進行實現,實現的過程沒有任何難度,重點就是時間問題罷了。
三、Netty實戰篇總結
看到這裡,其實Netty
實戰篇的內容也就大致結束了,個人對於實戰篇的內容並不怎麼滿意,因為與最初設想的實現存在很大偏差,這是由於近期工作、生活狀態不對,所以內容輸出也沒那麼夯實,對於這篇中的完整程式碼實現,也包括前面兩篇中的一些程式碼實現,這裡給出完整的GitHub
連結:>>>戳我訪問<<<,大家感興趣可以自行Down
下去玩玩。
在我所撰寫的案例中,自定義協議可以繼續優化,選擇效能更強的序列化方式,而聊天室也可以進一步拓展,比如將使用者資訊、群聊資訊、聯絡人資訊都結合資料庫實現,進一步實現離線訊息功能,但由於該案例的設計之初就有問題,所以是存在效能問題的,想要打造一款真正高效能的IM
程式,那諸位可參考《計算機網路綜述-騰訊QQ原理》其中的內容。
- 全解MySQL終章:這份爆肝30W字的資料庫寶典贈與有緣的你!
- 追憶四年前:一段關於我被外企CTO用登入註冊吊打的不堪往事
- (十一)Netty實戰篇:基於Netty框架打造一款高效能的IM即時通訊程式
- (四)MySQL之索引初識篇:索引機制、索引分類、索引使用與管理綜述
- (九)MySQL之MVCC機制:為什麼你改了的資料我還看不見?
- (十)全解MySQL之死鎖問題分析、事務隔離與鎖機制的底層原理剖析
- (二十八)MySQL面試通關祕籍:這次你也可以在簡歷寫上精通MySQL!
- (一)全解MySQL之架構篇:自頂向下深入剖析MySQL整體架構!
- (九)Java網路程式設計無冕之王-這回把大名鼎鼎的Netty框架一網打盡!
- (八)MySQL鎖機制:高併發場景下該如何保證資料讀寫的安全性?
- (十五)MySQL命令大全:以後再也不用擔心忘記SQL該怎麼寫啦~
- (七)MySQL事務篇:ACID原則、事務隔離級別及事務機制原理剖析
- 深入理解SpringMVC工作原理,像大牛一樣手寫SpringMVC框架
- (三)MySQL之庫表設計篇:一、二、三、四、五正規化、BC正規化與反正規化詳解!
- (五)MySQL索引應用篇:建立索引的正確姿勢與使用索引的最佳指南!
- (六)MySQL索引原理篇:深入資料庫底層揭開索引機制的神祕面紗!
- (五)網路程式設計之流量接入層設計:基於效能怪獸從零構建日均億級吞吐量的閘道器架構!
- (四)網路程式設計之請求分發篇:負載均衡靜態排程演算法、平滑輪詢加權、一致性雜湊、最小活躍數演算法實踐!
- (三)Nginx一網打盡:動靜分離、壓縮、快取、黑白名單、跨域、高可用、效能優化...想要的這都有!
- Redis綜述篇:與面試官徹夜長談Redis快取、持久化、淘汰機制、哨兵、叢集底層原理!