圖解 Kafka 原始碼實現機制之客戶端快取架構

語言: CN / TW / HK

大家好,我是 華仔, 又跟大家見面了。

今天主要聊聊 「Kafka 客戶端訊息快取架構設計」,深度剖析下訊息是如何進行快取的。

認真讀完這篇文章,我相信你會對 Kafka 客戶端快取架構的原始碼有更加深刻的理解。

一、總體概述

通過場景驅動的方式,當被髮送訊息通過網路請求封裝、NIO多路複用器監聽網路讀寫事件並進行訊息網路收發後,回頭來看看訊息是如何在客戶端快取的?

大家都知道 Kafka 是一款超高吞吐量的訊息系統,主要體現在「非同步傳送」、「批量傳送」、「訊息壓縮」。

跟本篇相關的是「批量傳送」即生產者會將訊息快取起來,等滿足一定條件後,Sender 子執行緒再把訊息批量傳送給 Kafka Broker。

這樣好處就是「儘量減少網路請求次數,提升網路吞吐量」。

為了方便大家理解,所有的原始碼只保留骨幹。

二、訊息如何在客戶端快取的

既然是批量傳送,那麼訊息肯定要進行快取的,那訊息被快取在哪裡呢?又是如何管理的?

通過下面簡化流程圖可以看出,待發送訊息主要被快取在 RecordAccumulator 裡。

我以一個 真實生活場景 類比解說一下會更好理解。

既然說 RecordAccumulator 像一個累積訊息的倉庫,就拿快遞倉庫類比。

上圖是一個快遞倉庫,堆滿了貨物。可以看到分揀員把不同目的地的包裹放入對應目的地的貨箱,每裝滿一箱就放置在對應的區域。

那麼分揀員就是指 RecordAccumulator,而貨箱以及各自所屬的堆放區域,就是 RecordAccumulator 中快取訊息的地方。所有封箱的都會等待 sender 來取貨傳送出去。

如果你看懂了上圖,就大概理解了 RecordAccumulator 的架構設計和執行邏輯。

總結下倉庫裡有什麼:

  1. 分揀員
  2. 貨物
  3. 目的地
  4. 貨箱
  5. 堆放區域

記住這些概念,都會體現在原始碼裡,流程如下圖所示:

從上面圖中可以看出:

  1. 至少有一個業務主執行緒和一個 sender 執行緒同時操作 RecordAccumulator,所以它必須是 執行緒安全 的。
  2. 在它裡面有一個 ConcurrentMap 集合「 Kafka 自定義的 CopyOnWriteMap 」。key:TopicPartiton, value:Deque<ProducerBatch>,即以主題分割槽為單元,把訊息以 ProducerBatch 為單位累積快取,多個 ProducerBatch 儲存在 Deque 佇列中。當 Deque 中最新的 batch 不能容納訊息時,就會建立新的 batch 來繼續快取,並將其加入 Deque。
  3. 通過 ProducerBatch 進行快取資料,為了減少頻繁申請銷燬記憶體造成 Full GC 問題,Kafka 設計了經典的「 快取池 BufferPool 機制 」。

綜上可以得出 RecordAccumulator 類中有三個重要的元件:「訊息批次 ProducerBatch」、「自定義 CopyOnWriteMap」、「快取池 BufferPool 機制」。

由於篇幅原因,RecordAccumulator 類放到下篇來講解。

先來看看 ProducerBatch,它是訊息快取及傳送訊息的最小單位。

通過呼叫關係可以看出,ProducerBatch 依賴 MemoryRecordsBuilder,而 MemoryRecordsBuilder 依賴 MemoryRecords 構建,所以 「MemoryRecords 才是真正用來儲存訊息的地方」。

1、MemoryRecords

import java.nio.ByteBuffer;
public class MemoryRecords extends AbstractRecords {
  public static MemoryRecordsBuilder builder(..){
        
        return builder(...);
  }
    
