面試必備的執行緒池知識-執行緒池的原理

語言: CN / TW / HK

@TOC

前言

上一篇我們介紹了執行緒池的使用,這一篇我們接著分析下執行緒池的實現原理。首先從建立執行緒池的核心類ThreadPoolExecutor類說起。

ThreadPoolExecutor類的常量

    //用來存放工作執行緒數量和執行緒池狀態
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;  //32-3=29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
	//執行狀態,可以執行任務
    private static final int RUNNING    = -1 << COUNT_BITS;
	//不能接受新任務,但是可以執行完正在執行任務
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
	//不能接受新任務,也不能執行已有的任務
    private static final int STOP       =  1 << COUNT_BITS;
	//所有任務都終止,工作執行緒數歸零
    private static final int TIDYING    =  2 << COUNT_BITS;
	//終止狀態執行完成
    private static final int TERMINATED =  3 << COUNT_BITS;
		
	//獲取執行緒池的狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
	//獲取工作執行緒的數量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl 變數主要是為了把工作執行緒數量和執行緒池狀態放在一個整型變數儲存而設定的一個原子型別的變數。在ctl中,低位的29位表示工作執行緒的數量,高位用來表示RUNNING,SHUTDOWN,STOP等執行緒池狀態。上面定義的三個方法只是為了計算得到執行緒池的狀態和工作執行緒的數量以及得到ctl。 下面是一段執行緒池的測試程式碼,定義執行緒池,並呼叫execute方法新增任務,並執行任務。

public class ExectorTest {
    public static void main(String[] args) {
	   //給執行緒設定一個自定義名稱
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("測試執行緒-%d").build();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                3,
                6,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(5),
                threadFactory
//                , new ThreadPoolExecutor.CallerRunsPolicy()
        );
        for (int i=0;i<20;i++) {
            executorService.execute(()->{
			   //模擬耗時的任務
                System.out.println(Thread.currentThread().getName()+" 開始執行任務");
                int j = 10000 * 10000;
                while (j >0) {
                    j--;
                }

                System.out.println(Thread.currentThread().getName()+" 執行結束");

            });
        }
    }
}

利用debug模式得到的除錯棧如下:

在這裡插入圖片描述 提交任務execute方法是整個執行緒池的執行入口,下面我就從它開始分析。

