dapr實戰(三)

語言: CN / TW / HK

政採雲技術團隊.png

水手.png

一、概要

​ 上篇介紹服務之間呼叫 ( Service-to-service invocation )、狀態管理 ( State management ) 功能。

本文介紹剩下的幾個功能: * 釋出訂閱( Publish and subscribe ) * 資源繫結( Resource bindings ) * Actors * 可觀測性( Observability ) * 祕鑰管理( Secrets management) * 配置管理( Configuration )

所有 Demo 均以 Java 的形式展示。以上功能都基於 Dapr實戰(二)的基礎上增加額外的配置,基礎部分不再贅述。

注意:Dapr 是作為 Sidecar存在的,所以設定好元件之後,所以必須啟動 http-demo 專案。如果元件未啟用的話,http-demo 內不要進行配置例如:dapr.io/config: "zipkin-config"

二、釋出訂閱( Publish and Subscribe )

回顧一下之前的路徑圖:

image-20211229165213277

2.1 釋出( Publish )

建立 pub 的配置( pub.yaml)

yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: redis-pubsub namespace: default spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: xx.xx.xx.xx:6379 - name: redisPassword value:

執行元件初始化

shell kubectl apply -f pub.yaml

啟動專案 http-demo.yaml

shell kubectl create -f http-demo.yaml

測試功能:呼叫Dapr介面

shell curl -X POST http://xx.xx.xx.xx:3500/v1.0/publish/redis-pubsub/deathStarStatus -H "Content-Type: application/json" -d '{ "status": "completed-test" }'

檢視redis內容,可以看到對應的訊息內容

shell xread streams deathStarStatus 0

2.2 訂閱( Subscribe )

建立 sub 的配置( sub.yaml )

yaml apiVersion: dapr.io/v1alpha1 kind: Subscription metadata: name: redis-sub spec: topic: deathStarStatus route: /dapr-sample-topic pubsubname: redis-pubsub

controller 程式碼片段

java @PostMapping("/dapr-sample-topic") public Response<String> sampleTopic() { System.out.println("sample-topic"); return Response.ok("sample-topic"); }

執行元件初始化

shell kubectl apply -f sub.yaml

啟動專案 http-demo-dapr.yaml

shell kubectl create -f http-demo-dapr.yaml

測試功能:呼叫 pub 的介面進行釋出,檢視http-demo-dapr的日誌,日誌可以看到sample-topic消費到了對應的日誌

shell kubectl logs -f daprnodeapp-xx-xx -c dapr-http-demo

三、資源繫結( Resource bindings )

資源繫結我們以kafka為例,先建立kafka中介軟體

shell helm repo add bitnami https://charts.bitnami.com/bitnami helm repo update kubectl create ns kafka helm install dapr-kafka bitnami/kafka --wait --namespace kafka -f ./kafka-non-persistence.yaml

kafka 的 yaml 內容( kafka-non-persistence.yaml )

```yaml

------------------------------------------------------------

Copyright (c) Microsoft Corporation.

Licensed under the MIT License.

------------------------------------------------------------

replicas: 1

Disable persistent storage

persistence: enabled: false zookeeper: persistence: enabled: false affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/os operator: In values: - linux - key: kubernetes.io/arch operator: In values: - amd64

autoCreateTopicsEnable: true

affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/os operator: In values: - linux - key: kubernetes.io/arch operator: In values: - amd64 ```

建立 kafka 的元件

yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: sample-topic spec: type: bindings.kafka version: v1 metadata: # Kafka broker connection setting - name: brokers value: "xx.xx.xx.xx:9092,xx.xx.xx.xx:9093" # consumer configuration: topic and consumer group - name: topics value: sample - name: consumerGroup value: group1 # publisher configuration: topic - name: publishTopic value: sample - name: authRequired value: "false"

初始化專案( http-demo-dapr.yaml )用於驗證輸出繫結

shell kubectl create -f http-demo-dapr.yaml

測試功能:輸出繫結(輸出繫結允許您呼叫外部資源( kafka ))

解釋:向 sample-topic 元件輸出請求topic sample建立訊息的事件)

shell curl -X POST -H 'Content-Type: application/json' http://xx.xx.xx.xx:3500/v1.0/bindings/sample-topic -d '{ "data": { "message": "Hi!" }, "operation": "create" }'

初始化專案( http-demo.yaml )用於驗證輸入繫結

shell kubectl create -f http-demo.yaml

測試功能:輸入繫結( kafka 外部資源的事件時觸發您的應用程式)

解釋:只要介面 url 是sample-topic就能接收到kafka發生的變化,通過日誌檢視到 sample-topic ,接收到了 kafka 的外部資源事件

