RocketMQ源码系列(3) — 基于Netty的网络通信
theme: cyanosis
对 Netty 网络通信不熟悉的可以先看下 Netty 系列
RocketMQ 网络通信
远程通信模块
RocketMQ 中 NameServer 与 Broker、Broker 与 Client 间网络通信相关代码集中在 rocketmq-remoting
模块下,模块主要包含基于 Netty 封装的服务器、客户端、网络通信协议等。这篇文章就来分析下 RocketMQ 是如何基于 Netty 来实现网络通信的,以后在开发类似的Client/Server通信时可以依样画葫芦。
因为是基于 Netty 的封装,从模块的类命名很容易理解其中的一些核心组件:
-
NettyRemotingServer:Netty 服务器
-
NettyRemotingClient:Netty 客户端
-
NettyRequestProcessor:Netty 网络请求处理器
-
NettyEncoder/NettyDecoder:编码器/解码器
-
NettyServerConfig/NettyClientConfig:Netty 服务端/客户端配置
-
RemotingCommand:通信协议封装
-
RocketMQSerializable:序列化
远程服务器
NamesrvController 中初始化创建了远程服务器 RemotingServer
,实现类为 NettyRemotingServer
,并注册了默认的请求处理器 DefaultRequestProcessor
,从这可以看出 RocketMQ 是基于 Netty 进行RPC网络通信的,这两个组件就是 NameServer 处理客户端请求的核心所在。
```java // 创建 Netty 远程通信服务器,就是初始化 ServerBootstrap this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 业务处理线程池,默认线程数 8 个 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册默认处理器和线程池 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); ```
从类结构图来看,Netty 服务端为 NettyRemotingServer
,客户端为 NettyRemotingClient
,他们都继承自 NettyRemotingAbstract
抽象基类,实现了 RemotingService
接口,NettyRemotingServer 还实现了 RemotingServer
接口。
RemotingService 主要提供了远程服务的启动、停止、注册RPC钩子函数三个接口。
java
public interface RemotingService {
// 启动
void start();
// 关闭
void shutdown();
// 注册钩子函数
void registerRPCHook(RPCHook rpcHook);
}
RemotingServer 继承自 RemotingService,主要提供了注册请求处理器、接口调用等基础接口。
从接口定义可以看出,NamesrvController 中向 RemotingServer 注册了默认的请求处理器,还支持针对某个请求注册特定的处理器和线程池,这样可以针对不同的业务场景使用不同的处理器和线程池。
从 invokeXxx 方法可以看出,请求的执行支持三种模式:
- invokeSync:同步调用,同步等待请求的结果,直到超时
- invokeAsync:异步调用,注册一个回调,请求完成后执行回调
- invokeOneway:直接发送一个请求而不关心响应,没有回调
```java public interface RemotingServer extends RemotingService { // 针对某个请求,注册处理器以及线程池 void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor);
// 针对所有请求注册默认处理器
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
// 返回本地监听的端口号
int localListenPort();
// 根据请求码获取对应的处理器和线程池
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
// 同步调用
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis);
// 异步调用,并注册一个执行回调
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback);
// 仅发送一个请求,不关心响应
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis);
} ```
网络通信抽象
RocketMQ 抽象了一个 Netty 远程通信抽象基类 NettyRemotingAbstract
,它对 Netty 服务器程序进行了封装,负责请求和响应的处理分发、执行,下面就先来看看这个基类的功能。
NettyRemotingAbstract
1、成员属性
NettyRemotingAbstract 有如下属性:
- semaphoreOneway:
Oneway
请求类型的信号量,用来限制 Oneway 请求类型的并发度 - semaphoreAsync:
Async
请求类型的信号量,用来限制 Async 请求类型的并发度 - responseTable:发起请求之后,将等待请求以及回调等操作封装到
ResponseFuture
中,在响应回来之后可以接着进行后续操作。responseTable 就存储了请求编码与 ResponseFuture 的关系。 - processorTable:这个表存储了请求编码与之对应的处理器和线程池
- nettyEventExecutor:Netty 事件执行器
- defaultRequestProcessor:请求默认处理器,如果 processorTable 没有特定的处理器,就使用这个默认处理器。
- sslContext:SSL 上下文。
- rpcHooks:RPC 回调钩子函数。
注意这些成员属性都是 protected
范围的,也就是子类可见,有些组件的设置是在子类进行的,比如注册处理器和线程池,注册RPC钩子函数、加载 SslContext 等。
NettyRemotingAbstract 默认构造方法需传入 Oneway 和 Async 请求类型的许可证数量,用来构造 semaphoreOneway 和 semaphoreAsync 两个信号量。
```java package org.apache.rocketmq.remoting.netty;
public abstract class NettyRemotingAbstract {
// 限制正在进行的 OneWay 请求数,保护系统内存占用
protected final Semaphore semaphoreOneway;
// 限制正在进行的异步请求数,保护系统内存占用
protected final Semaphore semaphoreAsync;
// 缓存所有正在进行的请求,等待请求的响应
protected final ConcurrentMap
/**
* @param permitsOneway one-way 请求的许可证数量.
* @param permitsAsync 异步请求的许可证数量.
*/
public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
// 创建两个信号量
this.semaphoreOneway = new Semaphore(permitsOneway, true);
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
} ```
2、抽象方法
NettyRemotingAbstract 有两个抽象方法:
- getChannelEventListener:子类返回一个 Channel 的事件监听器
- getCallbackExecutor:子类返回一个回调执行的线程池
```java public abstract class NettyRemotingAbstract {
// 抽象方法:获取网络连接事件监听器
public abstract ChannelEventListener getChannelEventListener();
// 抽象方法:获取回调的线程池
public abstract ExecutorService getCallbackExecutor();
} ```
3、处理消息接收
processMessageReceived 方法处理消息请求和响应,入参有两个:
- ChannelHandlerContext:就是 Netty 中的连接上下文,通过它可以拿到当前的 Channel、触发管道读写事件、写回响应数据等。
- RemotingCommand:请求数据的封装,是 RocketMQ 中的通信协议。其在 Netty 网络通道中传输时,会进行序列化、反序列化以及编解码。
通过请求的类型来看,有 REQUEST_COMMAND
、RESPONSE_COMMAND
两种,就是请求和响应。
java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND: // 处理请求
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND: // 处理响应
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
RemotingCommand
RemotingCommand 是 RocketMQ 请求、响应的对象载体,可以理解成是 RocketMQ 定义的网络通信协议。在发送请求时,会根据当前请求和数据构建一个 RemotingCommand 对象,然后经过序列化、编码成字节码,再经过 Netty 的 Channel 发送出去,另一端则从 Channel 读取字节码,经过解码、反序列化成 RemotingCommand。
RemotingCommand 主要包含如下属性:
- code:RocketMQ 中每个请求都会有一个对应的编码,请求码的枚举定义在
RequestCode
中。 - opaque:每个请求的ID,通过内存的一个原子计数器 requestId 自增。
- flag:类型标识,有 请求、响应、Oneway 三种类型
- extFields:扩展字段
- body:请求主体数据的字节数组
可以通过它的静态方法 createRequestCommand 和 createResponseCommand 来分别创建请求和响应类型的 RemotingCommand。
```java public class RemotingCommand { private static AtomicInteger requestId = new AtomicInteger(0);
// 请求编号
private int code;
// 编程语言
private LanguageCode language = LanguageCode.JAVA;
// 版本号
private int version = 0;
// 请求ID
private int opaque = requestId.getAndIncrement();
// 类型
private int flag = 0;
// 备注
private String remark;
// 扩展字段
private HashMap<String, String> extFields;
// 自定义header
private transient CommandCustomHeader customHeader;
// 请求的消息体序列化成字节
private transient byte[] body;
// 创建请求对象
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code); // 请求编码
cmd.customHeader = customHeader; // 请定义请求头
setCmdVersion(cmd);
return cmd;
}
// 创建响应
public static RemotingCommand createResponseCommand(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.markResponseType(); // 设置为响应类型(默认为请求类型)
cmd.setCode(code); // 请求编码
cmd.setRemark(remark);
setCmdVersion(cmd);
// 设置响应头
if (classHeader != null) {
CommandCustomHeader objectHeader = classHeader.getDeclaredConstructor().newInstance();
cmd.customHeader = objectHeader;
}
return cmd;
}
} ```
消息请求处理
1、主体流程
在接收到客户端的请求后,将由 processRequestCommand 来处理请求,主要流程如下:
- 从处理器表
processorTable
获取请求编码对应的处理器和线程池,没有则使用默认的处理器和线程池 - 接着将请求和响应的处理封装成一个
Runnable
- 判断是否拒绝请求,是的就返回系统繁忙的编码
SYSTEM_BUSY
,将响应写入 Channel。从这可以看出,将数据响应回客户端是通过ctx.writeAndFlush()
来完成 - 然后将前面封装的 Runnable,以及 ctx、cmd 封装成一个 RequestTask
- 最后将 RequestTask 提交到线程池里去执行,如果线程池满了被拒绝请求,则响应系统繁忙。
```java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 支持不同的请求设置不同的处理器和线程池
final Pair
// 封装 Runnable
Runnable run = new Runnable() {...};
// 是否拒绝请求
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
// 封装一个请求任务
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 提交到线程池异步执行
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
// 请求拒绝
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} ```
2、请求处理
请求处理的主要逻辑封装在 Runnable
中,主要逻辑如下:
- 读取 Channel 远程通信地址
- 执行前置 RPC 钩子函数
- 封装一个响应回调
RemotingResponseCallback
,在处理完请求后将调用这个回调。这个回调会先执行后置 RPC 钩子函数,然后对非 Oneway 类型请求,将响应通过 Channel 写回调用者。 - 最后使用处理器来处理请求,如果是异步处理器则异步处理请求;如果是同步处理器,则同步处理请求,然后同步执行回调。
从这里可以看出,请求执行完成后响应客户端是在 RemotingResponseCallback 回调中完成的,对于 Oneway 类型的请求,由于只执行请求不关心响应,因此不会响应调用者。
而在请求前或请求完成后,想要对请求做一些定制化可以注册自定义 RPCHook 钩子函数来完成。
java
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 获取通道远程Broker地址
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
// 请求之前执行RPC钩子
doBeforeRpcHooks(remoteAddr, cmd);
// 封装一个响应回调
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
// 请求之后执行PRC钩子
doAfterRpcHooks(remoteAddr, cmd, response);
// 非 Oneway 请求
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque); // 设置请求ID
response.markResponseType(); // 标记为响应类型
response.setSerializeTypeCurrentRPC(cmd.getSerializeTypeCurrentRPC());
// 写回响应
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
// 异步处理器,并注册回调
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
// 同步处理再执行回调
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
//...
}
}
};
消息响应处理
1、响应处理
在接收服务端的响应后,将由 processResponseCommand 来处理响应,主要流程如下:
- 从响应表 responseTable 中获取对应
ResponseFuture
,这是在执行请求时存起来的,然后将其从 responseTable 表中移除。 - 如果是异步调用,一般会设置一个执行回调,这时就会去执行回调函数
- 如果是同步调用,就会调用 putResponse 通知等待方响应回来了。
```java public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { // 请求ID final int opaque = cmd.getOpaque(); // 获取响应 Future final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); // 移除 responseTable.remove(opaque);
// 异步调用
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
}
// 同步调用
else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
} ```
请求调用方式
NettyRemotingAbstract 提供了三个执行请求的方法,可以实现同步调用、异步调用和 Oneway 请求,客户端封装好 RemotingCommand 后,就可以调用这三个方法发起远程调用。
1、同步调用
同步调用的主要流程如下:
- 将当前请求ID、Channel 封装一个
ResponseFuture
,然后放入响应表responseTable
中。 - 将请求数据写入 Channel,并添加一个 Channel 监听器,在请求完成后设置 ResponseFuture 的状态
- 然后调用
responseFuture.waitResponse()
开始等待响应,直到响应回来或者超时,最后从 responseTable 表中移除当前请求
同步执行就是在主线程中同步调用服务端接口,然后同步等待响应结果直到超时。
```java // 同步调用 public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) { // 请求ID final int opaque = request.getOpaque(); try { // 封装响应 Future final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 暂存到响应表 this.responseTable.put(opaque, responseFuture);
// 把数据写回客户端,并添加一个监听器
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
// 请求完成后的事件监听
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
// 请求成功
responseFuture.setSendRequestOK(true);
return;
} else {
// 请求失败
responseFuture.setSendRequestOK(false);
}
// 请求失败
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
}
});
// 同步等待响应直到完成或超时
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
final SocketAddress addr = channel.remoteAddress();
if (null == responseCommand) {
// 请求超时
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause());
} else {
// 请求发送异常
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
// 移除 ResponseFuture
this.responseTable.remove(opaque);
}
} ```
2、异步执行
异步执行的流程如下:
- 先通过异步信号量
semaphoreAsync
获取到一个许可证,如果并发太高获取不到信号量,就可能超时 - 拿到信号量许可证后,封装一个信号量释放器
SemaphoreReleaseOnlyOnce
,它主要的作用是后面释放这个许可证 - 接着将 Channel、请求ID、执行回调、信号量释放器等封装到
ResponseFuture
中,然后放到响应表responseTable
中。 - 之后就是数据发送到服务端,然后注册一个监听器,在请求完成后设置请求状态。
- 请求完成后,最后释放资源,其实就是在释放信号量的许可证。
异步执行会通过一个信号量来控制异步执行的并发度(默认64),异步执行有回调,它被封装到 ResponseFuture,响应回来后会进入 processResponseCommand 中处理,在里面就会调用 responseFuture.executeInvokeCallback()
来执行这个回调。
```java public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) { final int opaque = request.getOpaque(); // 请求ID // 获取信号量(默认64个信号量) boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { // 封装信号量释放 final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
// 封装响应 Future,请求回来后再执行回调
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
// 暂存到响应表
this.responseTable.put(opaque, responseFuture);
try {
// 向服务端写数据
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) { // 请求成功
responseFuture.setSendRequestOK(true);
return;
}
// 请求失败
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release(); // 释放信号量
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// ...
throw new RemotingTimeoutException(info);
}
} ```
3、Oneway 执行
Oneway 就是发起一起请求,只管发送,不需要响应,不需要执行回调。流程也比较简单:
- 将请求标记为 Oneway
- 通过Oneway信号量
semaphoreOneway
获取到一个许可证,封装信号量释放器 - 向 Channel 写入数据,发送到服务端
- 最后释放信号量许可证
java
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
// 标记为 oneway 请求
request.markOnewayRPC();
// 获取 oneway 信号量
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
// 封装信号量释放
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
// 写入数据
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// ....
throw new RemotingTimeoutException(info);
}
}
基于信号量的资源控制
一般如果想要控制并发、资源隔离,可以通过线程池或者信号量来实现,NettyRemotingAbstract 中使用信号量来控制异步请求和Oneway请求的并发。
同步执行 invokeAsyncImpl
是在主线程中执行并同步等待响应结果,因此主线程的线程数就是最大的并发度,无需控制并发。
1、异步执行信号量
异步执行 invokeAsyncImpl
由于是异步等待执行结果,要保存 ResponseFuture 到响应表 responseTable 中。为了避免内存资源占用过大,使用了一个信号量 semaphoreAsync
来控制并发度,信号量许可证数量默认为 64。
java
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
为了释放这个许可证,RocketMQ 将信号量封装到 SemaphoreReleaseOnlyOnce
中,它会保证只会释放一个许可证给信号量,这个是通过原子类 AtomicBoolean 来控制的。
```java public class SemaphoreReleaseOnlyOnce { private final AtomicBoolean released = new AtomicBoolean(false); private final Semaphore semaphore;
public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void release() {
if (this.semaphore != null) {
if (this.released.compareAndSet(false, true)) {
this.semaphore.release();
}
}
}
} ```
SemaphoreReleaseOnlyOnce 会放入 ResponseFuture 中,在 processResponseCommand 中处理响应时,就会去释放这个许可证。
```java responseFuture.release();
// 释放许可证 public void release() { if (this.once != null) { this.once.release(); } } ```
从上面的流程可以看出,异步请求的信号量并发控制是从发起请求,直到响应回来之后才会释放许可证。
2、Oneway执行信号量
Oneway 执行 invokeOnewayImpl
是发送请求后就不关注响应结果,也不需要保存 ResponseFuture,照理来说无需控制并发度,不过 Oneway 请求也通过一个信号量 semaphoreOneway
来控制并发度,信号量许可证数量默认是 256
,这个并发度比异步执行的高很多。
java
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
Oneway执行也会将创建一个 SemaphoreReleaseOnlyOnce
,不过它的释放是在发送请求完成后就释放。
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
once.release(); // 释放许可证
}
});
等待-通知机制
同步执行或异步执行会将当前 Channel、请求ID 等封装创建一个 ResponseFuture
,然后将其放入响应表 responseTable
中,在响应回来之后再取出来做后续的处理。
1、ResponseFuture
ResponseFuture 主要有请求ID、Channel、执行回调等属性,其使用 CountDownLatch 来实现等待-通知的效果。
```java public class ResponseFuture { // 请求ID private final int opaque; // 请求的网络通道 private final Channel processChannel; // 超时时间 private final long timeoutMillis; // 回调函数 private final InvokeCallback invokeCallback; // 开始时间 private final long beginTimestamp = System.currentTimeMillis(); // 计数器 private final CountDownLatch countDownLatch = new CountDownLatch(1);
// 支持 Semaphore 仅释放一次的组件
private final SemaphoreReleaseOnlyOnce once;
// 仅执行回调的标识
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
// 请求命令
private volatile RemotingCommand responseCommand;
// 请求是否发送成功
private volatile boolean sendRequestOK = true;
// RPC 请求一次
private volatile Throwable cause;
} ```
2、同步等待-通知
同步 invokeSyncImpl 执行时,创建好 ResponseFuture 放入 responseTable 中,然后就调用 responseFuture.waitResponse 开始等待响应结果。
java
// 封装响应 Future
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
// 暂存到响应表
this.responseTable.put(opaque, responseFuture);
//...
// 同步等待响应直到完成或超时
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
可以看到它是通过 CountDownLatch 来等待,CountDownLatch 的计数为 1,只要另一个地方调用了 countDown 这边就会收到通知,然后返回 responseCommand。
java
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
而这个通知的操作就是在 processResponseCommand 中,可以看到响应回来之后,会从 responseTable 中取出 ResponseFuture,并设置 RemotingCommand,对于同步调用,就会调用 ResponseFuture 的 putResponse 和 release 方法。
```java public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { // 请求ID final int opaque = cmd.getOpaque(); // 获取响应 Future final ResponseFuture responseFuture = responseTable.get(opaque); // 设置 RemotingCommand responseFuture.setResponseCommand(cmd); // 移除 ResponseFuture responseTable.remove(opaque);
// 同步执行没有回调,异步要执行回调
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
}
// 同步执行,响应回来,放入响应
else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} ```
可以看到,putResponse 内就是调用 countDownLatch.countDown() 通知等待结束。
java
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
// 响应回来后计数器减 1(减为 0)
this.countDownLatch.countDown();
}
从上面分析就可以知道 RocketMQ 使用 CountDownLatch 来实现同步调用中的等待-通知机制。
3、异步回调
异步 invokeAsyncImpl 执行时,创建好 ResponseFuture 并放入 responseTable 中,注意异步调用时会传入一个 InvokeCallback
执行回调,会一并放入 ResponseFuture 中。
java
// 封装响应 Future,请求回来后再执行回调
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
// 暂存到响应表
this.responseTable.put(opaque, responseFuture);
响应回来后,在 processResponseCommand 中就会执行这个回调,如果子类 getCallbackExecutor()
能返回线程池,将使用线程池异步执行回调,执行完后再释放资源;如果没有线程池,或者异步执行错误,将在主线程同步执行回调,然后释放信号量。
```java private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor(); if (executor != null) { try { // 有回调线程池则异步执行回调 executor.submit(new Runnable() { @Override public void run() { try { responseFuture.executeInvokeCallback(); } finally { responseFuture.release(); } } }); } catch (Exception e) { // 线程池执行报错在主线程执行 runInThisThread = true; } } else { runInThisThread = true; }
// 没有线程池则同步回调
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} finally {
responseFuture.release();
}
}
} ```
网络通信流程
通过前面的分析,几乎已经对 RocketMQ 的通信处理机制有个大概的认识了,下面通过一张图来总结下。