深度剖析 kafka 生產者全流程!

語言: CN / TW / HK

記得點選 " 華仔聊技術 設為星標 :star: 

關注華仔的你會越來越帥

大家好,我是  華仔  

      構造KafkaProducer

    • 生產者元資訊更新器

    • 生產者攔截器

    • 生產者分割槽器

    • Sender執行緒啟動

  • 傳送請求

    • 生產者攔截器

    • 更新元資訊waitOnMetadata

    • KeyValue序列化

    • 計算分割槽號

    • 將訊息快取進RecordAccumulator累加器中

    • Sender傳送訊息

  • 傳送流程總結

    • Kafka Producer 整體架構圖

今天我們來通過原始碼來分析一下,生產者傳送一條訊息的所有流程~~~

生產者客戶端程式碼

public class SzzTestSend {

public static final String bootStrap = "xxxxxx:9090";
public static final String topic = "t_3_1";

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);
// 序列化協議 下面兩種寫法都可以
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//過濾器 可配置多個用逗號隔開
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");
//構造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
// 傳送訊息, 並設定 回撥(回撥函式也可以不要)
ProducerRecord<String,String> record = new ProducerRecord(topic,"Hello World!");
try {
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
}catch (Exception e){
e.printStackTrace();
}
}

/**
* 傳送成功回撥類
*/

public static class SzzTestCallBack implements Callback{
private static final Logger log = LoggerFactory.getLogger(SzzTestCallBack.class);
private String topic;
private String key;
private String value;

public SzzTestCallBack(String topic, String key, String value) {
this.topic = topic;
this.key = key;
this.value = value;

}
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
topic, key,value, e);
}else {
log.info("send message to topic {} with key: {} value:{} success, partiton:{} offset:{}",
topic, key,value,metadata.partition(),metadata.offset());
}
}
}
}

1 構造KafkaProducer

KafkaProducer通過解析 producer.propeties 檔案裡面的屬性來構造自己。例如 :分割槽器、Key和Value序列化器、攔截器、RecordAccumulator訊息累加器 、元資訊更新器、啟動傳送請求的後臺執行緒

        //構造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);

生產者元資訊更新器

我們之前有講過. 客戶端都會儲存叢集的元資訊,例如生產者的元資訊是 ProducerMetadata. 消費組的是ConsumerMetadata 。

元資訊都會有自己的自動更新邏輯, 詳細請看Kafka的客戶端發起元資訊更新請求

相關的Producer配置有:

屬性 描述 預設
metadata.max.age.ms 即使我們沒有看到任何分割槽領導層更改以主動發現任何新代理或分割槽,我們也強制重新整理元資料的時間段(以毫秒為單位)。。 300000(5分鐘)
retry.backoff.ms 如果上次更新失敗,發起重試的間隔時間 100

雖然Producer元資訊會自動更新, 但是有可能在生產者傳送訊息的時候,發現某個TopicPartition不存在,這個時候可能就需要立刻發起一個元資訊更新了。

生產者攔截器

生產者攔截器在訊息傳送之前可以做一些準備工作, 比如 按照某個規則過濾某條訊息, 又或者對 訊息體做一些改造, 還可以用來在傳送回撥邏輯之前做一些定製化的需求,例如統計類的工作! 攔截器的執行時機在最前面,在 訊息序列化分割槽計算 之前

相關的Producer配置有:

屬性 描述 預設
interceptor.classes 生產者攔截器配置,填寫全路徑類名,可用逗號隔開配置多個,執行順序就是配置的順序。

生產者分割槽器

用來設定傳送的訊息具體要傳送到哪個分割槽上

相關的Producer配置有:

屬性 描述 預設值
partitioner.class 訊息的分割槽分配策略 org.apache.kafka.clients.producer.internals.DefaultPartitioner

Sender執行緒啟動

Sender是專門負責將訊息傳送到Broker的I/O執行緒。

相關的Producer配置有:

