Kafka訊息的壓縮機制

語言: CN / TW / HK

最近在做 AWS cost saving 的事情,對於 Kafka 訊息叢集,計劃通過壓縮訊息來減少訊息儲存所佔空間,從而達到減少 cost 的目的。本文將結合原始碼從 Kafka 支援的訊息壓縮型別、何時需要壓縮、如何開啟壓縮、何處進行壓縮以及壓縮原理來總結 Kafka 整個壓縮機制。文中所涉及原始碼部分都來自於 Kafka 當前最新的 3.3.0-SNAPSHOT 版本。

Kafka支援的訊息壓縮型別

什麼是 Kafka 的訊息壓縮

在談訊息壓縮型別之前,我們先看下 Kafka 中關於訊息壓縮的定義是什麼。

Kafka 官網 有這樣一段解釋:

此為 Kafka 中端到端的塊壓縮功能。如果啟用,資料將由 producer 壓縮,以壓縮格式寫入伺服器,並由 consumer 解壓縮。壓縮將提高 consumer 的吞吐量,但需付出一定的解壓成本。這在跨資料中心映象資料時尤其有用。

也就是說,Kafka 的訊息壓縮是指將訊息本身採用特定的壓縮演算法進行壓縮並存儲,待消費時再解壓縮。

我們知道壓縮就是用時間換空間,其基本理念是基於重複,將重複的片段編碼為字典,字典的 key 為重複片段,value 為更短的程式碼,比如序列號,然後將原始內容中的片段用程式碼表示,達到縮短內容的效果,壓縮後的內容則由字典和程式碼序列兩部分組成。解壓時根據字典和程式碼序列可無損地還原為原始內容。注:有失真壓縮不在此次討論範圍。

通常來講,重複越多,壓縮效果越好。JSON是 Kafka 訊息中常用的序列化格式,單條訊息內可能並沒有多少重複片段,但如果是批量訊息,則會有大量重複的欄位名,批量中訊息越多,則重複越多,這也是為什麼 Kafka 更偏向塊壓縮,而不是單條訊息壓縮。

訊息壓縮型別

目前 Kafka 共支援四種主要的壓縮型別:Gzip、Snappy、Lz4 和 Zstd。關於這幾種壓縮的特性,

壓縮型別 Compression ratio CPU 使用率 Compression speed Network bandwidth usage
Gzip Highest Highest Slowest Lowest
Snappy Medium Moderate Moderate Medium
Lz4 Low Lowest Fastest Highest
Zstd Medium Moderate Moderate Medium

從上表可知,Snappy 在 CPU 使用率、壓縮比、壓縮速度和網路頻寬使用率之間實現良好的平衡,我們最終也是採用的該型別進行壓縮試點。這裡值得一提的是,Zstd 是 Facebook 於 2016 年開源的新壓縮演算法,壓縮率和壓縮效能都不錯,具有與 Snappy(Google 傑作)相似的特性,直到 Kafka 的 2.1.0 版本才引入支援。

針對這幾種壓縮本身的效能,Zstd GitHub 官方 公佈了壓測對比結果如下,

Compressor name Ratio Compression Decompress.
zstd 1.5.1 -1 2.887 530 MB/s 1700 MB/s
zlib 1.2.11 -1 2.743 95 MB/s 400 MB/s
brotli 1.0.9 -0 2.702 395 MB/s 450 MB/s
zstd 1.5.1 –fast=1 2.437 600 MB/s 2150 MB/s
zstd 1.5.1 –fast=3 2.239 670 MB/s 2250 MB/s
quicklz 1.5.0 -1 2.238 540 MB/s 760 MB/s
zstd 1.5.1 –fast=4 2.148 710 MB/s 2300 MB/s
lzo1x 2.10 -1 2.106 660 MB/s 845 MB/s
lz4 1.9.3 2.101 740 MB/s 4500 MB/s
lzf 3.6 -1 2.077 410 MB/s 830 MB/s
snappy 1.1.9 2.073 550 MB/s 1750 MB/s

可以看到 Zstd 可以通過壓縮速度為代價獲得更高的壓縮比,二者之間的權衡可通過 --fast 引數靈活配置。

何時需要壓縮

