深入剖析Netty之NioEventLoop尋根究底

語言: CN / TW / HK

Github地址:http://github.com/lhj502819/netty/tree/v502819-main,示例程式碼在example模組中

系列文章

在上篇文章中我們對NioEventLoop的父類進行了詳細分析,今天我們就來拆解拆解這位老大哥吧,NioEventLoop較為複雜,需要耐心的思考和閱讀,首先我們來回顧下前邊講過的相關內容,只有將這些串聯起來後才便於我們理解,還是先看下這張類圖,左側為NioEventLoop相關類:
在這裡插入圖片描述
NioEventLoop集成了很多類的功能,其實NioEventLoop在我看來主要還是對整個執行流程的把控,細節上對任務的執行和功能實現都交給了父類去執行,比如執行任務就是在SingleThreadEventExecutor中實現的,NioEventLoop主要是用來對時機的把控,何時執行任務,以及什麼情況下需要對Selector進行重建,包括對事件處理時間的控制等,說了這麼多大家可能一臉懵,別急,等看到下邊的分析你就懂了。

主要功能概覽

在這裡插入圖片描述
NioEventLoop中維護了一個執行緒,執行緒啟動時會呼叫NioEventLoop的run方法,執行I/O任務和非I/O任務。

  • I/O任務: 指的是accept、connect、read、write等
  • 非I/O任務: 新增到taskQueue中的任務,如register0,bind0等任務

每一個功能點的實現都比較複雜,接下來我們逐一擊破,Go Go Go~~~~

從何而來

首先在講解NioEventLoop之前,我們先要知道這哥們兒是咋來的對不對,通過檢視構造方法的呼叫處可以看到只有一個引用處,就是NioEventLoopGroup#newChild,哎,這NioEventLoopGroup不是我們啟動Netty服務時用的嗎,NioEventLoopGroup我們已經在前邊的文章講解過了,主要就是用來對Executor(NioEventLoop)進行封裝,不瞭解的大家可以先去了解下,EventLoopGroup到底是個啥?,重點關注下MultithreadEventExecutorGroupNioEventLoopGroup。​

這裡我把程式碼也貼過來,很簡單,就不過多描述了:

```java protected EventLoop newChild(Executor executor, Object... args) throws Exception { SelectorProvider selectorProvider = (SelectorProvider) args[0]; SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; EventLoopTaskQueueFactory taskQueueFactory = null; EventLoopTaskQueueFactory tailTaskQueueFactory = null;

int argsLength = args.length;
if (argsLength > 3) {
    taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
    tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
        selectStrategyFactory.newSelectStrategy(),
        rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);

} ```

初始化解析

成員變數

```java * 是否禁用SelectionKey的優化,預設為 false */ private static final boolean DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

/ * 少於N值不開啟空輪詢重建新的Selector物件的功能 */ private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3; / * NIO Selector空輪詢N次後,重建新的Selector物件,預設值為512,如果設定小於3則表示不開啟重建 Selector,在{@link #unexpectedSelectorWakeup(int)}處理意外的喚醒時使用 / private static final int SELECTOR_AUTO_REBUILD_THRESHOLD; / * 與{@link SelectStrategy}配合使用 / private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } };

/ * The NIO {@link Selector},經過Netty包裝優化過的 */ private Selector selector; private Selector unwrappedSelector; / * 註冊的SelectionKey集合,Netty自己實現的,對SelectionKey進行了包裝優化 */ private SelectedSelectionKeySet selectedKeys;

/* * 用於建立Selector / private final SelectorProvider provider;

private static final long AWAKE = -1L; private static final long NONE = Long.MAX_VALUE;

// nextWakeupNanos is: // AWAKE when EL is awake // NONE when EL is waiting with no wakeup scheduled // other value T when EL is waiting with wakeup scheduled at time T //下次喚醒的時間,預設為-1,表示已經喚醒,主要是用來執行定時任務時使用的 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE); /* * Select策略,執行任務時使用 / private final SelectStrategy selectStrategy;

/* * 處理Channel的就緒事件,佔處理任務的總時間比例 * 在NioEventLoop中,有三種類型的任務 * 1:Channel的就緒IO事件 * 2:普通任務 * 3:定時任務 * ioRatio表示處理Channel的就緒IO事件佔處理總時間的比例 / private volatile int ioRatio = 50;

/ * 取消SelectionKey的數量 */ private int cancelledKeys; / * 是否需要再次select Selector物件 */ private boolean needsToSelectAgain; ```

