從Redis、HTTP協議,看Nett協議設計,我發現了個驚天大祕密
1. 協議的作用
TCP/IP 中訊息傳輸基於流的方式,沒有邊界
協議的目的就是劃定訊息的邊界,制定通訊雙方要共同遵守的通訊規則
2. Redis 協議
如果我們要向 Redis 伺服器傳送一條 set name Nyima 的指令,需要遵守如下協議
// 該指令一共有3部分,每條指令之後都要添加回車與換行符
*3\r\n
// 第一個指令的長度是3
$3\r\n
// 第一個指令是set指令
set\r\n
// 下面的指令以此類推
$4\r\n
name\r\n
$5\r\n
Nyima\r\n
客戶端程式碼如下
public class RedisClient {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 列印日誌
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 回車與換行符
final byte[] LINE = {'\r','\n'};
// 獲得ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 連線建立後,向Redis中傳送一條指令,注意添加回車與換行
// set name Nyima
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$5".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("Nyima".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
});
}
})
.connect(new InetSocketAddress("localhost", 6379));
channelFuture.sync();
// 關閉channel
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 關閉group
group.shutdownGracefully();
}
}
}
控制檯列印結果
1600 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x28c994f1, L:/127.0.0.1:60792 - R:localhost/127.0.0.1:6379] WRITE: 34B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a |.. |
+--------+-------------------------------------------------+----------------+
Redis 中查詢執行結果
3. HTTP 協議
HTTP 協議在請求行請求頭中都有很多的內容,自己實現較為困難,可以使用 HttpServerCodec 作為伺服器端的解碼器與編碼器,來處理 HTTP 請求
// HttpServerCodec 中既有請求的解碼器 HttpRequestDecoder 又有響應的編碼器 HttpResponseEncoder
// Codec(CodeCombine) 一般代表該類既作為 編碼器 又作為 解碼器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec
伺服器程式碼
public class HttpServer {
static final Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 作為伺服器,使用 HttpServerCodec 作為編碼器與解碼器
ch.pipeline().addLast(new HttpServerCodec());
// 伺服器只處理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
// 獲得請求uri
log.debug(msg.uri());
// 獲得完整響應,設定版本號與狀態碼
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 設定響應內容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 設定響應體長度,避免瀏覽器一直接收響應內容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 設定響應體
response.content().writeBytes(bytes);
// 寫回響應
ctx.writeAndFlush(response);
}
});
}
})
.bind(8080);
}
}
伺服器負責處理請求並響應瀏覽器。所以只需要處理 HTTP 請求即可
// 伺服器只處理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
獲得請求後,需要返回響應給瀏覽器。需要建立響應物件 DefaultFullHttpResponse,設定 HTTP 版本號及狀態碼,為避免瀏覽器獲得響應後,因為獲得 CONTENT_LENGTH 而一直空轉,需要新增 CONTENT_LENGTH 欄位,表明響應體中資料的具體長度
// 獲得完整響應,設定版本號與狀態碼
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 設定響應內容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 設定響應體長度,避免瀏覽器一直接收響應內容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 設定響應體
response.content().writeBytes(bytes);
執行結果
瀏覽器
控制檯
// 請求內容
1714 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] READ: 688B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
|00000020| 20 6c 6f 63 61 6c 68 6f 73 74 3a 38 30 38 30 0d | localhost:8080.|
|00000030| 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a 20 6b 65 65 |.Connection: kee|
|00000040| 70 2d 61 6c 69 76 65 0d 0a 50 72 61 67 6d 61 3a |p-alive..Pragma:|
....
// 響應內容
1716 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x72630ef7, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:55503] WRITE: 61B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 43 6f 6e 74 65 6e 74 2d 4c 65 6e 67 74 68 3a |.Content-Length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e |, World!</h1> |
+--------+-------------------------------------------------+----------------+
4. 自定義協議
組成要素
-
魔數:用來在第一時間判定接收的資料是否為無效資料包
-
版本號:可以支援協議的升級
-
序列化演算法
:訊息正文到底採用哪種序列化反序列化方式
- 如:json、protobuf、hessian、jdk
-
指令型別:是登入、註冊、單聊、群聊… 跟業務相關
-
請求序號:為了雙工通訊,提供非同步能力
-
正文長度
-
訊息正文
編碼器與解碼器
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 設定魔數 4個位元組
out.writeBytes(new byte[]{'N','Y','I','M'});
// 設定版本號 1個位元組
out.writeByte(1);
// 設定序列化方式 1個位元組
out.writeByte(1);
// 設定指令型別 1個位元組
out.writeByte(msg.getMessageType());
// 設定請求序號 4個位元組
out.writeInt(msg.getSequenceId());
// 為了補齊為16個位元組,填充1個位元組的資料
out.writeByte(0xff);
// 獲得序列化後的msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 獲得並設定正文長度 長度用4個位元組標識
out.writeInt(bytes.length);
// 設定訊息正文
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 獲取魔數
int magic = in.readInt();
// 獲取版本號
byte version = in.readByte();
// 獲得序列化方式
byte seqType = in.readByte();
// 獲得指令型別
byte messageType = in.readByte();
// 獲得請求序號
int sequenceId = in.readInt();
// 移除補齊位元組
in.readByte();
// 獲得正文長度
int length = in.readInt();
// 獲得正文
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 將資訊放入List中,傳遞給下一個handler
out.add(message);
// 列印獲得的資訊正文
System.out.println("===========魔數===========");
System.out.println(magic);
System.out.println("===========版本號===========");
System.out.println(version);
System.out.println("===========序列化方法===========");
System.out.println(seqType);
System.out.println("===========指令型別===========");
System.out.println(messageType);
System.out.println("===========請求序號===========");
System.out.println(sequenceId);
System.out.println("===========正文長度===========");
System.out.println(length);
System.out.println("===========正文===========");
System.out.println(message);
}
}
- 編碼器與解碼器方法源於父類 ByteToMessageCodec,通過該類可以自定義編碼器與解碼器, 泛型型別為被編碼與被解碼的類。此處使用了自定義類 Message,代表訊息
public class MessageCodec extends ByteToMessageCodec<Message>
-
編碼器負責將附加資訊與正文資訊寫入到 ByteBuf 中,其中附加資訊總位元組數最好為 2n,不足需要補齊。正文內容如果為物件,需要通過序列化將其放入到 ByteBuf 中
-
解碼器負責將 ByteBuf 中的資訊取出,並放入 List 中,該 List 用於將資訊傳遞給下一個 handler
編寫測試類
public class TestCodec {
static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel();
// 新增解碼器,避免粘包半包問題
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
channel.pipeline().addLast(new MessageCodec());
LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");
// 測試編碼與解碼
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, user, byteBuf);
channel.writeInbound(byteBuf);
}
}
- 測試類中用到了 LengthFieldBasedFrameDecoder,避免粘包半包問題
- 通過 MessageCodec 的 encode 方法將附加資訊與正文寫入到 ByteBuf 中,通過 channel 執行入站操作。入站時會呼叫 decode 方法進行解碼
執行結果
@Sharable 註解
為了提高 handler 的複用率,可以將 handler 建立為 handler 物件,然後在不同的 channel 中使用該 handler 物件進行處理操作
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一個handler物件,提高複用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
但是並不是所有的 handler 都能通過這種方法來提高複用率的,例如 LengthFieldBasedFrameDecoder。如果多個 channel 中使用同一個 LengthFieldBasedFrameDecoder 物件,則可能發生如下問題
-
channel1 中收到了一個半包,LengthFieldBasedFrameDecoder 發現不是一條完整的資料,則沒有繼續向下傳播
-
此時 channel2 中也收到了一個半包,因為兩個 channel 使用了同一個 LengthFieldBasedFrameDecoder,存入其中的資料剛好拼湊成了一個完整的資料包。LengthFieldBasedFrameDecoder 讓該資料包繼續向下傳播,最終引發錯誤
為了提高 handler 的複用率,同時又避免出現一些併發問題,Netty 中原生的 handler 中用 @Sharable 註解來標明,該 handler 能否在多個 channel 中進行共享。
只有帶有該註解,才能通過物件的方式被共享,否則無法被共享
自定義編解碼器能否使用 @Sharable 註解
這需要根據自定義的 handler 的處理邏輯進行分析
我們的 MessageCodec 本身接收的是 LengthFieldBasedFrameDecoder 處理之後的資料,那麼資料肯定是完整的,按分析來說是可以新增 @Sharable 註解的
但是實際情況我們並不能新增該註解,會丟擲異常資訊 ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared
-
因為 MessageCodec 繼承自 ByteToMessageCodec,ByteToMessageCodec 類的註解如下
這就意味著 ByteToMessageCodec 不能被多個 channel 所共享的
- 原因:因為該類的目標是:將 ByteBuf 轉化為 Message,意味著傳進該 handler 的資料還未被處理過。所以傳過來的 ByteBuf 可能並不是完整的資料,如果共享則會出現問題
如果想要共享,需要怎麼辦呢?
繼承 MessageToMessageDecoder 即可。 該類的目標是:將已經被處理的完整資料再次被處理。傳過來的 Message 如果是被處理過的完整資料,那麼被共享也就不會出現問題了,也就可以使用 @Sharable 註解了。實現方式與 ByteToMessageCodec 類似
@ChannelHandler.Sharable
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
...
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
...
}
}
本文由
傳智教育博學谷
教研團隊釋出。如果本文對您有幫助,歡迎
關注
和點贊
;如果您有任何建議也可留言評論
或私信
,您的支援是我堅持創作的動力。轉載請註明出處!
- 大哥,這是併發不是並行,Are You Ok?
- 為啥要重學Tomcat?| 博學谷狂野架構師
- 這是一篇純講SQL語句優化的文章!!!| 博學谷狂野架構師
- 捲起來!!!看了這篇文章我才知道MySQL事務&MVCC到底是啥?
- 為什麼99%的程式設計師都做不好SQL優化?
- 如何搞定MySQL鎖(全域性鎖、表級鎖、行級鎖)?這篇文章告訴你答案!太TMD詳細了!!!
- 【建議收藏】超詳細的Canal入門,看這篇就夠了!!!
- 從菜鳥程式設計師到高階架構師,竟然是因為這個字final
- 為什麼95%的Java程式設計師,都是用不好Synchronized?
- 99%的Java程式設計師者,都敗給這一個字!
- 8000 字,就說一個字Volatile
- 98%的程式設計師,都沒有研究過JVM重排序和順序一致性
- 來一波騷操作,Java記憶體模型
- 時隔多年,這次我終於把動態代理的原始碼翻了個地兒朝天
- 再有人問你分散式事務,把這篇文章砸過去給他
- 大哥,這是併發不是並行,Are You Ok?
- 我是如何用CAP和BASE兩個基礎理論卷死其他組員的?
- Java裡面為什麼搞了雙重檢查鎖,寫完這篇文章終於真相大白了
- 分享會上狂吹MySQL的4大索引結構
- 為什麼大家都說 SELECT * 效率低?