博文乾貨|在 Kotlin 中使用 Apache Pulsar

語言: CN / TW / HK

關於 Apache Pulsar

Apache Pulsar 是 Apache 軟體基金會頂級專案,是下一代雲原生分散式訊息流平臺,集訊息、儲存、輕量化函式式計算為一體,採用計算與儲存分離架構設計,支援多租戶、持久化儲存、多機房跨區域資料複製,具有強一致性、高吞吐、低延時及高可擴充套件性等流資料儲存特性。
GitHub 地址:http://github.com/apache/pulsar/

本文翻譯自:《Using Apache Pulsar With Kotlin》,作者 Gilles Barbier。
原文連結:https://gillesbarbier.medium.com/using-apache-pulsar-with-kotlin-3b0ab398cf52

譯者簡介

宋博,就職於北京百觀科技有限公司,高階開發工程師,專注於微服務,雲端計算,大資料領域。

Apache Pulsar[1]通常被描述為下一代 Kafka,是開發人員工具集中一顆冉冉升起的新星。Pulsar 是用於 server-to-server 訊息傳遞的多租戶、高效能解決方案,通常用作可擴充套件應用程式的核心。

Pulsar 可以與 Kotlin[2]一起使用,因為它是用 Java 編寫的。不過,它的 API 並沒有考慮 Kotlin 帶來的強大功能,例如資料類[3]協程[4]無反射序列化[5]

在這篇文章中,我將討論如何通過 Kotlin 來使用 Pulsar。

為訊息體使用原生序列化

在 Kotlin 中定義訊息的一種預設方式是使用資料類[6],這些類的主要目的是儲存資料。對於此類資料類,Kotlin 會自動提供 equals()、toString()、copy()等方法 ,從而縮短程式碼長度並降低出現錯誤的風險。

使用 Java 建立一個Pulsar 生產者[7]:

Producer<MyAvro> avroProducer = client
  .newProducer(Schema.AVRO(MyAvro.class))
  .topic(“some-avro-topic”)
  .create();

該 Schema.AVRO(MyAvro.class) 指令將內省 MyAvro Java 類並從中推斷出一個 Schema。這需要校驗新的生產者是否會產生與現有消費者實際相容的訊息。然而 Kotlin 資料類的 Java 實現不能很好地與 Pulsar 使用的預設序列化器配合使用。但幸運的是,從 2.7.0 版本開始,Pulsar 允許您對生產者和消費者使用自定義序列化程式。

首先,您需要安裝官方 Kotlin 序列化外掛[8]。使用它可以建立一個如下的訊息類:

@Serializable
        data class RunTask(
             val taskName: TaskName,
             val taskId: TaskId,
        val taskInput: TaskInput,
        val taskOptions: TaskOptions,
        val taskMeta: TaskMeta
         )

注意 @Serializable 註解。
有了它,你就可以使用 RunTask.serialiser() 讓序列化器在不內省的情況下工作,這將使效率大大提升!

目前,序列化外掛僅支援 JSON(以及一些其他在 beta 內的格式 例如 protobuf)。所以我們還需要 avro4k[9] 庫來擴充套件它並支援 Avro 格式。

使用這些工具,我們可以建立一個像下面這樣的 Producer  任務:

import com.github.avrokotlin.avro4k.Avro
import com.github.avrokotlin.avro4k.io.AvroEncodeFormat
import io.infinitic.common.tasks.executors.messages.RunTask
import kotlinx.serialization.KSerializer
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.schema.SchemaDefinition
import org.apache.pulsar.client.api.schema.SchemaReader
import org.apache.pulsar.client.api.schema.SchemaWriter
import java.io.ByteArrayOutputStream
import java.io.InputStream

// Convert T instance to Avro schemaless binary format
fun <T : Any> writeBinary(t: T, serializer: KSerializer<T>): ByteArray {
    val out = ByteArrayOutputStream()
    Avro.default.openOutputStream(serializer) {
        encodeFormat = AvroEncodeFormat.Binary
        schema = Avro.default.schema(serializer)
    }.to(out).write(t).close()

    return out.toByteArray()
}

// Convert Avro schemaless byte array to T instance
fun <T> readBinary(bytes: ByteArray, serializer: KSerializer<T>): T {
    val datumReader = GenericDatumReader<GenericRecord>(Avro.default.schema(serializer))
    val decoder = DecoderFactory.get().binaryDecoder(SeekableByteArrayInput(bytes), null)

    return Avro.default.fromRecord(serializer, datumReader.read(null, decoder))
}

// custom Pulsar SchemaReader
class RunTaskSchemaReader: SchemaReader<RunTask> {
    override fun read(bytes: ByteArray, offset: Int, length: Int) =
        read(bytes.inputStream(offset, length))

