新來個阿里 P7,僅花 2 小時,擼出一個多執行緒永動任務,看完直接跪了,真牛逼!
大家好,我是樓仔!
今天教大家擼一個 Java 的多執行緒永動任務,這個示例的原型是公司自研的多執行緒非同步任務專案,我把裡面涉及到多執行緒的程式碼抽離出來,然後進行一定的改造。
裡面涉及的知識點非常多,特別適合有一定工作經驗的同學學習,或者可以直接拿到專案中使用。
文章結構非常簡單:
1. 功能說明
做這個多執行緒非同步任務,主要是因為我們有很多永動的非同步任務,什麼是永動呢?就是任務跑起來後,需要一直跑下去。
比如訊息 Push 任務,因為一直有訊息過來,所以需要一直去消費 DB 中的未推送訊息,就需要整一個 Push 的永動非同步任務。
我們的需求其實不難,簡單總結一下:
- 能同時執行多個永動的非同步任務;
- 每個非同步任務,支援開多個執行緒去消費這個任務的資料;
- 支援永動非同步任務的優雅關閉,即關閉後,需要把所有的資料消費完畢後,再關閉。
完成上面的需求,需要注意幾個點:
- 每個永動任務,可以開一個執行緒去執行;
- 每個子任務,因為需要支援併發,需要用執行緒池控制;
- 永動任務的關閉,需要通知子任務的併發執行緒,並支援永動任務和併發子任務的優雅關閉。
2. 多執行緒任務示例
2.1 執行緒池
對於子任務,需要支援併發,如果每個併發都開一個執行緒,用完就關閉,對資源消耗太大,所以引入執行緒池:
```
public class TaskProcessUtil {
// 每個任務,都有自己單獨的執行緒池
private static Map
// 初始化一個執行緒池
private static ExecutorService init(String poolName, int poolSize) {
return new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
// 獲取執行緒池 public static ExecutorService getOrInitExecutors(String poolName,int poolSize) { ExecutorService executorService = executors.get(poolName); if (null == executorService) { synchronized (TaskProcessUtil.class) { executorService = executors.get(poolName); if (null == executorService) { executorService = init(poolName, poolSize); executors.put(poolName, executorService); } } } return executorService; }
// 回收執行緒資源 public static void releaseExecutors(String poolName) { ExecutorService executorService = executors.remove(poolName); if (executorService != null) { executorService.shutdown(); } } } ```
這是一個執行緒池的工具類,這裡初始化執行緒池和回收執行緒資源很簡單,我們主要討論獲取執行緒池。
獲取執行緒池可能會存在併發情況,所以需要加一個 synchronized 鎖,然後鎖住後,需要對 executorService 進行二次判空校驗。
2.2 單個任務
為了更好講解單個任務的實現方式,我們的任務主要就是把 Cat 的資料打印出來,Cat 定義如下:
@Data
@Service
public class Cat {
private String catName;
public Cat setCatName(String name) {
this.catName = name;
return this;
}
}
單個任務主要包括以下功能:
- 獲取永動任務資料:這裡一般都是掃描 DB,我直接就簡單用 queryData() 代替。
- 多執行緒執行任務:需要把資料拆分成 4 份,然後分別由多執行緒併發執行,這裡可以通過執行緒池支援;
- 永動任務優雅停機:當外面通知任務需要停機,需要執行完剩餘任務資料,並回收執行緒資源,退出任務;
- 永動執行:如果未收到停機命令,任務需要一直執行下去。
直接看程式碼:
``` public class ChildTask {
private final int POOL_SIZE = 3; // 執行緒池大小 private final int SPLIT_SIZE = 4; // 資料拆分大小 private String taskName;
// 接收jvm關閉訊號,實現優雅停機 protected volatile boolean terminal = false;
public ChildTask(String taskName) { this.taskName = taskName; }
// 程式執行入口
public void doExecute() {
int i = 0;
while(true) {
System.out.println(taskName + ":Cycle-" + i + "-Begin");
// 獲取資料
List
// 優雅停機 public void terminal() { // 關機 terminal = true; System.out.println(taskName + " shut down"); }
// 處理資料
private void doProcessData(List
// 處理單個任務資料
private void taskExecute(List> splitDatas = Lists.partition(sourceDatas, SPLIT_SIZE);
final CountDownLatch latch = new CountDownLatch(splitDatas.size());
// 併發處理拆分的資料,共用一個執行緒池
for (final List
try { latch.await(); } catch (Exception e) { System.out.println(e.getStackTrace()); } }
// 獲取永動任務資料
private List
簡單解釋一下:
- queryData:用於獲取資料,實際應用中其實是需要把 queryData 定為抽象方法,然後由各個任務實現自己的方法。
- doProcessData:資料處理邏輯,實際應用中其實是需要把 doProcessData 定為抽象方法,然後由各個任務實現自己的方法。
- taskExecute:將資料拆分成 4 份,獲取該任務的執行緒池,並交給執行緒池併發執行,然後通過 latch.await() 阻塞。當這 4 份資料都執行成功後,阻塞結束,該方法才返回。
- terminal:僅用於接受停機命令,這裡該變數定義為 volatile,所以多執行緒記憶體可見;
- doExecute:程式執行入口,封裝了每個任務執行的流程,當 terminal=true 時,先執行完任務資料,然後回收執行緒池,最後退出。
2.3 任務入口
直接上程式碼:
public class LoopTask {
private List<ChildTask> childTasks;
public void initLoopTask() {
childTasks = new ArrayList();
childTasks.add(new ChildTask("childTask1"));
childTasks.add(new ChildTask("childTask2"));
for (final ChildTask childTask : childTasks) {
new Thread(new Runnable() {
@Override
public void run() {
childTask.doExecute();
}
}).start();
}
}
public void shutdownLoopTask() {
if (!CollectionUtils.isEmpty(childTasks)) {
for (ChildTask childTask : childTasks) {
childTask.terminal();
}
}
}
public static void main(String args[]) throws Exception{
LoopTask loopTask = new LoopTask();
loopTask.initLoopTask();
Thread.sleep(5000L);
loopTask.shutdownLoopTask();
}
}
每個任務都開一個單獨的 Thread,這裡我初始化了 2 個永動任務,分別為 childTask1 和 childTask2,然後分別執行,後面 Sleep 了 5 秒後,再關閉任務,我們可以看看是否可以按照我們的預期優雅退出。
2.4 結果分析
執行結果如下:
childTask1:Cycle-0-Begin
childTask2:Cycle-0-Begin
childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1
childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2
childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2
childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1
childTask2:Cycle-0-End
childTask2:Cycle-1-Begin
childTask1:Cycle-0-End
childTask1:Cycle-1-Begin
childTask2:Cat(catName=羅小黑0),ThreadName:Pool-childTask2
childTask2:Cat(catName=羅小黑4),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑4),ThreadName:Pool-childTask1
childTask1:Cat(catName=羅小黑0),ThreadName:Pool-childTask1
childTask1 shut down
childTask2 shut down
childTask2:Cat(catName=羅小黑1),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑1),ThreadName:Pool-childTask1
childTask1:Cat(catName=羅小黑2),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑2),ThreadName:Pool-childTask2
childTask1:Cat(catName=羅小黑3),ThreadName:Pool-childTask1
childTask2:Cat(catName=羅小黑3),ThreadName:Pool-childTask2
childTask1:Cycle-1-End
childTask2:Cycle-1-End
輸出資料:
- “Pool-childTask” 是執行緒池名稱;
- “childTask” 是任務名稱;
- “Cat(catName=羅小黑)” 是執行的結果;
- “childTask shut down” 是關閉標記;
- “childTask:Cycle-X-Begin” 和“childTask:Cycle-X-End” 是每一輪迴圈的開始和結束標記。
我們分析一下執行結果:
- childTask1 和 childTask2 分別執行,在第一輪迴圈中都正常輸出了 5 條羅小黑資料;
- 第二輪執行過程中,我啟動了關閉指令,這次第二輪執行沒有直接停止,而是先執行完任務中的資料,再執行退出,所以完全符合我們的優雅退出結論。
2.5 原始碼地址
GitHub 地址:
https://github.com/lml200701158/java-study/tree/master/src/main/java/com/java/parallel/pool/ofc
3. 寫在最後
對於這個經典的執行緒池使用示例,原專案是我好友一灰寫的,技術水平對標阿里 P7,實現得也非常優雅,涉及的知識點非常多,非常值得大家學習。
如果對這個示例有任何疑問,可以加我微信,隨時溝通交流哈。
硬核推薦:
- 從美團挖來的架構師居然這麼設計 DB+ 快取,真的長見識了!
- 去位元組面試,直接讓人出門左拐:Bean 生命週期都不知道!
- 原始碼深度解析,Spring 如何解決迴圈依賴?
- 總監又來了,人狠話不多,這篇 gRPC,小弟佩服!
- 如何才能達到阿里 P7 水平 ?
- 新來個技術總監,把 RabbitMQ 講的那叫一個透徹,佩服!
- 工作很忙,家裡事多,一直沒時間學習,看看大佬的時間管理,屌炸天!
- 76 張圖,剖析 Spring AOP 原始碼,小白居然也能看懂,大神,請收下我的膝蓋!
- 新來個阿里 P7,僅花 2 小時,擼出一個多執行緒永動任務,看完直接跪了,真牛逼!
- 新來個技術總監,把限流實現的那叫一個優雅,佩服!
- 百度一面:談談 @Transactional 的原理和坑
- 新來個技術總監,把 RabbitMQ 講的那叫一個透徹,佩服!
- 阿里 P7,如何才能達到該水平 ?
- Redis 高可用原理
- 肝了一個月的 DDD,一文帶你掌握!
- 如何寫好文件,5 分鐘帶你掌握!
- 淘寶 10 年,高併發分散式架構演進之路
- Raft 協議原理詳解,10 分鐘帶你掌握!
- 只會用傳統開發模式?10分鐘教你玩轉敏捷!
- 過完年變胖沒?2個月減肥6斤,告訴你如何健康減肥