壓縮是需要額外的 CPU 代價的,並且會帶來一定的訊息分發延遲,因而在壓縮前要慎重考慮是否有必要。筆者認為需考慮以下幾方面:

  • 壓縮帶來的磁碟空間和頻寬節省遠大於額外的 CPU 代價,這樣的壓縮是值得的。
  • 資料量足夠大且具重複性。訊息壓縮是批量的,低頻的資料流可能都無法填滿一個批量,會影響壓縮比。資料重複性越高,往往壓縮效果越好,例如 JSON、XML 等結構化資料;但若資料不具重複性,例如文字都是唯一的 md5 或 UUID 之類,違背了壓縮的重複性前提,壓縮效果可能不會理想。
  • 系統對訊息分發的延遲沒有嚴苛要求,可容忍輕微延遲增大。

如何開啟壓縮

Kafka 通過配置屬性 compression.type 控制是否壓縮。該屬性在 producer 端和 broker 端各自都有一份,也就是說,我們可以選擇在 producer 或 broker 端開啟壓縮,對應的應用場景各有不同。

在 Broker 端開啟壓縮

compression.type 屬性

Broker 端的 compression.type 屬性預設值為 producer ,即直接繼承 producer 端所發來訊息的壓縮方式,無論訊息採用何種壓縮或者不壓縮,broker 都原樣儲存,這一點可以從如下程式碼片段看出:

class UnifiedLog(...) extends Logging with KafkaMetricsGroup {
  ...
  private def analyzeAndValidateRecords(records: MemoryRecords,
                                        origin: AppendOrigin,
                                        ignoreRecordSize: Boolean,
                                        leaderEpoch: Int): LogAppendInfo = {
    records.batches.forEach { batch =>
      ...
      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
      if (messageCodec != NoCompressionCodec)
        sourceCodec = messageCodec
    }
    // Apply broker-side compression if any
    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec);
  }
}

object BrokerCompressionCodec {

  val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
  val brokerCompressionOptions: List[String] = brokerCompressionCodecs.map(codec => codec.name)

  def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))

  def getCompressionCodec(compressionType: String): CompressionCodec = {
    compressionType.toLowerCase(Locale.ROOT) match {
      case UncompressedCodec.name => NoCompressionCodec
      case _ => CompressionCodec.getCompressionCodec(compressionType)
    }
  }

  def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = {
    if (ProducerCompressionCodec.name.equals(compressionType))
      producerCompression
    else
      getCompressionCodec(compressionType)
  }
}

sourceCodecrecordBatch 上的編碼,即表示從 producer 端發來的這批訊息的編碼。 targetCodec 為 broker 端用於儲存訊息最終的目標編碼,從函式 getTargetCompressionCodec 可以看出是結合 broker 端的 compressionType 和 producer 端的 producerCompression 綜合判斷的:當 compressionTypeproducer 時直接採用 producer 端的 producerCompression ,否則就採用 broker 端自身的編碼設定 compressionType 。從 brokerCompressionCodecs 的取值可看出, compression.type 的可選值為 [uncompressed, zstd, lz4, snappy, gzip, producer] 。其中 uncompressednone 是等價的, producer 不用多說,其餘四個則是標準的壓縮型別。

broker 和 topic 兩個級別

在 broker 端的壓縮配置分為兩個級別:全域性的 broker 級別 和 區域性的topic 級別。顧名思義,如果配置的是 broker 級別,則對於該 Kafka 叢集中所有的 topic 都是生效的。但如果 topic 級別配置了自己的壓縮型別,則會覆蓋 broker 全域性的配置,以 topic 自己配置的為準。

broker 級別

要配置 broker 級別的壓縮型別,可通過 configs 命令修改 compression.type 配置項取值。此處是否需要重啟 broker 取決於 Kafak 的版本,在 1.1.0 之前,任何配置項的改動都需要重啟 broker 才生效,而從 1.1.0 版本開始,Kafka 引入了動態 broker 引數,將配置項分為三類: read-onlyper-brokercluster-wide ,第一類跟原來一樣需重啟才生效,而後面兩類都是動態生效的,只是影響範圍不同,關於 Kafka 動態引數,以後單開博文介紹。從 官網 可以看到, compression.type 是屬於 cluster-wide 的,如果是 1.1.0 及之後的版本,則無需重啟 broker。

topic 級別

topic 的配置分為兩部分,一部分是 topic 特有的,如 partitions 等,另一部分則是預設採用 broker 配置,但也可以覆蓋。如果要定義 topic 級別的壓縮,可以在 topic 建立時通過 –config 選項覆蓋配置項 compression.type 的取值,命令如下:

sh bin/kafka-topics.sh --create --topic my-topic --replication-factor 1 --partitions 1 --config compression.type=snappy