構造方法

java /** * @param parent 所屬的EventLoopGroup * @param executor 預設為 {@link ThreadPerTaskExecutor} ,每個任務啟動一個執行緒執行,在{@link MultithreadEventExecutorGroup}初始化時設定 * @param selectorProvider SelectorProvider.provider(),{@link Selector#open()}中就是使用SelectorProvider.provider().openSelect來建立Selector * {@link Selector},在{@link NioEventLoop}中設定 * @param strategy Select的策略,下邊會進行講解, 預設為{@link DefaultSelectStrategyFactory} * @param rejectedExecutionHandler 拒絕策略,預設為丟擲{@link RejectedExecutionException} * @param taskQueueFactory 生成普通任務佇列的Factory,預設為空 * @param tailTaskQueueFactory 生成尾任務佇列的Factory ,預設為空 */ NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); //建立Selector final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; //獲取未包裝的Selector this.unwrappedSelector = selectorTuple.unwrappedSelector; }

優化點:Selector優化

在構造方法中,我們看到建立Selector返回的是SelectorTuple,此類是一個包裝類,對Selector進行了包裝,JDK NIO的SelectionKeySet是用HashSet儲存的,HashSet底層使用的HashMap,put的時間複雜度為O(logn),Netty使用陣列對儲存方式進行了改變,陣列add操作的時間複雜度降為O(1),可以看到Netty的優化是非常細節的,並且通過自定義的SelectedSelectionKeySetSelector對其進行了組合封裝,具體是如何優化的看下原始碼便知,主要是使用的反射技術進行替換。

```java private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } /* * 判斷是否關閉SelectionKeySet優化,預設是false,不關閉 / if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); }

/**
 * 建立class sun.nio.ch.SelectorImpl位元組碼
 */
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    @Override
    public Object run() {
        try {
            return Class.forName(
                    "sun.nio.ch.SelectorImpl",
                    false,
                    PlatformDependent.getSystemClassLoader());
        } catch (Throwable cause) {
            return cause;
        }
    }
});

if (!(maybeSelectorImplClass instanceof Class) ||
    // ensure the current selector implementation is what we can instrument.
    !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
    if (maybeSelectorImplClass instanceof Throwable) {
        Throwable t = (Throwable) maybeSelectorImplClass;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
    }
    //如果獲取SelectorImpl位元組碼失敗,則返回一個SelectorTuple未包裝的原生selector
    return new SelectorTuple(unwrappedSelector);
}

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
/**
 * Netty對SelectionKeySet的優化,SelectedSelectionKeySet基於陣列實現
 */
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    @Override
    public Object run() {
        try {
            /**
             * 通過反射替換Selector的成員變數selectedKeys(Set)為Netty優化後的陣列實現{@link SelectedSelectionKeySet}
             * 使用陣列實現,add操作的時間複雜度降為O(1),而Set底層使用的HashMap,put的時間複雜度為O(logn),這也是Netty的一個優化點
             */
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            //對Java9的適配
            if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                // This allows us to also do this in Java9+ without any extra flags.
                long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                long publicSelectedKeysFieldOffset =
                        PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                    PlatformDependent.putObject(
                            unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                    PlatformDependent.putObject(
                            unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                    return null;
                }
                // We could not retrieve the offset, lets try reflection as last-resort.
            }


            //設定兩個欄位可訪問
            Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
            if (cause != null) {
                return cause;
            }
            cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
            if (cause != null) {
                return cause;
            }

            /**
             * 將Selector的SelectionKeySet替換為Netty自己的
             */
            selectedKeysField.set(unwrappedSelector, selectedKeySet);
            publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
            return null;
        } catch (NoSuchFieldException e) {
            return e;
        } catch (IllegalAccessException e) {
            return e;
        }
    }
});

if (maybeException instanceof Exception) {
    selectedKeys = null;
    Exception e = (Exception) maybeException;
    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
    return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
                         new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));

} ```

SelectedSelectionKeySetSelector