  public static MemoryRecordsBuilder builder(
    ByteBuffer buffer,
    // 訊息版本
    byte magic,
    // 訊息壓縮型別
    CompressionType compressionType,
    // 時間戳
    TimestampType timestampType,
    // 基本位移
    long baseOffset,
    // 日誌追加時間
    long logAppendTime,
    // 生產者id
    long producerId,
    // 生產者版本
    short producerEpoch,
    // 批次序列號
    int baseSequence,
    boolean isTransactional,
    // 是否是控制類的批次
    boolean isControlBatch,
    // 分割槽leader的版本
    int partitionLeaderEpoch) {
        
        return new MemoryRecordsBuilder(...);
  }
}

該類比較簡單,通過 builder 方法可以看出依賴 ByteBuffer 來儲存訊息。MemoryRecordsBuilder 類的構建是通過 MemoryRecords.builder() 來初始化的。

來看看 MemoryRecordsBuilder 類的實現。

2、MemoryRecordBuilder

public class MemoryRecordsBuilder implements AutoCloseable {   
    private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() {
        
        public void write(int b) {
            throw new ...;
        }
    });    
    private final TimestampType timestampType;
    private final CompressionType compressionType;   
    private final ByteBufferOutputStream bufferStream;   
    private final byte magic;   
    private final int initialPosition;   
    private final long baseOffset;   
    private final long logAppendTime;   
    private final boolean isControlBatch;  
    private final int partitionLeaderEpoch;  
    private final int writeLimit;
    private final int batchHeaderSizeInBytes;
    private float estimatedCompressionRatio = 1.0F;
    private DataOutputStream appendStream;
    private boolean isTransactional;
    private long producerId;
    private short producerEpoch;
    private int baseSequence;
    private int uncompressedRecordsSizeInBytes = 0; 
    private int numRecords = 0;
    private float actualCompressionRatio = 1;
    private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
    private long offsetOfMaxTimestamp = -1;
    private Long lastOffset = null;
    private Long firstTimestamp = null;
    private MemoryRecords builtRecords;

從該類屬性欄位來看比較多,這裡只講2個關於位元組流的欄位。

  1. CLOSED_STREAM :當關閉某個 ByteBuffer 也會把它對應的寫操作輸出流設定為 CLOSED_STREAM, 目的就是防止再向該 ByteBuffer 寫資料 ,否則就拋異常。
  2. bufferStream :首先 MemoryRecordsBuilder 依賴 ByteBuffer 來完成訊息儲存。它會將 ByteBuffer 封裝成 ByteBufferOutputStream 並實現了 Java NIO 的 OutputStream,這樣就可以按照流的方式寫資料了。同時 ByteBufferOutputStream 提供了 自動擴容 ByteBuffer 能力

來看看它的初始化構造方法。

public MemoryRecordsBuilder(ByteBuffer buffer,...) {  
    this(new ByteBufferOutputStream(buffer), ...);
}
public MemoryRecordsBuilder(
    ByteBufferOutputStream bufferStream,
    ...
    int writeLimit) {
        ....    
        this.initialPosition = bufferStream.position();      
        this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);    
        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        this.bufferStream = bufferStream;     
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }
}

從建構函式可以看出,除了基本欄位的賦值之外,會做以下3件事情:

  1. 根據訊息版本、壓縮型別來 計算批次 Batch 頭的大小長度
  2. 通過 調整 bufferStream 的 position ,使其跳過 Batch 頭部位置,就可以直接寫入訊息了。
  3. 對 bufferStream 增加壓縮功能

看到這裡,挺有意思的,不知讀者是否意識到這裡涉及到 「ByteBuffer」、「bufferStream」 、「appendStream」。

三者的關係是通過「裝飾器模式」實現的,即 bufferStream 對 ByteBuffer 裝飾實現擴容功能,而 appendStream 又對 bufferStream 裝飾實現壓縮功能。

來看看它的核心方法。

(1)appendWithOffset()

public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {
   return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers);
}
private long nextSequentialOffset() { 
  return lastOffset == null ? baseOffset : lastOffset + 1;
}
private Long appendWithOffset(
  long offset,
  boolean isControlRecord, 
  long timestamp, 
  ByteBuffer key,
  ByteBuffer value, 
  Header[] headers) {
    try {    
        if (isControlRecord != isControlBatch)
            throw new ...;     
        if (lastOffset != null && offset <= lastOffset)
            throw new ...;   
        if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
           throw new ...;    
        if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
           throw new ...;    
        if (firstTimestamp == null)
           firstTimestamp = timestamp;   
        if (magic > RecordBatch.MAGIC_VALUE_V1)         {
            appendDefaultRecord(offset, timestamp, key, value, headers);
            return null;
        } else {       
            return appendLegacyRecord(offset, timestamp, key, value, magic);
        }
    } catch (IOException e) {
        
    }
}

