一文詳解Kafka API

語言: CN / TW / HK
摘要:Kafka的API有Producer API,Consumer API還有自定義Interceptor (自定義攔截器),以及處理的流使用的Streams API和構建連接器的Kafka Connect API。

本文分享自華為雲社區《【Kafka筆記】Kafka API詳細解析 Java版本(Producer API,Consumer API,攔截器等)》,作者: Copy工程師。

簡介

Kafka的API有Producer API,Consumer API還有自定義Interceptor (自定義攔截器),以及處理的流使用的Streams API和構建連接器的Kafka Connect API。

Producer API

Kafka的Producer發送消息採用的是異步發送的方式。在消息發送過程中,涉及兩個線程:main線程和Sender線程,以及一個線程共享變量RecordAccumulator。main線程將消息發送給RecordAccmulator,Sender線程不斷地從RecordAccumulator中拉取消息發送給Kafka broker。

這裏的ACk機制,不是生產者得到ACK返回信息才開始發送,ACK保證的是生產者不丟失數據。例如:

而是隻要有消息數據,就向broker發送。

消息發送流程

生產者使用send方法,經過攔截器之後在經過序列化器,然後在走分區器。然後通過分批次把數據發送到PecordAccumulator,main線程到此過程就結束了,然後在回去執行send。

Sender線程不斷的獲取RecordAccumulator的數據發送到topic。

消息發送流程是異步發送的,並且順序是一定的攔截器-》序列化器-》分區器

異步發送API

需要用到的類:

KafkaProducer: 需要創建一個生產者對象,用來發送數據
ProducerConfig:獲取所需要的一系列配置參數
ProducerRecord:每條數據都要封裝成一個ProducerRecord對象

實例:

public class KafkaProducerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXXXXXXXX:9093");//kafka集羣,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 創建KafkaProducer客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10 ; i++) {
            producer.send(new ProducerRecord<>("my-topic","ImKey-"+i,"ImValue-"+i));
        }
        // 關閉資源
        producer.close();

    }
}

配置參數説明:

send():方法是異步的,添加消息到緩衝區等待發送,並立即返回。生產者將單個的消息批量在一起發送來提高效率。

ack:是判斷請求是否完整的條件(就會判斷是不是成功發送了,也就是上次説的ACK機制),指定all將會阻塞消息,性能低但是最可靠。

retries:如果請求失敗,生產者會自動重試,我們指定是1次,但是啟動重試就有可能出現重複數據。

batch.size:指定緩存的大小,生產者緩存每個分區未發送的消息。值越大的話將會產生更大的批量,並需要更大的內存(因為每個活躍的分區都有一個緩存區)。

linger.ms:指示生產者發送請求之前等待一段時間,設置等待時間是希望更多地消息填補到未滿的批中。默認緩衝可以立即發送,即便緩衝空間還沒有滿,但是如果想減少請求的數量可以設置linger.ms大於0。需要注意的是在高負載下,相近的時間一般也會組成批,即使等於0。

buffer.memory:控制生產者可用的緩存總量,如果消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。當緩存空間耗盡,其他發送調用將被阻塞,阻塞時間的閾值通過max.block.ms設定,之後將會拋出一個TimeoutException

key.serializer和value.serializer將用户提供的key和value對象ProducerRecord轉換成字節,你可以使用附帶的ByteArraySerializaerStringSerializer處理簡單的string或byte類型。

運行日誌:

[Godway] INFO  2019-11-14 14:46 - org.apache.kafka.clients.producer.ProducerConfig[main] - ProducerConfig values: 
	acks = all
	batch.size = 16384
	bootstrap.servers = [XXXXXX:9093]
	buffer.memory = 33554432
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 1
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 1
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[Godway] INFO  2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka version : 0.11.0.3
[Godway] INFO  2019-11-14 14:46 - org.apache.kafka.common.utils.AppInfoParser[main] - Kafka commitId : 26ddb9e3197be39a
[Godway] WARN  2019-11-14 14:46 - org.apache.kafka.clients.NetworkClient[kafka-producer-network-thread | producer-1] - Error while fetching metadata with correlation id 1 : {my-topic=LEADER_NOT_AVAILABLE}
[Godway] INFO  2019-11-14 14:46 - org.apache.kafka.clients.producer.KafkaProducer[main] - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Process finished with exit code 0

有一條警告{my-topic=LEADER_NOT_AVAILABLE} 提示該topic不存在,但是沒有關係kafka會自動給你創建一個topic,不過創建的topic是有一個分區和一個副本:

查看一下該topic的消息:

消息已經在topic裏了

上面的實例是沒有回調函數的,send方法是有回調函數的:

