Dapr 入門教程之訊息佇列

語言: CN / TW / HK

前面我們瞭解了 Dapr 對釋出訂閱的支援,本節我們將來介紹了 Dapr 中對訊息佇列的支援。訊息佇列,分為兩種繫結,一種是輸出繫結,一種是輸入繫結。出和入是看資料的流向,輸出繫結就是作為生產者的服務把訊息通過 Dapr 傳給訊息佇列,輸入繫結就是作為消費者的服務通過 Dapr 從訊息佇列裡得到訊息。

這裡的訊息佇列和釋出訂閱裡的訊息匯流排有什麼區別呢?一個訊息進入訊息匯流排的話,所有訂閱者都能得到這個訊息,而一個訊息進入訊息佇列的話,由消費者來取,一次只有一個人能得到。此外,訊息匯流排是不要求處理順序的,兩個訊息進入訊息匯流排,誰先被拿到順序是不一定的,而訊息佇列可以保證是先入先出的。

本節我們將建立兩個微服務,一個具有輸入繫結,另一個具有輸出繫結,前面我們都使用的 Redis 這種中介軟體,這裡我們將繫結到 Kafka。

  • Node.js 微服務使用輸入繫結
  • Python 微服務利用輸出繫結

繫結連線到 Kafka,允許我們將訊息推送到 Kafka 例項(從 Python 微服務)中,並從該例項(從 Node.js 微服務)接收訊息,而不必知道例項的位置。相反,同樣只需要直接使用 Dapr API 通過 sidecars 連線即可。

本地執行

首先我們在本地來執行示例應用,對應的架構圖如下所示:

Bindings 本地模式

同樣使用 quickstarts 這個程式碼倉庫:

git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git

由於我們這裡是使用 Kafka 來做訊息佇列的中介軟體,所以我們首先需要在本地環境執行 Kafka,我們可以直接使用 https://github.com/wurstmeister/kafka-docker 這個專案以 Docker 方式執行。

定位到 quickstarts 的 tutorials/bindings 目錄,下面有一個 docker-compose-single-kafka.yml 檔案:

$ cd tutorials/bindings
$ cat docker-compose-single-kafka.yml
version: '2'
services:
  zookeeper:
    image: ghcr.io/dapr/3rdparty/zookeeper:latest
    ports:
      - "2181:2181"
  kafka:
    image: ghcr.io/dapr/3rdparty/kafka:latest
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "sample:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

我們可以直接而使用 docker-compose 來啟動一個單例項的 Kafka:

$ docker-compose -f ./docker-compose-single-kafka.yml up -d

隔一段時間映象拉取完成後以容器方式啟動 Kafka:

$ docker-compose -f ./docker-compose-single-kafka.yml ps
NAME                   COMMAND                  SERVICE             STATUS              PORTS
bindings-kafka-1       "start-kafka.sh"         kafka               running             0.0.0.0:9092->9092/tcp
bindings-zookeeper-1   "/bin/sh -c '/usr/sb…"   zookeeper           running             0.0.0.0:2181->2181/tcp

在本地運行了 Kafka 後,接著我們可以執行輸入繫結的 Node.js 微服務:

$ cd nodeapp

同樣先安裝服務依賴:

$ npm install  # 或者執行 yarn 命令

然後我們就可以使用 dapr run 命令來啟動該微服務了,啟動方式我們應該比較熟悉了,如下所示:

$ dapr run --app-id bindings-nodeapp --app-port 3000 node app.js --components-path ../components

上面的命令和前面有點不一樣的地方是多了一個 --components-path 用來指定元件路徑,這是因為現在我們要使用 Kafka 這種中介軟體來作為我們的訊息佇列元件,那麼我們就需要告訴 Dapr,在 ./components 目錄下面就包含一個對應的 kafka_bindings.yaml 檔案,內容如下所示:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: localhost:9092
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

前面在本地模式下面我們沒有主動宣告元件,是因為我們使用的就是預設的 Redis,而 Kafka 並不是內建就有的,所以需要我們主動宣告,注意上面元件的型別為 type: bindings.kafka,metadata 下面是訪問 Kafka 相關的元資料。正常情況下上面的啟動命令會輸出如下所示的日誌資訊:

:information_source:  Starting Dapr with id bindings-nodeapp. HTTP Port: 54215. gRPC Port: 54216
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 347.136ms  app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
:information_source:  Updating metadata for app command: node app.js
:white_check_mark:  You're up and running! Both Dapr and your app logs will appear here.