屬性 描述 預設值
max.in.flight.requests.per.connection 客戶端能夠允許的最大未完成請求(在請求中)的請求數量, 如果該值大於1, 並且請求傳送失敗可可能導致訊息重排序的風險(如果重試啟用的話) 5
request.timeout.ms 控制客戶端等待請求響應的最長時間。如果在超時之前沒有收到響應,客戶端將在必要時重新發送請求,或者如果重試次數用盡,則請求失敗 30000(30 秒)
connections.max.idle.ms 在此配置指定的毫秒數後關閉空閒連線。 6540000(9 分鐘)
reconnect.backoff.ms 在嘗試重新連線到給定主機之前等待的基本時間量。這避免了在緊密迴圈中重複連線到主機。此退避適用於客戶端到代理的所有連線嘗試 50
reconnect.backoff.max.ms 重新連線到反覆連線失敗的代理時等待的最長時間(以毫秒為單位)。如果提供,每臺主機的退避將在每次連續連線失敗時呈指數增長,直至達到此最大值。在計算回退增加後,新增 20% 的隨機抖動以避免連線風暴。 1000(1 秒)
retry.backoff.ms 在嘗試重試對給定主題分割槽的失敗請求之前等待的時間量。這避免了在某些故障情況下在緊密迴圈中重複傳送請求。 100
send.buffer.bytes 傳送資料時使用的 TCP 傳送緩衝區 (SO_SNDBUF) 的大小。如果值為 -1,將使用作業系統預設值。 131072(128 千位元組)
receive.buffer.bytes 讀取資料時使用的 TCP 接收緩衝區 (SO_RCVBUF) 的大小。如果值為 -1,將使用作業系統預設值。 32768
acks 生產者要求Leader在決定是否完成請求之前收到的確認數量. 這控制了傳送的記錄的永續性 可配置的引數如下:
1. acks=0 如果為0, 生產者不會等待伺服器的任何確認, 會被立即視為已傳送,這種情況下不能保證伺服器是否真的已經收到了訊息。這個時候 retries 配置不會生效(客戶端都不管服務端的返回了,所以客戶端一般是不知道有故障的)
2. acks=1 Leader會將訊息寫入到它的本地日誌中,但是不會等待所有的Follower完全確認就會返回傳送成功狀態。這種情況下, 當Follower成功同步資料之前Leader掛掉了會造成資料丟失。
3. acks=all Leader將等待所有的ISR中的副本完成同步之後返回成功狀態, 這樣子資料就不會丟失,是最高級別的保證。
1
transactional.id
enable.idempotence 是否啟動冪等。當設定為true時候, 生產者將確保每條訊息被最多寫入一個副本,如果未false,生產者由於Broker失敗等原因重試,可能會寫入到多個副本中。注意:啟動冪等性的要求 max.in.flight.requests.per.connection<=5 retries>0 並且 acks=all .如果設定了不相容的值則會丟擲異常 false
max.request.size 請求的最大大小(以位元組為單位)。此設定將限制生產者在單個請求中傳送的記錄批次的總資料量,以避免傳送大量請求。這實際上也是最大未壓縮記錄批量大小的上限。請注意,伺服器對記錄批量大小有自己的上限(如果啟用壓縮,則在壓縮之後),這可能與此不同。 1048576
retries 生產者重試次數,當 max.in.flight.requests.per.connection=1 的情況發生重試可能會導致順序問題. 2147483647
delivery.timeout.ms 最大交付時間, 呼叫send()方法後不管是成功還是失敗的時間上限。例如重試太多次之後達到次配置時間的時候也會停止重試了。此配置值應該大於等於 request.timeout.mslinger.ms 總和 120000 (2 minutes).  如果這個值你沒有主動設定並且 request.timeout.ms + linger.ms > 120000(預設值) ,那麼它最終的值是 request.timeout.ms + linger.ms

2 傳送請求


producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));

生產者攔截器

傳送訊息的第一步就是執行攔截器

在這裡插入圖片描述

一般情況下我們可能不需要攔截器, 但是我們需要用攔截器的時候按照下面操作執行:

  1. 在配置檔案中配置屬性 interceptor.classes=攔截器1,攔截器2,攔截器3
  2. 實現介面 org.apache.kafka.clients.producer.ProducerInterceptor<K, V>

這個 interceptor.classes 中的屬性可以配置多個攔截器, 用逗號隔開,並且執行順序就是按照配置的順序執行的。

攔截器的執行時機在最前面,在 訊息序列化分割槽計算 之前

ProducerInterceptor

org.apache.kafka.clients.producer.ProducerInterceptor<K, V> 介面方法講解:


public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

public void onAcknowledgement(RecordMetadata metadata, Exception exception);

public void close();

onSend(ProducerRecord record)方法 :