public class KafkaProducerCallbackDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXXXX:9093");//kafka集羣,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 創建KafkaProducer客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 10; i < 20 ; i++) {
            producer.send(new ProducerRecord<String, String>("my-topic", "ImKey-" + i, "ImValue-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){
                        System.out.println("消息發送成功!"+recordMetadata.offset());
                    }else {
                        System.err.println("消息發送失敗!");
                    }
                }
            });
        }
        producer.close();
    }
}

回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,分別是RecordMetadata和Exception,如果Exception為null,説明消息發送成功,如果Exception不為null説明消息發送失敗。

注意: 消息發送失敗會自動重試,不需要我們在回調函數中手動重試,使用回調也是無阻塞的。而且callback一般在生產者的IO線程中執行,所以是非常快的,否則將延遲其他的線程消息發送。如果需要執行阻塞或者計算的回調(耗時比較長),建議在callbanck主體中使用自己的Executor來並行處理!

同步發送API

同步發送的意思就是,一條消息發送之後,會阻塞當前的線程,直到返回ack(此ack和異步的ack機制不是一個ack)。

此ack是Future阻塞main線程,當發送完成就返回一個ack去通知main線程已經發送完畢,繼續往下走了

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

send是異步的,並且一旦消息被保存在等待發送的消息緩存中,此方法就立即返回。這樣並行發送多條消息而不阻塞去等待每一條消息的響應。

發送的結果是一個RecordMetadata,它指定了消息發送的分區,分配的offset和消息的時間戳。如果topic使用的是CreateTime,則使用用户提供的時間戳或發送的時間(如果用户沒有指定指定消息的時間戳)如果topic使用的是LogAppendTime,則追加消息時,時間戳是broker的本地時間。

由於send調用是異步的,它將為分配消息的此消息的RecordMetadata返回一個Future。如果future調用get(),則將阻塞,直到相關請求完成並返回該消息的metadata,或拋出發送異常。

Throws:

InterruptException - 如果線程在阻塞中斷。
SerializationException - 如果key或value不是給定有效配置的serializers。
TimeoutException - 如果獲取元數據或消息分配內存話費的時間超過max.block.ms。
KafkaException - Kafka有關的錯誤(不屬於公共API的異常)。

public class KafkaProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXXXX:9093");//kafka集羣,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 創建KafkaProducer客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 20; i < 30 ; i++) {
            RecordMetadata metadata = producer.send(new ProducerRecord<>("my-topic", "ImKey-" + i, "ImValue-" + i)).get();
            System.out.println(metadata.offset());
        }
        producer.close();

    }
}

API生產者自定義分區策略

生產者在向topic發送消息的時候的分區規則:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)

根據send方法的參數的構造方法就可以看出來,

  1. 指定分區就發送到指定分區
  2. 沒有指定分區,有key值,就按照key值的Hash值分配分區
  3. 沒有指定分區,也沒有指定key值,輪詢分區分配(只分配一次,以後都按照第一次的分區順序)

自定義分區器

自定義分區器需要實現org.apache.kafka.clients.producer.Partitioner接口。並且實現三個方法

public class KafkaMyPartitions implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 0;
    }
    @Override
    public void close() {

    }
    @Override
    public void configure(Map<String, ?> map) {

    }
}

自定義分區實例:

KafkaMyPartitions:

public class KafkaMyPartitions implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // 這裏寫自己的分區策略
        // 我這裏指定為1
        return 1;
    }
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

KafkaProducerCallbackDemo:

public class KafkaProducerCallbackDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXXXX:9093");//kafka集羣,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 指定自定義分區
        props.put("partitioner.class","com.firehome.newkafka.KafkaMyPartitions");

        // 創建KafkaProducer客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 20; i < 25 ; i++) {
            producer.send(new ProducerRecord<String, String>("th-topic", "ImKey-" + i, "ImValue-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){
                        System.out.printf("消息發送成功!topic=%s,partition=%s,offset=%d \n",recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset());
                    }else {
                        System.err.println("消息發送失敗!");
                    }
                }
            });
        }
        producer.close();
    }
}

返回日誌:

消息發送成功!topic=th-topic,partition=1,offset=27 
消息發送成功!topic=th-topic,partition=1,offset=28 
消息發送成功!topic=th-topic,partition=1,offset=29 
消息發送成功!topic=th-topic,partition=1,offset=30 
消息發送成功!topic=th-topic,partition=1,offset=31 

可以看到直接發送到了分區1上了。

多線程發送消息

Producer API是線程安全的,直接就可以使用多線程發送消息,實例:

public class KafkaProducerThread implements Runnable {