該方法主要用來根據偏移量追加寫訊息,會根據訊息版本來寫對應訊息,但需要明確的是 ProducerBatch 對標 V2 版本。

來看看 V2 版本訊息寫入邏輯。

private void appendDefaultRecord(
  long offset, 
  long timestamp, 
  ByteBuffer key, 
  ByteBuffer value,
  Header[] headers) throws IOException {   
    ensureOpenForRecordAppend(); 
    int offsetDelta = (int) (offset - baseOffset);
    long timestampDelta = timestamp - firstTimestamp;
    int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
    recordWritten(offset, timestamp, sizeInBytes);
}
private void ensureOpenForRecordAppend() {
    if (appendStream == CLOSED_STREAM)
        throw new ...;
}
private void recordWritten(long offset, long timestamp, int size) {
  .... 
  numRecords += 1; 
  uncompressedRecordsSizeInBytes += size; 
  lastOffset = offset;
  if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {    
      maxTimestamp = timestamp;   
      offsetOfMaxTimestamp = offset;
  }
}

該方法主要用來寫入 V2 版本訊息的,主要做以下5件事情:

  1. 檢查是否可寫 :判斷 appendStream 狀態是否為 CLOSED_STREAM,如果不是就可寫,否則拋異常。
  2. 計算本次要寫入多少偏移量。
  3. 計算本次寫入和第一次寫的時間差。
  4. 按照 V2 版本格式 寫入 appendStream 流 中,並返回壓縮前的訊息大小。
  5. 成功後 更新 RecordBatch 的元資訊

(2)hasRoomFor()

public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) {   
    if (isFull())
        return false;  
    if (numRecords == 0)
        return true;
    final int recordSize;
    if (magic < RecordBatch.MAGIC_VALUE_V2) {     
        recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
    } else {   
        int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1);
        ...
        recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers);
    } 
    return this.writeLimit >= estimatedBytesWritten() + recordSize;
}
public boolean isFull() {
      return appendStream == CLOSED_STREAM || 
      (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
}

該方法主要用來估計當前 MemoryRecordsBuilder 是否還有空間來容納要寫入的 Record,會在下面 ProducerBatch.tryAppend() 裡面呼叫。

最後來看看小節開始提到的自動擴容功能。

(3)expandBuffer()

public class ByteBufferOutputStream extends OutputStream {
   private static final float REALLOCATION_FACTOR = 1.1f;
   private final int initialCapacity;
   private final int initialPosition;
   public void ensureRemaining(int remainingBytesRequired) {
     if (remainingBytesRequired > buffer.remaining())
     expandBuffer(remainingBytesRequired);
  }
  private void expandBuffer(int remainingRequired) {
    int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired);
    ByteBuffer temp = ByteBuffer.allocate(expandSize);
    int limit = limit(); 
    buffer.flip();
    temp.put(buffer);
    buffer.limit(limit);
    buffer.position(initialPosition);
    buffer = temp;
  }
}

該方法主要用來判斷是否需要擴容 ByteBuffer 的,即當寫入位元組數大於 buffer 當前剩餘位元組數就開啟擴容,擴容需要做以下3件事情:

  1. 評估需要多少空間 : 在「 擴容空間 」、「 真正需要多少位元組 」之間取最大值,此處通過「 擴容因子 」來計算主要是因為 擴容是需要消耗系統資源的 ,如果每次都按實際資料大小來進行分配空間,會浪費不必要的系統資源。
  2. 申請新的空間 :根據擴容多少申請新的 ByteBuffer,然後將原來的 ByteBuffer 資料拷貝進去,對應原始碼步驟:「 3 - 7 」。
  3. 最後將引用指向新申請的 ByteBuffer。

接下來看看 ProducerBatch 的實現。

3、ProducerBatch