shell kubectl logs -f nodeapp-xx-xx -c http-demo

四、Actors

Actors目前功能包含:計時器( timers )和 提醒( reminders ),主要差別是 timers 是無狀態的,reminders是有狀態。無狀態即重啟就資訊就丟失,有狀態是重啟還是能繼續運轉。

建立狀態儲存提供給 actor ( statestore.yaml )

yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: statestore namespace: default spec: type: state.redis version: v1 metadata: - name: redisHost value: xx.xx.xx.xx:6379 - name: redisPassword value: "" - name: actorStateStore value: "true"

初始化 http-demo 、 http-demo-dapr 專案

shell kubectl create -f http-demo.yaml kubectl create -f http-demo-dapr.yaml

建立 actor sample 相關程式碼

```java @GetMapping("/dapr-sample-actor/{actorId}") public Response sampleActor(@PathVariable("actorId") String actorId) throws InterruptedException { System.out.println("sample-actor"); int messageNumber = doActor(actorId); return Response.ok(messageNumber); }

@GetMapping("/dapr-sample-actor/clock/{actorId}") public Response sampleActorClock(@PathVariable("actorId") String actorId) { System.out.println("sample-actor"); int messageNumber = doActorClock(actorId); return Response.ok(messageNumber); }

private int doActorClock(String actorId){ ActorClient client = new ActorClient(); //建立actor ActorProxyBuilder builder = new ActorProxyBuilder(DemoActor.class, client); DemoActor actor = builder.build(new ActorId(actorId)); actor.registerActorTimer(); return sayMessageToActor(actorId, actor); }

private int doActor(String actorId) { ActorClient client = new ActorClient(); //建立actor ActorProxyBuilder builder = new ActorProxyBuilder(DemoActor.class, client); DemoActor actor = builder.build(new ActorId(actorId)); //設定定時機制 10 觸發一次 結束 actor.registerReminder(); //執行定時任務 call return sayMessageToActor(actorId, actor); }

/* * Makes multiple method calls into actor until interrupted. * * @param actorId Actor's identifier. * @param actor Actor to be invoked. / private int sayMessageToActor(String actorId, DemoActor actor) { // Invoke actor method to increment counter by 1, then build message. int messageNumber = actor.incrementAndGet(1).block(); String message = String.format("Actor %s said message #%d", actorId, messageNumber); //invoker and save actor.say(message); return messageNumber; } ```

建立 actor 功能類

```java @ActorType(name = "DemoActor") public interface DemoActor {

void registerReminder();

@ActorMethod(name = "echo_message")
String say(String something);

void clock(String message);

void registerActorTimer();

@ActorMethod(returns = Integer.class)
Mono<Integer> incrementAndGet(int delta);

} ```

建立 actor 實現類( DemoActorImpl )