```java / * 經過優化的Selector,對原生Selector進行了組合 */ final class SelectedSelectionKeySetSelector extends Selector { / * 自定義的SelectionKeySet實現,我們需要知道此處的selectionKeys與{@link #delegate#selectionKeys}是同一個 / private final SelectedSelectionKeySet selectionKeys; / * 組合的原生的Selector{@link NioEventLoop#unwrappedSelector} / private final Selector delegate;

SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
    this.delegate = delegate;
    this.selectionKeys = selectionKeys;
}

..........省略部分程式碼...........

/**
 * 在{@link NioEventLoop#run()}會呼叫
 */
@Override
public int select() throws IOException {
    selectionKeys.reset();
    return delegate.select();
}

@Override
public Selector wakeup() {
    return delegate.wakeup();
}

@Override
public void close() throws IOException {
    delegate.close();
}

} ```

SelectedSelectionKeySet

使用陣列實現,add操作的時間複雜度降為O(1),而Set底層使用的HashMap,put的時間複雜度為O(logn)

```java final class SelectedSelectionKeySet extends AbstractSet {

SelectionKey[] keys;
int size;

SelectedSelectionKeySet() {
    keys = new SelectionKey[1024];
}

@Override
public boolean add(SelectionKey o) {
    if (o == null) {
        return false;
    }

    keys[size++] = o;
    if (size == keys.length) {
        increaseCapacity();
    }

    return true;
}
@Override
public Iterator<SelectionKey> iterator() {
    return new Iterator<SelectionKey>() {
        private int idx;

        @Override
        public boolean hasNext() {
            return idx < size;
        }

        @Override
        public SelectionKey next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            return keys[idx++];
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    };
}

private void increaseCapacity() {
    SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
    System.arraycopy(keys, 0, newKeys, 0, size);
    keys = newKeys;
}

} ```

run方法,執行任務

run方法為NioEventLoop中最重要的方法,那NioEventLoop中執行的任務是哪些任務呢?通過IDEA可以看到呼叫處為SingleThreadEventExecutor#doStartThread[SingleThreadEventExecutor]上篇文章中我們詳細將結果,這裡不再過多闡述,通過Debug可以看到doStartThread會在#execute中呼叫,#execute方法又是新增非同步任務的入口,那#run方法大概率就是執行這些新增的任務嘍,到底是不是呢,我們跟蹤下原始碼便知。

```java protected void run() { //Select計數 int selectCnt = 0; for (;;) { try { int strategy; try { //計算Select的策略 <1> strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue;

            case SelectStrategy.BUSY_WAIT:
                // fall-through to SELECT since the busy-wait is not supported with NIO

            case SelectStrategy.SELECT:
                //當沒有普通任務時,返回定時任務最近一次要執行的時間,如果有沒有定時任務則返回-1
                long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                if (curDeadlineNanos == -1L) {
                    //如果沒有定時任務,則將最近執行時間設定為Integer的最大值
                    curDeadlineNanos = NONE; // nothing on the calendar
                }
                //設定下一次的喚醒時間
                nextWakeupNanos.set(curDeadlineNanos);
                try {
                    if (!hasTasks()) {
                        //select看是否有新增的感興趣的事件
                        strategy = select(curDeadlineNanos);
                    }
                } finally {
                    // This update is just to help block unnecessary selector wakeups
                    // so use of lazySet is ok (no race condition)
                    //延遲設定執行緒的喚醒時間阻塞不必要的Select喚醒
                    nextWakeupNanos.lazySet(AWAKE);
                }
                // fall through
            default:
            }
        } catch (IOException e) {
            // If we receive an IOException here its because the Selector is messed up. Let's rebuild
            // the selector and retry. http://github.com/netty/netty/issues/8566
            //重建Selector
            rebuildSelector0();
            //重置計數
            selectCnt = 0;
            handleLoopException(e);
            continue;
        }

        selectCnt++;
        cancelledKeys = 0;
        needsToSelectAgain = false;
        final int ioRatio = this.ioRatio;
        boolean ranTasks;
        if (ioRatio == 100) {
            try {
                if (strategy > 0) {
                    //如果有新增的感興趣的事件,則處理
                    processSelectedKeys();
                }
            } finally {
                // Ensure we always run tasks.
                //所有的時間都用來處理IO事件,包括普通任務和定時任務,不限制時間
                ranTasks = runAllTasks();
            }
        } else if (strategy > 0) {
            //記錄當前時間
            final long ioStartTime = System.nanoTime();
            try {
                //處理Channel的就緒事件
                processSelectedKeys();
            } finally {
                // Ensure we always run tasks.
                //計算用來處理IO事件的時間,包括普通任務和定時任務,限制時間
                //以處理Channel的就緒事件所需時間為基準計算執行所有任務需要的時間
                final long ioTime = System.nanoTime() - ioStartTime;
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        } else {
            ranTasks = runAllTasks(0); // This will run the minimum number of tasks
        }
        //如果有任務執行過了或者有任務待執行,則重置select計數
        if (ranTasks || strategy > 0) {
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                        selectCnt - 1, selector);
            }
            //有新增的事件,或者任務執行過,則將空輪詢次數置0
            selectCnt = 0;
        } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
            //針對意外喚醒,重置計數
            selectCnt = 0;
        }
    } catch (CancelledKeyException e) {
        // Harmless exception - log anyway
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                    selector, e);
        }
    } catch (Error e) {
        throw e;
    } catch (Throwable t) {
        handleLoopException(t);
    } finally {
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                //如果EventLoop狀態是正在關閉、已關閉、已終止,則執行關閉邏輯,關閉Channel和Selector的繫結,關閉Channel
                closeAll();
                //確認是否可以關閉了
                if (confirmShutdown()) {
                    //退出NioEventLoop執行緒迴圈
                    return;
                }
            }
        } catch (Error e) {
            throw e;
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

} ```