public final class ProducerBatch {  
    private enum FinalState { ABORTED, FAILED, SUCCEEDED } 
    final long createdMs;
    final TopicPartition topicPartition;
    final ProduceRequestResult produceFuture;
    private final List<Thunk> thunks = new ArrayList<>();
    private final MemoryRecordsBuilder recordsBuilder;
    private final AtomicInteger attempts = new AtomicInteger(0);
    private final boolean isSplitBatch;    
    private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);    
    int recordCount;    
    int maxRecordSize;    
    private long lastAttemptMs;   
    private long lastAppendTime;   
    private long drainedMs; 
    private boolean retry;
}
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) {
    ...    
    this.produceFuture = new ProduceRequestResult(topicPartition);
    ...
}

一個 ProducerBatch 會存放一條或多條訊息,通常把它稱為「批次訊息」。

先來看看幾個重要欄位:

  1. topicPartition :批次對應的主題分割槽,當前 ProducerBatch 中快取的 Record 都會發送給該 TopicPartition。
  2. produceFuture :請求結果的 Future,通過 ProduceRequestResult 類實現。
  3. thunks :Thunk 物件集合,用來儲存訊息的 callback 和每個 Record 關聯的 Feture 響應資料。
  4. recordsBuilder :封裝 MemoryRecords 物件,用來儲存訊息的 ByteBuffer。
  5. attemps :batch 的失敗重試次數,通過  AtomicInteger 提供原子操作 來進行 Integer 的使用, 適合高併發情況下的使用
  6. isSplitBatch :是否是被分裂的批次,因單個訊息過大導致一個 ProducerBatch 存不下,被分裂成多個 ProducerBatch 來儲存的情況。
  7. drainedMs :Sender 子執行緒拉取批次的時間。
  8. retry :如果 ProducerBatch 中的資料傳送失敗,則會重新嘗試傳送。

在建構函式中,有個重要的依賴元件就是 「ProduceRequestResult」,而它是「非同步獲取訊息生產結果的類」,簡單剖析下。

(1)ProduceRequestResult 類

public class ProduceRequestResult {   
    private final CountDownLatch latch = new CountDownLatch(1);
    private final TopicPartition topicPartition;   
    private volatile Long baseOffset = null;    
    public ProduceRequestResult(TopicPartition topicPartition) {
        this.topicPartition = topicPartition;
    }  
    public void done() {
        if (baseOffset == null)
            throw new ...;
        this.latch.countDown();
    }  
    public void await() throws InterruptedException {
        latch.await();
    } 
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return latch.await(timeout, unit);
    }
}

該類通過 CountDownLatch(1) 間接地實現了 Future 功能,並讓其他所有執行緒都在這個鎖上等待,此時只需要呼叫一次 countDown() 方法就可以讓其他所有等待的執行緒同時恢復執行。

當 Producer 傳送訊息時會間接呼叫「ProduceRequestResult.await」,此時執行緒就會等待服務端的響應。當服務端響應時呼叫「ProduceRequestResult.done」,該方法呼叫了「CountDownLatch.countDown」喚醒了阻塞在「CountDownLatch.await」上的主執行緒。這些執行緒後續可以通過 ProduceRequestResult 的 error 欄位來判斷本次請求成功還是失敗。

接下來看看 ProducerBatch 類的重要方法。

(2) tryAppend()

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {    
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {       
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);     
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),recordsBuilder.compressionType(), key, value, headers));
        ...   
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length, Time.SYSTEM);  
        thunks.add(new Thunk(callback, future));       
        this.recordCount++;     
        return future;
    }
}

該方法主要用來嘗試追加寫訊息的,主要做以下6件事情:

  1. 通過 MemoryRecordsBuilder 的 hasRoomFor() 檢查當前 ProducerBatch 是否還有足夠的空間 來儲存此次寫入的 Record。
  2. 呼叫 MemoryRecordsBuilder.append() 方法 將 Record 追加到 ByteBuffer 中
  3. 建立 FutureRecordMetadata 物件 ,底層繼承了 Future 介面,對應此次 Record 的傳送。
  4. 將 Future 和訊息的 callback 回撥封裝成 Thunk 物件, 放入 thunks 集合中
  5. 更新 Record 記錄數。
  6. 返回 FutureRecordMetadata。