```java package com.test.dapr.sample.interfaces.http;

import io.dapr.actors.ActorId; import io.dapr.actors.runtime.AbstractActor; import io.dapr.actors.runtime.ActorRuntimeContext; import io.dapr.actors.runtime.Remindable; import io.dapr.utils.TypeRef; import reactor.core.publisher.Mono;

import java.text.DateFormat; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Calendar; import java.util.TimeZone;

/* * @author 水手 * @since 2021/12/30 7:25 下午 / public class DemoActorImpl extends AbstractActor implements DemoActor, Remindable {

/**
 * Format to output date and time.
 */
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

/**
 * This is the constructor of an actor implementation, while also registering a timer.
 *
 * @param runtimeContext The runtime context object which contains objects such as the state provider.
 * @param id             The id of this actor.
 */
public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
    super(runtimeContext, id);
}

@Override
public void registerActorTimer() {
    super.registerActorTimer(
            "my-actorTimer",
            "clock",
            "ping!",
            Duration.ofSeconds(5),
            Duration.ofSeconds(2)).block();
}

/**
 * Registers a reminder.
 */
@Override
public void registerReminder() {
    super.registerReminder(
            "my-reminder",
            (int) (Integer.MAX_VALUE * Math.random()),
            Duration.ofSeconds(10),
            Duration.ZERO).block();
}

/**
 * Prints a message and appends the timestamp.
 *
 * @param something Something to be said.
 * @return What was said appended with timestamp.
 */
@Override
public String say(String something) {
    Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
    String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());

    // Handles the request by printing message.
    System.out.println("Server say method for actor " + super.getId() + ": "
            + (something == null ? "" : something + " @ " + utcNowAsString));
    //儲存redis 對應的值
    super.getActorStateManager().set("lastMessage", something).block();

    // Now respond with current timestamp.
    return utcNowAsString;
}

/**
 * Increments a persistent counter, saves and returns its updated value.
 * Example of method implemented with Reactor's Mono class.
 * This method could be rewritten with blocking calls in Mono, using block() method:
 *
 * <p>public int incrementAndGet(int delta) {
 * int counter = 0;
 * if (super.getActorStateManager().contains("counter").block()) {
 * counter = super.getActorStateManager().get("counter", int.class).block();
 * }
 * counter = counter + 1;
 * super.getActorStateManager().set("counter", counter).block();
 * return counter;
 * }</p>
 *
 * @param delta Amount to be added to counter.
 * @return Mono response for the incremented value.
 */
@Override
public Mono<Integer> incrementAndGet(int delta) {
    return super.getActorStateManager().contains("counter")
            .flatMap(exists -> exists ? super.getActorStateManager().get("counter", int.class) : Mono.just(0))
            .map(c -> c + delta)
            .flatMap(c -> super.getActorStateManager().set("counter", c).thenReturn(c));
}

/**
 * Method invoked by timer.
 *
 * @param message Message to be printed.
 */
@Override
public void clock(String message) {
    Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
    String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());

    // Handles the request by printing message.
    System.out.println("clock for actor "
            + super.getId() + ": "
            + (message == null ? "" : message + " @ " + utcNowAsString));
}

/**
 * Method used to determine reminder's state type.
 *
 * @return Class for reminder's state.
 */
@Override
public TypeRef<Integer> getStateType() {
    return TypeRef.INT;
}

/**
 * Method used be invoked for a reminder.
 *
 * @param reminderName The name of reminder provided during registration.
 * @param state        The user state provided during registration.
 * @param dueTime      The invocation due time provided during registration.
 * @param period       The invocation period provided during registration.
 * @return Mono result.
 */
@Override
public Mono<Void> receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) {
    return Mono.fromRunnable(() -> {
        Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
        String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());

        String message = String.format("Server reminded actor %s of: %s for %d @ %s",
                this.getId(), reminderName, state, utcNowAsString);

        // Handles the request by printing message.
        System.out.println(message);
    });
}

} ```

測試功能:建立 timers、reminders 任務

建立 timers 的 clock 任務

Java建立方式

shell curl http://xx.xx.xx.xx:3500/v1.0/invoke/daprnodeapp/method/dapr-sample-actor/clock/88

刪除 timers 的 clock 任務

shell curl -X DELETE http://xx.xx.xx.xx:3500/v1.0/actors/DemoActor/88/timers/my-actorTimer

建立 reminders 任務

Java 建立方式

shell curl -X GET http://xx.xx.xx.xx:3500/v1.0/invoke/daprnodeapp/method/dapr-sample-actor/90

Api 建立或更新方式(第一次9秒執行,以後每3秒執行一次)

shell curl -X PUT -H "Content-Type: application/json" http://xx.xx.xx.xx:3500/v1.0/actors/DemoActor/90/reminders/echo_message -d '{"dueTime":"0h0m9s0ms","period":"0h0m3s0ms"}'

刪除 reminders 任務

shell curl -X DELETE http://xx.xx.xx.xx:3500/v1.0/actors/DemoActor/90/reminders/echo_message

五、可觀測性( Observability )

可觀測性主要包括三部分: 鏈路跟蹤( tracing )、日誌( logging )、指標( metrics )

  1. tracing 主要使用 zipkin 協議,方式可以使用openzipkin
  2. logging主要使用 EFK、ELK
  3. metrics主要使用 grafana + prometheus

建立 openzipkin.yaml 配置

```yaml apiVersion: apps/v1 kind: Deployment metadata: name: zipkin labels: app: zipkin spec: replicas: 1 selector: matchLabels: app: zipkin template: metadata: labels: app: zipkin spec: containers: - name: zipkin image: openzipkin/zipkin ports: - containerPort: 9411


kind: Service apiVersion: v1 metadata: name: zipkin labels: app: zipkin spec: selector: app: zipkin ports: - protocol: TCP port: 9411 targetPort: 9411 type: ClusterIP ```

開啟網頁 http://xx.xx.xx.xx:9411/zipkin 正常訪問

初始化 openzipkin

shell kubectl create -f zipkin.yaml

建立 zipkin 元件

yaml apiVersion: dapr.io/v1alpha1 kind: Configuration metadata: name: zipkin-config spec: tracing: samplingRate: "1" zipkin: endpointAddress: "http://xx.xx.xx.xx:9411/api/v2/spans"

專案的 yaml 檔案,在相應的位置增加配置

dapr.io/log-as-json: "true" dapr.io/config: "zipkin-config"

增加 http-demo-dapr controller