整個run方法的執行流程如下:判斷是否有任務需要執行(taskQueue + 定時任務)或者有感興趣的事件

  1. 有新增的感興趣的事件則先處理事件
  2. 有任務需要執行則先執行任務
  3. 判斷是否要shutDown

以上三個步驟會迴圈執行。​

這裡插入一個點,在run方法中,我們看到NioEventLoop呼叫JDK NIO底層的select方法檢視是否有感興趣的事件,在服務端剛剛啟動時,感興趣的事件肯定是客戶端的連線(ACCEPT)時間,那這個感興趣的事件是如何設定的呢?大家是否還記得在服務端建立原始碼分析中,在Channel註冊後最終會呼叫AbstractNioChannel#doBeginRead到方法,此處就會將感興趣的事件設定為OP_ACCEPT。​

SelectStrategy是Select的策略介面,其中:

```java / * Indicates a blocking select should follow. * 表示使用阻塞Select的策略 */ int SELECT = -1; / * Indicates the IO loop should be retried, no blocking select to follow directly. * 表示需要進行重試的策略,目前沒有使用 / int CONTINUE = -2; / * Indicates the IO loop to poll for new events without blocking. * 目前沒有使用 * / int BUSY_WAIT = -3;

/* * The {@link SelectStrategy} can be used to steer the outcome of a potential select * call. * * @param selectSupplier The supplier with the result of a select result. * @param hasTasks true if tasks are waiting to be processed. * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if * the next step should be to not select but rather jump back to the IO loop and try * again. Any value >= 0 is treated as an indicator that work needs to be done. / int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception; ```

預設使用的是DefaultSelectStrategy,大家看註釋就好,

```java final class DefaultSelectStrategy implements SelectStrategy { static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

private DefaultSelectStrategy() { }

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    //如果有任務,則返回Channel新增的感興趣的IO事件數量
    //如果沒有任務,則返回阻塞Select的策略
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

}

```

重點內容

我們繼續看run方法,主要的邏輯我都已經打好了對應的註釋,大家看註釋就好,我們重點說下這幾個內容:

  • IO任務處理時間比例控制:ioRatio
  • 處理新增感興趣事件
  • 執行任務佇列任務
  • 重建Selector解決JDK空輪詢的bug:rebuildSelector0
  • IO任務處理事件比例:ioRatio
  • shutDown優雅關閉處理

IO任務處理時間任務比例控制