當客戶端將記錄傳送到 KafkaProducer 時,在鍵和值被序列化之前呼叫。該方法呼叫 ProducerInterceptor.onSend(ProducerRecord) 方法。從第一個攔截器的 onSend() 返回的 ProducerRecord 傳遞給第二個攔截器 onSend(),在攔截器鏈中依此類推。從 最後一個攔截器返回 的記錄就是從這個方法返回的。此方法 不會丟擲異常 。任何攔截器方法丟擲的異常都會被捕獲並忽略。如果鏈中間的攔截器(通常會修改記錄)丟擲異常,則鏈中的下一個攔截器將使用前一個未丟擲異常的攔截器返回的記錄呼叫。

呼叫地方 ①. 攔截器執行時機在 鍵值序列化 之前 ②. 攔截器丟擲異常會被捕獲,並列印日誌,那麼也意味著這個攔截器所做的修改不會生效 ③.攔截器中修改的訊息體會被傳遞到下一個攔截器

onAcknowledgement(RecordMetadata metadata, Exception exception)方法:

當傳送到伺服器的記錄已被確認時,或者當傳送記錄在傳送到伺服器之前失敗時,將呼叫此方法。此方法通常 在使用者設定的Callback之前呼叫 ,此方法不會丟擲異常。任何攔截器方法丟擲的異常都會被捕獲並忽略。這個方法執行在Producer的I/O執行緒中,所以這個方法中的程式碼邏輯需要越簡單越好。否則,來自其他執行緒的訊息傳送可能會延遲。

引數: metadata – 已傳送記錄的元資料(即分割槽和偏移量)。如果發生錯誤,元資料將只包含有效的主題和分割槽。如果 ProducerRecord 中沒有給出 partition 並且在分配 partition 之前發生錯誤,則 partition 將設定為 RecordMetadata.NO_PARTITION。如果客戶端將空記錄傳遞給KafkaProducer.send(ProducerRecord)則元資料可能為空。 exception – 在處理此記錄期間丟擲的異常。如果 沒有發生錯誤,則為空

close()

主要用於在關閉攔截器時自行一些資源清理工作。

configure(Map<String, ?> configs)

ProducerInterceptor 介面中集成了一 Configurable 介面,介面有個方法

    void configure(Map<String, ?> configs);

也就是說在攔截器中,我們可以拿到所有的配置屬性了; 這個方法在這幾個方法中最早執行

生產者攔截器示例

將傳送的訊息加上字尾注意這裡訊息value的型別是 String ,如果是byte則需要處理一下


@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("生產者攔截器 onSend() run ."+record);
return new ProducerRecord<>(
record.topic(), record.partition(), record.key(), record.value().concat("_字尾")); }

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("生產者攔截器 onAcknowledgement run ."+metadata.toString() +" exception:"+exception);
}
@Override
public void close() {
System.out.println("生產者攔截器 close() run .");
}

@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
System.out.println("生產者攔截器 configure run ."+configs);
}

更新元資訊waitOnMetadata

在傳送訊息之前,要先獲取一下將要傳送的TopicPartition的元資訊。這個獲取元資訊的請求也是通過喚醒 Sender 執行緒進行傳送的。

1 . ProducerMetadata 元資訊 Map<String, Long> topics 中儲存 Topic 的有效期時間, metadata.max.idle.ms 控制,預設 300000 2. ProducerMetadata 元資訊 Set<String> newTopics 中儲存所有 Topic 3. 獲取 Topic的 元資料叢集 以及我們等待的時間(以毫秒為單位), 這個獲取元資料不是這裡獲取的,這裡只是判斷當前是否已經獲取到了元資料,如果沒有獲取到,則一直等待,最大等待時間由 max.block.ms 控制,預設60000(1分鐘),關於獲取元資料在最上面已經分析過了, 是Sender執行緒獲取並更新的。如果等待時間超過了 max.block.ms ,很有可能網路異常,那麼會丟擲超時異常。4. 當你傳送訊息的時候指定了 分割槽號 , 但是這個分割槽號是不存在的, 這個時候就會一直髮起 Metadata 請求(流程看最上面), 直到超時( max.block.ms )之後 丟擲異常


org.apache.kafka.common.errors.TimeoutException: Topic t_3_1 not present in metadata after 60000 ms.

相關的Producer配置有:

屬性 描述 預設
max.block.ms 生產者傳送訊息過程中,獲取元資訊的最大超時時間 60000(1分鐘)
metadata.max.idle.ms Topic的最大空閒時間. 如果一個主題在這麼多毫秒內沒有被訪問過,它就會從快取中刪除。並且下一次對其的訪問將強制執行元資料獲取請求。 300000(5分鐘)

KafkaProducer producer = new KafkaProducer(properties); 在構建 KafkaProducer 物件的時候, 有構建 producer I/O thread , 並且啟動了, Runnablesender

