背景
之前在工作中遇到一個問題,我定義了一個執行緒池來執行任務,程式執行結束後任務沒有全部執行完。
業務場景是這樣的:由於統計業務需要,訂單資訊需要從主庫中經過統計業務程式碼寫入統計庫。由於程式碼質量及歷史原因,目前的重新統計介面是單執行緒的,粗略算了算一共有100萬條訂單資訊,每100條的處理大約是10秒,所以理論上處理完全部資訊需要28個小時,這還不算因為 mysql 中 limit 分頁導致的後期查詢時間以及可能出現的記憶體溢位導致中止統計的情況。
基於上述的原因,以及最重要的一點:統計業務是根據訂單所屬的中心進行的,各個中心同時統計不會導致髒資料。所以,我計劃使用執行緒池,為每一箇中心分配一條執行緒去執行統計業務。
業務實現
// 執行緒工廠,用於為執行緒池中的每條執行緒命名
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("stats-pool-%d").build();
// 建立執行緒池,使用有界阻塞佇列防止記憶體溢位
ExecutorService statsThreadPool = new ThreadPoolExecutor(5, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100), namedThreadFactory);
// 遍歷所有中心,為每一個centerId提交一條任務到執行緒池
statsThreadPool.submit(new StatsJob(centerId));
複製程式碼
在建立完執行緒池後,為每一個 centerId 提交一條任務到執行緒池。在我的預想中,由於執行緒池的核心執行緒數為5,最多5箇中心同時進行統計業務,將大大縮短100萬條資料的總統計時間,於是萬分興奮的我開始執行重新統計業務了。
問題
在跑了很久之後,當我檢視統計進度時,我發現了一個十分詭異的問題(如下圖)。藍框標出的這條執行緒是 WAIT 狀態,表明這條執行緒是空閒狀態,但是從日誌中我看到這條執行緒並沒有完成它的任務,因為這個中心的資料有10萬條,但是日誌顯示它只跑到了一半,之後就再無關於此中心的日誌了。
這是什麼原因?
除錯及原因
可以想到的是,這條執行緒因為某些原因被阻塞了,並且沒有繼續進行下去,但是日誌又沒有任何異常資訊...
可能有經驗的工程師已經知道了原因...
由於個人水平的執行緒,暫時沒有找到原因的我只能放棄使用執行緒池,乖乖用單執行緒跑...
幸運的是,單執行緒跑的任務竟然拋錯了(為什麼要說幸運?),於是馬上想到,之前那條 WAIT 狀態的執行緒可能是因為同樣的拋錯所以被中斷了,導致任務沒有繼續進行下去。
為什麼說幸運?因為如果單執行緒的任務沒有拋錯的話,我可能很久都想不到是這個原因。
深入探究執行緒池的異常處理
工作上的問題到這裡就找到原因了,之後的解決過程也十分簡單,這裡就不提了。
但是疑問又來了,為什麼使用執行緒池的時候,執行緒因異常被中斷卻沒有丟擲任何資訊呢?還有平時如果是在 main 函式裡面的異常也會被丟擲來,而不是像執行緒池這樣被吞掉。
如果子執行緒丟擲了異常,執行緒池會如何進行處理呢?
我提交任務到執行緒池的方式是:
threadPoolExecutor.submit(Runnbale task);
,後面瞭解到使用 execute() 方式提交任務會把異常日誌給打出來,這裡研究一下為什麼使用 submit 提交任務,在任務中的異常會被“吞掉”。
對於 submit() 形式提交的任務,我們直接看原始碼:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 被包裝成 RunnableFuture 物件,然後準備新增到工作佇列
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
複製程式碼
它會被執行緒池包裝成 RunnableFuture 物件,而最終它其實是一個 FutureTask 物件,在被新增到執行緒池的工作佇列,然後呼叫 start() 方法後, FutureTask 物件的 run() 方法開始執行,即本任務開始執行。
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this,runnerOffset,null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
// 捕獲子任務中的異常
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
複製程式碼
在 FutureTask 物件的 run() 方法中,該任務丟擲的異常被捕獲,然後在setException(ex); 方法中,丟擲的異常會被放到 outcome 物件中,這個物件就是 submit() 方法會返回的 FutureTask 物件執行 get() 方法得到的結果。但是線上程池中,並沒有獲取執行子執行緒的結果,所以異常也就沒有被丟擲來,即被“吞掉”了。
這就是執行緒池的 submit() 方法提交任務沒有異常丟擲的原因。
執行緒池自定義異常處理方法
在定義 ThreadFactory 的時候呼叫setUncaughtExceptionHandler
方法,自定義異常處理方法。
例如:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("judge-pool-%d")
.setUncaughtExceptionHandler((thread, throwable)-> logger.error("ThreadPool {} got exception", thread,throwable))
.build();
複製程式碼
這樣,對於執行緒池中每條執行緒丟擲的異常都會打下 error 日誌,就不會看不到了。
後續
在修復了單個執行緒任務的異常之後,我繼續使用執行緒池進行重新統計業務,終於跑完了,也終於完成了這個任務。
小結:使用執行緒池時需要注意,子執行緒的異常,如果沒有被捕獲就會丟失,可能會導致後期根據日誌除錯時無法找到原因。