漏桶演算法與虛擬佇列

語言: CN / TW / HK

演算法定義

漏桶演算法(Leaky Bucket),描述了一個桶口流入水流,底部開口的桶。是不是很像小時候經常做的一邊注水一邊放水的數學應用題? 桶的上部接水,桶的下部以固定的速率漏水,當桶中的水的量超過桶的容量時,後續的水將無法進入桶中,漏桶隨即進入溢位狀態。在溢位狀態時,後續的水將被拋棄,或者在佇列中等待,直到桶中的水量低於一個閾值時,才可以繼續承接水量。

image-20220720171028755.png

圖-1 漏桶演算法示意圖

應用場景

漏桶演算法有兩類主要的作用:流量監控(Traffic Policing)、流量整形(Traffic Sharping)。

流量整形是非常符合直覺的:將流量(或者網路請求)本身比作水,流量(或者網路請求)本身不規律地流入漏桶,漏桶則以固定的速率放行流量,看起來存在突發性或者抖動的流量,在經過漏桶之後,都被削平了;

網路圖片,來自:http://www.geeksforgeeks.org/leaky-bucket-algorithm/

假設一個場景,在網路中的路由裝置只能接受固定頻寬輸入資料。如圖上所示,一個交換機前十秒的頻寬: $$ 總通過量= 12Mbps * 2 +2Mbps *3 = 30Mb $$ 前兩秒的突發流量`12Mbps`。很可能超過了該交換機的處理能力。通過漏桶演算法的整流能力,可以將這類突發的網路流量,整流成每秒3Mbps的穩定網路流量。 **流量監控**場景中,將流量所代表的頻寬、位元組、離散事件型別等權重比作水,漏桶本身像一個計數器,當流量或事件經過其檢查點時,計數器的數值增加相應的值,同時計數值以固定的速率遞減,流量(事件)經過時,演算法通過判斷漏桶溢位與否,賦予流量合法性。即此場景中,漏桶演算法本身無法影響流量的通過速率,而只執行標記的作用。在標記流量(事件)之後,可以配合後續的處理器/過濾器來區別處理,進而實現流量整形的功能。 ##### 流量計 >A technique used in ATM networks at the switch level that applies a sustained cell flow rate to bursty traffic. Incoming data flows into a buffer (the "bucket"), then "leaks" out at a steady rate, which is designated as constant bit rate (CBR) traffic. In the event the in-flow exceeds the negotiated rate for a certain time, the buffer will overflow. At that point, the switch examines the Cell Loss Priority (CLP) bit in each cell, and low-priority cells are discarded and retransmitted by the originating device. See [leaky bucket counter](http://encyclopedia2.thefreedictionary.com/leaky+bucket+counter). 在[ATM網路](http://support.huawei.com/enterprise/zh/doc/EDOC1100055312/437f0712)(一種快速分組的網路通訊技術)中,[GCRA](http://en.wikipedia.org/wiki/Generic_cell_rate_algorithm)(通用信元速率演算法,存在一種類似漏桶版本的演算法實現)起到類似流量計的作用,當ATM中的網路裝置檢測到網路流量超出**網絡合同**(一種開啟網路通訊時定義的,關於服務質量的引數集合)中規定的**固定速率**時(即通過判斷當前信元是否會造成漏桶溢位確定網路擁塞與否),會根據信元中的CLP的取值(一個在信元頭中宣告的標誌位,CLP=0表示優先順序高,CLP=1表示優先順序低),首先選擇CLP=1的信元丟棄並讓來源設備嘗試重新發送信元,進而緩解擁塞。

簡單介紹GCRA演算法: 有一個總容量為$L$的漏桶,它當前的計數值為$X$,他的洩露速率為$\frac{1單元}{單位時間}$,當漏桶中的$X < L $時(即漏桶還沒有洩露時),每個信元的到達都會為計數值帶來 $l$ 的增量。則GCRA演算法的漏桶版本的實現流程如下所示:

