☕【Java技術指南】「併發編程專題」Guava RateLimiter針對於限流器的入門到精通(含源碼分析介紹)

語言: CN / TW / HK

theme: cyanosis

Guava包中限流實現分析

RateLimiter

之前的文章中已經介紹了常用的限流算法,而google在Java領域中使用Guava包中的限流工具進行服務限流。

回顧使用案例

Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶算法實現流量限制,使用十分方便。 java @Test public void testSample() { RateLimiter rateLimiter = RateLimiter.create(500) } 以上示例,創建一個RateLimiter,指定每秒放500個令牌(0.002秒放1個令牌),其輸出見下:

從輸出結果可以看出,RateLimiter具有預消費的能力:

  • 請求 1時並沒有任何等待直接預消費了1個令牌
  • 請求 2時,由於之前預消費了1個令牌,故而等待了2秒,之後又預消費了6個令牌
  • 請求 3時同理,由於之前預消費了6個令牌,故而等待了12秒

屬於線性處理機制。

  • RateLimiter通過限制後面請求的等待時間,來支持一定程度的突發請求(預消費)。

  • 但是某些情況下並不需要這種突發請求處理能力,如某IM廠商提供消息推送接口,但推送接口有嚴格的頻率限制(600次/30秒),在調用該IM廠商推送接口時便不能預消費,否則,則可能出現推送頻率超出限制而失敗。

  • 其中RateLimiter類為限流的核心類,其為public的抽象類,RateLimiter有一個實現類SmoothRateLimiter,根據不同消耗令牌的策略SmoothRateLimiter又有兩個具體實現類SmoothBursty和SmoothWarmingUp。

  • 在實際使用過程中一般直接使用RateLimiter類,其他類對用户是透明的,RateLimiter類的設計使用了類似BUILDER模式的小技巧,並做了一定的調整。

  • 通過RateLimiter類圖可見,RateLimiter類不僅承擔了具體實現類的創建職責,同時也確定了被創建出的實際類可提供的方法。標準創建者模式UML圖如下所示(引用自百度百科)

Guava包中限流工具類

Guava核心限流類介紹

  • RateLimiter類為限流的核心類,其為public的抽象類,RateLimiter有一個實現類SmoothRateLimiter,根據不同消耗令牌的策略SmoothRateLimiter又有兩個具體實現類SmoothBursty和SmoothWarmingUp。

Guava有兩種限流模式

  • 一種為穩定模式(SmoothBursty:令牌生成速度恆定)
  • 一種為漸進模式(SmoothWarmingUp:令牌生成速度緩慢提升直到維持在一個穩定值)

    兩種模式實現思路類似,主要區別在等待時間的計算上,

Guava RateLimiter核心類實現

  • 在實際使用過程中一般直接使用RateLimiter類,其他類對用户是透明的。RateLimiter類的設計使用了類似BUILDER模式的小技巧,並做了一定的調整。
  • 通過RateLimiter類圖可見,RateLimiter類不僅承擔了具體實現類的創建職責,同時也確定了被創建出的實際類可提供的方法。

    RateLimiter類即承擔了builder的職責,也承擔了Product的職責。

SmoothBursty
  • Guava包RateLimiter類的説明文檔,首先使用create函數創建限流器,指定每秒生成2個令牌,在需要調用服務時使用acquire函數或取令牌。
create函數分析
  • create函數具有兩個個重載,根據不同的重載可能創建不同的RateLimiter具體實現子類。

  • 目前可返回的實現子類包括SmoothBursty及SmoothWarmingUp兩種,具體不同下文詳細分析。

  • 在調用create接口時,實際實例化的為SmoothBursty類

java public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }

在解析SmoothBursty原理前,重點解釋下SmoothBursty中幾個屬性的含義

