漏桶演算法與虛擬佇列
演算法定義
漏桶演算法(Leaky Bucket),描述了一個桶口流入水流,底部開口的桶。是不是很像小時候經常做的一邊注水一邊放水的數學應用題? 桶的上部接水,桶的下部以固定的速率漏水,當桶中的水的量超過桶的容量時,後續的水將無法進入桶中,漏桶隨即進入溢位狀態。在溢位狀態時,後續的水將被拋棄,或者在佇列中等待,直到桶中的水量低於一個閾值時,才可以繼續承接水量。
圖-1 漏桶演算法示意圖
應用場景
漏桶演算法有兩類主要的作用:流量監控(Traffic Policing)、流量整形(Traffic Sharping)。
流量整形是非常符合直覺的:將流量(或者網路請求)本身比作水,流量(或者網路請求)本身不規律地流入漏桶,漏桶則以固定的速率放行流量,看起來存在突發性或者抖動的流量,在經過漏桶之後,都被削平了;
網路圖片,來自:https://www.geeksforgeeks.org/leaky-bucket-algorithm/
假設一個場景,在網路中的路由裝置只能接受固定頻寬輸入資料。如圖上所示,一個交換機前十秒的頻寬: $$ 總通過量= 12Mbps * 2 +2Mbps *3 = 30Mb $$ 前兩秒的突發流量`12Mbps`。很可能超過了該交換機的處理能力。通過漏桶演算法的整流能力,可以將這類突發的網路流量,整流成每秒3Mbps的穩定網路流量。 **流量監控**場景中,將流量所代表的頻寬、位元組、離散事件型別等權重比作水,漏桶本身像一個計數器,當流量或事件經過其檢查點時,計數器的數值增加相應的值,同時計數值以固定的速率遞減,流量(事件)經過時,演算法通過判斷漏桶溢位與否,賦予流量合法性。即此場景中,漏桶演算法本身無法影響流量的通過速率,而只執行標記的作用。在標記流量(事件)之後,可以配合後續的處理器/過濾器來區別處理,進而實現流量整形的功能。 ##### 流量計 >A technique used in ATM networks at the switch level that applies a sustained cell flow rate to bursty traffic. Incoming data flows into a buffer (the "bucket"), then "leaks" out at a steady rate, which is designated as constant bit rate (CBR) traffic. In the event the in-flow exceeds the negotiated rate for a certain time, the buffer will overflow. At that point, the switch examines the Cell Loss Priority (CLP) bit in each cell, and low-priority cells are discarded and retransmitted by the originating device. See [leaky bucket counter](https://encyclopedia2.thefreedictionary.com/leaky+bucket+counter). 在[ATM網路](https://support.huawei.com/enterprise/zh/doc/EDOC1100055312/437f0712)(一種快速分組的網路通訊技術)中,[GCRA](https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm)(通用信元速率演算法,存在一種類似漏桶版本的演算法實現)起到類似流量計的作用,當ATM中的網路裝置檢測到網路流量超出**網絡合同**(一種開啟網路通訊時定義的,關於服務質量的引數集合)中規定的**固定速率**時(即通過判斷當前信元是否會造成漏桶溢位確定網路擁塞與否),會根據信元中的CLP的取值(一個在信元頭中宣告的標誌位,CLP=0表示優先順序高,CLP=1表示優先順序低),首先選擇CLP=1的信元丟棄並讓來源設備嘗試重新發送信元,進而緩解擁塞。使用佇列實現整流功能,來自wiki百科:Leaky Bucket
漏桶的佇列版本的描述: >[Andrew S. Tanenbaum](https://en.wikipedia.org/wiki/Andrew_S._Tanenbaum):"The leaky bucket consists of a finite queue. When a packet arrives, if there is room on the queue it is appended to the queue; otherwise it is discarded. At every clock tick one packet is transmitted (unless the queue is empty) 漏桶由一個**有限佇列**組成。當一個數據包到達時,如果佇列上有空間,它就會被附加到佇列尾部;否則將被丟棄。除非佇列為空,否則在每個固定時間點傳輸一個數據包。佇列版本的漏桶,被漏桶管理的"水",不再是流量計版本中的"資料包的大小"、"信元的大小"、"事件的權重"等等資訊屬性,而是資料包、信元、事件本身。 ### 虛擬佇列 根據佇列版本的漏桶描述,實現一個漏桶需要指定兩個引數:佇列長度、處理時間間隔。 在Sentinel中,有一個勻速限流效果控制器`RateLimiterController`,它正是基於佇列版本的漏桶演算法來實現:對一個資源的訪問執行緒在佇列中排隊等待,直到合適的時間點到達時,才被允許執行,而超出佇列長度的執行緒則被禁止執行業務程式碼,進而達成限制一個資源訪問QPS的限流效果。 和前面講到的管理流量、信元等等的漏桶例子不同,Sentinel的勻速限流效果控制器(可以視為漏桶)管理的是訪問資源的執行緒本身,它不太可能在一個併發環境下再引入一個同步的佇列:假設,Sentinel真的引入了執行緒間同步的佇列,考慮一下每個訪問資源的執行緒併發地確認佇列長度,自己的放行時間等等操作對業務效能造成的影響? `RateLimiterController`使用虛擬佇列來實現,其並不需要維護真正的佇列資訊。雖然虛擬佇列也存在一定的侷限:並不能實現訪問請求先入先出的順序保證,但在Sentinel的限流場景下,都是對同一個資源的並行訪問,因為執行緒通過順序並不是第一要務,重要的是漏桶的整流能力。1. 讓執行緒們知道自己能否在超時之前等到下一次佇列開放
2. 利用`AtomicLong`的自旋和原子性,讓執行緒們爭奪佇列開放的時間點
3. 讓執行緒們知道自己需要等待多久才能繼續執行 | | 閘機5分鐘放行一次 | `FlowRule.Count`屬性,即QPS,在實現過程中,代表了換成每次等待佇列放行的通過間隔 | 例如設定count的值為200QPS, 那麼虛擬佇列的放行的頻率就是: $\frac{1000}{200QPS}=5ms$ | | 超時時間30分鐘 | `RateLimiterController.maxQueueingTimeMs` | 記錄了執行緒們的最大等待時間,如果超過這個時間的Sentinel會直接以 `FlowException`的形式中斷執行緒對資源的訪問。 ### 整體角度 正所謂“不識廬山真面目,只緣身在此山中”,之前的角度鎖定在一個虛擬佇列中的執行緒角度闡述其行為,現在換一個說明問題的角度,讓我們從整體的角度看看對同一個資源的訪問執行緒們都在什麼狀態吧。 
圖1-1 從整體角度檢視執行緒經過虛擬佇列的併發情況
#### 整流功能 在示意圖中,執行緒2、3、4、5、6都在**第8毫秒**併發進入佇列,但是通過對`RatelimiterController.LaterPassedTime`這個“時間鎖”的搶佔更新,他們獲得了不同的執行時間: - 執行緒3被允許在第17毫秒執行業務邏輯 - 執行緒4被允許在第22毫秒執行業務邏輯 - 執行緒5被允許在第12毫秒執行業務邏輯 - 執行緒6被循序在第27毫秒執行業務邏輯 在系統時間到達執行緒本身被允許執行時間點之前,執行緒呼叫sleep(),等待被系統喚醒。看起來是不是就像是併發的執行緒們進入了一個佇列等待,然後按照固定的頻率被放行,不同的是, 特別注意的是**執行緒2**: 1. 就像平行時空No.2號那個幸運的又有點倒黴的小T一樣,執行緒2雖然參與了排隊,但是被它允許執行性的時間點(第32毫秒),距離當前的時間差又超過了20ms(假設超時時間是20ms)。執行緒2只能返回一個`BlockException`,相當於說明當前虛擬佇列的長度就是,一旦當前並行的執行緒超過4,其餘的執行緒將直接溢位。 1. 執行緒2在返回異常之前,還有一個工作要做,將最近更新時間**回撥**一個時間間隔(5ms)。相當於釋放自己所佔用的時間鎖,讓其他執行緒有可能重新佔用第32ms的這個執行時間點。如下圖所示:\ 第①步:【執行緒2】執行Update Lock Time = 32 - 5 = 27 ms\ 第②步:【執行緒8】執行Update Lock Time = 27 + 5 = 32 ms\ 第③步:【執行緒9】執行Update Lock Time = 32 + 5 = 37 ms 1. 其他的併發執行緒可能永遠也佔用不到32ms的這個執行時間點了,它們可能在第37ms的這個時間點相遇:\ \ 與上一點不同的是,如果第①步和第②步的執行順序正好相反,也就是執行緒8執行的新增5ms操作先於執行緒2的回撥5ms操作:\ 第①步:【執行緒8】執行Update Lock Time = 32 + 5 = 37 ms\ 第②步:【執行緒2】執行Update Lock Time = 37 - 5 = 32 ms\ 第③步:【執行緒9】執行Update Lock Time = 32 + 5 = 37 ms\ 在這個場景下,執行緒8和執行緒9最後都在第37ms這個時間點併發執行,這看起來是個場景似乎是個問題:在第32毫秒,沒有執行緒開始執行自己的業務程式碼,而在第37毫秒,執行緒8和執行緒9一起執行了自己的併發程式碼。整體來說,這個虛擬佇列的實現,還是能保證“**平均QPS**”的承諾的。 **執行緒7**則是平行時空No.0的那個倒黴的小T,執行緒7在第12ms開始,通過計算 32+5 - 12 = 25ms > 20 ms(此時執行緒2還沒來的及將時間回撥5ms到27,不然的話執行緒7不會超時) 就可以知道當前佇列必然已滿,因此將直接觸發Sentinel對自己的限流,並返回阻塞異常。 總結一下,只需要三個引數:超時時間、通過頻率、最後通過時間。就可以實現一個虛擬佇列: $ 通過間隔 = \frac{1000}{QPS} = \frac{1000}{200} = 5ms$ $ 佇列長度 = \frac{超時時間}{通過間隔} = \frac{QPS超時時間}{1000} = \frac{200 20ms}{1000} = 4$ $最後通過時間_n=\begin{cases} 當前時間 & if 佇列為空 \\ 最後通過時間_{n-1} + 通過間隔 & if 執行緒排隊中 \end{cases}$ ### 實現流程  #### 程式碼實現 ```java import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; /** * 虛擬佇列 * 1. 連續建立多個執行緒,執行緒建立之後就執行 * 2. 讓執行緒通過虛擬佇列 * @author zongzi */ public class VirtualQueue { /** * 統計放行的執行緒數 */ static final LongAdder PASS_COUNT = new LongAdder(); /** * 統計阻塞的執行緒數 */ static final LongAdder BLOCK_COUNT = new LongAdder(); /** * 統計總的執行緒數 */ static final LongAdder DONE_COUNT = new LongAdder(); /** * 最大的測試執行緒數 */ private static final int TEST_MAX_THREAD = 50; /** * 等待所有的執行緒執行結束標誌 */ private static final CountDownLatch COUNT_DOWN = new CountDownLatch(1); /** * 允許通過的最大併發執行緒數 */ private static final int QPS = 10; /** * 最大的等待時間 */ private static final Long MAX_QUEUEING_TIME_MS = 1000L; private static final AtomicLong LATEST_PASSED_TIME = new AtomicLong(-1); public static void main(String[] args) throws InterruptedException { int i = 0; // 宣告測試執行緒並啟動 while (i < TEST_MAX_THREAD) { Runnable andStartThread = createAndStartThread(); new Thread(andStartThread, String.format("ThreadName:%d", i)).start(); i++; } // 阻塞等待 COUNT_DOWN.await(); System.out.println("pass:\t" + PASS_COUNT.longValue()); System.out.println("block:\t" + BLOCK_COUNT.longValue()); System.out.println("done:\t" + DONE_COUNT.longValue()); } public static Runnable createAndStartThread() { return () -> { // 進入虛擬佇列, boolean pass = queueWait(); if (pass) { PASS_COUNT.add(1); } else { BLOCK_COUNT.add(1); } DONE_COUNT.add(1); // 讓主執行緒繼續執行 if (DONE_COUNT.longValue() >= TEST_MAX_THREAD) { COUNT_DOWN.countDown(); } }; } /** * 進入虛擬佇列等待 * @return true 執行緒可以放行, false 執行緒被阻塞 */ static boolean queueWait() { long currentTime = System.currentTimeMillis(); // passedInterval 是一個請求通過的時間間隔,根據我們設定的QPS有關,如果QPS等於200,那麼每個請求平均需要的時間是costTime=1000/200=5ms. long passedInterval = Math.round(1.0 * (1000) / QPS ); // 下次開放通過的時間 long expectedPassedTime = passedInterval + LATEST_PASSED_TIME.get(); // 如果期望的時間小於當前時間,說明上一個請求在很早之前就已經被放行了,當前佇列應該是空的 if (expectedPassedTime <= currentTime) { // 這裡可能仍會存在併發,但是隻影響最開始進入佇列的幾個執行緒,是可以接受的 LATEST_PASSED_TIME.set(currentTime); return true; } else { // 計算從當前系統開始,期望的等待時間 long expectWaitTime = passedInterval + LATEST_PASSED_TIME.get() - System.currentTimeMillis(); // 在未開始排隊前,執行緒就知道等待的時間超過了最大等待時間,說明當前虛擬佇列已經滿了,當前執行緒不再進入排隊 if (expectWaitTime > MAX_QUEUEING_TIME_MS) { return false; } // 和其他執行緒一起併發競爭時間鎖的更新,獲得自己的通過時間。 long expectPassedTime = LATEST_PASSED_TIME.addAndGet(passedInterval); try { expectWaitTime = expectPassedTime - System.currentTimeMillis(); // 如果期望的排隊時間超過了超時時間,說明當前執行緒雖然參與了排隊,但是已經在佇列長度之外, if (expectWaitTime > MAX_QUEUEING_TIME_MS) { // 回撥自己佔用的放行時間點,給可能的其他執行緒 LATEST_PASSED_TIME.addAndGet(-passedInterval); return false; } // 執行到這裡,說明執行緒已經正確的派上到了虛擬佇列中,休眠等待 if (expectWaitTime > 0) { Thread.sleep(expectWaitTime); } // 返回true, 代表請求業務被放行,後面可以正常執行業務程式碼了 return true; } catch (InterruptedException e) { } } return false; } } ``` #### 程式碼說明 在程式碼實現中,已經添加了足夠的程式碼註釋,相信理解這個問題,已經不是難事了,現在我們來看看虛擬佇列的兩個主要引數:QPS、MAX_QUEUEING_TIME_MS 兩個引數對虛擬佇列的整流效果的影響吧。 在程式碼中,我們首先建立了500個併發執行緒,然後建立了一個QPS為10,超時時間為1000ms的,由此我們可以得到:,因而50個併發執行緒中應該只有10個執行緒通過。其餘的執行緒都因為佇列已滿而被阻塞(`queueWait()`方法返回`false`)。 如果你執行這個示例程式碼的話,應該會得到類似的結果。 ```shell pass: 11 block: 39 done: 50 ``` “通過的執行緒”是11個情況和`圖1-1`的類似,在佇列能正常工作之前,需要一個額外的執行緒來啟動時間鎖(作用和圖1-1中的執行緒1類似)。
「其他文章」