    private KafkaProducer<String,String> kafkaProducer;

    public KafkaProducerThread(){

    }
    public KafkaProducerThread(KafkaProducer kafkaProducer){
        this.kafkaProducer = kafkaProducer;
    }
    @Override
    public void run() {
        for (int i = 0; i < 20 ; i++) {
            String key = "ImKey-" + i+"-"+Thread.currentThread().getName();
            String value = "ImValue-" + i+"-"+Thread.currentThread().getName();
            kafkaProducer.send(new ProducerRecord<>("th-topic", key, value));
            System.out.printf("Thread-name = %s, key = %s, value = %s",Thread.currentThread().getName(),key,value);
        }
    }

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXXXXXXX:9093");//kafka集羣,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 創建KafkaProducer客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        KafkaProducerThread producerThread1 = new KafkaProducerThread(producer);
        //KafkaProducerThread producerThread2 = new KafkaProducerThread(producer);
        Thread one = new Thread(producerThread1, "one");
        Thread two = new Thread(producerThread1, "two");
        System.out.println("線程開始");
        one.start();
        two.start();
    }
}

這裏只是一個簡單的實例。

Consumer API

kafka客户端通過TCP長連接從集羣中消費消息,並透明地處理kafka集羣中出現故障服務器,透明地調節適應集羣中變化的數據分區。也和服務器交互,平衡均衡消費者。

偏移量和消費者的位置

kafka為分區中的每條消息保存一個偏移量(offset),這個偏移量是該分區中一條消息的唯一標示符。也表示消費者在分區的位置。例如,一個位置是5的消費者(説明已經消費了0到4的消息),下一個接收消息的偏移量為5的消息。實際上有兩個與消費者相關的“位置”概念:

消費者的位置給出了下一條記錄的偏移量。它比消費者在該分區中看到的最大偏移量要大一個。 它在每次消費者在調用poll(long)中接收消息時自動增長。

“已提交”的位置是已安全保存的最後偏移量,如果進程失敗或重新啟動時,消費者將恢復到這個偏移量。消費者可以選擇定期自動提交偏移量,也可以選擇通過調用commit API來手動的控制(如:commitSync 和 commitAsync)。

這個區別是消費者來控制一條消息什麼時候才被認為是已被消費的,控制權在消費者。

消費者組和主題訂閲

Kafka的消費者組概念,通過進程池瓜分消息並處理消息。這些進程可以在同一台機器運行,也可分佈到多台機器上,以增加可擴展性和容錯性,相同group.id的消費者將視為同一個消費者組。

分組中的每個消費者都通過subscribe API動態的訂閲一個topic列表。kafka將已訂閲topic的消息發送到每個消費者組中。並通過平衡分區在消費者分組中所有成員之間來達到平均。因此每個分區恰好地分配1個消費者(一個消費者組中)。所有如果一個topic有4個分區,並且一個消費者分組有隻有2個消費者。那麼每個消費者將消費2個分區。

消費者組的成員是動態維護的:如果一個消費者故障。分配給它的分區將重新分配給同一個分組中其他的消費者。同樣的,如果一個新的消費者加入到分組,將從現有消費者中移一個給它。這被稱為重新平衡分組。當新分區添加到訂閲的topic時,或者當創建與訂閲的正則表達式匹配的新topic時,也將重新平衡。將通過定時刷新自動發現新的分區,並將其分配給分組的成員。

從概念上講,你可以將消費者分組看作是由多個進程組成的單一邏輯訂閲者。作為一個多訂閲系統,Kafka支持對於給定topic任何數量的消費者組,而不重複。

這是在消息系統中常見的功能的略微概括。所有進程都將是單個消費者分組的一部分(類似傳統消息傳遞系統中的隊列的語義),因此消息傳遞就像隊列一樣,在組中平衡。與傳統的消息系統不同的是,雖然,你可以有多個這樣的組。但每個進程都有自己的消費者組(類似於傳統消息系統中pub-sub的語義),因此每個進程都會訂閲到該主題的所有消息。

此外,當分組重新分配自動發生時,可以通過ConsumerRebalanceListener通知消費者,這允許他們完成必要的應用程序級邏輯,例如狀態清除,手動偏移提交等

它也允許消費者通過使用assign(Collection)手動分配指定分區,如果使用手動指定分配分區,那麼動態分區分配和協調消費者組將失效。

發現消費者故障

訂閲一組topic,當調用poll(long)時,消費者將自動加入到消費者組中。只要持續調用poll,消費者將一直保持可用,並繼續從分配的分區中接收數據。此外,消費者向服務器定時發送心跳。如果消費者崩潰或無法再session.timeout.ms配置的時間內發送心跳,則消費者就被視為死亡,並且其分區將被重新分配。