```java if (ioRatio == 100) { try { if (strategy > 0) { //如果有新增的感興趣的事件,則處理 processSelectedKeys(); } } finally { // Ensure we always run tasks. //所有的時間都用來處理IO事件,包括普通任務和定時任務,不限制時間 ranTasks = runAllTasks(); } } else if (strategy > 0) {//如果有新增的感興趣的事件 //記錄當前時間 final long ioStartTime = System.nanoTime(); try { //處理Channel的就緒事件 processSelectedKeys(); } finally { // Ensure we always run tasks. //計算用來處理IO事件的時間,包括普通任務和定時任務,限制時間 //以處理Channel的就緒事件所花時間為基準計算執行所有任務需要的時間 final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //如果沒有新增的感興趣的事件,則執行所有的任務 ranTasks = runAllTasks(0); // This will run the minimum number of tasks }

```

  • ioRatio等於100時,則不會對執行任務限制時間,如果有新增的感興趣的時間,則全力處理感興趣的事件,如果有待執行的任務,則全力執行任務
  • ioRatio小於100時
  • 當有新增的感興趣的事件,則先處理感興趣的事件,處理完事件後,通過處理事件所花的時間計算執行所有的任務最大的時間
  • 當沒有新增的感興趣的事件,則執行所有的任務,這裡參為0,表示執行最少的任務

處理新增感興趣事件

```java private void processSelectedKeys() { //判斷是否使用的優化過的SelectionKey if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }

```

processSelectedKeysOptimized

處理優化過的SelectionKeySet

```java ate void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See http://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null;

    final Object a = k.attachment();
    //這裡取出來了附加資訊,並且判斷附加資訊是否為AbstractNioChannel,為什麼會有這種可能呢?
    // 我們在解服務端建立的原始碼分析文章中分析註冊的流程時,AbstractNioChannel#doRegister在將Channel註冊到Selector上時,將自己作為附加資訊傳了進去
    //  selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    //因此這裡是true,至於NioTask用的不多我們就不進行分析了
    if (a instanceof AbstractNioChannel) {
        processSelectedKey(k, (AbstractNioChannel) a);
    } else {
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        processSelectedKey(k, task);
    }
    <2>
    if (needsToSelectAgain) {
        // null out entries in the array to allow to have it GC'ed once the Channel close
        // See http://github.com/netty/netty/issues/2363
        selectedKeys.reset(i + 1);

        selectAgain();
        i = -1;
    }
}

}

```

<2>處判斷是否需要再次Select,預設為false,通過IDEA工具檢視到needsToSelectAgain在#cancel方法中會設定為true

```java void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } }

```

#cancel方法會在AbstractNioChannel#doDeregister中被呼叫,doDeregister和doRegister相反,那就是將Channel和EventLoop解綁,這裡每解綁一個cancelledKeys就會自增1,當cancelledKeys大於等於設定的閾值256時則將needsToSelectAgain設定為true,當needsToSelectAgain=true時會執行清理操作,將SelectionKeySet清空,然後再呼叫selectAgain重新select一遍,將剩餘的SelectionKey再填到SelectionKeySet中,這裡主要是為了解決當Channel斷開後,而在服務端的SelectionKey佔用還在導致的記憶體不能回收問題。

```java if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See http://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);

selectAgain();
i = -1;

}

```

processSelectedKey

針對不同的事件做不同的處理,事件就是JDK NIO的那些事件,SelectionKey#OP_ACCEPT、SelectionKey#OP_CONNECT、SelectionKey#OP_READ、_SelectionKey#_OP_WRITE

```java private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //這裡的服務端的Unsafe類是class io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe,我們也在服務端建立原始碼分析中講解過 final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See http://github.com/netty/netty/issues/5125 if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; }

try {
    //當前事件的操作型別,這裡客戶端第一次建立連線時為OP_ACCEPT
    int readyOps = k.readyOps();
    // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
    // the NIO JDK channel implementation may throw a NotYetConnectedException.
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        //OP_CONNECT 連線成功事件
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See http://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
        //觸發完成連線操作,這裡會觸發連線成功事件,Handler將會接收到事件通知進行處理
        unsafe.finishConnect();
    }

    // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // OP_WRITE 事件就緒
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        //向Channel寫入資料,,這裡會觸發連線成功事件,Handler將會接收到事件通知進行處理
        ch.unsafe().forceFlush();
    }

    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    // to a spin loop
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        //SelectionKey.OP_READ 或者 SelectionKey.OP_ACCEPT事件,readyOps=0是用來處理JDK Selector的空輪bug
        //這裡會觸發連線成功事件,Handler將會接收到事件通知進行處理
        unsafe.read();
    }
} catch (CancelledKeyException ignored) {
    unsafe.close(unsafe.voidPromise());
}

}

```