當然也可以通過 configs 命令修改 topic 的 compression.type 取值,命令如下:

bin/kafka-configs.sh --entity-type topics --entity-name my-topic --alter --add-config compression.type=snappy

在 Producer 端壓縮

compression.type 屬性

跟 broker 端一樣,producer 端的壓縮配置屬性依然是 compression.type ,只不過預設值和可選值有所不同。預設值為 none ,表示不壓縮,可選值為列舉類 CompressionType 中的 name 列表。

開啟壓縮的方式

直接在程式碼層面更改 producer 的 config,示例如下。但需要注意的是,改完 config 之後,需要重啟 producer 端的應用程式,壓縮才會生效。

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Bean
    public KafkaTemplate<byte[], byte[]> kafkaTemplate() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServer);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "3000");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        ...
        // 開啟 Snappy 壓縮
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name);

        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(config));
    }
}

壓縮和解壓的位置

何處會壓縮

可能產生壓縮的地方有兩處:producer 端和 broker 端。

producer 端

producer 端發生壓縮的唯一條件就是在 producer 端為屬性 compression.type 配置了除 none 之外有效的壓縮型別。此時,producer 在向所負責的所有 topics 發訊息之前,都會將訊息壓縮處理。

broker 端

對於 broker 端,產生壓縮的情況就複雜得多,這不僅取決於 broker 端自身的壓縮編碼 targetCodec 是否是需要壓縮的型別,還取決於 targetCodec 跟 producer 端的 sourceCodec 是否相同,除此之外,還跟訊息格式的 magic 版本有關,關於 magic 版本此處不做展開,之後會開專門的博文討論這個。

直接看程式碼,broker 端的訊息讀寫是由 UnifiedLog 負責的,訊息持久化的核心入口是 append 方法,程式碼如下:

class UnifiedLog(...) extends Logging with KafkaMetricsGroup {
  ...
  private def append(records: MemoryRecords,
                     origin: AppendOrigin,
                     interBrokerProtocolVersion: ApiVersion,
                     validateAndAssignOffsets: Boolean,
                     leaderEpoch: Int,
                     requestLocal: Option[RequestLocal],
                     ignoreRecordSize: Boolean): LogAppendInfo = {
    ...
    val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)

    // return if we have no valid messages or if this is a duplicate of the last appended entry
    if (appendInfo.shallowCount == 0) appendInfo
    else {

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
          localLog.checkIfMemoryMappedBufferClosed()
          if (validateAndAssignOffsets) {
            // assign offsets to the message set
            val offset = new LongRef(localLog.logEndOffset)
            appendInfo.firstOffset = Some(LogOffsetMetadata(offset.value))
            val now = time.milliseconds
            val validateAndOffsetAssignResult = try {
              LogValidator.validateMessagesAndAssignOffsets(validRecords,
                topicPartition,
                offset,
                time,
                now,
                appendInfo.sourceCodec,
                appendInfo.targetCodec,
                config.compact,
                config.recordVersion.value,
                config.messageTimestampType,
                config.messageTimestampDifferenceMaxMs,
                leaderEpoch,
                origin,
                interBrokerProtocolVersion,
                brokerTopicStats,
                requestLocal.getOrElse(throw new IllegalArgumentException(
                  "requestLocal should be defined if assignOffsets is true")))
            } catch {
              case e: IOException =>
                throw new KafkaException(s"Error validating messages while appending to log $name", e)
            }
            ...
          } else {
            // we are taking the offsets we are given
            ...
          }
          ...
          maybeDuplicate match {
            case Some(duplicate) =>
              ...
              localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
              updateHighWatermarkWithLogEndOffset()
              ...
              trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
                s"first offset: ${appendInfo.firstOffset}, " +
                s"next offset: ${localLog.logEndOffset}, " +
                s"and messages: $validRecords")

              if (localLog.unflushedMessages >= config.flushInterval) flush(false)
          }
          appendInfo
        }
      }
    }
  }
}

可以看到,先是採用 analyzeAndValidateRecordsrecordBatch 的維度對批量訊息整體做校驗,比如 CRC、size等,不會細化到單條訊息,所以這裡不會涉及解壓。這一步通過之後,會採用 LogValidator.validateMessagesAndAssignOffsetsrecordBatch 以及單條訊息做進一步驗證併為 recordBatch 分配 offset該過程可能涉及解壓 。完成這一步之後,呼叫 localLog.append 方法將訊息追加到本地日誌,這一步才是真正的落盤。我們繼續關注可能發生解壓的 LogValidator 部分,程式碼如下:

private[log] object LogValidator extends Logging {
  private[log] def validateMessagesAndAssignOffsets(records: MemoryRecords,
                                                    topicPartition: TopicPartition,
                                                    offsetCounter: LongRef,
                                                    time: Time,
                                                    now: Long,
                                                    sourceCodec: CompressionCodec,
                                                    targetCodec: CompressionCodec,
                                                    compactedTopic: Boolean,
                                                    magic: Byte,
                                                    timestampType: TimestampType,
                                                    timestampDiffMaxMs: Long,
                                                    partitionLeaderEpoch: Int,
                                                    origin: AppendOrigin,
                                                    interBrokerProtocolVersion: ApiVersion,
                                                    brokerTopicStats: BrokerTopicStats,
                                                    requestLocal: RequestLocal): ValidationAndOffsetAssignResult = {
    if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
      // check the magic value
      if (!records.hasMatchingMagic(magic))
        convertAndAssignOffsetsNonCompressed(records, topicPartition, offsetCounter, compactedTopic, time, now, timestampType,
          timestampDiffMaxMs, magic, partitionLeaderEpoch, origin, brokerTopicStats)
      else
        // Do in-place validation, offset assignment and maybe set timestamp
        assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
          partitionLeaderEpoch, origin, magic, brokerTopicStats)
    } else {
      validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec,
        targetCodec, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin,
        interBrokerProtocolVersion, brokerTopicStats, requestLocal)
    }
  }
  ...
}

從上可知,當 broker 端配置的壓縮編碼 targetCodec 與所收到的批量訊息的壓縮編碼 sourceCodec 都為 none 即不壓縮時,會再檢查訊息的格式版本,如果與 broker 端配置的訊息版本不同,則需要先將原批量訊息轉換為目標版本 magic 對應格式的新批量訊息,然後再在新批量訊息中分配 offset ;否則直接在原批量訊息中就地分配 offset ,此過程均不涉及解壓縮。這裡稍微解釋下分配 offset 的邏輯,我們知道在 Kafka 中 offsetpartition 下每條訊息的唯一標識,consumer 端也是根據 offset 來追蹤消費進度,而 offset 的生成和寫入則是在 broker 端,就是此處提到的 offset 分配。理論上說,broker 需要為每條訊息都分配一個 offset 的,但在實踐中,因為用的是 recordBatch ,內部訊息是順序排列的且總記錄數是知道的,而 recordBatch 本身會記錄 baseOffset ,故通常只需設定 lastOffset 即可。唯一的例外是,當因訊息格式轉換或解壓縮而需要建立新的 recordBatch 時,會呼叫 memoryRecordsBuilderappendWithOffset 方法為每一條訊息記錄分配 offset

targetCodecsourceCodec 至少有一個不為 none 即需要壓縮時,情況就複雜一些,具體邏輯都在 validateMessagesAndAssignOffsetsCompressed 方法中,