最終呼叫 NetworkClient.poll(long timeout, long now) 裡面 maybeUpdate() 方法 這個方法會獲取 前Node中負載最少的節點發起網路請求, 如果所有Node都是滿負載則請求不會被髮起。

如何判斷哪個節點負載最少?

通過每個節點的 InFlightRequests(空中請求數量) 裡面的最小數量判斷,這個表示當前正在發起的請求,但是還沒有收到回覆的請求數量; 儲存形式是一個HashMap, key 是Node的Id, value 是所有當前還在請求中的節點; 當請求完成,請求就會在這個佇列裡面移除; 如果這個佇列一直是滿的,說明當前負載很高或者網路連線有問題。如果所有Node都是滿負載則請求不會被髮起,除非等到佇列數量減少。

    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();

每個Node最大負載數 ?

每個客戶端在發起請求還沒有收到回覆的時候都會被快取到 InFlightRequests(空中請求數量) 裡面,但是這個數量是有限制的,這個可以通過配置 max.in.flight.requests.per.connection 進行設定, 預設是: 5; 也就是每個客戶端對每個Node最多也就同時發起 5 個未完成的請求; 如果超時這個數量就會等待有請求完成並釋放額度了才可以發起新的請求;

相關的Producer配置有:

屬性 描述 預設
max.in.flight.requests.per.connection 每個客戶端對每個Node發起請求的最大併發數 5

KeyValue序列化

將key和Value先序列化。

自定義序列化器,需要實現 org.apache.kafka.common.serialization.Serializer 介面。我們簡單看下 StringSerializer 序列化器

public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}

@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
}

configure(Map<String, ?> configs, boolean isKey)這個方法是在構造 KafkaProduce 例項的時候呼叫的。 isKey 表示是 key還是value來進行序列化 這裡 serialize(String topic, String data) 方法直接將字串轉換成byte[]型別。

Kafka客戶端提供了很多種序列化器供我們選擇,如果這些序列化器你都不滿意,你也可以選擇其他一些開源的序列化工具,或者自己進行實現。

計算分割槽號

將序列化後的key、 value 呼叫合適的分割槽器選擇將要傳送的分割槽號。

分割槽三種策略

將訊息快取進RecordAccumulator累加器中

圖解Kafka Producer中的訊息快取模型

Sender傳送訊息

Sender執行緒在構造KafkaProducer的時候就已經啟動了,它的職責就是從

以下忽略部分程式碼省略


void runOnce() {
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}

private long sendProducerData(long now) {

// 獲取哪些資料準備好了傳送
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

}

尋找準備好傳送的訊息Batch,獲取對應Leader所在的ReadyNode

我們都知道生產者生產的訊息是暫時快取在訊息累加器RecordAccumulator中的, Sender負責從RecordAccumulator裡面獲取準備好的資料進行傳送

那麼 ,哪些屬於準備好的資料呢?

我們先回顧一下 RecordAccumulator的結構。

在這裡插入圖片描述

每個TopicPartition的訊息都會被暫存在ProducerBatch Deque 阻塞佇列中的其中一個ProducerBatch中,每個ProducerBatch都存放著一條或者多條訊息。

具體請看 圖解Kafka Producer 訊息快取模型

滿足傳送的條件的Batch

遍歷每個TopicPartition裡面的Deque, 獲取佇列中的第一個 ProducerBatch 如果該TopicPartition不存在Leader,則忽略該Batch ,如果有則進入判斷流程

因為訊息是要發Leader所在的Broker傳送的, 所以必須要有Leader。

在滿足條件

不屬於重試或者屬於重試並且等待的時候大於 retry.backoff.ms 的前提下,滿足下面條件的均可傳送

(該條件就是要排除那些是屬於重試,但是還沒有到達重試間隔時間的情況。)

該ProducerBatch還沒有被髮送過. 該Batch能否傳送判斷條件如下

  1. 如果該Batch滿了或者Batch所在的Deque數量>1(數量大於1說明第一個Batch肯定就滿了) 則滿足傳送條件

  2. 如果訊息累加器中記憶體用完了,有執行緒阻塞等待寫入訊息累加器 則也滿足傳送條件

  3. RecordAccumulator訊息累加器被關閉,滿足條件;(一般KafkaProducer被正常關閉的時候會先將累加器標記為已經關閉,方便讓累加器裡面的訊息都發出去)

  4. 是否被強制將訊息傳送出去。訊息累加器RecordAccumulator提供強制 flush() 方法供呼叫,用於 該時刻的訊息 都滿足傳送的條件,一般在訊息事務的地方有呼叫。這裡要注意的是,是呼叫 flush() 這一時刻的所有未傳送的Batch都需滿足傳送條件,後面新增的Batch不屬於這一範疇

  5. 該Batch的建立時間> linger.ms 的時間