還有一種可能,消費者可能遇到活鎖的情況,它持續的發送心跳,但是沒有處理。為了預防消費者在這總情況下一直擁有分區,我們使用max.poll.interval.ms活躍監測機制。在此基礎上,如果你調用的poll的頻率大於最大間隔,則客户端將主動地離開組,以便其他消費者接管該分區。發生這種情況時,你會看到offset提交失敗( 調用commitSync()引發的CommitFailedException )。這是一種安全機制,保障只有活動成員能夠提交offset。所以要留在組中,你必須持續調用poll。

消費者提供兩種配置設置來控制poll循環:

  1. max.poll.interval.ms: 增大poll的間隔,可以為消費者提供更多的時間去處理返回的消息(調用poll(long)返回的消息,通常返回的消息都是一批),缺點是此值越大將會延遲組重新平衡。
  2. max.poll.records:此設置限制每次調用poll返回的消息數,這樣可以更容易的預測每次poll間隔要處理的最大值。通過調整此值,可以減少poll間隔,減少重新平衡分組的

對於消息處理時間不可預測地情況,這些選項是不夠的。 處理這種情況的推薦方法是將消息處理移到另一個線程中,讓消費者繼續調用poll。 但是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,並只有在線程完成處理後才為記錄手動提交偏移量。 還要注意, 你需要pause暫停分區,不會從poll接收到新消息,讓線程處理完之前返回的消息(如果你的處理能力比拉取消息的慢,那創建新線程將導致機器內存溢出)。

實例:

自動提交偏移量

public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers","xxxxxxxxxx:9093");
        props.put("group.id","test-6");//消費者組,只要group.id相同,就屬於同一個消費者組
        props.put("enable.auto.commit","true");//自動提交offset
        props.put("auto.commit.interval.ms","1000"); // 自動提交時間間隔
        props.put("max.poll.records","5"); // 拉取的數據條數
        props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 可以寫多個topic
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(5000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            System.out.println("處理了一批數據!");
        }
    }

配置説明:

bootstrap.servers: 集羣是通過配置bootstrap.servers指定一個或多個broker。不用指定全部的broker,它將自動發現集羣中的其餘的borker(最好指定多個,萬一有服務器故障)

enable.auto.commit: 自動提交偏移量,如果設置了自動提交偏移量,下面這個設置就必須要用到了。

auto.commit.interval.ms:自動提交時間間隔,和自動提交偏移量配合使用

max.poll.records:控制從 broker拉取的消息條數

poll(long time): 當消費者獲取不到消息時,就會使用這個參數,為了減輕無效的循環請求消息,消費者會每隔long time的時間請求一次消息,單位是毫秒。

session.timeout.ms: broker通過心跳機器自動檢測消費者組中失敗的進程,消費者會自動ping集羣,告訴進羣它還活着。只要消費者能夠做到這一點,它就被認為是活着的,並保留分配給它分區的權利,如果它停止心跳的時間超過session.timeout.ms,那麼就會認為是故障的,它的分區將被分配到別的進程。

auto.offset.reset:這個屬性很重要,一會詳細講解

這裏説明一下auto.commit.interval.ms以及何時提交消費者偏移量,經過測試:

  • 設置props.put("auto.commit.interval.ms","60000");

自動提交時間為一分鐘,也就是你在這一分鐘內拉取任何數量的消息都不會被提交消費的當前偏移量,如果你此時關閉消費者(一分鐘內),下次消費還是從和第一次的消費數據一樣,即使你在一分鐘內消費完所有的消息,只要你在一分鐘內關閉程序,導致提交不了offset,就可以一直重複消費數據。

  • 設置props.put("auto.commit.interval.ms","3000");

但是在消費過程中設置sleep。

public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers","xxxxxxxxxxxx:9093");
        props.put("group.id","test-6");//消費者組,只要group.id相同,就屬於同一個消費者組
        props.put("enable.auto.commit","true");//自動提交offset
        props.put("auto.commit.interval.ms","100000"); // 自動提交時間間隔
        props.put("max.poll.records","5"); // 拉取的數據條數
        props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); //
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 可以寫多個topic
        consumer.subscribe(Arrays.asList("my-topic"));

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(5000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            try {
                Thread.sleep(5000L);
                System.out.println("等待了5秒了!!!!!!!!!!!!開始等待15秒了");
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("處理了一批數據!");
        }
    }

這裏如果你消費了第一批數據,在執行第二次poll的時候,關閉程序也不會提交偏移量,只有在執行第二次poll的時候才會把上一次的最後一個offset提交上去。

