ScheduledExecutorService原理解析

語言: CN / TW / HK

 

將上次的時間參數重新設置一下,然後塞回工作任務隊列,由於默認實現了compareTo方法,所以會放在隊列後面,等到下次執行又會把這個任務取出來,執行完重複塞回隊列,這樣就實現了週期性執行。

 

 

ScheduledExecutorService是基於DelayQueue,所以要先對DelayQueue做講解
 
DelayQueue 內部實現
參考鏈接:https://www.zybuluo.com/mikumikulch/note/712598
延遲阻塞隊列 = 阻塞隊列 + 延遲獲取任務功能
舉個例子,DelayQueue可通過重寫compareTo來處理排序
private static DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
static class DelayTask implements Delayed {
// 延遲時間
private final long delay;
// 到期時間
private final long expire;
// 數據
private final String msg;
// 創建時間
private final long now;
/**
* 初始化 DelayTask 對象
*
* @param delay 延遲時間 單位:微妙
* @param msg 業務信息
*/
DelayTask(long delay, String msg) {
this.delay = delay; // 延遲時間
this.msg = msg; // 業務信息
this.now = Instant.now().toEpochMilli();
this.expire = now + delay; // 到期時間 = 當前時間+延遲時間
}
/**
* 獲取延遲時間
*
* @param unit 單位對象
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expire - Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS);
}
/**
* 比較器
* 比較規則:延遲時間越長的對象越靠後
*/
@Override
public int compareTo(Delayed o) {
if (o == this) // compare zero ONLY if same object
return 0;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
其實ScheduledExecutorService也是重寫了上面的compareTo方法的,接下來看下他的原理
DelayQueue原理講解
* 需要重寫 getDelay 與 compareTo 方法
* 其阻塞隊列是優先級隊列PriorityQueue,可通過重寫compareTo來變更排序
* 使用take()、add()方法時,調用的是ReentrantLock來加鎖,並且take()中有一個for死循環,然後裏面用Condition進行阻塞和放行(add()方法就一個available.signal();將等待隊列添加到同步隊列,這個就不講解):
for (;;) {
// 嘗試從優先級隊列中獲取隊列頭部元素
E first = q.peek();
if (first == null)
// 無元素,當前線程節點加入等待隊列,並阻塞當前線程
available.await(); // 等待available.signal()
else {
// 通過延遲任務的 getDelay 方法獲取延遲時間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
// 延遲時間到期,獲取並刪除頭部元素。
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await(); // 隊列沒有數據,阻塞住
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 線程節點進入等待隊列 x 納秒。
available.awaitNanos(delay); // 隊列有數據,根據他要延遲的秒數 好用awaitNanos來延遲
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
newScheduledThreadPool實現原理
參考鏈接:https://www.zybuluo.com/mikumikulch/note/713546
用法總結如下:
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
// 第一種用法   表示線程啟動後1秒開始執行任務,然後5秒後再次執行。假設這時候有一次執行了10秒,那執行完就會馬上執行下一個任務
service.scheduleAtFixedRate(new Runnable(){......}, 1, 5, TimeUnit.SECONDS);
 
// 第二種用法   表示啟動後1秒執行,並當前任務執行完後隔5秒在執行下一個任務
service.scheduleWithFixedDelay(new Runnable(){......}, 1, 5, TimeUnit.SECONDS);
 
// 第三種用法    表示啟動後1秒執行,後續不會再執行(僅一次)
service.scheduleWithFixedDelay(new Runnable(){......}, 1, TimeUnit.SECONDS);
 
原理:將線程任務結合構造參數,封裝為一個 DelayedFutureTask,其本質就是實現了DelayQueue,並覆蓋了其getDelay 與 compareTo 方法,並且如果用的是週期性執行的方法,其本質就是:
將上次的時間參數重新設置一下,然後塞回工作任務隊列,由於默認實現了compareTo方法,所以會放在隊列後面,等到下次執行又會把這個任務取出來,執行完重複塞回隊列,這樣就實現了週期性執行。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 若無間隔時間,則直接調用 futureTask 的 run 方法。
else if (!periodic)
ScheduledFutureTask.super.run();
// 調用 runAndReset 方法執行任務。
else if (ScheduledFutureTask.super.runAndReset()) {
// 設置下次執行時間
setNextRunTime();
// 將任務重新放回工作任務隊列。
reExecutePeriodic(outerTask);
}
}
 
————————————————
版權聲明:本文為CSDN博主「Lidisam」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_28666081/article/details/108811927