圖解 Kafka 原始碼實現機制之客戶端快取架構
大家好,我是 華仔, 又跟大家見面了。
今天主要聊聊 「Kafka 客戶端訊息快取架構設計」,深度剖析下訊息是如何進行快取的。
認真讀完這篇文章,我相信你會對 Kafka 客戶端快取架構的原始碼有更加深刻的理解。
一、總體概述
通過場景驅動的方式,當被髮送訊息通過網路請求封裝、NIO多路複用器監聽網路讀寫事件並進行訊息網路收發後,回頭來看看訊息是如何在客戶端快取的?
大家都知道 Kafka 是一款超高吞吐量的訊息系統,主要體現在「非同步傳送」、「批量傳送」、「訊息壓縮」。
跟本篇相關的是「批量傳送」即生產者會將訊息快取起來,等滿足一定條件後,Sender 子執行緒再把訊息批量傳送給 Kafka Broker。
這樣好處就是「儘量減少網路請求次數,提升網路吞吐量」。
為了方便大家理解,所有的原始碼只保留骨幹。
二、訊息如何在客戶端快取的
既然是批量傳送,那麼訊息肯定要進行快取的,那訊息被快取在哪裡呢?又是如何管理的?
通過下面簡化流程圖可以看出,待發送訊息主要被快取在 RecordAccumulator 裡。
我以一個 真實生活場景 類比解說一下會更好理解。
既然說 RecordAccumulator 像一個累積訊息的倉庫,就拿快遞倉庫類比。
上圖是一個快遞倉庫,堆滿了貨物。可以看到分揀員把不同目的地的包裹放入對應目的地的貨箱,每裝滿一箱就放置在對應的區域。
那麼分揀員就是指 RecordAccumulator,而貨箱以及各自所屬的堆放區域,就是 RecordAccumulator 中快取訊息的地方。所有封箱的都會等待 sender 來取貨傳送出去。
如果你看懂了上圖,就大概理解了 RecordAccumulator 的架構設計和執行邏輯。
總結下倉庫裡有什麼:
- 分揀員
- 貨物
- 目的地
- 貨箱
- 堆放區域
記住這些概念,都會體現在原始碼裡,流程如下圖所示:
從上面圖中可以看出:
- 至少有一個業務主執行緒和一個 sender 執行緒同時操作 RecordAccumulator,所以它必須是 執行緒安全 的。
- 在它裡面有一個 ConcurrentMap 集合「 Kafka 自定義的 CopyOnWriteMap 」。key:TopicPartiton, value:Deque<ProducerBatch>,即以主題分割槽為單元,把訊息以 ProducerBatch 為單位累積快取,多個 ProducerBatch 儲存在 Deque 佇列中。當 Deque 中最新的 batch 不能容納訊息時,就會建立新的 batch 來繼續快取,並將其加入 Deque。
- 通過 ProducerBatch 進行快取資料,為了減少頻繁申請銷燬記憶體造成 Full GC 問題,Kafka 設計了經典的「 快取池 BufferPool 機制 」。
綜上可以得出 RecordAccumulator 類中有三個重要的元件:「訊息批次 ProducerBatch」、「自定義 CopyOnWriteMap」、「快取池 BufferPool 機制」。
由於篇幅原因,RecordAccumulator 類放到下篇來講解。
先來看看 ProducerBatch,它是訊息快取及傳送訊息的最小單位。
通過呼叫關係可以看出,ProducerBatch 依賴 MemoryRecordsBuilder,而 MemoryRecordsBuilder 依賴 MemoryRecords 構建,所以 「MemoryRecords 才是真正用來儲存訊息的地方」。
1、MemoryRecords
import java.nio.ByteBuffer; public class MemoryRecords extends AbstractRecords { public static MemoryRecordsBuilder builder(..){ return builder(...); } public static MemoryRecordsBuilder builder( ByteBuffer buffer, // 訊息版本 byte magic, // 訊息壓縮型別 CompressionType compressionType, // 時間戳 TimestampType timestampType, // 基本位移 long baseOffset, // 日誌追加時間 long logAppendTime, // 生產者id long producerId, // 生產者版本 short producerEpoch, // 批次序列號 int baseSequence, boolean isTransactional, // 是否是控制類的批次 boolean isControlBatch, // 分割槽leader的版本 int partitionLeaderEpoch) { return new MemoryRecordsBuilder(...); } }
該類比較簡單,通過 builder 方法可以看出依賴 ByteBuffer 來儲存訊息。MemoryRecordsBuilder 類的構建是通過 MemoryRecords.builder() 來初始化的。
來看看 MemoryRecordsBuilder 類的實現。
2、MemoryRecordBuilder
public class MemoryRecordsBuilder implements AutoCloseable { private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() { public void write(int b) { throw new ...; } }); private final TimestampType timestampType; private final CompressionType compressionType; private final ByteBufferOutputStream bufferStream; private final byte magic; private final int initialPosition; private final long baseOffset; private final long logAppendTime; private final boolean isControlBatch; private final int partitionLeaderEpoch; private final int writeLimit; private final int batchHeaderSizeInBytes; private float estimatedCompressionRatio = 1.0F; private DataOutputStream appendStream; private boolean isTransactional; private long producerId; private short producerEpoch; private int baseSequence; private int uncompressedRecordsSizeInBytes = 0; private int numRecords = 0; private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; private long offsetOfMaxTimestamp = -1; private Long lastOffset = null; private Long firstTimestamp = null; private MemoryRecords builtRecords;
從該類屬性欄位來看比較多,這裡只講2個關於位元組流的欄位。
- CLOSED_STREAM :當關閉某個 ByteBuffer 也會把它對應的寫操作輸出流設定為 CLOSED_STREAM, 目的就是防止再向該 ByteBuffer 寫資料 ,否則就拋異常。
- bufferStream :首先 MemoryRecordsBuilder 依賴 ByteBuffer 來完成訊息儲存。它會將 ByteBuffer 封裝成 ByteBufferOutputStream 並實現了 Java NIO 的 OutputStream,這樣就可以按照流的方式寫資料了。同時 ByteBufferOutputStream 提供了 自動擴容 ByteBuffer 能力 。
來看看它的初始化構造方法。
public MemoryRecordsBuilder(ByteBuffer buffer,...) { this(new ByteBufferOutputStream(buffer), ...); } public MemoryRecordsBuilder( ByteBufferOutputStream bufferStream, ... int writeLimit) { .... this.initialPosition = bufferStream.position(); this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType); bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } }
從建構函式可以看出,除了基本欄位的賦值之外,會做以下3件事情:
- 根據訊息版本、壓縮型別來 計算批次 Batch 頭的大小長度 。
- 通過 調整 bufferStream 的 position ,使其跳過 Batch 頭部位置,就可以直接寫入訊息了。
- 對 bufferStream 增加壓縮功能 。
看到這裡,挺有意思的,不知讀者是否意識到這裡涉及到 「ByteBuffer」、「bufferStream」 、「appendStream」。
三者的關係是通過「裝飾器模式」實現的,即 bufferStream 對 ByteBuffer 裝飾實現擴容功能,而 appendStream 又對 bufferStream 裝飾實現壓縮功能。
來看看它的核心方法。
(1)appendWithOffset()
public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers); } private long nextSequentialOffset() { return lastOffset == null ? baseOffset : lastOffset + 1; } private Long appendWithOffset( long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { try { if (isControlRecord != isControlBatch) throw new ...; if (lastOffset != null && offset <= lastOffset) throw new ...; if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) throw new ...; if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new ...; if (firstTimestamp == null) firstTimestamp = timestamp; if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); return null; } else { return appendLegacyRecord(offset, timestamp, key, value, magic); } } catch (IOException e) { } }
該方法主要用來根據偏移量追加寫訊息,會根據訊息版本來寫對應訊息,但需要明確的是 ProducerBatch 對標 V2 版本。
來看看 V2 版本訊息寫入邏輯。
private void appendDefaultRecord( long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - firstTimestamp; int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); recordWritten(offset, timestamp, sizeInBytes); } private void ensureOpenForRecordAppend() { if (appendStream == CLOSED_STREAM) throw new ...; } private void recordWritten(long offset, long timestamp, int size) { .... numRecords += 1; uncompressedRecordsSizeInBytes += size; lastOffset = offset; if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) { maxTimestamp = timestamp; offsetOfMaxTimestamp = offset; } }
該方法主要用來寫入 V2 版本訊息的,主要做以下5件事情:
- 檢查是否可寫 :判斷 appendStream 狀態是否為 CLOSED_STREAM,如果不是就可寫,否則拋異常。
- 計算本次要寫入多少偏移量。
- 計算本次寫入和第一次寫的時間差。
- 按照 V2 版本格式 寫入 appendStream 流 中,並返回壓縮前的訊息大小。
- 成功後 更新 RecordBatch 的元資訊 。
(2)hasRoomFor()
public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { if (isFull()) return false; if (numRecords == 0) return true; final int recordSize; if (magic < RecordBatch.MAGIC_VALUE_V2) { recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value); } else { int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1); ... recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers); } return this.writeLimit >= estimatedBytesWritten() + recordSize; } public boolean isFull() { return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); }
該方法主要用來估計當前 MemoryRecordsBuilder 是否還有空間來容納要寫入的 Record,會在下面 ProducerBatch.tryAppend() 裡面呼叫。
最後來看看小節開始提到的自動擴容功能。
(3)expandBuffer()
public class ByteBufferOutputStream extends OutputStream { private static final float REALLOCATION_FACTOR = 1.1f; private final int initialCapacity; private final int initialPosition; public void ensureRemaining(int remainingBytesRequired) { if (remainingBytesRequired > buffer.remaining()) expandBuffer(remainingBytesRequired); } private void expandBuffer(int remainingRequired) { int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired); ByteBuffer temp = ByteBuffer.allocate(expandSize); int limit = limit(); buffer.flip(); temp.put(buffer); buffer.limit(limit); buffer.position(initialPosition); buffer = temp; } }
該方法主要用來判斷是否需要擴容 ByteBuffer 的,即當寫入位元組數大於 buffer 當前剩餘位元組數就開啟擴容,擴容需要做以下3件事情:
- 評估需要多少空間 : 在「 擴容空間 」、「 真正需要多少位元組 」之間取最大值,此處通過「 擴容因子 」來計算主要是因為 擴容是需要消耗系統資源的 ,如果每次都按實際資料大小來進行分配空間,會浪費不必要的系統資源。
- 申請新的空間 :根據擴容多少申請新的 ByteBuffer,然後將原來的 ByteBuffer 資料拷貝進去,對應原始碼步驟:「 3 - 7 」。
- 最後將引用指向新申請的 ByteBuffer。
接下來看看 ProducerBatch 的實現。
3、ProducerBatch
public final class ProducerBatch { private enum FinalState { ABORTED, FAILED, SUCCEEDED } final long createdMs; final TopicPartition topicPartition; final ProduceRequestResult produceFuture; private final List<Thunk> thunks = new ArrayList<>(); private final MemoryRecordsBuilder recordsBuilder; private final AtomicInteger attempts = new AtomicInteger(0); private final boolean isSplitBatch; private final AtomicReference<FinalState> finalState = new AtomicReference<>(null); int recordCount; int maxRecordSize; private long lastAttemptMs; private long lastAppendTime; private long drainedMs; private boolean retry; } public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) { ... this.produceFuture = new ProduceRequestResult(topicPartition); ... }
一個 ProducerBatch 會存放一條或多條訊息,通常把它稱為「批次訊息」。
先來看看幾個重要欄位:
- topicPartition :批次對應的主題分割槽,當前 ProducerBatch 中快取的 Record 都會發送給該 TopicPartition。
- produceFuture :請求結果的 Future,通過 ProduceRequestResult 類實現。
- thunks :Thunk 物件集合,用來儲存訊息的 callback 和每個 Record 關聯的 Feture 響應資料。
- recordsBuilder :封裝 MemoryRecords 物件,用來儲存訊息的 ByteBuffer。
- attemps :batch 的失敗重試次數,通過 AtomicInteger 提供原子操作 來進行 Integer 的使用, 適合高併發情況下的使用 。
- isSplitBatch :是否是被分裂的批次,因單個訊息過大導致一個 ProducerBatch 存不下,被分裂成多個 ProducerBatch 來儲存的情況。
- drainedMs :Sender 子執行緒拉取批次的時間。
- retry :如果 ProducerBatch 中的資料傳送失敗,則會重新嘗試傳送。
在建構函式中,有個重要的依賴元件就是 「ProduceRequestResult」,而它是「非同步獲取訊息生產結果的類」,簡單剖析下。
(1)ProduceRequestResult 類
public class ProduceRequestResult { private final CountDownLatch latch = new CountDownLatch(1); private final TopicPartition topicPartition; private volatile Long baseOffset = null; public ProduceRequestResult(TopicPartition topicPartition) { this.topicPartition = topicPartition; } public void done() { if (baseOffset == null) throw new ...; this.latch.countDown(); } public void await() throws InterruptedException { latch.await(); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return latch.await(timeout, unit); } }
該類通過 CountDownLatch(1) 間接地實現了 Future 功能,並讓其他所有執行緒都在這個鎖上等待,此時只需要呼叫一次 countDown() 方法就可以讓其他所有等待的執行緒同時恢復執行。
當 Producer 傳送訊息時會間接呼叫「ProduceRequestResult.await」,此時執行緒就會等待服務端的響應。當服務端響應時呼叫「ProduceRequestResult.done」,該方法呼叫了「CountDownLatch.countDown」喚醒了阻塞在「CountDownLatch.await」上的主執行緒。這些執行緒後續可以通過 ProduceRequestResult 的 error 欄位來判斷本次請求成功還是失敗。
接下來看看 ProducerBatch 類的重要方法。
(2) tryAppend()
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { return null; } else { Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),recordsBuilder.compressionType(), key, value, headers)); ... FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length, Time.SYSTEM); thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
該方法主要用來嘗試追加寫訊息的,主要做以下6件事情:
- 通過 MemoryRecordsBuilder 的 hasRoomFor() 檢查當前 ProducerBatch 是否還有足夠的空間 來儲存此次寫入的 Record。
- 呼叫 MemoryRecordsBuilder.append() 方法 將 Record 追加到 ByteBuffer 中 。
- 建立 FutureRecordMetadata 物件 ,底層繼承了 Future 介面,對應此次 Record 的傳送。
- 將 Future 和訊息的 callback 回撥封裝成 Thunk 物件, 放入 thunks 集合中 。
- 更新 Record 記錄數。
- 返回 FutureRecordMetadata。
可以看出該方法只是讓 Producer 主執行緒完成了訊息的快取,並沒有實現真正的網路傳送。
接下來簡單看看 FutureRecordMetadata,它實現了 JDK 中 concurrent 的 Future 介面。除了維護 ProduceRequestResult 物件外還維護了 relativeOffset 等欄位,其中 relativeOffset 用來記錄對應 Record 在 ProducerBatch 中的偏移量。
該類有2個值得注意的方法,get() 和 value()。
public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { ... boolean occurred = this.result.await(timeout, unit); ... return valueOrError(); } RecordMetadata valueOrError() throws ExecutionException { ... return value(); }
該方法主要依賴 ProduceRequestResult 的 CountDown 來實現阻塞等待,最後呼叫 value() 返回 RecordMetadata 物件。
RecordMetadata value() { ... return new RecordMetadata( result.topicPartition(), ...); } private long timestamp() { return result.hasLogAppendTime() ? result.logAppendTime() : createTimestamp; }
該方法主要通過各種引數封裝成 RecordMetadata 物件返回。
瞭解了 ProducerBatch 是如何寫入資料的,我們再來看看 done() 方法。當 Producer 收到 Broker 端「正常」|「超時」|「異常」|「關閉生產者」等響應都會呼叫 ProducerBatch 的 done()方法。
(3)done()
public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) { final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED; .... if (this.finalState.compareAndSet(null, tryFinalState)) { completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); return true; } .... return false; }
該方法主要用來 是否可以執行回撥操作 ,即當收到該批次響應後,判斷批次 Batch 最終狀態是否可以執行回撥操作。
(4)completeFutureAndFireCallbacks()
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { produceFuture.set(baseOffset, logAppendTime, exception); for (Thunk thunk : thunks) { try { if (exception == null) { RecordMetadata metadata = thunk.future.value(); if (thunk.callback != null) thunk.callback.onCompletion(metadata, null); } else { if (thunk.callback != null) thunk.callback.onCompletion(null, exception); } } .... } produceFuture.done(); }
該方法主要用來呼叫回撥方法和完成 future,主要做以下3件事情:
- 更新 ProduceRequestResult 中的相關欄位,包括基本位移、訊息追加的時間、異常。
- 遍歷 thunks 集合,觸發每個 Record 的 Callback 回撥。
- 呼叫底層 CountDownLatch.countDown()方法,阻塞在其上的主執行緒。
至此我們已經講解了 ProducerBatch 「如何快取訊息」、「如何處理響應」、「如何處理回撥」三個最重要方法。
通過一張圖來描述下快取訊息的儲存結構:
接下來看看 Kafka 生產端最經典的 「緩衝池架構」。
三、客戶端快取池架構設計
為什麼客戶端需要快取池這個經典架構設計呢?
主要原因就是頻繁的建立和釋放 ProducerBatch 會導致 Full GC 問題,所以 Kafka 針對這個問題實現了一個非常優秀的機制,就是「快取池 BufferPool 機制」。即每個 Batch 底層都對應一塊記憶體空間,這個記憶體空間就是專門用來存放訊息,用完歸還就行。
接下來看看快取池的原始碼設計。
1、BufferPool
x
1
public class BufferPool {
2
private final long totalMemory;
3
4
private final int poolableSize;
5
6
private final ReentrantLock lock;
7
8
private final Deque<ByteBuffer> free;
9
10
private final Deque<Condition> waiters;
11
12
private long nonPooledAvailableMemory;
13
14
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
15
...
16
17
this.totalMemory = memory;
18
19
this.nonPooledAvailableMemory = memory;
20
}
21
}
x
1
public class BufferPool {
2
private final long totalMemory;
3
4
private final int poolableSize;
5
6
private final ReentrantLock lock;
7
8
private final Deque<ByteBuffer> free;
9
10
private final Deque<Condition> waiters;
11
12
private long nonPooledAvailableMemory;
13
14
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
15
...
16
17
this.totalMemory = memory;
18
19
this.nonPooledAvailableMemory = memory;
20
}
21
}
先來看看上面幾個重要欄位:
- totalMemory :整個 BufferPool 記憶體大小「 buffer.memory 」,預設是32M。
- poolableSize :池化快取池一塊記憶體塊的大小「 batch.size 」,預設是16k。
- lock :當有多執行緒併發分配和回收 ByteBuffer 時,為了保證執行緒的安全,使用鎖來控制併發。
- free :池化的 free 佇列,其中快取了指定大小的 ByteBuffer 物件。
- waiters :阻塞執行緒對應的 Condition 佇列,當有申請不到足夠記憶體的執行緒時,為了等待其他執行緒釋放記憶體而阻塞等待,對應的 Condition 物件會進入該佇列。
- nonPooledAvailableMemory :非池化可用記憶體。
可以看出它只會針對固定大小「poolableSize 16k」的 ByteBuffer 進行管理,ArrayDeque 的初始化大小是16,此時 BufferPool 的狀態如下圖:
接下來看看 BufferPool 的重要方法。
(1)allocate()
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of "+ this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); if (this.closed) { this.lock.unlock(); throw new KafkaException("Producer closed while allocating memory"); } .... try { if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { freeUp(size); this.nonPooledAvailableMemory -= size; } else { int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); while (accumulated < size) { .... try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { .... } .... if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { buffer = this.free.pollFirst(); accumulated = size; } else { freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; accumulated += got; } } accumulated = 0; } finally { this.nonPooledAvailableMemory += accumulated; this.waiters.remove(moreMemory); } } } finally { try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { lock.unlock(); } } if (buffer == null) return safeAllocateByteBuffer(size); else return buffer; } private ByteBuffer safeAllocateByteBuffer(int size) { boolean error = true; try { ByteBuffer buffer = allocateByteBuffer(size); error = false; return buffer; } finally { if (error) { this.lock.lock(); try { this.nonPooledAvailableMemory += size; if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { this.lock.unlock(); } } } } protected ByteBuffer allocateByteBuffer(int size) { return ByteBuffer.allocate(size); } private void freeUp(int size) { while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size) this.nonPooledAvailableMemory += this.free.pollLast().capacity(); }
該方法主要用來嘗試分配 ByteBuffer,這裡分4種情況說明下:
情況1:申請16k且free快取池有可用記憶體
此時會直接從 free 快取池中獲取隊首的 ByteBuffer 分配使用,用完後直接將 ByteBuffer 放到 free 快取池的隊尾中,並呼叫 clear() 清空資料,以便下次重複使用。
情況2:申請16k且free快取池無可用記憶體
此時 free 快取池無可用記憶體,只能從非池化可用記憶體中獲取16k記憶體來分配,用完後直接將 ByteBuffer 放到 free 快取池的隊尾中,並呼叫 clear() 清空資料,以便下次重複使用。
情況3:申請非16k且free快取池無可用記憶體
此時 free 快取池無可用記憶體,且申請的是非16k,只能從非池化可用記憶體(空間夠分配)中獲取一部分記憶體來分配,用完後直接將申請到的記憶體空間釋放到非池化可用記憶體中,後續會被 GC 掉。
情況4:申請非16k且free快取池有可用記憶體,但非池化可用記憶體不夠
此時 free 快取池有可用記憶體,但申請的是非16k,先嚐試從 free 快取池中將 ByteBuffer 釋放到非池化可用記憶體中,直到滿足申請記憶體大小(size),然後從非池化可用記憶體獲取對應記憶體大小來分配,用完後直接將申請到的記憶體空間釋放到到非池化可用記憶體中,後續會被 GC 掉。
(2)deallocate()
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
該方法主要用來嘗試釋放 ByteBuffer 空間,主要做以下幾件事情:
- 先加鎖,保證執行緒安全。
- 如果待釋放的 size 大小為16k,則直接放入 free 佇列中。
- 否則由 JVM GC 來回收 ByteBuffer 並增加 nonPooledAvailableMemory。
- 當有 ByteBuffer 回收了,喚醒 waiters 中的第一個阻塞執行緒。
最後來看看 kafka 自定義的支援「 讀寫分離場景 」CopyOnWriteMap 的實現。
2、CopyOnWriteMap
通過 RecordAccumulator 類的屬性欄位中可以看到,CopyOnWriteMap 中 key 為主題分割槽,value 為向這個分割槽傳送的 Deque<ProducerBatch> 佇列集合。
我們知道生產訊息時,要傳送的分割槽是很少變動的,所以寫操作會很少。大部分情況都是先獲取分割槽對應的佇列,然後將 ProducerBatch 放入隊尾,所以讀操作是很頻繁的,這就是個典型的「讀多寫少」的場景。
所謂 「CopyOnWrite」 就是當寫的時候會拷貝一份來進行寫操作,寫完了再替換原來的集合。
來看看它的原始碼實現。
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> { private volatile Map<K, V> map; public CopyOnWriteMap() { this.map = Collections.emptyMap(); }
該類只有一個重要的欄位 Map,是通過「volatile」來修飾的,目的就是在多執行緒的場景下,當 Map 發生變化的時候其他的執行緒都是可見的。
接下來看幾個重要方法,都比較簡單,但是實現非常經典。
(1)get()
public V get(Object k) { return map.get(k); }
該方法主要用來讀取集合中的佇列,可以看到讀操作並沒有加鎖,多執行緒併發讀取的場景並不會阻塞,可以實現高併發讀取。如果佇列已經存在了就直接返回即可。
(2)putIfAbsent()
public synchronized V putIfAbsent(K k, V v) { if (!containsKey(k)) return put(k, v); else return get(k); } public boolean containsKey(Object k) { return map.containsKey(k); }
該方法主要用來獲取或者設定佇列,會被多個執行緒併發執行,通過「synchronized」來修飾可以保證執行緒安全的,除非佇列不存在才會去設定。
(3)put()
public synchronized V put(K k, V v) { Map<K, V> copy = new HashMap<K, V>(this.map); V prev = copy.put(k, v); this.map = Collections.unmodifiableMap(copy); return prev; }
該方法主要用來設定佇列的, put 時也是通過「synchronized」來修飾的,可以保證同一時間只有一個執行緒會來更新這個值。
那為什麼說寫操作不會阻塞讀操作呢?
- 首先重新建立一個 HashMap 集合副本。
- 通過「 volatile 」寫的方式賦值給對應集合裡。
- 把新的集合設定成「 不可修改的 map 」,並賦值給欄位 map。
這就實現了讀寫分離。對於 Producer 最最核心,會出現多執行緒併發訪問的就是快取池。因此這塊的高併發設計相當重要。
四、總結
這裡,我們一起來總結一下這篇文章的重點。
1、帶你先整體的梳理了 Kafka 客戶端訊息批量傳送的好處。
2、通過一個真實生活場景類比來帶你理解 RecordAccumulator 內部構造,並且深度剖析了訊息是如何在客戶端快取的,以及內部各元件實現原理。
3、帶你深度剖析了 Kafka 客戶端非常重要的 BufferPool 、CopyOnWriteMap 的實現原理。
- 面試回答 CopyOnWrite 的三重境界,1%的人能答到最後
- 2022年全球混合雲發展趨勢報告
- Kafka為何要設計緩衝池機制?初看一臉懵逼,看懂直接跪下
- 工業領域的四個邊緣計算用例
- 圖解 Kafka 原始碼實現機制之客戶端快取架構
- TapTap 利用亞馬遜雲科技打造開發者服務並啟動出海計劃 全面賦能遊戲開發者
- 為什麼邊緣計算和人工智慧策略必須互補
- 用於快速數字化轉型的七大可觀察性工具
- 雲無關硬體如何成為物聯網的未來
- 三種雲原生儲存方案優缺點及應用場景分析
- 為什麼構建一個外部資料產品這麼難?
- 選擇SaaS提供商時需要問的關鍵問題
- 談談你對IaaS、PaaS、SaaS的理解
- 騰訊雲資料庫自研核心全新升級 新架構比原先效能提升20%
- 如何正確進行雲遷移
- 分散式雲的擴充套件自動化需要考慮什麼
- SUSE成立 RFO SIG,建設面向 openEuler 的容器基礎設施平臺
- 2022年網路安全威脅態勢研究:攻擊面增長將成常態,七成企業尚未做好應對準備!
- 答網友問:Await 一個 Promise 物件到底發生了什麼
- 邊緣計算和雲端計算之間的區別