深入剖析Netty之NioEventLoop尋根究底
Github地址:http://github.com/lhj502819/netty/tree/v502819-main,示例程式碼在example模組中
系列文章
- 你知道都有哪些I/O模型嗎?
- Java NIO三大角色Channel、Buffer、Selector
- Doug lea《Scalable IO in Java》翻譯
- Reactor模型你知道都有哪些嗎?
- Netty服務端建立原始碼流程解析
- EventLoopGroup到底是個啥?
- 深入剖析Netty之EventLoop刨根問底
- 未完待續..
在上篇文章中我們對NioEventLoop的父類進行了詳細分析,今天我們就來拆解拆解這位老大哥吧,NioEventLoop較為複雜,需要耐心的思考和閱讀,首先我們來回顧下前邊講過的相關內容,只有將這些串聯起來後才便於我們理解,還是先看下這張類圖,左側為NioEventLoop相關類:
NioEventLoop集成了很多類的功能,其實NioEventLoop在我看來主要還是對整個執行流程的把控,細節上對任務的執行和功能實現都交給了父類去執行,比如執行任務就是在SingleThreadEventExecutor
中實現的,NioEventLoop主要是用來對時機的把控,何時執行任務,以及什麼情況下需要對Selector進行重建,包括對事件處理時間的控制等,說了這麼多大家可能一臉懵,別急,等看到下邊的分析你就懂了。
主要功能概覽
NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務。
- I/O任務: 指的是accept、connect、read、write等
- 非I/O任務: 新增到taskQueue中的任務,如register0,bind0等任務
每一個功能點的實現都比較複雜,接下來我們逐一擊破,Go Go Go~~~~
從何而來
首先在講解NioEventLoop之前,我們先要知道這哥們兒是咋來的對不對,通過檢視構造方法的呼叫處可以看到只有一個引用處,就是NioEventLoopGroup#newChild
,哎,這NioEventLoopGroup
不是我們啟動Netty服務時用的嗎,NioEventLoopGroup
我們已經在前邊的文章講解過了,主要就是用來對Executor(NioEventLoop)
進行封裝,不瞭解的大家可以先去了解下,EventLoopGroup到底是個啥?,重點關注下MultithreadEventExecutorGroup
和NioEventLoopGroup
。
這裡我把程式碼也貼過來,很簡單,就不過多描述了:
```java protected EventLoop newChild(Executor executor, Object... args) throws Exception { SelectorProvider selectorProvider = (SelectorProvider) args[0]; SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; EventLoopTaskQueueFactory taskQueueFactory = null; EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
} ```
初始化解析
成員變數
```java * 是否禁用SelectionKey的優化,預設為 false */ private static final boolean DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
/ * 少於N值不開啟空輪詢重建新的Selector物件的功能 */ private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3; / * NIO Selector空輪詢N次後,重建新的Selector物件,預設值為512,如果設定小於3則表示不開啟重建 Selector,在{@link #unexpectedSelectorWakeup(int)}處理意外的喚醒時使用 / private static final int SELECTOR_AUTO_REBUILD_THRESHOLD; / * 與{@link SelectStrategy}配合使用 / private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } };
/ * The NIO {@link Selector},經過Netty包裝優化過的 */ private Selector selector; private Selector unwrappedSelector; / * 註冊的SelectionKey集合,Netty自己實現的,對SelectionKey進行了包裝優化 */ private SelectedSelectionKeySet selectedKeys;
/* * 用於建立Selector / private final SelectorProvider provider;
private static final long AWAKE = -1L; private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is: // AWAKE when EL is awake // NONE when EL is waiting with no wakeup scheduled // other value T when EL is waiting with wakeup scheduled at time T //下次喚醒的時間,預設為-1,表示已經喚醒,主要是用來執行定時任務時使用的 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE); /* * Select策略,執行任務時使用 / private final SelectStrategy selectStrategy;
/* * 處理Channel的就緒事件,佔處理任務的總時間比例 * 在NioEventLoop中,有三種類型的任務 * 1:Channel的就緒IO事件 * 2:普通任務 * 3:定時任務 * ioRatio表示處理Channel的就緒IO事件佔處理總時間的比例 / private volatile int ioRatio = 50;
/ * 取消SelectionKey的數量 */ private int cancelledKeys; / * 是否需要再次select Selector物件 */ private boolean needsToSelectAgain; ```
構造方法
java
/**
* @param parent 所屬的EventLoopGroup
* @param executor 預設為 {@link ThreadPerTaskExecutor} ,每個任務啟動一個執行緒執行,在{@link MultithreadEventExecutorGroup}初始化時設定
* @param selectorProvider SelectorProvider.provider(),{@link Selector#open()}中就是使用SelectorProvider.provider().openSelect來建立Selector
* {@link Selector},在{@link NioEventLoop}中設定
* @param strategy Select的策略,下邊會進行講解, 預設為{@link DefaultSelectStrategyFactory}
* @param rejectedExecutionHandler 拒絕策略,預設為丟擲{@link RejectedExecutionException}
* @param taskQueueFactory 生成普通任務佇列的Factory,預設為空
* @param tailTaskQueueFactory 生成尾任務佇列的Factory ,預設為空
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
//建立Selector
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
//獲取未包裝的Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
優化點:Selector優化
在構造方法中,我們看到建立Selector返回的是SelectorTuple
,此類是一個包裝類,對Selector進行了包裝,JDK NIO的SelectionKeySet是用HashSet儲存的,HashSet底層使用的HashMap,put的時間複雜度為O(logn),Netty使用陣列對儲存方式進行了改變,陣列add操作的時間複雜度降為O(1),可以看到Netty的優化是非常細節的,並且通過自定義的SelectedSelectionKeySetSelector
對其進行了組合封裝,具體是如何優化的看下原始碼便知,主要是使用的反射技術進行替換。
```java private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } /* * 判斷是否關閉SelectionKeySet優化,預設是false,不關閉 / if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); }
/**
* 建立class sun.nio.ch.SelectorImpl位元組碼
*/
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
//如果獲取SelectorImpl位元組碼失敗,則返回一個SelectorTuple未包裝的原生selector
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
/**
* Netty對SelectionKeySet的優化,SelectedSelectionKeySet基於陣列實現
*/
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
/**
* 通過反射替換Selector的成員變數selectedKeys(Set)為Netty優化後的陣列實現{@link SelectedSelectionKeySet}
* 使用陣列實現,add操作的時間複雜度降為O(1),而Set底層使用的HashMap,put的時間複雜度為O(logn),這也是Netty的一個優化點
*/
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
//對Java9的適配
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
//設定兩個欄位可訪問
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
/**
* 將Selector的SelectionKeySet替換為Netty自己的
*/
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
} ```
SelectedSelectionKeySetSelector
```java / * 經過優化的Selector,對原生Selector進行了組合 */ final class SelectedSelectionKeySetSelector extends Selector { / * 自定義的SelectionKeySet實現,我們需要知道此處的selectionKeys與{@link #delegate#selectionKeys}是同一個 / private final SelectedSelectionKeySet selectionKeys; / * 組合的原生的Selector{@link NioEventLoop#unwrappedSelector} / private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
..........省略部分程式碼...........
/**
* 在{@link NioEventLoop#run()}會呼叫
*/
@Override
public int select() throws IOException {
selectionKeys.reset();
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
} ```
SelectedSelectionKeySet
使用陣列實現,add操作的時間複雜度降為O(1),而Set底層使用的HashMap,put的時間複雜度為O(logn)
```java
final class SelectedSelectionKeySet extends AbstractSet
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
} ```
run方法,執行任務
run方法為NioEventLoop中最重要的方法,那NioEventLoop中執行的任務是哪些任務呢?通過IDEA可以看到呼叫處為SingleThreadEventExecutor#doStartThread
,[SingleThreadEventExecutor]
在上篇文章中我們詳細將結果,這裡不再過多闡述,通過Debug可以看到doStartThread會在#execute
中呼叫,#execute
方法又是新增非同步任務的入口,那#run
方法大概率就是執行這些新增的任務嘍,到底是不是呢,我們跟蹤下原始碼便知。
```java protected void run() { //Select計數 int selectCnt = 0; for (;;) { try { int strategy; try { //計算Select的策略 <1> strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
//當沒有普通任務時,返回定時任務最近一次要執行的時間,如果有沒有定時任務則返回-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
//如果沒有定時任務,則將最近執行時間設定為Integer的最大值
curDeadlineNanos = NONE; // nothing on the calendar
}
//設定下一次的喚醒時間
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
//select看是否有新增的感興趣的事件
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
//延遲設定執行緒的喚醒時間阻塞不必要的Select喚醒
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. http://github.com/netty/netty/issues/8566
//重建Selector
rebuildSelector0();
//重置計數
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
//如果有新增的感興趣的事件,則處理
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
//所有的時間都用來處理IO事件,包括普通任務和定時任務,不限制時間
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//記錄當前時間
final long ioStartTime = System.nanoTime();
try {
//處理Channel的就緒事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
//計算用來處理IO事件的時間,包括普通任務和定時任務,限制時間
//以處理Channel的就緒事件所需時間為基準計算執行所有任務需要的時間
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
//如果有任務執行過了或者有任務待執行,則重置select計數
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//有新增的事件,或者任務執行過,則將空輪詢次數置0
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
//針對意外喚醒,重置計數
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
//如果EventLoop狀態是正在關閉、已關閉、已終止,則執行關閉邏輯,關閉Channel和Selector的繫結,關閉Channel
closeAll();
//確認是否可以關閉了
if (confirmShutdown()) {
//退出NioEventLoop執行緒迴圈
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
} ```
整個run
方法的執行流程如下:判斷是否有任務需要執行(taskQueue + 定時任務)或者有感興趣的事件
- 有新增的感興趣的事件則先處理事件
- 有任務需要執行則先執行任務
- 判斷是否要shutDown
以上三個步驟會迴圈執行。
這裡插入一個點,在run方法中,我們看到NioEventLoop呼叫JDK NIO底層的select方法檢視是否有感興趣的事件,在服務端剛剛啟動時,感興趣的事件肯定是客戶端的連線(ACCEPT)時間,那這個感興趣的事件是如何設定的呢?大家是否還記得在服務端建立原始碼分析中,在Channel註冊後最終會呼叫AbstractNioChannel#doBeginRead
到方法,此處就會將感興趣的事件設定為OP_ACCEPT。
SelectStrategy是Select的策略介面,其中:
```java / * Indicates a blocking select should follow. * 表示使用阻塞Select的策略 */ int SELECT = -1; / * Indicates the IO loop should be retried, no blocking select to follow directly. * 表示需要進行重試的策略,目前沒有使用 / int CONTINUE = -2; / * Indicates the IO loop to poll for new events without blocking. * 目前沒有使用 * / int BUSY_WAIT = -3;
/* * The {@link SelectStrategy} can be used to steer the outcome of a potential select * call. * * @param selectSupplier The supplier with the result of a select result. * @param hasTasks true if tasks are waiting to be processed. * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if * the next step should be to not select but rather jump back to the IO loop and try * again. Any value >= 0 is treated as an indicator that work needs to be done. / int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception; ```
預設使用的是DefaultSelectStrategy
,大家看註釋就好,
```java final class DefaultSelectStrategy implements SelectStrategy { static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
//如果有任務,則返回Channel新增的感興趣的IO事件數量
//如果沒有任務,則返回阻塞Select的策略
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
```
重點內容
我們繼續看run
方法,主要的邏輯我都已經打好了對應的註釋,大家看註釋就好,我們重點說下這幾個內容:
- IO任務處理時間比例控制:ioRatio
- 處理新增感興趣事件
- 執行任務佇列任務
- 重建Selector解決JDK空輪詢的bug:
rebuildSelector0
- IO任務處理事件比例:ioRatio
- shutDown優雅關閉處理
IO任務處理時間任務比例控制
```java if (ioRatio == 100) { try { if (strategy > 0) { //如果有新增的感興趣的事件,則處理 processSelectedKeys(); } } finally { // Ensure we always run tasks. //所有的時間都用來處理IO事件,包括普通任務和定時任務,不限制時間 ranTasks = runAllTasks(); } } else if (strategy > 0) {//如果有新增的感興趣的事件 //記錄當前時間 final long ioStartTime = System.nanoTime(); try { //處理Channel的就緒事件 processSelectedKeys(); } finally { // Ensure we always run tasks. //計算用來處理IO事件的時間,包括普通任務和定時任務,限制時間 //以處理Channel的就緒事件所花時間為基準計算執行所有任務需要的時間 final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //如果沒有新增的感興趣的事件,則執行所有的任務 ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
```
- ioRatio等於100時,則不會對執行任務限制時間,如果有新增的感興趣的時間,則全力處理感興趣的事件,如果有待執行的任務,則全力執行任務
- ioRatio小於100時
- 當有新增的感興趣的事件,則先處理感興趣的事件,處理完事件後,通過處理事件所花的時間計算執行所有的任務最大的時間
- 當沒有新增的感興趣的事件,則執行所有的任務,這裡參為0,表示執行最少的任務
處理新增感興趣事件
```java private void processSelectedKeys() { //判斷是否使用的優化過的SelectionKey if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
```
processSelectedKeysOptimized
處理優化過的SelectionKeySet
```java ate void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See http://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null;
final Object a = k.attachment();
//這裡取出來了附加資訊,並且判斷附加資訊是否為AbstractNioChannel,為什麼會有這種可能呢?
// 我們在解服務端建立的原始碼分析文章中分析註冊的流程時,AbstractNioChannel#doRegister在將Channel註冊到Selector上時,將自己作為附加資訊傳了進去
// selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
//因此這裡是true,至於NioTask用的不多我們就不進行分析了
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
<2>
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See http://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
```
<2>處判斷是否需要再次Select,預設為false,通過IDEA工具檢視到needsToSelectAgain在#cancel方法中會設定為true
```java void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } }
```
#cancel
方法會在AbstractNioChannel#doDeregister
中被呼叫,doDeregister和doRegister相反,那就是將Channel和EventLoop解綁,這裡每解綁一個cancelledKeys
就會自增1,當cancelledKeys
大於等於設定的閾值256時則將needsToSelectAgain
設定為true,當needsToSelectAgain=true
時會執行清理操作,將SelectionKeySet清空,然後再呼叫selectAgain
重新select一遍,將剩餘的SelectionKey再填到SelectionKeySet中,這裡主要是為了解決當Channel斷開後,而在服務端的SelectionKey佔用還在導致的記憶體不能回收問題。
```java if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See http://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
```
processSelectedKey
針對不同的事件做不同的處理,事件就是JDK NIO的那些事件,SelectionKey#OP_ACCEPT、SelectionKey#OP_CONNECT、SelectionKey#OP_READ、_SelectionKey#_OP_WRITE
```java private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //這裡的服務端的Unsafe類是class io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe,我們也在服務端建立原始碼分析中講解過 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See http://github.com/netty/netty/issues/5125 if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; }
try {
//當前事件的操作型別,這裡客戶端第一次建立連線時為OP_ACCEPT
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//OP_CONNECT 連線成功事件
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See http://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//觸發完成連線操作,這裡會觸發連線成功事件,Handler將會接收到事件通知進行處理
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// OP_WRITE 事件就緒
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
//向Channel寫入資料,,這裡會觸發連線成功事件,Handler將會接收到事件通知進行處理
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//SelectionKey.OP_READ 或者 SelectionKey.OP_ACCEPT事件,readyOps=0是用來處理JDK Selector的空輪bug
//這裡會觸發連線成功事件,Handler將會接收到事件通知進行處理
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
```
processSelectedKeysPlain
處理未使用優化過的SelectionKeySet,最終也會呼叫#processSelectedKey
,不再過多分析
執行任務佇列任務runAllTasks
runAllTasks
的內容我在上篇文章中已經講解過,大家跳過去看下哈
重建Selector解決JDK 的空輪詢bug
unexpectedSelectorWakeup
主要工作是判斷是否達到了重建Selector的標準
```java /* * 針對意外的喚醒,JDK的空輪訓BUG,沒有事件發生也會立即返回,此方法主要是為了解決這個BUG * 如果已經達到了重建Selector的閾值,則會進行重建Selector,返回true,將select計數重置 / private boolean unexpectedSelectorWakeup(int selectCnt) { if (Thread.interrupted()) { //如果執行緒被打斷 // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See http://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } //判斷是否達到了重建Selector的閾值 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. // Selector連續多次提前返回 logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); return true; } return false; }
```
rebuildSelector
```java /* * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. * 重建Selector物件來解決JDK epoll的100% CPU的bug,其實Netty並沒有解決JDK NIO這個問題,只是進行了規避 / public void rebuildSelector() { if (!inEventLoop()) { //執行一個事件 execute(new Runnable() { @Override public void run() { rebuildSelector0(); } }); return; } rebuildSelector0(); }
```
rebuildSelector0
核心思想就是建立一個新的Selector,將原來註冊的Channel全部都註冊到新的Selector上
```java /* * 建立一個新的Selector,將之前註冊到老的selector上的Channel重新轉移到新的Selector上,並將老的Selector關閉 / private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//建立新的Selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
//遍歷所有的SelectionKey,將Channel重新註冊到新的Selector上
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
//取消老Key
key.cancel();
//將Channel註冊到新的Selector
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
// 修改Channel的SelectionKey為新的SelectionKy
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
//修改NioEventLoop的Selector為新的Selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
}
```
shutdown優雅關閉處理
NioEventLoop在啟動後會不斷的判斷是否達到了關閉條件state >= _ST_SHUTTING_DOWN_
,達到條件則先關閉所有的Channel連線,隨後會判斷是不是滿足條件(#confirmShutdown
,在上篇文章中我們已經對其講解過,主要是配合優雅關閉的安靜期和最大等待時間進行計算)退出。
```java if (isShuttingDown()) { //如果EventLoop狀態是正在關閉、已關閉、已終止,則執行關閉邏輯,關閉Channel和Selector的繫結,關閉Channel closeAll(); //確認是否可以關閉了 if (confirmShutdown()) { //退出NioEventLoop執行緒迴圈 return; } }
```
總結
今天我們對NioEventLoop
做了詳細的介紹,從功能概覽到初始化,再到任務執行,篇幅較長,如果大家耐心的讀完了肯定會有所收穫的。
我是壹氿,感謝各位小夥伴點贊、收藏和評論,文章持續更新,我們下期再見!