    override fun read(inputStream: InputStream) =
        readBinary(inputStream.readBytes(), RunTask.serializer())
}

// custom Pulsar SchemaWriter
class RunTaskSchemaWriter : SchemaWriter<RunTask> {
    override fun write(message: RunTask) = writeBinary(message, RunTask.serializer())
}

// custom Pulsar SchemaDefinition<RunTask>
fun runTaskSchemaDefinition(): SchemaDefinition<RunTask> =
    SchemaDefinition.builder<RunTask>()
        .withJsonDef(Avro.default.schema(RunTask.serializer()).toString())
        .withSchemaReader(RunTaskSchemaReader())
        .withSchemaWriter(RunTaskSchemaWriter())
        .withSupportSchemaVersioning(true)
        .build()

// Create an instance of Producer<RunTask>
fun runTaskProducer(client: PulsarClient): Producer<RunTask> = client
    .newProducer(Schema.AVRO(runTaskSchemaDefinition()))
    .topic("some-avro-topic")
    .create();

// Create an instance of Consumer<RunTask>
fun runTaskConsumer(client: PulsarClient): Consumer<RunTask> = client
    .newConsumer(Schema.AVRO(runTaskSchemaDefinition()))
    .topic("some-avro-topic")
    .subscribe();

密封類訊息和每個 Topic 一個封裝

Pulsar 每個 Topic 只允許一種型別的訊息。在某些特殊情況下,這並不能滿足全部需求。但這個問題可以通過使用封裝模式來變通。

首先,使用密封類從一個 Topic 建立所有型別訊息:

@Serializable
sealed class TaskEngineMessage() {
    abstract val taskId: TaskId
}

@Serializable
data class DispatchTask(
    override val taskId: TaskId,
    val taskName: TaskName,
    val methodName: MethodName,
    val methodParameterTypes: MethodParameterTypes?,
    val methodInput: MethodInput,
    val workflowId: WorkflowId?,
    val methodRunId: MethodRunId?,
    val taskMeta: TaskMeta,
    val taskOptions: TaskOptions = TaskOptions()
) : TaskEngineMessage()

@Serializable
data class CancelTask(
    override val taskId: TaskId,
    val taskOutput: MethodOutput
) : TaskEngineMessage()

@Serializable
data class TaskCanceled(
    override val taskId: TaskId,
    val taskOutput: MethodOutput,
    val taskMeta: TaskMeta
) : TaskEngineMessage()

@Serializable
data class TaskCompleted(
    override val taskId: TaskId,
    val taskName: TaskName,
    val taskOutput: MethodOutput,
    val taskMeta: TaskMeta
) : TaskEngineMessage()

然後,再為這些訊息建立一個封裝:

Note @Serializable
data class TaskEngineEnvelope(
    val taskId: TaskId,
    val type: TaskEngineMessageType,
    val dispatchTask: DispatchTask? = null,
    val cancelTask: CancelTask? = null,
    val taskCanceled: TaskCanceled? = null,
    val taskCompleted: TaskCompleted? = null,
) {
    init {
        val noNull = listOfNotNull(
            dispatchTask,
            cancelTask,
            taskCanceled,
            taskCompleted
        )

        require(noNull.size == 1)
        require(noNull.first() == message())
        require(noNull.first().taskId == taskId)
    }

    companion object {
        fun from(msg: TaskEngineMessage) = when (msg) {
            is DispatchTask -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.DISPATCH_TASK,
                dispatchTask = msg
            )
            is CancelTask -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.CANCEL_TASK,
                cancelTask = msg
            )
            is TaskCanceled -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.TASK_CANCELED,
                taskCanceled = msg
            )
            is TaskCompleted -> TaskEngineEnvelope(
                msg.taskId,
                TaskEngineMessageType.TASK_COMPLETED,
                taskCompleted = msg
            )
        }
    }

    fun message(): TaskEngineMessage = when (type) {
        TaskEngineMessageType.DISPATCH_TASK -> dispatchTask!!
        TaskEngineMessageType.CANCEL_TASK -> cancelTask!!
        TaskEngineMessageType.TASK_CANCELED -> taskCanceled!!
        TaskEngineMessageType.TASK_COMPLETED -> taskCompleted!!
    }
}

enum class TaskEngineMessageType {
    CANCEL_TASK,
    DISPATCH_TASK,
    TASK_CANCELED,
    TASK_COMPLETED
}

請注意 Kotlin 如何優雅地檢查init! 可以藉助 TaskEngineEnvelope.from(msg) 很容易建立一個封裝,並通過envelope.message()返回原始訊息。

為什麼這裡添加了一個顯式 taskId 值,而非使用一個全域性欄位message:TaskEngineMessage,並且針對每種訊息型別使用一個欄位呢?是因為通過這種方式,我就可以藉助 taskId 或 type,亦或者兩者相結合的方式使用PulsarSQL[10] 來獲取這個 Topic 的資訊。