獲取可傳送請求的服務端ReadyNodes

上面是講哪些Batch屬於可傳送的邏輯判斷,但是實際上,真正傳送的時候並不是以每個Batch維度來判斷髮送的,而是以Node維度來發送的,上面我們知道了哪些Batch能夠傳送,然後我們就可以推斷出Batch對應的TopicPartition所屬的Broker。有了這些可傳送的Broker,然後再來遍歷Broker上的每個TopicPartition中的First Batch

文字不好理解,我們看看下圖
在這裡插入圖片描述

上圖是生產者的 RecordAccumulator 訊息累加器, 訊息累加成上圖所示。

每個TopicPartition佇列都有很多Batch, 我們知道了TopicPartition 是不是就能夠確定它所在的Broker?

例如上圖中

  1. Topic1Partition-1、 Topic1Partition-2 、Topic2Partition-0他們三個的Leader都存在於 Broker-0 中 雖然 Topic2Partition-0 佇列中不滿足傳送邏輯, 但是跟他同一個Broker中有其他的佇列滿足條件了,所以它最終也是滿足傳送條件的。

  2. Topic2Partition-1Leader在Broker-1中,但是它不滿足傳送條件,這個Broker中也沒有其他的滿足條件了,所以客戶端不會向Broker-1這個Node發起請求。

  3. Topic1Partition-3Leader在Broker-2中,它滿足傳送條件,那麼Broker-2就滿足傳送條件

那麼最終得到的 ReadyNodes 就是Broker-0、Broker-2

強制更新沒有Leader的Topic元資訊

上面我們在獲取 哪些Batch準備好傳送的時候,也會找到哪些TopicPartition沒有Leader。

那麼這個時候就需要強制的去更新一下這些TopicPartition的元資訊了,否則就傳送不了。

過濾一些還未準備好連線的ReadyNodes

上面我們已經獲取了 ReadyNodes

那麼在真正的向對應的 ReadyNodes 發起請求之前, 我們還是需要判斷一下 我們的生產者客戶端是否準備好了跟 ReadyNodes 發起請求.

那麼客戶端的準備條件有哪些呢?

生產者客戶端在最開始的時候都沒有跟任何Node建立連線的, 當我們嘗試傳送之前會去檢驗一下連線是否建立成功(就是當前這一步), 如果沒有的話,則會去嘗試建立連線。並且當前這次是會把這個Node過濾掉的,因為還沒有建立成功連結,等到下一次迴圈的時候,可能已經建立成功了。

當然客戶端是否準備好,不僅僅是判斷 連線是否建立成功。

還需要判斷 當前未完成的請求佇列數量是否 < max.in.flight.requests.per.connection

遍歷ReadNodes上的所有TopicPartition阻塞佇列中的FirstBatch進行打包

到現在為止,我們已經得到了可以傳送請求的 ReadyNodes 了。那麼接下來就是分別解析這些ReadyNode 他們能夠傳送的Batch打包傳送了。

這一步最重要的作用是將 ProducerBatch 跟Node對映,也就是知道當前批次想哪個Broker傳送哪些Batch


public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
// 遍歷ReadyNodes 每個Node下的佇列都獲取一遍
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}

那麼應該選擇哪些Batch來發送呢?

遍歷每個ReadyNode節點下面的每個TopicPartition 佇列的首個Batch

在這裡插入圖片描述
  1. 如果FirstBatch 屬於重試, 並且還沒有達到重試間隔時間 retry.backoff.ms , 則該TopicPartition佇列會忽略     例如上圖 Topic3Partition-1)
  2. 如果FirstBatch 為空, 則該TopicPartition佇列會忽略;如左邊 Topic3Partition-0

  3. 如果該批次中的總Batch大小 > max.request.size 了. 則會終止此次遍歷,並記錄當前遍歷到的位置, 等下次再次傳送的時候從上一次結束的位置進行遍歷 (但是這裡kafka用了一個全域性變數記錄當前遍歷到的索引,不是每個Broker一個變數, 是一個小Bug)

  4. 一次Request最多隻會完整的遍歷一遍, 就算遍歷完一遍所有TopicPartition之後還沒有寫滿 max.request.size . 那麼也不會再重新遍歷。