processSelectedKeysPlain

處理未使用優化過的SelectionKeySet,最終也會呼叫#processSelectedKey,不再過多分析

執行任務佇列任務runAllTasks

runAllTasks的內容我在上篇文章中已經講解過,大家跳過去看下哈

重建Selector解決JDK 的空輪詢bug

unexpectedSelectorWakeup

主要工作是判斷是否達到了重建Selector的標準

```java /* * 針對意外的喚醒,JDK的空輪訓BUG,沒有事件發生也會立即返回,此方法主要是為了解決這個BUG * 如果已經達到了重建Selector的閾值,則會進行重建Selector,返回true,將select計數重置 / private boolean unexpectedSelectorWakeup(int selectCnt) { if (Thread.interrupted()) { //如果執行緒被打斷 // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See http://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } //判斷是否達到了重建Selector的閾值 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. // Selector連續多次提前返回 logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); return true; } return false; }

```

rebuildSelector

```java /* * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. * 重建Selector物件來解決JDK epoll的100% CPU的bug,其實Netty並沒有解決JDK NIO這個問題,只是進行了規避 / public void rebuildSelector() { if (!inEventLoop()) { //執行一個事件 execute(new Runnable() { @Override public void run() { rebuildSelector0(); } }); return; } rebuildSelector0(); }

```

rebuildSelector0

核心思想就是建立一個新的Selector,將原來註冊的Channel全部都註冊到新的Selector上

```java /* * 建立一個新的Selector,將之前註冊到老的selector上的Channel重新轉移到新的Selector上,並將老的Selector關閉 / private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;

if (oldSelector == null) {
    return;
}

try {
    //建立新的Selector
    newSelectorTuple = openSelector();
} catch (Exception e) {
    logger.warn("Failed to create a new Selector.", e);
    return;
}

// Register all channels to the new Selector.
int nChannels = 0;
//遍歷所有的SelectionKey,將Channel重新註冊到新的Selector上
for (SelectionKey key: oldSelector.keys()) {
    Object a = key.attachment();
    try {
        if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
            continue;
        }

        int interestOps = key.interestOps();
        //取消老Key
        key.cancel();
        //將Channel註冊到新的Selector
        SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
        if (a instanceof AbstractNioChannel) {
            // Update SelectionKey
            // 修改Channel的SelectionKey為新的SelectionKy
            ((AbstractNioChannel) a).selectionKey = newKey;
        }
        nChannels ++;
    } catch (Exception e) {
        logger.warn("Failed to re-register a Channel to the new Selector.", e);
        if (a instanceof AbstractNioChannel) {
            AbstractNioChannel ch = (AbstractNioChannel) a;
            ch.unsafe().close(ch.unsafe().voidPromise());
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            invokeChannelUnregistered(task, key, e);
        }
    }
}
//修改NioEventLoop的Selector為新的Selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;

}

```

shutdown優雅關閉處理

NioEventLoop在啟動後會不斷的判斷是否達到了關閉條件state >= _ST_SHUTTING_DOWN_,達到條件則先關閉所有的Channel連線,隨後會判斷是不是滿足條件(#confirmShutdown在上篇文章中我們已經對其講解過,主要是配合優雅關閉的安靜期和最大等待時間進行計算)退出。

```java if (isShuttingDown()) { //如果EventLoop狀態是正在關閉、已關閉、已終止,則執行關閉邏輯,關閉Channel和Selector的繫結,關閉Channel closeAll(); //確認是否可以關閉了 if (confirmShutdown()) { //退出NioEventLoop執行緒迴圈 return; } }

```

總結

今天我們對NioEventLoop做了詳細的介紹,從功能概覽到初始化,再到任務執行,篇幅較長,如果大家耐心的讀完了肯定會有所收穫的。

我是壹氿,感謝各位小夥伴點贊、收藏和評論,文章持續更新,我們下期再見!