1. 假設上一個信元的到達時間為$LCT$, 而漏桶的洩露速率為$\frac{1單元}{單位時間}$,所以兩個信元之間的時間差既是此期間漏桶中洩露的水量,當前信元到達漏桶檢查點時,那麼漏桶中的剩餘容量:$X^1 = X - (t_a(k) - LCT) $ 2. 如果 $X^1 < 0$,說明兩個信元到達的時間差過大,漏桶中的所有水量都已漏光,當前漏桶為空,因此設定$X = 0$。此時跳轉到步驟4,更新漏桶狀態。 3. 如果 $X^1 > 0 $ 且 $X1 > L $,說明當前信元的到達,超出了漏桶的容量,則標記當前信元不符合要求;在ATM網路中,代表了當前的信元超過了PCR(峰值信元速率)的限制。 4. 否則,則說明當前漏桶上可以接收此信元帶來的增量,執行 $ X = X + l $來更新漏桶計數值,通過執行$ LCT = t_a(k)$來重置上次到達時間。 總結來說,如果信元以大於洩露速率的速率(可以理解為峰值速率)經過漏桶的檢查點,那麼漏桶鐘的計數值將很快超過漏桶容量而造成溢位,當溢位出現時,經過的信元會被標記為“不符合要求” 進而被後續的流量管理裝置拋棄。反之,如果信元經過檢查點的速率並不那麼快,那麼漏桶總是不滿的(或是空的,這取決與信元相對於漏桶的洩露速率)。 ##### 整流工具或緩衝區 上面已經說明使用計數器方式實現的漏桶演算法,可以通過標記流量(或事件)的“合規性”配合後續的服務元件達到整流的目的。事實上,也存在一種使用佇列這一資料結構實現的漏桶演算法。

image.png

使用佇列實現整流功能,來自wiki百科:Leaky Bucket