INFO[0001] placement tables updated, version: 0          app_id=bindings-nodeapp instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4

接下來,需要執行輸出繫結的 Python 微服務,定位到 pythonapp​ 目錄,安裝 requests 依賴:

$ cd pythonapp
$ pip3 install requests

然後同樣用 dapr run 命令來啟動該微服務,也要注意帶上後面的 --components-path 引數:

$ dapr run --app-id bindings-pythonapp python3 app.py --components-path ../components
:information_source:  Starting Dapr with id bindings-pythonapp. HTTP Port: 54554. gRPC Port: 54555
:information_source:  Checking if Dapr sidecar is listening on HTTP port 54554
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=bindings-pythonapp instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
:information_source:  Checking if Dapr sidecar is listening on GRPC port 54555
:information_source:  Dapr sidecar is up and running.
:information_source:  Updating metadata for app command: python3 app.py
:white_check_mark:  You're up and running! Both Dapr and your app logs will appear here.

啟動完成後,觀察 Python 服務的日誌,可以看到不斷輸出如下所示成功輸出繫結到 Kafka 的日誌:

== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP == <Response [204]>
# ......

同樣這個時候 Node.js 微服務中也不斷有新的日誌資料產生:

== APP == <Response [204]>
== APP == {'data': {'orderId': 1}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 2}, 'operation': 'create'}
== APP == <Response [204]>
== APP == {'data': {'orderId': 3}, 'operation': 'create'}
== APP == <Response [204]>
# ......

這是因為 Python 微服務每隔 1s 就會向我們繫結的訊息佇列傳送一條訊息,而 Node.js 微服務作為消費者當然會接收到對應的訊息資料。

在 Kubernetes 中執行

上面在本地環境下可以正常執行 Dapr bindings 服務,接下來我們再次將該示例部署到 Kubernetes 叢集中來進行觀察。

同樣首先需要提供一個可用的 Kafka 例項,這裡我們仍然使用 Helm Chart 方式來進行安裝:

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update

然後使用如下所示的命令來安裝 Kafka:

$ helm upgrade --install dapr-kafka bitnami/kafka --wait --namespace kafka -f ./kafka-non-persistence.yaml --create-namespace

這裡我們指定了一個無需持久化資料(僅供測試)的 values 檔案 kafka-non-persistence.yaml,內容如下所示:

replicas: 1
# Disable persistent storage
persistence:
  enabled: false
zookeeper:
  persistence:
    enabled: false
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
          - matchExpressions:
              - key: kubernetes.io/os
                operator: In
                values:
                  - linux
              - key: kubernetes.io/arch
                operator: In
                values:
                  - amd64

autoCreateTopicsEnable: true
affinity:
  nodeAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
        - matchExpressions:
            - key: kubernetes.io/os
              operator: In
              values:
                - linux
            - key: kubernetes.io/arch
              operator: In
              values:
                - amd64

安裝完成後可以檢視 Pod 的狀態來保證 Kafka 啟動成功:

$ kubectl -n kafka get pods -w
NAME                     READY   STATUS    RESTARTS   AGE
dapr-kafka-0             1/1     Running   0          2m7s
dapr-kafka-zookeeper-0   1/1     Running   0          2m57s

接下來我們首先需要在 Kubernetes 叢集中配置使用 Kafka 作為 Binding 訊息中介軟體的 Component 元件:

# kafka_bindings.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: dapr-kafka.kafka:9092
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

注意該物件上面指定的元件型別為 bindings.kafka,metadata 下面的元資訊包括 Kafka brokers地址、生產者和消費者的配置等等,直接應用上面的資源清單即可:

$ kubectl apply -f kafka_bindings.yaml
$ kubectl get components sample-topic
NAME           AGE
sample-topic   13s

建立完成後在 Dapr Dashboard 中也可以看到對應的元件資訊:

dapr dashboard components。

接著部署兩個 Node.js 和 Python 微服務即可:

$ kubectl apply -f deploy/node.yaml
service/bindings-nodeapp created
deployment.apps/bindings-nodeapp created
$ kubectl apply -f deploy/python.yaml
deployment.apps/bindings-pythonapp created
$ kubectl get pods
NAME                                  READY   STATUS    RESTARTS         AGE
bindings-nodeapp-8bcdd744d-pj2j7      2/2     Running   0                3m44s
bindings-pythonapp-7b7fcc579b-kqx6p   2/2     Running   0                3m39s