private[log] object LogValidator extends Logging {
  ...
  def validateMessagesAndAssignOffsetsCompressed(...): ValidationAndOffsetAssignResult = {
    ...
    // No in place assignment situation 1
    var inPlaceAssignment = sourceCodec == targetCodec

    var maxTimestamp = RecordBatch.NO_TIMESTAMP
    val expectedInnerOffset = new LongRef(0)
    val validatedRecords = new mutable.ArrayBuffer[Record]

    var uncompressedSizeInBytes = 0

    // Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
    // One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
    // a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
    val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec)

    // No in place assignment situation 2 and 3: we only need to check for the first batch because:
    //  1. For most cases (compressed records, v2, for example), there's only one batch anyways.
    //  2. For cases that there may be multiple batches, all batches' magic should be the same.
    if (firstBatch.magic != toMagic || toMagic == RecordBatch.MAGIC_VALUE_V0)
      inPlaceAssignment = false

    // Do not compress control records unless they are written compressed
    if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch)
      inPlaceAssignment = true

    records.batches.forEach { batch =>
      validateBatch(topicPartition, firstBatch, batch, origin, toMagic, brokerTopicStats)
      uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())

      // if we are on version 2 and beyond, and we know we are going for in place assignment,
      // then we can optimize the iterator to skip key / value / headers since they would not be used at all
      val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
        batch.skipKeyValueIterator(requestLocal.bufferSupplier)
      else
        batch.streamingIterator(requestLocal.bufferSupplier)

      try {
        val recordErrors = new ArrayBuffer[ApiRecordError](0)
        // this is a hot path and we want to avoid any unnecessary allocations.
        var batchIndex = 0
        recordsIterator.forEachRemaining { record =>
          val expectedOffset = expectedInnerOffset.getAndIncrement()
          val recordError = validateRecordCompression(batchIndex, record).orElse {
            validateRecord(batch, topicPartition, record, batchIndex, now,
              timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).orElse {
              if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
                if (record.timestamp > maxTimestamp)
                  maxTimestamp = record.timestamp

                // Some older clients do not implement the V1 internal offsets correctly.
                // Historically the broker handled this by rewriting the batches rather
                // than rejecting the request. We must continue this handling here to avoid
                // breaking these clients.
                if (record.offset != expectedOffset)
                  inPlaceAssignment = false
              }
              None
            }
          }

          recordError match {
            case Some(e) => recordErrors += e
            case None =>
              uncompressedSizeInBytes += record.sizeInBytes()
              validatedRecords += record
          }
         batchIndex += 1
        }
        processRecordErrors(recordErrors)
      } finally {
        recordsIterator.close()
      }
    }

    if (!inPlaceAssignment) {
      val (producerId, producerEpoch, sequence, isTransactional) = {
        // note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
        // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
        // with older magic versions, there will never be a producer id, etc.
        val first = records.batches.asScala.head
        (first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
      }
      buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec),
        now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch,
        uncompressedSizeInBytes)
    } else {
      // we can update the batch only and write the compressed payload as is;
      // again we assume only one record batch within the compressed set
      val batch = records.batches.iterator.next()
      val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1

      batch.setLastOffset(lastOffset)

      if (timestampType == TimestampType.LOG_APPEND_TIME)
        maxTimestamp = now

      if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
        batch.setMaxTimestamp(timestampType, maxTimestamp)

      if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
        batch.setPartitionLeaderEpoch(partitionLeaderEpoch)

      val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
      ValidationAndOffsetAssignResult(validatedRecords = records,
        maxTimestamp = maxTimestamp,
        shallowOffsetOfMaxTimestamp = lastOffset,
        messageSizeMaybeChanged = false,
        recordConversionStats = recordConversionStats)
    }
  }
  ...
}

可以看到, inPlaceAssignment 是用於標識是否可以原地修改 recordBatch 來分配 offset ,有三種情況不能原地修改:

  • sourceCodec 和 targetCodec 不同,這個比較好理解,編碼不同,構建目標 payload 時原 recordBatch 自然不能複用。
  • 目標訊息格式版本 magic 與 broker 接收到的 recordBatchmagic 不同,此時需要訊息格式轉換,需要構建新的 recordBatch ,這個跟第一種情況是一樣的,無法複用原 recordBatch
  • 目標訊息格式版本為 V0 ,因為老版本 V0 格式的訊息,需要為每條訊息重新分配絕對 offset ,無法複用原 recordBatch

此時, inPlaceAssignment 為 false,直接走 buildRecordsAndAssignOffsets 邏輯來構建新的 recordBatch ,此時是否壓縮取決於 targetCodec ,如果不為 none ,則此處會按照 targetCodec 編碼進行壓縮。

除了上述三種情況之外,都是可以原地修改,此時可以直接複用原 recordBatch 來構建目標訊息的 payload,此時不存在壓縮處理。

何處會解壓

可能發生解壓的地方依然是兩處:consumer 端和 broker 端。

consumer 端

consumer 端發生解壓的唯一條件就是從 broker 端拉取到的訊息是帶壓縮的。此時,consumer 會根據 recordBatchcompressionType 來對訊息進行解壓,具體細節後面原始碼分析部分會講。

broker 端

broker 端是否發生解壓取決於 producer 發過來的批量訊息 recordBatch 是否是壓縮的:如果 producer 開啟了壓縮,則會發生解壓,否則不會。原因簡單說下,在 broker 端持久化訊息前,會對訊息做各種驗證,此時必然會迭代 recordBatch ,而在迭代的過程中,會直接採用 recordBatch 上的 compressionType 對訊息位元組流進行處理,是否解壓取決於 compressionType 是否是壓縮型別。關於這點,可以在 LogValidatorvalidateMessagesAndAssignOffsets 方法實現中可以看到,在 convertAndAssignOffsetsNonCompressedassignOffsetsNonCompressedvalidateMessagesAndAssignOffsetsCompressed 三個不同的分支中,都會看到 records.batches.forEach {...} 的身影,而在後面的原始碼分析中會發現,在 recordBatch 的迭代器邏輯中,直接採用的 compressionType 的解壓邏輯對訊息位元組流讀取的。也就是說,如果 recordBatch 是壓縮的 ,只要對其進行了迭代訪問,則會自動觸發解壓邏輯。