可以看出該方法只是讓 Producer 主執行緒完成了訊息的快取,並沒有實現真正的網路傳送。

接下來簡單看看 FutureRecordMetadata,它實現了 JDK 中 concurrent 的 Future 介面。除了維護 ProduceRequestResult 物件外還維護了 relativeOffset 等欄位,其中 relativeOffset 用來記錄對應 Record 在 ProducerBatch 中的偏移量。

該類有2個值得注意的方法,get() 和 value()。

public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    ... 
    boolean occurred = this.result.await(timeout, unit);
    ...
    return valueOrError();
}
RecordMetadata valueOrError() throws ExecutionException {
    ...
    return value();
}

該方法主要依賴 ProduceRequestResult 的 CountDown 來實現阻塞等待,最後呼叫 value() 返回 RecordMetadata 物件。

RecordMetadata value() {
    ...  
    return new RecordMetadata(
      result.topicPartition(), 
      ...);
}
private long timestamp() {
    return result.hasLogAppendTime() ? result.logAppendTime() : createTimestamp;
}

該方法主要通過各種引數封裝成 RecordMetadata 物件返回。

瞭解了 ProducerBatch 是如何寫入資料的,我們再來看看 done() 方法。當 Producer 收到 Broker 端「正常」|「超時」|「異常」|「關閉生產者」等響應都會呼叫 ProducerBatch 的 done()方法。

(3)done()