```java @GetMapping("/dapr-helloWorld") public Response getHelloWorld(@RequestHeader Map headers) { System.out.println("getHelloWorld"); HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, null, headers);

InvokeMethodRequest request = new InvokeMethodRequest("nodeapp", "helloWorld") .setBody(null) .setHttpExtension(httpExtension); Response response = daprClient.invokeMethod(request, TypeRef.get(Response.class)).block(); // .subscriberContext(getReactorContext()).block(); System.out.println("finish getHelloWorld"); if (response != null) { return Response.ok(response.getResult().toString()); } return null; } ```

初始化 http-demo 、 http-demo-dapr 專案

shell kubectl create -f http-demo.yaml kubectl create -f http-demo-dapr.yaml

測試功能:openzipkin 鏈路跟蹤是否正常,注意( http 透傳的 key 約定為 traceparent,grpc 透傳的 key 約定為 grpc-trace-bin )

shell curl -H "traceparent: 00-0af7651916cd43dd8438eb211c19319c-b7ad5b7169203331-02" -H "tracestate: congo=t39rcWkgMzE" http://xx.xx.xx.xx:3500/v1.0/invoke/daprnodeapp/method/dapr-helloWorld

驗證結果,開啟openzipkin地址:http://xx.xx.xx.xx:9411/zipkin ,顯示有條日誌,SHOW明細,顯示呼叫鏈路是從應用daprnodeapp -> nodeapp

可觀測性就介紹到這裡,至於 EFK 以及 grafana + prometheus 有興趣的同學可以自己動手試試。

六、祕鑰管理( Secrets management )

首先建立一個 K8S 的祕鑰管理

shell kubectl create secret generic dapr-secret --from-literal=my-secret="I'm Batman"

基於 K8S 的祕鑰管理,初始化一個 Dapr 元件( daprSecretStore.yaml )

yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: mycustomsecretstore namespace: default spec: type: secretstores.kubernetes version: v1 metadata: - name: "dapr-secret"

執行初始化元件命令:

shell kubectl create -f daprSecretStore.yaml

測試功能:通過 Dapr 獲取 K8S 設定的祕鑰,獲取結果 {"my-secret":"I'm Batman"}

shell curl http://xx.xx.xx.xx:3500/v1.0/secrets/kubernetes/dapr-secret

七、配置管理( Configuration )

回顧一下之前的路徑圖:

image-20211229172623715

官方介紹:此 API 當前處於Alpha狀態,僅在 gRPC 上可用。後續開放後支援語法為/v1.0/configuration

根據官方提供的文件和圖確定:

元件的配置( configuration.yaml )為:

yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: configstore namespace: default spec: type: configuration.redis version: v1 metadata: - name: redisHost value: xx.xx.xx.xx:6379 - name: redisPassword value: ''

執行初始化元件命令:

shell kubectl create -f configuration.yaml

獲取方式為:

shell curl http://10.246.3.147:3500/v1.0/configuration/configstore/{redisKey}

八、小結

​ 至此,Dapr 相關的 Java Demo 全部講完了,如果大家完成了實戰相關的 Demo,恭喜已經 Dapr 入門了。如果 Demo 實戰中,碰到問題,歡迎在評論區留言溝通。

推薦閱讀

Dapr 實戰(一)

Dapr 實戰(二)

政採雲Flutter低成本螢幕適配方案探索

招賢納士

政採雲技術團隊(Zero),一個富有激情、創造力和執行力的團隊,Base 在風景如畫的杭州。團隊現有300多名研發小夥伴,既有來自阿里、華為、網易的“老”兵,也有來自浙大、中科大、杭電等校的新人。團隊在日常業務開發之外,還分別在雲原生、區塊鏈、人工智慧、低程式碼平臺、中介軟體、大資料、物料體系、工程平臺、效能體驗、視覺化等領域進行技術探索和實踐,推動並落地了一系列的內部技術產品,持續探索技術的新邊界。此外,團隊還紛紛投身社群建設,目前已經是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等眾多優秀開源社群的貢獻者。如果你想改變一直被事折騰,希望開始折騰事;如果你想改變一直被告誡需要多些想法,卻無從破局;如果你想改變你有能力去做成那個結果,卻不需要你;如果你想改變你想做成的事需要一個團隊去支撐,但沒你帶人的位置;如果你想改變本來悟性不錯,但總是有那一層窗戶紙的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望參與到隨著業務騰飛的過程,親手推動一個有著深入的業務理解、完善的技術體系、技術創造價值、影響力外溢的技術團隊的成長過程,我覺得我們該聊聊。任何時間,等著你寫點什麼,發給 [email protected]

微信公眾號

文章同步釋出,政採雲技術團隊公眾號,歡迎關注

政採雲技術團隊.png