java /** * The currently stored permits. * 當前存儲令牌數 */ double storedPermits; /** * The maximum number of stored permits. * 最大存儲令牌數 */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 添加令牌時間間隔 */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. * 下一次請求可以獲取令牌的起始時間 * 由於RateLimiter允許預消費,上次請求預消費令牌後 * 下次請求需要等待相應的時間到nextFreeTicketMicros時刻才可以獲取令牌 */ private long nextFreeTicketMicros = 0L; // could be either in the past or future

tryAcquire函數實現機制
  • 就非常容易理解RateLimiter暴露出來的接口

```java @CanIgnoreReturnValue public double acquire() { return acquire(1); }

@CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); }

final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } ``` - acquire函數主要用於獲取permits個令牌,並計算需要等待多長時間,進而掛起等待,並將該值返回

```java public boolean tryAcquire(int permits) { return tryAcquire(permits, 0, MICROSECONDS); }

public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS); }

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; }

private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; }

@Override final long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros; } ```

  • acquire函數主要用於獲取permits個令牌,並計算需要等待多長時間,進而掛起等待,並將該值返回
  • tryAcquire函數可以嘗試在timeout時間內獲取令牌,如果可以則掛起等待相應時間並返回true,否則立即返回false
  • canAcquire用於判斷timeout時間內是否可以獲取令牌
resync函數

該函數會在每次獲取令牌之前調用,其實現思路為,若當前時間晚於nextFreeTicketMicros,則計算該段時間內可以生成多少令牌,將生成的令牌加入令牌桶中並更新數據。這樣一來,只需要在獲取令牌時計算一次即可。

java /** * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */ void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }

acquire函數分析

acquire函數也具有兩個重載類,但分析過程僅僅需要關係具有整形參數的函數重載即可,無參數的函數僅僅是acquire(1)的簡便寫法。

在acquire(int permits)函數中主要完成三件事:

  • 預分配授權數量,此函數返回需要等待的時間,可能為0;
  • 根據等待時間進行休眠;
  • 以秒為單位,返回獲取授權消耗的時間。

完成以上工作的過程中,RateLimiter類確定了獲取授權的過程骨架並且實現了一些通用的方法,這些通用方法中會調用為實現的抽象方法,開發人員根據不同的算法需求可實現特定子類對抽象方法進行覆蓋。

其調用流程如下圖: 其中橙色塊中reserveEarliestAvailable方法即為需要子類進行實現的,下文以該函數為核心,分析RateLimiter類的子類是如何實現該方法的。

```java final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; // 返回的是上次計算的nextFreeTicketMicros double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消費的令牌數 double freshPermits = requiredPermits - storedPermitsToSpend; // 還需要的令牌數 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 根據freshPermits計算需要等待的時間

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次計算的nextFreeTicketMicros不返回 this.storedPermits -= storedPermitsToSpend; return returnValue; } ```

  • 該函數用於獲取requiredPermits個令牌,並返回需要等待到的時間點
  • 其中,storedPermitsToSpend為桶中可以消費的令牌數,freshPermits為還需要的(需要補充的)令牌數,根據該值計算需要等待的時間,追加並更新到nextFreeTicketMicros

  • 需要注意的是,該函數的返回是更新前的(上次請求計算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗來講,本次請求需要為上次請求的預消費行為埋單,這也是RateLimiter可以預消費(處理突發)的原理所在。若需要禁止預消費,則修改此處返回更新後的nextFreeTicketMicros值。

SmoothBursty的構造函數

```java SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; // 最大存儲maxBurstSeconds秒生成的令牌 }

@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; // 計算最大存儲令牌數 if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } ``` - 桶中可存放的最大令牌數由maxBurstSeconds計算而來,其含義為最大存儲maxBurstSeconds秒生成的令牌。 - 該參數的作用在於,可以更為靈活地控制流量。如,某些接口限制為300次/20秒,某些接口限制為50次/45秒等。

抽象函數分析

在以上文代碼分析中出現了兩個抽象函數coolDownIntervalMicros及storedPermitsToWaitTime,現分析這兩個抽象函數。

coolDownIntervalMicros函數

主要含義為生成一個令牌需要消耗的時間,該函數主要應用於計算當前時間可產生的令牌數。根據上文的UML圖SmoothRateLimiter類有兩個子類SmoothBursty及SmoothWarmingUp。

SmoothBursty類中對於coolDownIntervalMicros函數的實現如下: java @Override double coolDownIntervalMicros() { return stableIntervalMicros; }

可見實現非常簡單,僅僅只是返回stableIntervalMicros屬性,即產生兩個令牌需要的時間間隔。

SmoothWarmingUp類中對於coolDownIntervalMicros函數的實現如下: java @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } - 其中maxPermits屬性上文已經出現過,表示當前令牌桶的最大容量。 - warmupPeriodMicros屬性屬於SmoothWarmingUp類的特有屬性,表示令牌桶中令牌從0到maxPermits需要經過的時間,故warmupPeriodMicros / maxPermits表示在令牌數量達到maxPermits之前的令牌產生時間間隔。