execute方法

    public void execute(Runnable command) {
	//如果任務為空,則丟擲NPE異常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
		 //獲取執行緒池狀態
        int c = ctl.get();
		//1.如果工作執行緒的數量小於核心執行緒數
        if (workerCountOf(c) < corePoolSize) {
			//呼叫addWorker增加一個新執行緒,並執行一個任務
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
		//如果執行緒池的狀態是執行狀態,並且任務加入到了工作佇列成功
        if (isRunning(c) && workQueue.offer(command)) {
			//雙重檢查,再次檢查執行緒池的狀態。
            int recheck = ctl.get();
			//如果執行緒池的狀態不是執行狀態並且移除任務成功則呼叫拒絕策略
            if (! isRunning(recheck) && remove(command))
            	//呼叫RejectedExecutionHandler.rejectedExecution()方法。根據不同的拒絕策略去處理
			     reject(command);
			//如果工作執行緒的數量為0,說明工作佇列中可能有任務沒有執行緒執行,此時則新建一個執行緒來執行任務,由於執行的是佇列中已經堆積的任務,所以沒有傳入具體的任務。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		//如果前面的新增work,放入佇列都失敗,則會繼續新增worker,此時執行緒池中的工作執行緒數達到corePoolSize,阻塞佇列任務已滿,只能基於maximumPoolSize來繼續增加work,如果還是失敗
        else if (!addWorker(command, false))
		   //如果還是失敗,則呼叫RejectedExecutionHandler.rejectedExecution()方法。根據不同的拒絕策略去處理
            reject(command);
    }

從上程式碼中,我們可以總結出execute方法主要有如下三個流程

  1. 如果執行緒池中當前工作執行緒數小於核心執行緒數(corePoolSize),則建立一個新執行緒來執行傳入的任務(執行這一步驟需要獲取全域性鎖)
  2. 如果工作執行緒數大於等於核心執行緒數,並且執行緒池是執行狀態,則將傳入的任務加入到工作佇列(BlockingQueue)中。
  3. 如果無法將任務加入BlockingQueue(佇列已滿),則建立新的執行緒來處理任務(執行這一步驟需要獲取全域性鎖)
  4. 如果建立新執行緒將使得當前執行的執行緒超出maximumPoolSize,任務將被拒絕,並呼叫RejectedExecutionHandler.rejectedExecution()方法。根據不同的拒絕策略去處理 執行的流程圖如下: 在這裡插入圖片描述execute()方法可以看到新增執行緒並且執行任務核心邏輯在addWorker方法中。

addWorker的方法

首先第一段程式碼,這段程式碼有兩個死迴圈,外層的死迴圈主要是檢查執行緒池的狀態,更新執行緒池的狀態。內層的死迴圈,是檢查工作執行緒的數量,並且通過CAS的方式在ctl中更新工作執行緒的數量。

 for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //檢查執行緒池的狀態是否是執行狀態,並且佇列不為空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
				//獲取工作執行緒數
                int wc = workerCountOf(c);
				//工作執行緒數
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
					//通過CAS的方式來在ctl中增加工作執行緒的數量
                if (compareAndIncrementWorkerCount(c))
                    break retry;
				//再次獲取狀態
                c = ctl.get();  // Re-read ctl
				//如果狀態更新失敗,則迴圈更新
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

分析完了前置的一些檢查工作的程式碼,接下來,來看下主流程的程式碼:

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
		   //1. 新建一個工作執行緒,Work後面會說
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
				//加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
					//再次獲取執行緒池的狀態,在新建執行緒或者釋放鎖時,都會重新檢查。
                    int rs = runStateOf(ctl.get());
					//如果執行緒池的狀態不是關閉狀態,則進入下面的分支
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
						//檢查新建的執行緒是否是可執行狀態
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
						//將工作執行緒新增到HashSet型別的集合中
                        workers.add(w);
                        int s = workers.size();
						//如果工作執行緒的集合數大於largestPoolSize
                        if (s > largestPoolSize)
                            largestPoolSize = s;
						//新建工作執行緒成功之後,將操作標誌workerAdded設為true,表示新增工作執行緒成功,後續流程用
                        workerAdded = true;
                    }
                } finally {
					//釋放鎖
                    mainLock.unlock();
                }
				//如果新建工作執行緒成功,則呼叫start() 方法啟動執行緒
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
			//如果workerStarted為false,表示新建工作執行緒失敗
            if (! workerStarted)
				//移除已經建立的工作執行緒
                addWorkerFailed(w);
        }
        return workerStarted;

如上,該主流程的程式碼邏輯也是比較清晰的,首先是新建一個工作執行緒,然後就是在同步程式碼塊中檢查執行緒池的狀態,如果不是SHUTDOWN狀態,則將新增的執行緒放在HashSet型別執行緒的集合中,放入成功之後,將建立work的標識workerAdded改成true,然後釋放鎖。接著就是呼叫start()方法使得執行緒可以執行任務。接下來就來看看Worker的結構

Work 類

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
  Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;   //傳入的任務
            this.thread = getThreadFactory().newThread(this);  //建立一個新執行緒
        }
	   public void run() {
            runWorker(this);
        }

	}

Worker類是ThreadPoolExecutor類的一個私有內部不變類,其實現了Runnable介面,內部的run()方法裡面呼叫的runWorker()方法。所以,任務的最終執行時通過runWorker()方法的。 在介紹runWorker()之前,我們先看看建立執行緒的邏輯。

ThreadFactoryBuilder類

按照前面呼叫棧我們接著分析下ThreadFactoryBuilder。ThreadFactoryBuilder類用於生成ThreadFactory並且設定一些引數,比如執行緒名,執行緒的等級,是否是後臺執行緒等資訊。這裡設定資訊用到了建造者模式。程式碼如下:

 public ThreadFactory build() {
    return build(this);
  }

  private static ThreadFactory build(ThreadFactoryBuilder builder) {
    final String nameFormat = builder.nameFormat;
    final Boolean daemon = builder.daemon;
    final Integer priority = builder.priority;
    final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
	//沒有指定ThreadFactory實現類的話預設就是Executors.defaultThreadFactory()
    final ThreadFactory backingThreadFactory =
        (builder.backingThreadFactory != null)
            ? builder.backingThreadFactory
            : Executors.defaultThreadFactory();
    final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
   //匿名內部類
   return new ThreadFactory() {
      @Override
      public Thread newThread(Runnable runnable) {
	    //呼叫 backingThreadFactory.newThread得到生成的工作執行緒
        Thread thread = backingThreadFactory.newThread(runnable);
		//重置執行緒名
        if (nameFormat != null) {
          thread.setName(format(nameFormat, count.getAndIncrement()));
        }
		//重置是否是後臺執行緒
        if (daemon != null) {
          thread.setDaemon(daemon);
        }
		//重置執行緒的等級
        if (priority != null) {
          thread.setPriority(priority);
        }
        if (uncaughtExceptionHandler != null) {
          thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
        }
        return thread;
      }
    };
  }