public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {  
    final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
    .... 
    if (this.finalState.compareAndSet(null, tryFinalState)) {   
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
    ....
    return false;
}

該方法主要用來 是否可以執行回撥操作 ,即當收到該批次響應後,判斷批次 Batch 最終狀態是否可以執行回撥操作。

(4)completeFutureAndFireCallbacks()

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { 
  produceFuture.set(baseOffset, logAppendTime, exception);  
  for (Thunk thunk : thunks) {
      try {
          if (exception == null) {           
           RecordMetadata metadata = thunk.future.value();
           if (thunk.callback != null)             
             thunk.callback.onCompletion(metadata, null);
          } else {
              if (thunk.callback != null)                 
                  thunk.callback.onCompletion(null, exception);
          }
      } 
      ....
  }
  produceFuture.done();
}

該方法主要用來呼叫回撥方法和完成 future,主要做以下3件事情:

  1. 更新 ProduceRequestResult 中的相關欄位,包括基本位移、訊息追加的時間、異常。
  2. 遍歷 thunks 集合,觸發每個 Record 的 Callback 回撥。
  3. 呼叫底層 CountDownLatch.countDown()方法,阻塞在其上的主執行緒。

至此我們已經講解了 ProducerBatch 「如何快取訊息」、「如何處理響應」、「如何處理回撥」三個最重要方法。

通過一張圖來描述下快取訊息的儲存結構:

接下來看看 Kafka 生產端最經典的 「緩衝池架構」。

三、客戶端快取池架構設計

為什麼客戶端需要快取池這個經典架構設計呢?

主要原因就是頻繁的建立和釋放 ProducerBatch 會導致 Full GC 問題,所以 Kafka 針對這個問題實現了一個非常優秀的機制,就是「快取池 BufferPool 機制」。即每個 Batch 底層都對應一塊記憶體空間,這個記憶體空間就是專門用來存放訊息,用完歸還就行。

接下來看看快取池的原始碼設計。

1、BufferPool​

x

1

public class BufferPool {

2

  private final long totalMemory;

3

4

  private final int poolableSize;

5

6

  private final ReentrantLock lock;

7

8

  private final Deque<ByteBuffer> free;

9

10

  private final Deque<Condition> waiters;

11

12

  private long nonPooledAvailableMemory;

13

14

  public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {

15

    ...

16

17

    this.totalMemory = memory;

18

19

    this.nonPooledAvailableMemory = memory;

20

  }

21

}

先來看看上面幾個重要欄位:

  1. totalMemory :整個 BufferPool 記憶體大小「 buffer.memory 」,預設是32M。
  2. poolableSize :池化快取池一塊記憶體塊的大小「 batch.size 」,預設是16k。
  3. lock :當有多執行緒併發分配和回收 ByteBuffer 時,為了保證執行緒的安全,使用鎖來控制併發。
  4. free :池化的 free 佇列,其中快取了指定大小的 ByteBuffer 物件。
  5. waiters :阻塞執行緒對應的 Condition 佇列,當有申請不到足夠記憶體的執行緒時,為了等待其他執行緒釋放記憶體而阻塞等待,對應的 Condition 物件會進入該佇列。
  6. nonPooledAvailableMemory :非池化可用記憶體。

可以看出它只會針對固定大小「poolableSize 16k」的 ByteBuffer 進行管理,ArrayDeque 的初始化大小是16,此時 BufferPool 的狀態如下圖:

接下來看看 BufferPool 的重要方法。

(1)allocate()

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {  
  if (size > this.totalMemory)
      throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of "+ this.totalMemory + " on memory allocations.");
  ByteBuffer buffer = null;
  this.lock.lock(); 
  if (this.closed) {
      this.lock.unlock();
      throw new KafkaException("Producer closed while allocating memory");
  }
  ....
  try {    
      if (size == poolableSize && !this.free.isEmpty())    
      return this.free.pollFirst();       
      int freeListSize = freeSize() * this.poolableSize;
      
      if (this.nonPooledAvailableMemory + freeListSize >= size) {     
          freeUp(size);     
          this.nonPooledAvailableMemory -= size;
      } else {    
          int accumulated = 0;   
          Condition moreMemory = this.lock.newCondition();
          try {       
              long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);      
              this.waiters.addLast(moreMemory);      
              while (accumulated < size) {
                  ....
                  try {        
                   waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                  } finally {
                      ....
                  }
                  ....         
                  if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {                 
                    buffer = this.free.pollFirst();           
                    accumulated = size;
                  } else {              
                      freeUp(size - accumulated);
                      int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);       
                      this.nonPooledAvailableMemory -= got;           
                      accumulated += got;
                  }
              }
              accumulated = 0;
          } finally {      
              this.nonPooledAvailableMemory += accumulated;     
              this.waiters.remove(moreMemory);
          }
      }
  } finally {
      try {
          if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
           
           this.waiters.peekFirst().signal();
      } finally {
          
          lock.unlock();
      }
  }
  if (buffer == null)
      return safeAllocateByteBuffer(size);
  else
      return buffer;
}
private ByteBuffer safeAllocateByteBuffer(int size) {
  boolean error = true;
  try {  
      ByteBuffer buffer = allocateByteBuffer(size);
      error = false;
      return buffer;
  } finally {
    if (error) {
        this.lock.lock();
        try { 
            this.nonPooledAvailableMemory += size;
            if (!this.waiters.isEmpty())
                
                this.waiters.peekFirst().signal();
        } finally {
            
            this.lock.unlock();
        }
    }
  }
}
protected ByteBuffer allocateByteBuffer(int size) {   
    return ByteBuffer.allocate(size);
}
private void freeUp(int size) {
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

該方法主要用來嘗試分配 ByteBuffer,這裡分4種情況說明下:

情況1:申請16k且free快取池有可用記憶體

此時會直接從 free 快取池中獲取隊首的 ByteBuffer 分配使用,用完後直接將 ByteBuffer 放到 free 快取池的隊尾中,並呼叫 clear() 清空資料,以便下次重複使用。

情況2:申請16k且free快取池無可用記憶體

此時 free 快取池無可用記憶體,只能從非池化可用記憶體中獲取16k記憶體來分配,用完後直接將 ByteBuffer 放到 free 快取池的隊尾中,並呼叫 clear() 清空資料,以便下次重複使用。

情況3:申請非16k且free快取池無可用記憶體

此時 free 快取池無可用記憶體,且申請的是非16k,只能從非池化可用記憶體(空間夠分配)中獲取一部分記憶體來分配,用完後直接將申請到的記憶體空間釋放到非池化可用記憶體中,後續會被 GC 掉。

情況4:申請非16k且free快取池有可用記憶體,但非池化可用記憶體不夠

此時 free 快取池有可用記憶體,但申請的是非16k,先嚐試從 free 快取池中將 ByteBuffer 釋放到非池化可用記憶體中,直到滿足申請記憶體大小(size),然後從非池化可用記憶體獲取對應記憶體大小來分配,用完後直接將申請到的記憶體空間釋放到到非池化可用記憶體中,後續會被 GC 掉。

(2)deallocate()

public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        if (size == this.poolableSize && size == buffer.capacity()) {   
            buffer.clear();            
            this.free.add(buffer);
        } else {            
            this.nonPooledAvailableMemory += size;
        }       
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {        
        lock.unlock();
    }
}