壓縮和解壓原理

壓縮和解壓涉及到幾個關鍵的類: CompressionTypeMemoryRecordsBuilderDefaultRecordBatchAbstractLegacyRecordBatch 。其中 CompressionType 是壓縮相關的列舉,集壓縮定義和實現為一體; MemoryRecordsBuilder 是負責將新的訊息資料寫入記憶體 buffer,即呼叫 CompressionType 中的壓縮邏輯 wrapForOutput 來寫入訊息;而 DefaultRecordBatchAbstractLegacyRecordBatch 則是負責讀取訊息資料,即呼叫 CompressionType 的解壓邏輯 wrapForInput 將訊息還原為無壓縮資料。只不過二者區別是,前者是用於處理新版本格式的訊息(即 magic >= 2 ),而後者則是處理老版本格式的訊息(即 magic 為 0 或 1 )。

CompressionType

在說 CompressionType 之前,我們先看下 CompressionCodec 這個 Scala 指令碼。

CompressionCodec

部分原始碼如下,

...
case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
  val codec = 1
  val name = "gzip"
}

case object SnappyCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
  val codec = 2
  val name = "snappy"
}

case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionCodec {
  val codec = 3
  val name = "lz4"
}

case object ZStdCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
  val codec = 4
  val name = "zstd"
}

case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
  val codec = 0
  val name = "none"
}

case object UncompressedCodec extends BrokerCompressionCodec {
  val name = "uncompressed"
}

case object ProducerCompressionCodec extends BrokerCompressionCodec {
  val name = "producer"
}

該指令碼定義了 GZIPCompressionCodec 等共 7 個 case object,可類比於 Java 中列舉,這些 case object 中的 name 集合則剛好覆蓋了前文所提到的屬性 compression.type 的所有可選值,包括 producer 端和 broker 端的。而與 name 繫結在一起的 codec 則是最終真正寫入訊息體的壓縮編碼, name 只是為了可讀性友好。從上可知,壓縮編碼 codec 的有效取值只有 0~4 ,分別對應 nonegzipsnappylz4zstd ,而這五種取值恰好是 CompressionType 中定義的五種列舉常量。

由此可知, CompressionCodec 是面向配置屬性 compression.type 的可選值的,並將數值化的壓縮編碼 codec 對映為可讀性強的 name ;而 CompressionType 則是定義了與壓縮編碼對應的列舉常量,二者通過 name 關聯。

CompressionType 原始碼

CompressionType 定義了與壓縮編碼對應的五種壓縮型別列舉,並且通過用於壓縮的 wrapForOutput 和用於解壓的 wrapForInput 這兩個抽象方法將每種壓縮型別與對應的壓縮實現繫結在一起,既避免了常規的 if-else 判斷,也將壓縮的定義與實現完全收斂到 CompressionType ,符合單一職責原則。其實類似這種優雅的設計在 JDK 中也能經常看到其身影,比如 TimeUnit 。直接看原始碼,

public enum CompressionType {
    ...
    GZIP(1, "gzip", 1.0f) {
        @Override
        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
            try {
                return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
            } catch (Exception e) {
                throw new KafkaException(e);
            }
        }

        @Override
        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
            try {
                // Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
                // 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
                // number of bytes (potentially a single byte)
                return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
                        16 * 1024);
            } catch (Exception e) {
                throw new KafkaException(e);
            }
        }
    },
    ...
    ZSTD(4, "zstd", 1.0f) {
        @Override
        public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
            return ZstdFactory.wrapForOutput(buffer);
        }

        @Override
        public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
            return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
        }
    };
    ...
    // Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
    public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion);
    // Wrap buffer with an InputStream that will decompress data with this CompressionType.
    public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);

    ...
}

每種壓縮型別對於 wrapForOutputwrapForInput 兩方法的具體實現已經很清楚地闡述了壓縮和解壓的方式,感興趣的朋友可以從該入口 step in 一探究竟。這裡就不細述。當然這只是處理壓縮最小的基本單元,為了搞清楚 Kafka 在何處使用它,還得繼續看其他幾個核心類。