部署完成後可以同樣分別觀察 Node.js 和 Python 微服務的日誌:

$ kubectl logs --selector app=bindingspythonapp -c python --tail=-1
{'data': {'orderId': 1}, 'operation': 'create'}
HTTPConnectionPool(host='localhost', port=3500): Max retries exceeded with url: /v1.0/bindings/sample-topic (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f9e75181390>: Failed to establish a new connection: [Errno 111] Connection refused'))
{'data': {'orderId': 2}, 'operation': 'create'}
<Response [204]>
{'data': {'orderId': 3}, 'operation': 'create'}
<Response [204]>
# ......
$ kubectl logs --selector app=bindingsnodeapp -c node --tail=-1
Node App listening on port 3000!
Hello from Kafka!
{ orderId: 2 }
Hello from Kafka!
{ orderId: 3 }
# ......

可以看到兩個微服務的日誌也服務我們的預期的。

如何工作

前面我們在本地或 Kubernetes 中都運行了示例應用,而且沒有更改任何程式碼,應用結果都符合預期,接下來我們看看這是如何工作的。

在檢視應用程式程式碼之前,我們先看看 Kafka 繫結元件的資源清單檔案,它們為 Kafka 連線指定 brokers,為消費者指定 topics 和 consumerGroup,為生產者指定了 publishTopic。

我們建立了名為 sample-topic 的元件,然後我們通過該元件配置的 Kafka 中的 sample 主題來設定輸入和輸出繫結。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: sample-topic
spec:
  type: bindings.kafka
  version: v1
  metadata:
    # Kafka broker connection setting
    - name: brokers
      value: [kafka broker address]
    # consumer configuration: topic and consumer group
    - name: topics
      value: sample
    - name: consumerGroup
      value: group1
    # publisher configuration: topic
    - name: publishTopic
      value: sample
    - name: authRequired
      value: "false"

現在我們先導航到 nodeapp​ 目錄下面開啟 app.js​ 檔案,這是 Node.js 輸入繫結示例應用的程式碼。這裡使用 Express​ 暴露了一個 API 端點,需要注意的是 API 名稱必須與在 Kafka 繫結元件中宣告的元件名稱相同,然後 Dapr 執行時將使用來自 sample 主題的事件,然後將 POST 請求與事件負載一起傳送給 Node 應用程式。

const express = require("express");
const bodyParser = require("body-parser");
const port = process.env.APP_PORT ?? "3000";
require("isomorphic-fetch");
const app = express();
app.use(bodyParser.json());
// 這裡的 api 端點需要與宣告的元件名稱相同
app.post("/sample-topic", (req, res) => {
  console.log("Hello from Kafka!");
  console.log(req.body);
  res.status(200).send();
});
app.listen(port, () => console.log(`Node App listening on port ${port}!`));

所以當 Kafka 中收到訊息後就會列印類似如下所示的日誌:

Hello from Kafka!
{ orderId: 3 }

然後我們導航到 pythonapp 目錄下面開啟 app.py 檔案,這是輸出繫結示例(生產者)應用程式的程式碼,該服務會每秒傳送一次 POST 請求到 Dapr 的 http 端點的 http://localhost:3500/v1.0/bindings/<output_bindings_name>,並帶有事件的 payload 資料。這個應用程式使用 bindings 元件名 sample-topic 作為 <output_bindings_name>,然後 Dapr 執行時將事件傳送到上面的 Kafka 繫結元件中指定的 sample 主題上去。

import time
import requests
import os
dapr_port = os.getenv("DAPR_HTTP_PORT", 3500)

dapr_url = "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port)
n = 0
while True:
    n += 1
    payload = { "data": {"orderId": n}, "operation": "create" }
    print(payload, flush=True)
    try:
        response = requests.post(dapr_url, json=payload)
        print(response, flush=True)

    except Exception as e:
        print(e, flush=True)

    time.sleep(1)

上面程式碼中最重要的依然是 Dapr API 地址 dapr_url 的拼接 "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port),注意我們依然是面向 localhost 程式設計,而 v1.0/bindings/<output_bindings_name> 端點則是 Dapr API 為我們封裝的輸出訊息繫結的統一介面,非常簡單方便。