該方法主要用來嘗試釋放 ByteBuffer 空間,主要做以下幾件事情:

  1. 先加鎖,保證執行緒安全。
  2. 如果待釋放的 size 大小為16k,則直接放入 free 佇列中。
  3. 否則由 JVM GC 來回收 ByteBuffer 並增加 nonPooledAvailableMemory。
  4. 當有 ByteBuffer 回收了,喚醒 waiters 中的第一個阻塞執行緒。

最後來看看 kafka 自定義的支援「 讀寫分離場景 」CopyOnWriteMap 的實現。

2、CopyOnWriteMap

通過 RecordAccumulator 類的屬性欄位中可以看到,CopyOnWriteMap 中 key 為主題分割槽,value 為向這個分割槽傳送的 Deque<ProducerBatch> 佇列集合。

我們知道生產訊息時,要傳送的分割槽是很少變動的,所以寫操作會很少。大部分情況都是先獲取分割槽對應的佇列,然後將 ProducerBatch 放入隊尾,所以讀操作是很頻繁的,這就是個典型的「讀多寫少」的場景。

所謂 「CopyOnWrite」 就是當寫的時候會拷貝一份來進行寫操作,寫完了再替換原來的集合。

來看看它的原始碼實現。

public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {   
    private volatile Map<K, V> map;
    public CopyOnWriteMap() {
        this.map = Collections.emptyMap();
    }

該類只有一個重要的欄位 Map,是通過「volatile」來修飾的,目的就是在多執行緒的場景下,當 Map 發生變化的時候其他的執行緒都是可見的。

接下來看幾個重要方法,都比較簡單,但是實現非常經典。

(1)get()

public V get(Object k) {
    return map.get(k);
}

該方法主要用來讀取集合中的佇列,可以看到讀操作並沒有加鎖,多執行緒併發讀取的場景並不會阻塞,可以實現高併發讀取。如果佇列已經存在了就直接返回即可。

(2)putIfAbsent()

public synchronized V putIfAbsent(K k, V v) {
    if (!containsKey(k))
        return put(k, v);
    else
        return get(k);
}
public boolean containsKey(Object k) {
    return map.containsKey(k);
}

該方法主要用來獲取或者設定佇列,會被多個執行緒併發執行,通過「synchronized」來修飾可以保證執行緒安全的,除非佇列不存在才會去設定。

(3)put()

public synchronized V put(K k, V v) {
    Map<K, V> copy = new HashMap<K, V>(this.map);
    V prev = copy.put(k, v);
    this.map = Collections.unmodifiableMap(copy);
    return prev;
}

該方法主要用來設定佇列的, put 時也是通過「synchronized」來修飾的,可以保證同一時間只有一個執行緒會來更新這個值。

那為什麼說寫操作不會阻塞讀操作呢?

  1. 首先重新建立一個 HashMap 集合副本。
  2. 通過「 volatile 」寫的方式賦值給對應集合裡。
  3. 把新的集合設定成「 不可修改的 map 」,並賦值給欄位 map。

這就實現了讀寫分離。對於 Producer 最最核心,會出現多執行緒併發訪問的就是快取池。因此這塊的高併發設計相當重要。

四、總結

這裡,我們一起來總結一下這篇文章的重點。

1、帶你先整體的梳理了 Kafka 客戶端訊息批量傳送的好處。

2、通過一個真實生活場景類比來帶你理解 RecordAccumulator 內部構造,並且深度剖析了訊息是如何在客戶端快取的,以及內部各元件實現原理。

3、帶你深度剖析了 Kafka 客戶端非常重要的 BufferPool 、CopyOnWriteMap 的實現原理。