在此之前,就上述原始碼,拋開本次主題,我還想談幾個值得學習借鑑的細節,

  1. SnappyZstd 都是用的 XXXFactory 靜態方法來構建 Stream 物件,而其他的比如 Lz4 則都是直接通過 new 建立的物件。之所以這麼做,我們進一步 step in 就會發現,對於 SnappyZstd ,Kafka 都是直接依賴的第三方庫,而其他的則是 JDK 或 Kafka 自己的實現。為了減少第三方庫的副作用, 通過此方式將第三方庫的類的惰性載入做到極致,這也體現出作者對 Java 類載入時機的充分理解,很精緻的處理
  2. GzipwrapForInput 實現中,在 KAFKA-6430 這個 Improvement 提交中,input buffer 從 0.5 KB 調大到 8 KB,其目的就是能夠在一次 Gzip 壓縮中處理更多的位元組,以獲得更高的效能。至少,從 commit 的描述上看,throughput 能翻倍。
  3. 抽象方法 wrapForInput 中暴露的最後一個 BufferSupplier型別的引數 decompressionBufferSupplier ,正如方法的引數說明所言,對於比較小的批量訊息,如果在 wrapForInput 內部新建 buffer,那麼每次方法呼叫都會新分配buffer,這可能比壓縮處理本身更耗時,所以該引數給了一個選擇的機會,在外面分配記憶體,然後方法內迴圈利用。 在日常的編碼中,對於迴圈中所需的空間,我也會經常會思考是每次新建好還是在外面分配,然後內部迴圈利用更好,case by case .

MemoryRecordsBuilder

public class MemoryRecordsBuilder implements AutoCloseable {
    ...
    // Used to append records, may compress data on the fly
    private DataOutputStream appendStream;
    ...

    public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
                                byte magic,
                                CompressionType compressionType,
                                TimestampType timestampType,
                                long baseOffset,
                                long logAppendTime,
                                long producerId,
                                short producerEpoch,
                                int baseSequence,
                                boolean isTransactional,
                                boolean isControlBatch,
                                int partitionLeaderEpoch,
                                int writeLimit,
                                long deleteHorizonMs) {
        if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
            throw new IllegalArgumentException("TimestampType must be set for magic > 0");
        if (magic < RecordBatch.MAGIC_VALUE_V2) {
            if (isTransactional)
                throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
            if (isControlBatch)
                throw new IllegalArgumentException("Control records are not supported for magic " + magic);
            if (compressionType == CompressionType.ZSTD)
                throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
            if (deleteHorizonMs != RecordBatch.NO_TIMESTAMP)
                throw new IllegalArgumentException("Delete horizon timestamp is not supported for magic " + magic);
        }
        ...
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
        ...
    }
  
    public void close() {
        ...
        if (numRecords == 0L) {
            buffer().position(initialPosition);
            builtRecords = MemoryRecords.EMPTY;
        } else {
            if (magic > RecordBatch.MAGIC_VALUE_V1)
                this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes;
            else if (compressionType != CompressionType.NONE)
                this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;

            ByteBuffer buffer = buffer().duplicate();
            buffer.flip();
            buffer.position(initialPosition);
            builtRecords = MemoryRecords.readableRecords(buffer.slice());
        }
    }
    ...
    private int writeDefaultBatchHeader() {
        ...
        DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
                baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
                hasDeleteHorizonMs(), partitionLeaderEpoch, numRecords);

        buffer.position(pos);
        return writtenCompressed;
    }
    private int writeLegacyCompressedWrapperHeader() {
        ...
        int wrapperSize = pos - initialPosition - Records.LOG_OVERHEAD;
        int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic);
        AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize);

        long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp;
        LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);

        buffer.position(pos);
        return writtenCompressed;
    }
}

可以看到, appendStream 是用於追加訊息到記憶體 buffer 的,直接採用的 compressionType 的壓縮邏輯來構建寫入流的,如果此處 compressionType 屬於非 none 的有效壓縮型別,則會產生壓縮。此外,從上面 magic 的判斷邏輯可知,訊息的時間戳型別是從大版本 V1 開始支援的;而事務訊息、控制訊息、Zstd 壓縮和 deleteHorizonMs 都是從 V2 才開始支援的。這裡的 V1V2 對應訊息格式的版本,其中 V1 是從 0.10.0 版本開始引入的,在此之前都是 V0 版本,而 V2 則是從 0.11.0 版本開始引入,直到現在的最新版依然是 V2