這個類的邏輯比較簡單,主要是兩步

  1. 獲取ThreadFactory的具體實現類
  2. 呼叫ThreadFactory的newThread方法,並重置執行緒名資訊。 接下來我們看看Executors類的內部靜態類DefaultThreadFactory類。

DefaultThreadFactory類

 static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
		    //直接new一個工作執行緒
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            //是否是後臺執行緒
			if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

上面DefaultThreadFactory的程式碼比較簡單,就是new一個工作執行緒並設定工作執行緒的預設名。說完了建立工作執行緒的邏輯,接下來,我們來看看執行任務的runWorker方法的邏輯。

runWorker

  final void runWorker(Worker w) {
  //獲取當前執行緒
        Thread wt = Thread.currentThread();
		//獲取當前的任務
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
			//一般情況下,task都不會為空(特殊情況上面註釋就是前面execute方法說的)或者可以從工作佇列中取到任務,會直接進入迴圈體中。
            while (task != null || (task = getTask()) != null) {
			   //加鎖
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
				//如果執行緒池正在停止,確保執行緒是被中斷,
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
			 //該方法是個空的實現,如果有需要使用者可以自己繼承該類進行實現
                    beforeExecute(wt, task);
                    Throwable thrown = null;
						//呼叫任務的run方法,真正的任務執行邏輯
                        task.run();
                 	  .....省略部分程式碼
					  finally {
					  //該方法是個空的實現,如果有需要使用者可以自己繼承該類進行實現
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
		        //當指定任務執行完成,阻塞佇列中也取不到可執行任務時,會進入這裡,做一些善後工作,比如在corePoolSize跟maximumPoolSize之間的woker會進行回收
            processWorkerExit(w, completedAbruptly);
        }
    }

work執行緒的執行流程就是首先執行初始化分配給的任務,執行完成之後會嘗試從阻塞中獲取可執行的任務,如果指定時間內仍然沒有任務可以執行,則進入銷燬邏輯,這裡只會回收corePoolSize與maxmumPoolSize之間的那部分worker。

getTask方法

這裡getTask方法的實現更我們構造引數設定存活時間有關,我們都知道構造引數設定的時間代表了執行緒池中的執行緒,即worker執行緒的存活時間,如果到期則回收worker執行緒,這個邏輯的實現就在getTask中。來不及執行的任務,執行緒池會放入一個阻塞佇列(工作佇列),getTask方法就是去工作佇列中取任務,使用者設定的存活時間,就是從這個阻塞佇列中取任務等待的最大時間,如果getTask返回null,意思就是worker等待了指定時間仍然沒有取到任務,此時就會跳過迴圈體,進入worker執行緒銷燬邏輯。

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //執行緒池如果是SHUTDOWN或者STOP狀態,則將work移除。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 對於allowCoreThreadTimeOut為true(設定了核心執行緒的存活時間),或者是在corePoolSize與maxmumPoolSize之間的那部分worker
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
				//如果timed為true,則需要在keepAliveTime時間內取任務,否則沒有存活時間的限制
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

總結

本文對執行緒池新增任務,執行任務的原始碼做了重點解析,內部用到了很多設計模式,比如建立執行緒用到了工廠模式,設定執行緒的屬性用到了建造者模式。同時還用到了鎖等知識。瞭解其實現原理對我們更好的使用執行緒池大有好處。

參考

Java執行緒池總結 面試題|關於Java執行緒池一篇文章就夠了

全網同名【碼農飛哥】。不積跬步,無以至千里,享受分享的快樂 我是碼農飛哥,再次感謝您讀完本文