漏桶的佇列版本的描述: >[Andrew S. Tanenbaum](http://en.wikipedia.org/wiki/Andrew_S._Tanenbaum):"The leaky bucket consists of a finite queue. When a packet arrives, if there is room on the queue it is appended to the queue; otherwise it is discarded. At every clock tick one packet is transmitted (unless the queue is empty) 漏桶由一個**有限佇列**組成。當一個數據包到達時,如果佇列上有空間,它就會被附加到佇列尾部;否則將被丟棄。除非佇列為空,否則在每個固定時間點傳輸一個數據包。佇列版本的漏桶,被漏桶管理的"水",不再是流量計版本中的"資料包的大小"、"信元的大小"、"事件的權重"等等資訊屬性,而是資料包、信元、事件本身。 ### 虛擬佇列 根據佇列版本的漏桶描述,實現一個漏桶需要指定兩個引數:佇列長度、處理時間間隔。 在Sentinel中,有一個勻速限流效果控制器`RateLimiterController`,它正是基於佇列版本的漏桶演算法來實現:對一個資源的訪問執行緒在佇列中排隊等待,直到合適的時間點到達時,才被允許執行,而超出佇列長度的執行緒則被禁止執行業務程式碼,進而達成限制一個資源訪問QPS的限流效果。 和前面講到的管理流量、信元等等的漏桶例子不同,Sentinel的勻速限流效果控制器(可以視為漏桶)管理的是訪問資源的執行緒本身,它不太可能在一個併發環境下再引入一個同步的佇列:假設,Sentinel真的引入了執行緒間同步的佇列,考慮一下每個訪問資源的執行緒併發地確認佇列長度,自己的放行時間等等操作對業務效能造成的影響? `RateLimiterController`使用虛擬佇列來實現,其並不需要維護真正的佇列資訊。雖然虛擬佇列也存在一定的侷限:並不能實現訪問請求先入先出的順序保證,但在Sentinel的限流場景下,都是對同一個資源的並行訪問,因為執行緒通過順序並不是第一要務,重要的是漏桶的整流能力。

image.png

#### 執行緒角度 理解虛擬佇列的過程並不簡單,可以從執行緒本身和整體角度兩個視角來入手說明。首先用將執行緒比作"架空世界"中一個到動物園遊玩的遊客“小T”,從它的角度來看看會發生什麼。 ##### 場景一:空空蕩蕩的熊貓館排隊大廳 上午十點整,"小T"進入空空蕩蕩的熊貓場館排隊大廳,辛勤的小T發現此時排隊大廳裡一個人都沒有。另外大廳的盡頭有一個閘機讓遊客們依次通過,而後小T就看到閘機上方巨大的時鐘,小T知道這個時鐘不是記錄時間的時鐘,而是記錄著當前最後一次閘機開放的時間:`9:51`。在正常情況下,閘機最快只會每隔**5分鐘**開啟一次。這代表著已經9分鐘沒有人通過閘機了,所以當小T走到閘機口時,檢票員並沒有阻攔,就讓小T通過了閘機口。在通過閘機的時候,時鐘“叮”的一聲被設定成了"10:00"。 ##### 場景二:熱鬧的熊貓館排隊大廳 第二天上午十點整,“小T"又來到了排隊廳,這次他發現大廳裡很熱鬧:很多和他一樣的執行緒們站在大廳中,拿著手機在點選螢幕。小T知道,這是他們在線上排隊,原來是今天等候訪問熊貓館的人太多了,而閘機口最快只能5分鐘通過一人,大家只能通過搶購的形式確定自己的通過時間。 平時時空No.0: 不幸的小T 現在時鐘顯示`10:27`分,糟糕了,小T只能立即放棄排隊,因為時鐘顯示`10:27`,而此時時間是:`10:00`,意味著小T即使立即加入搶購大軍,在運氣非常好的的情況下,最快也要到`10:32`才能通過閘機,他仍需要等待32分鐘,這超過了場館的規定:為了防止擁擠,任何人都不要排隊超過30分鐘。 平行時空No.1: 幸運的小T 平行時空的小T的運氣則非常好,時鐘顯示時間是:`10:17`,說明最近一個搶到通過權的幸運兒排在`10:17`。小T還有兩次機會,如果搶到下次(`10:22`),和下下次(`10:27`)兩次排隊資格,那麼小T就能在不超時的情況下順利參觀熊貓館。於是小T打開了排隊APP開始排隊,等了一會兒之後,系統返回了一個時間: `10:27`。‘Yeah!' 小T歡呼一聲,躺在了自己的座位上,呼叫自己的sleep()方法休息個27分鐘等候進入場館。 平時時空No.2: 幸運的又有那麼點倒黴的小T 在No.2號平時時空裡,時鐘顯示`10:17`分,於是小T開始排隊,等了一會兒之後,系統返回了一個時間: `10:37`。‘焯!’,小T怒摔手機,超時了!!!小T點選了確定,離開場館的排隊大廳,代表小T放棄了本次訪問場館的資格,過一會兒閘機口上方的時鐘,默默回調了**五分鐘**,等待下一個“有緣人搶到通過資格。 上面的場景再現,講了一個小T訪問熊貓館的故事。其實這個故事對應著Sentinel中允許限流控制器中的各個元素,整理到一個表格中: 故事舉例 | Sentinel世界 | 說明 | | --------- | ------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | 小T | 執行緒 | | | 熊貓館 | 執行緒訪問的資源 | Sentinel世界好比是一個動物園,而動物園裡的場館,則是Sentinel中的資源 | | 場館前的排隊大廳 | `RateLimiterController`類(勻速限流效果控制器) | `FlowSolt`將對資源訪問執行緒的校驗工作委託給`FlowRuleChecker`, `FlowRuleChecker`完成叢集限流和本地限流方式的區分 如果是本地限流,則將具體的限流工作委託給`TrafficShapingController`(`RateLimiterController`的父介面)實現不同的限流效果。 | | 閘機口上方的時鐘 | `RateLimiterController.latestPassedTime`屬性,型別是`AtomicLong`,記錄勻速限流效果控制器最後一次允許執行緒通過的時間戳 | 對於正在通過效果控制器的執行緒們而言,他有以下作用:
1. 讓執行緒們知道自己能否在超時之前等到下一次佇列開放
2. 利用`AtomicLong`的自旋和原子性,讓執行緒們爭奪佇列開放的時間點
3. 讓執行緒們知道自己需要等待多久才能繼續執行 | | 閘機5分鐘放行一次 | `FlowRule.Count`屬性,即QPS,在實現過程中,代表了換成每次等待佇列放行的通過間隔 | 例如設定count的值為200QPS, 那麼虛擬佇列的放行的頻率就是: $\frac{1000}{200QPS}=5ms$ | | 超時時間30分鐘 | `RateLimiterController.maxQueueingTimeMs` | 記錄了執行緒們的最大等待時間,如果超過這個時間的Sentinel會直接以 `FlowException`的形式中斷執行緒對資源的訪問。 ### 整體角度 正所謂“不識廬山真面目,只緣身在此山中”,之前的角度鎖定在一個虛擬佇列中的執行緒角度闡述其行為,現在換一個說明問題的角度,讓我們從整體的角度看看對同一個資源的訪問執行緒們都在什麼狀態吧。 ![](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/bd5acd16baa744e294ffe5252cb51fae~tplv-k3u1fbpfcp-zoom-1.image)

圖1-1 從整體角度檢視執行緒經過虛擬佇列的併發情況

#### 整流功能 在示意圖中,執行緒2、3、4、5、6都在**第8毫秒**併發進入佇列,但是通過對`RatelimiterController.LaterPassedTime`這個“時間鎖”的搶佔更新,他們獲得了不同的執行時間: - 執行緒3被允許在第17毫秒執行業務邏輯 - 執行緒4被允許在第22毫秒執行業務邏輯 - 執行緒5被允許在第12毫秒執行業務邏輯 - 執行緒6被循序在第27毫秒執行業務邏輯 在系統時間到達執行緒本身被允許執行時間點之前,執行緒呼叫sleep(),等待被系統喚醒。看起來是不是就像是併發的執行緒們進入了一個佇列等待,然後按照固定的頻率被放行,不同的是, 特別注意的是**執行緒2**: 1. 就像平行時空No.2號那個幸運的又有點倒黴的小T一樣,執行緒2雖然參與了排隊,但是被它允許執行性的時間點(第32毫秒),距離當前的時間差又超過了20ms(假設超時時間是20ms)。執行緒2只能返回一個`BlockException`,相當於說明當前虛擬佇列的長度就是![](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5948d9901a914983adfb6c8c81982fe5~tplv-k3u1fbpfcp-zoom-1.image),一旦當前並行的執行緒超過4,其餘的執行緒將直接溢位。 1. 執行緒2在返回異常之前,還有一個工作要做,將最近更新時間**回撥**一個時間間隔(5ms)。相當於釋放自己所佔用的時間鎖,讓其他執行緒有可能重新佔用第32ms的這個執行時間點。如下圖所示:![](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c791bc4ee7a34cd68b232f85465e5692~tplv-k3u1fbpfcp-zoom-1.image)\ 第①步:【執行緒2】執行Update Lock Time = 32 - 5 = 27 ms\ 第②步:【執行緒8】執行Update Lock Time = 27 + 5 = 32 ms\ 第③步:【執行緒9】執行Update Lock Time = 32 + 5 = 37 ms 1. 其他的併發執行緒可能永遠也佔用不到32ms的這個執行時間點了,它們可能在第37ms的這個時間點相遇:\ ![](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0353a88620bd4477a649aedb95d201e8~tplv-k3u1fbpfcp-zoom-1.image)\ 與上一點不同的是,如果第①步和第②步的執行順序正好相反,也就是執行緒8執行的新增5ms操作先於執行緒2的回撥5ms操作:\ 第①步:【執行緒8】執行Update Lock Time = 32 + 5 = 37 ms\ 第②步:【執行緒2】執行Update Lock Time = 37 - 5 = 32 ms\ 第③步:【執行緒9】執行Update Lock Time = 32 + 5 = 37 ms\ 在這個場景下,執行緒8和執行緒9最後都在第37ms這個時間點併發執行,這看起來是個場景似乎是個問題:在第32毫秒,沒有執行緒開始執行自己的業務程式碼,而在第37毫秒,執行緒8和執行緒9一起執行了自己的併發程式碼。整體來說,這個虛擬佇列的實現,還是能保證“**平均QPS**”的承諾的。 **執行緒7**則是平行時空No.0的那個倒黴的小T,執行緒7在第12ms開始,通過計算 32+5 - 12 = 25ms > 20 ms(此時執行緒2還沒來的及將時間回撥5ms到27,不然的話執行緒7不會超時) 就可以知道當前佇列必然已滿,因此將直接觸發Sentinel對自己的限流,並返回阻塞異常。 總結一下,只需要三個引數:超時時間、通過頻率、最後通過時間。就可以實現一個虛擬佇列: $ 通過間隔 = \frac{1000}{QPS} = \frac{1000}{200} = 5ms$ $ 佇列長度 = \frac{超時時間}{通過間隔} = \frac{QPS超時時間}{1000} = \frac{200 20ms}{1000} = 4$ $最後通過時間_n=\begin{cases} 當前時間 & if 佇列為空 \\ 最後通過時間_{n-1} + 通過間隔 & if 執行緒排隊中 \end{cases}$ ### 實現流程 ![](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/08278485b5f743b8883852f45b3d429f~tplv-k3u1fbpfcp-zoom-1.image) #### 程式碼實現 ```java import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; /** * 虛擬佇列 * 1. 連續建立多個執行緒,執行緒建立之後就執行 * 2. 讓執行緒通過虛擬佇列 * @author zongzi */ public class VirtualQueue { /** * 統計放行的執行緒數 */ static final LongAdder PASS_COUNT = new LongAdder(); /** * 統計阻塞的執行緒數 */ static final LongAdder BLOCK_COUNT = new LongAdder(); /** * 統計總的執行緒數 */ static final LongAdder DONE_COUNT = new LongAdder(); /** * 最大的測試執行緒數 */ private static final int TEST_MAX_THREAD = 50; /** * 等待所有的執行緒執行結束標誌 */ private static final CountDownLatch COUNT_DOWN = new CountDownLatch(1); /** * 允許通過的最大併發執行緒數 */ private static final int QPS = 10; /** * 最大的等待時間 */ private static final Long MAX_QUEUEING_TIME_MS = 1000L; private static final AtomicLong LATEST_PASSED_TIME = new AtomicLong(-1); public static void main(String[] args) throws InterruptedException { int i = 0; // 宣告測試執行緒並啟動 while (i < TEST_MAX_THREAD) { Runnable andStartThread = createAndStartThread(); new Thread(andStartThread, String.format("ThreadName:%d", i)).start(); i++; } // 阻塞等待 COUNT_DOWN.await(); System.out.println("pass:\t" + PASS_COUNT.longValue()); System.out.println("block:\t" + BLOCK_COUNT.longValue()); System.out.println("done:\t" + DONE_COUNT.longValue()); } public static Runnable createAndStartThread() { return () -> { // 進入虛擬佇列, boolean pass = queueWait(); if (pass) { PASS_COUNT.add(1); } else { BLOCK_COUNT.add(1); } DONE_COUNT.add(1); // 讓主執行緒繼續執行 if (DONE_COUNT.longValue() >= TEST_MAX_THREAD) { COUNT_DOWN.countDown(); } }; } /** * 進入虛擬佇列等待 * @return true 執行緒可以放行, false 執行緒被阻塞 */ static boolean queueWait() { long currentTime = System.currentTimeMillis(); // passedInterval 是一個請求通過的時間間隔,根據我們設定的QPS有關,如果QPS等於200,那麼每個請求平均需要的時間是costTime=1000/200=5ms. long passedInterval = Math.round(1.0 * (1000) / QPS ); // 下次開放通過的時間 long expectedPassedTime = passedInterval + LATEST_PASSED_TIME.get(); // 如果期望的時間小於當前時間,說明上一個請求在很早之前就已經被放行了,當前佇列應該是空的 if (expectedPassedTime <= currentTime) { // 這裡可能仍會存在併發,但是隻影響最開始進入佇列的幾個執行緒,是可以接受的 LATEST_PASSED_TIME.set(currentTime); return true; } else { // 計算從當前系統開始,期望的等待時間 long expectWaitTime = passedInterval + LATEST_PASSED_TIME.get() - System.currentTimeMillis(); // 在未開始排隊前,執行緒就知道等待的時間超過了最大等待時間,說明當前虛擬佇列已經滿了,當前執行緒不再進入排隊 if (expectWaitTime > MAX_QUEUEING_TIME_MS) { return false; } // 和其他執行緒一起併發競爭時間鎖的更新,獲得自己的通過時間。 long expectPassedTime = LATEST_PASSED_TIME.addAndGet(passedInterval); try { expectWaitTime = expectPassedTime - System.currentTimeMillis(); // 如果期望的排隊時間超過了超時時間,說明當前執行緒雖然參與了排隊,但是已經在佇列長度之外, if (expectWaitTime > MAX_QUEUEING_TIME_MS) { // 回撥自己佔用的放行時間點,給可能的其他執行緒 LATEST_PASSED_TIME.addAndGet(-passedInterval); return false; } // 執行到這裡,說明執行緒已經正確的派上到了虛擬佇列中,休眠等待 if (expectWaitTime > 0) { Thread.sleep(expectWaitTime); } // 返回true, 代表請求業務被放行,後面可以正常執行業務程式碼了 return true; } catch (InterruptedException e) { } } return false; } } ``` #### 程式碼說明 在程式碼實現中,已經添加了足夠的程式碼註釋,相信理解這個問題,已經不是難事了,現在我們來看看虛擬佇列的兩個主要引數:QPS、MAX_QUEUEING_TIME_MS 兩個引數對虛擬佇列的整流效果的影響吧。 在程式碼中,我們首先建立了500個併發執行緒,然後建立了一個QPS為10,超時時間為1000ms的,由此我們可以得到:![](http://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/628add1e53cf47228c4754c2e872daf9~tplv-k3u1fbpfcp-zoom-1.image),因而50個併發執行緒中應該只有10個執行緒通過。其餘的執行緒都因為佇列已滿而被阻塞(`queueWait()`方法返回`false`)。 如果你執行這個示例程式碼的話,應該會得到類似的結果。 ```shell pass: 11 block: 39 done: 50 ``` “通過的執行緒”是11個情況和`圖1-1`的類似,在佇列能正常工作之前,需要一個額外的執行緒來啟動時間鎖(作用和圖1-1中的執行緒1類似)。