close() 方法可以看出, MemoryRecordsBuilder 在構建 memoryRecords 時,會根據訊息格式的版本高低,寫入不同的 Header。對於新版訊息,在 writeDefaultBatchHeader 方法中直接呼叫 DefaultRecordBatch.writeHeader(...) 寫入新版訊息特定的 Header;而對於老版訊息,則是在 writeLegacyCompressedWrapperHeader 方法中呼叫 AbstractLegacyRecordBatch.writeHeaderLegacyRecord.writeCompressedRecordHeader 寫入老版訊息的 Header。雖然 Header 的格式各不相同,但我們在兩種 Header 中都可以看到 compressionType 的身影,以此可見,Kafka 是允許多種版本的訊息共存,壓縮與非壓縮訊息的共存,因為這些資訊是儲存在 recordBatch 上的,是批量訊息級別。

DefaultRecordBatch

public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
    ...
    @Override
    public Iterator<Record> iterator() {
        if (count() == 0)
            return Collections.emptyIterator();

        if (!isCompressed())
            return uncompressedIterator();

        // for a normal iterator, we cannot ensure that the underlying compression stream is closed,
        // so we decompress the full record set here. Use cases which call for a lower memory footprint
        // can use `streamingIterator` at the cost of additional complexity
        try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) {
            List<Record> records = new ArrayList<>(count());
            while (iterator.hasNext())
                records.add(iterator.next());
            return records.iterator();
        }
    }
    ...
}

RecordBatch 是表示批量訊息的介面,對於老版格式的訊息(版本 0 和 1),如果沒有壓縮,只會包含單條訊息,否則可以包含多條;而新版格式訊息(版本 2及以上)無論是否壓縮,都是通常包含多條訊息。且該介面中有一個 compressionType() 方法來標識該 batch 的壓縮型別,它會作為讀訊息時解壓的判斷依據。而上面的 DefaultRecordBatch 則是該介面的針對新版本格式訊息的預設實現,它也實現了 Iterable<Record> 介面,因而 iterator() 是訪問批量訊息的核心邏輯,當 compressionType() 返回 none 時,表示不壓縮,直接返回非壓縮迭代器,此處跳過,當有壓縮時,走的是壓縮迭代器,具體實現如下,

public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
    final ByteBuffer buffer = this.buffer.duplicate();
    buffer.position(RECORDS_OFFSET);
    return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
}

private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
    final DataInputStream inputStream = recordInputStream(bufferSupplier);

    if (skipKeyValue) {
        // this buffer is used to skip length delimited fields like key, value, headers
        byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
      
        return new StreamRecordIterator(inputStream) {
            ...
        }
    } else {
        ...  
    }
}

我們可以看到, compressedIterator() 在構造 Stream 迭代器之前,呼叫了 recordInputStream(...) ,該方法中通過 compressionType 的解壓邏輯對原資料進行了解壓。

AbstractLegacyRecordBatch

public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch implements Record {
    ...
    CloseableIterator<Record> iterator(BufferSupplier bufferSupplier) {
        if (isCompressed())
            return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier);

        return new CloseableIterator<Record>() {
            private boolean hasNext = true;

            @Override
            public void close() {}

            @Override
            public boolean hasNext() {
                return hasNext;
            }

            @Override
            public Record next() {
                if (!hasNext)
                    throw new NoSuchElementException();
                hasNext = false;
                return AbstractLegacyRecordBatch.this;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
    ...
    private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
        private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
                                    boolean ensureMatchingMagic,
                                    int maxMessageSize,
                                    BufferSupplier bufferSupplier) {
            LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
            this.wrapperMagic = wrapperRecord.magic();
            if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1)
                throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);

            CompressionType compressionType = wrapperRecord.compressionType();
            if (compressionType == CompressionType.ZSTD)
                throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
            ByteBuffer wrapperValue = wrapperRecord.value();
            if (wrapperValue == null)
                throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
                        wrapperMagic + ")");

            InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
            ...
        }
    }
}

AbstractLegacyRecordBatch 跟前面的 DefaultRecordBatch 大同小異,同樣也是 iterator() 入口,當開啟了壓縮時,返回壓縮迭代器 DeepRecordsIterator ,只是名字不同而已,迭代器內部依然是直接通過 compressionType 的解壓邏輯對資料流進行解壓。