storedPermitsToWaitTime函數

主要表示消耗存儲在令牌桶中的令牌需要的時間。

SmoothBursty類中對於storedPermitsToWaitTime函數的實現如下:

java @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; }

直接返回0,表示消耗令牌不需要時間。

SmoothBursty類中對於storedPermitsToWaitTime函數的實現如下:

java @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // measuring the integral on the right part of the function (the climbing line) if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); // TODO(cpovirk): Figure out a good name for this variable. double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // measuring the integral on the left part of the function (the horizontal line) micros += (long) (stableIntervalMicros * permitsToTake); return micros; }

  • 實現較為複雜,其核心思想在於計算消耗當前存儲令牌時需要根據預熱設置區別對待。其中涉及到新變量thresholdPermits,該變量為令牌閾值,噹噹前存儲的令牌數大於該值時,消耗(storedPermits-thresholdPermits)範圍的令牌需要有預熱的過程(即消耗每個令牌的間隔時間慢慢減小),而消耗0~thresholdPermits個數的以存儲令牌,每個令牌消耗時間為固定值,即stableIntervalMicros。

  • 而thresholdPermits取值需要考慮預熱時間及令牌產生速度兩個屬性,即thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;。可見閾值為預熱時間中能夠產生的令牌數的一半,並且根據註釋計算消耗閾值以上的令牌的時間可以轉換為計算預熱圖的梯形面積(實際為積分),本處不詳細展開。

  • 使用此種設計可以保證在上次請求間隔時間較長時,令牌桶中存儲了較多的令牌,當消耗這些令牌時,最開始的令牌消耗時間較長,後續時間慢慢縮短直到達到stableIntervalMicros的狀態,產生預熱的效果。

實現總結

  • 根據令牌桶算法,桶中的令牌是持續生成存放的,有請求時需要先從桶中拿到令牌才能開始執行,誰來持續生成令牌存放呢?

    • 一種解法是,開啟一個定時任務,由定時任務持續生成令牌。這樣的問題在於會極大的消耗系統資源,如,某接口需要分別對每個用户做訪問頻率限制,假設系統中存在6W用户,則至多需要開啟6W個定時任務來維持每個桶中的令牌數,這樣的開銷是巨大的。

    • 在實現限流器的過程中,基於令牌桶的思想,並且增加了帶有預熱器的令牌桶限流器實現。被限流的線程使用其自帶的SleepingStopwatch工具類,最終使用的是Thread.sleep(ms, ns);方法,而線程使用sleep休眠時其持有的鎖並不會釋放,在多線程編程時此處需要注意。

    • 最後,限流器觸發算法採用的是預定令牌的方式,即當前請求需要的令牌數不會對當前請求的等待時間造成影響,而是會影響下一次請求的等待時間。

「其他文章」