auto.offset.reset講解:

auto.offset.reset的值有三種:earliest,latest,none,代表者不同的意思

earliest:
	當各分區下有已經提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費,最常用的值
latest:
	當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none:
	topic各分區都存在已提交的offset時,從offset後開始消費,只要有一個分區不存在已提交的offset,則拋出異常

!!注意:當使用了latest,並且分區沒有已提交的offset時,消費新產生的該分區下的數據,其實是把offset的值直接設置到最後一個消息的位置。例如,有個30條數據的demo的topic,各分區無提交offset,使用了latest,再看offset就會發現已經在30的位置了,所以才只能消費新產生的數據!!!!

手動提交偏移量

不需要定時提交偏移量,可以自己控制offset,當消息已經被我們消費過後,再去手動提交他們的偏移量。這個很適合我們的一些處理邏輯。

手動提交offset的方法有兩種:分別是commitSync(同步提交) 和commitAsync(異步提交)。兩者的相同點,都會將本次poll的一批數據最高的偏移量提交;不同點是commitSync會失敗重試,一直到提交成功(如果有不可恢復的原因導致,也會提交失敗),才去拉取新數據。而commitAsync則沒有重試機制(提交了就去拉取新數據,不管這次的提交有沒有成功),故有可能提交失敗。

實例:

 public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers","XXXXXC:9093");
        props.put("group.id","test-11");//消費者組,只要group.id相同,就屬於同一個消費者組
        props.put("enable.auto.commit","false");//自動提交offset
        props.put("auto.commit.interval.ms","1000"); // 自動提交時間間隔
        props.put("max.poll.records","20"); // 拉取的數據條數
        props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        int i= 0;
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(5000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                i++;
            }
            if (i == 20){
                System.out.println("i_num:"+i);
                // 同步提交
                consumer.commitSync();
                // 異步提交
                // consumer.commitAsync();
            }else {
                System.out.println("不足二十個,不提交"+i);
            }
            i=0;
        }
    }

這些都是全部提交偏移量,如果我們想更細緻的控制偏移量提交,可以自定義提交偏移量:

public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","XXXXXXXXXX:9093");
        props.put("group.id","test-18");//消費者組,只要group.id相同,就屬於同一個消費者組
        props.put("enable.auto.commit","false");//自動提交offset
        props.put("auto.commit.interval.ms","1000000"); // 自動提交時間間隔
        props.put("max.poll.records","5"); // 拉取的數據條數
        props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(5000);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                        for (Map.Entry<TopicPartition,OffsetAndMetadata> entry : map.entrySet()){
                            System.out.println("提交的分區:"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset());
                        }
                    }
                });
            }
        }
    }

訂閲指定的分區

通過消費者Kafka會通過分區分配分給消費者一個分區,但是我們也可以指定分區消費消息,要使用指定分區,只需要調用assign(Collection)消費指定的分區即可:

public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","XXXXXXXXX:9093");
        props.put("group.id","test-19");//消費者組,只要group.id相同,就屬於同一個消費者組
        props.put("enable.auto.commit","false");//自動提交offset
        props.put("auto.commit.interval.ms","1000000"); // 自動提交時間間隔
        props.put("max.poll.records","5"); // 拉取的數據條數
        props.put("session.timeout.ms","10000"); // 維持session的時間。超過這個時間沒有心跳 就會剔出消費者組
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");

        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
        // 你可以指定多個不同topic的分區或者相同topic的分區 我這裏只指定一個分區
        TopicPartition topicPartition = new TopicPartition("my-topic", 0);
        // 調用指定分區用assign,消費topic使用subscribe
        consumer.assign(Arrays.asList(topicPartition));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(5000);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitAsync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)), new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                        for (Map.Entry<TopicPartition,OffsetAndMetadata> entry : map.entrySet()){
                            System.out.println("提交的分區:"+entry.getKey().partition()+",提交的偏移量:"+entry.getValue().offset());
                        }
                    }
                });
            }
        }
    }

一旦手動分配分區,你可以在循環中調用poll。消費者分區任然需要提交offset,只是現在分區的設置只能通過調用assign 修改,因為手動分配不會進行分組協調,因此消費者故障或者消費者的數量變動都不會引起分區重新平衡。每一個消費者是獨立工作的(即使和其他的消費者共享GroupId)。為了避免offset提交衝突,通常你需要確認每一個consumer實例的groupId都是唯一的。

注意:

手動分配分區(assgin)和動態分區分配的訂閲topic模式(subcribe)不能混合使用。

 

點擊關注,第一時間瞭解華為雲新鮮技術~