通過協程來構建 Worker

在普通 Java 中使用 Thread 很複雜且容易出錯。好在 Koltin 提供了 coroutines[11]——一種更簡單的非同步處理抽象——和 channels[12]——一種在協程之間傳輸資料的便捷方式。

我可以通過以下方式建立一個 Worker:

單個("task-engine-message-puller")專用於從 Pulsar 拉取訊息的協程N 個協程 ( "task-engine-$i") 並行處理訊息單個("task-engine-message-acknoldeger")處理後確認 Pulsar 訊息的協程

有很多個類似於這樣的程序後我已經添加了一個 logChannel 用來採集日誌。請注意,為了能夠在與接收它的協程不同的協程中確認 Pulsar 訊息,我需要將TaskEngineMessage封裝到包含Pulsar messageIdMessageToProcess<TaskEngineMessage>中:

typealias TaskEngineMessageToProcess = MessageToProcess<TaskEngineMessage>

fun CoroutineScope.startPulsarTaskEngineWorker(
    taskEngineConsumer: Consumer<TaskEngineEnvelope>,
    taskEngine: TaskEngine,
    logChannel: SendChannel<TaskEngineMessageToProcess>?,
    enginesNumber: Int
) = launch(Dispatchers.IO) {

    val taskInputChannel = Channel<TaskEngineMessageToProcess>()
    val taskResultsChannel = Channel<TaskEngineMessageToProcess>()

    // coroutine dedicated to pulsar message pulling
    launch(CoroutineName("task-engine-message-puller")) {
        while (isActive) {
            val message: Message<TaskEngineEnvelope> = taskEngineConsumer.receiveAsync().await()

            try {
                val envelope = readBinary(message.data, TaskEngineEnvelope.serializer())
                taskInputChannel.send(MessageToProcess(envelope.message(), message.messageId))
            } catch (e: Exception) {
                taskEngineConsumer.negativeAcknowledge(message.messageId)
                throw e
            }
        }
    }

    // coroutines dedicated to Task Engine
    repeat(enginesNumber) {
        launch(CoroutineName("task-engine-$it")) {
            for (messageToProcess in taskInputChannel) {
                try {
                    messageToProcess.output = taskEngine.handle(messageToProcess.message)
                } catch (e: Exception) {
                    messageToProcess.exception = e
                }
                taskResultsChannel.send(messageToProcess)
            }
        }
    }

    // coroutine dedicated to pulsar message acknowledging
    launch(CoroutineName("task-engine-message-acknowledger")) {
        for (messageToProcess in taskResultsChannel) {
            if (messageToProcess.exception == null) {
                taskEngineConsumer.acknowledgeAsync(messageToProcess.messageId).await()
            } else {
                taskEngineConsumer.negativeAcknowledge(messageToProcess.messageId)
            }
            logChannel?.send(messageToProcess)
        }
    }
}

data class MessageToProcess<T> (
    val message: T,
    val messageId: MessageId,
    var exception: Exception? = null,
    var output: Any? = null
)

總結

在本文中,我們介紹瞭如何在 Kotlin 中實現的 Pulsar 使用方法:

程式碼訊息(包括接收多種型別訊息的 Pulsar Topic 的封裝);建立 Pulsar 的生產者/消費者;構建一個能夠並行處理許多訊息的簡單 Worker。

引用連結

[1] Apache Pulsar: https://pulsar.apache.org/zh-CN/
[2] Kotlin: https://kotlinlang.org/
[3] 資料類: https://kotlinlang.org/docs/reference/data-classes.html
[4] 協程: https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html
[5] 無反射序列化: https://kotlinlang.org/docs/reference/serialization.html#serialization
[6] 資料類: https://kotlinlang.org/docs/reference/data-classes.html
[7] Pulsar 生產者: https://pulsar.apache.org/docs/en/client-libraries-java/#schema-example
[8] Kotlin 序列化外掛: https://github.com/Kotlin/kotlinx.serialization
[9] avro4k: https://github.com/avro-kotlin/avro4k
[10] PulsarSQL: https://pulsar.apache.org/docs/en/sql-overview/
[11] coroutines: https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html
[12] channels: https://kotlinlang.org/docs/reference/coroutines/channels.html


▼ 關注「Apache Pulsar」,獲取更多技術乾貨 ▼

👇🏻 加入 Apache Pulsar 中文交流群 👇🏻

本文分享自微信公眾號 - ApachePulsar(ApachePulsar)。
如有侵權,請聯絡 [email protected] 刪除。
本文參與“OSC源創計劃”,歡迎正在閱讀的你也加入,一起分享。

「其他文章」