構造Produce請求併發起接著處理Response

上面我們已經得到了


Map<Integer, List<ProducerBatch>> batches

也就是Node.id 和對應要發往該Node的Request請求攜帶的ProducerBatch列表。

傳送成功之後,會返回Response,根據Response情況處理不同的邏輯

Response處理邏輯 每個Batch都會對應著一個 PartitionResponse , 不同的PartitionResponse對應的不同處理邏輯。

  1. 如果Response返回RecordTooLargeException異常,並且Batch裡面的訊息數量>1.這種情況, 就會嘗試的去拆分Batch, 如何拆分呢?是以 batch.size 大小來拆分成多個Batch。並且重新放入到訊息累加器中。
  2. 如果返回是其他異常則先判斷一下是否能夠重試,如果能夠重試,則重新入隊到訊息累加器中。重新入隊的Batch會記錄重試次數和時間等等資訊。是否能夠重試判斷邏輯:batch沒有超過 delivery.timeout.ms && 重啟次數< retiries
  3. 如果是DuplicateSequenceException異常的話,那麼並不會做其他的處理,而是當做正常完成。

  4. 其他異常或者沒有異常則會走正常流程, 並且呼叫 InterceptorCallback ,如果有Exception也會返回。這個InterceptorCallback裡面包含在攔截器 interceptorsuserCallback (使用者自己的回撥)。呼叫順序如下圖: 這個usercallback呢就是我們自己設定的,例如:

    producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));

    注意: 這裡的回撥並不是指的一個Batch一個回撥,這裡是一個Batch裡面有多少條訊息,就有多少個回撥。每個ProducerBatch裡面都有一個物件專門儲存所有訊息的回撥資訊 thunks . 在處理ProducerBatch返回資訊的時候會遍歷這個trunks, 來執行每個訊息的回撥。

假如你想確定某個訊息是否傳送成功, 那麼你可以自己定義一個攔截器。並重寫介面 onAcknowledgement(RecordMetadata metadata, Exception exception) 在這裡面來判斷你的訊息是否傳送成功。

counter(counterh2) 傳送流程總結

Kafka Producer 整體架構圖

在這裡插入圖片描述

整個生產者客戶端是由 主執行緒Sender執行緒 協調執行的, 主執行緒建立訊息, 然後通過 攔截器元資訊更新序列化分割槽器快取訊息 等等流程。

然後Sender執行緒在初始化的時候就已經運行了,並且是一個while迴圈。

Sender執行緒裡面主要工作是:

  1. 尋找ReadyNodes:去訊息累加器裡面獲取有哪些Node是能夠傳送Request的。只要該Node有一個TopicPartition佇列中有符合傳送條件的Batch。那麼這個Node就應該是ReadyNode。具體的篩選邏輯請看上文有具體分析。

  2. 構建Request: 過濾之後, 拿到了所有的ReadyNodes。接下來就是遍歷該Node下所有的TopicPartition佇列裡面的 FirstBatch , 組裝到Request請求裡面。發往一個Node的請求Request,可以包含多個 ProducerBatch ,能夠一次傳送多少個Batch是由配置 max.request.size 決定的,一個Node對應一個Request。注意: 這個時候對映關係已經是 Map<Node,Reqeust>
  3. 將Request放入inFightRequest中: 上面是組裝好了Request, 組裝好了之後要先把這個Request放到 inFightRequest 物件中, 它儲存著每個Node當前已經發送的Request但是還沒有收到Response的請求。每個Node最多能夠存放多少個未完成的Request,是由 max.in.flight.requests.per.connection 控制的。需要注意的是, 如果佇列已經滿了, Request是放入不了這個物件裡面的,並且會丟擲異常:
    "Attempt to send a request to node " + nodeId + " which is not ready."
    它決定著生產者針對某個Node的併發度。
  4. Request通過Selector發起通訊.

  5. 返回Response:服務端處理完成, 返回Response資訊。

  6. 從inFightRequest中移除完成Request

  7. 釋放記憶體回訊息累加器:請求結束,清理 訊息累加器 ,將傳送成功的ProducerBatch佔用的記憶體大小加回到 訊息累加器 中。注意:這裡純粹的是數字的加減,不涉及記憶體的處理, 因為傳送成功之前的Batch佔用了 訊息累加器 的剩餘可用記憶體。傳送成之後要加回來。否則訊息累加器滿了會導致阻塞。

各位小夥伴有什麼看法,歡迎評論區留言探討~