用NATS JetStream在Java中進行非同步的Pub/Sub資訊傳遞

語言: CN / TW / HK

簡介

當我們構建大規模的分散式應用叢集時,我們利用所有的努力將單體分解成小的容器化工作負載,這些容器之間相互通訊並分享資訊以執行各種操作。

我們沒有花太多時間去設計一個 訊息傳遞系統.

訊息傳遞通常被視為任何_大規模分散式系統_的_中樞神經系統_。通常情況下,單體內部的記憶體通訊被轉化為線上通訊。

如果我們對_叢集_內的所有通訊進行佈線,就會形成類似網狀的模組,每個服務以同步的方式呼叫另一個服務,由於在請求-響應的生命週期中有大量的等待時間,這並不理想。

這一點 混亂的網狀結構可以通過引入一個 非同步訊息傳遞叢集而不是同步的。

在兩個微服務之間沒有_點對點的通訊_,我們可以把它們的訊息委託給一種 樞紐和輻條拓撲結構.因此,訊息傳遞是連線整個系統的_膠水_。

Need for Messaging System

在本指南中,我們將使用 NATS JetStream來執行非同步訊息傳遞,通過 釋出/訂閱模式.

那麼,我們如何為我們的應用程式選擇一個訊息代理或訊息傳遞架構呢?

選擇一個訊息傳遞系統可能會讓人感覺很吃力,已經有大量的選擇,而且每天都有新的選擇出現,每個選擇都有不同的優勢。

選擇一個分散式的訊息傳遞系統

最值得注意的是,我們已經有了廣泛流行和相當頻繁使用的Apache Kafka,它通常被稱為_分散式日誌儲存_。

在Kafka中釋出到主題的訊息會持續一段時間,而.NET的概念允許訊息在多個例項中均勻分佈。 消費者群體允許訊息在同一服務的多個例項中均勻分佈。它的功能非常強大,但伴隨著強大的功能而來的是巨大的責任和維護。Kafka明顯難以維護,對於任何希望掌握該技術的團隊來說,都有一個陡峭的學習曲線。

另一個獨特的選擇是RabbitMQ。RabbitMQ使用高階訊息佇列協議進行訊息傳遞。它也明顯是輕量級的。

RabbitMQ 沒有使用獨特的消費者組的概念,而是採取了更簡單的方法,即讓客戶端消費_佇列_。如果一個客戶端不承認一個訊息,它將回到佇列中,由另一個客戶端處理。

所有這些產品都有一些甜頭,並在它們的用例中大放異彩。

那麼,如果有人想真正接受擁有一個簡單而又超高效能的系統的想法,而不需要維護它的額外開銷呢?如果有人想做傳統的pub/sub,但也想做request/reply,甚至想做scatter-gather,同時又想保持簡單和輕便呢?

這就是 NATS資訊傳遞系統可能是最適合你的解決方案的地方。

介紹一下NATS

NATS是一個經過生產驗證的、雲原生的訊息傳遞系統,是為那些想花更多時間實現業務邏輯,而少花時間擔心_如何做訊息傳遞的_開發者或運營商而設計的。

它是一個令人難以置信的快速、開源的訊息傳遞系統,建立在一個簡單而強大的核心之上。伺服器使用基於文字的協議,所以雖然有一些特定語言的客戶端庫,但你完全可以通過_telnet_進入NATS伺服器來發送和接收資訊。

NATS被設計成永遠線上,連線,並準備接受命令。如果你足夠老,知道什麼是_撥號音_,那麼值得一提的是,NATS團隊喜歡用這個比喻來設計。

NATS的一些突出特點包括。

  • _超高_的效能
  • 低配置
    • 客戶端只需要一個URL和憑證
    • 伺服器自動發現自己
  • 能夠在不影響執行服務的情況下擴充套件架構
  • 自愈且始終可用
  • 支援多種交付模式。
    • 最多一次(NATS核心)
    • 至少一次(NATS流或JetStream)
  • 將訊息儲存到永續性儲存,並按時間或順序重放
  • 支援萬用字元
  • 在REST加密的資料
  • 淨化特定的訊息(GDPR
  • 橫向可擴充套件性
  • 完整的TLS支援。CA證書,雙向支援
  • 支援標準的使用者/密碼認證/JWT的使用
  • 許可權限制
  • 具有資料隔離的安全多租戶
  • 賬戶之間共享資料
  • 擁有30多個用不同語言編寫的客戶端庫

資訊傳遞模式

NATS支援4種主要的通訊模式。它們是

  • 基於主題
  • 釋出-訂閱
  • 請求-回覆/分散-收集
  • 佇列組

每一個都是不同的模式,都有其使用案例,有一些重疊。允許所有這四種模式給了NATS極大的靈活性和功能,以應對多個應用程式之間的各種不同情況,或一個大型單體。

基於主題的訊息傳遞

A 主題在NATS中是一個簡單的字串,代表對資料的興趣。它被分_層標記_以支援_萬用字元訂閱_。

  • foo.* 匹配 foo.bar_和 foo.baz_
  • foo.*.bar匹配 foo.a.bar_和 foo.b.bar_
  • _foo.>_匹配上述任何一個
  • _>_匹配NATS中的所有內容

這種訊息傳遞模式允許釋出者使用一個_Subject_來共享資料,而消費者可以通過使用萬用字元來監聽這些Subject來接收這些訊息。

從某種意義上說,這種模式是基於觀察者設計模式的,它通常有一個_主題_和_觀察者_。

例如,如果有人向_'audit.us.east'_傳送訊息,那麼所有監聽該確切主題或萬用字元主題的訂閱者都會收到這個訊息。

Subject based Messaging

釋出-訂閱訊息

這是傳統的訊息傳遞模式之一,其中 釋出者_將一個訊息釋出到一個 訂閱者_列表,其中每個訂閱者都單獨訂閱了它。

Publish Subscribe Messaging

這類似於通訊,這種模式被_廣泛_用於各種系統中。從通知/警報系統到VoD平臺,如YouTube。

這就是我們在本指南中要使用的模式。

請求-回覆訊息/分散-收集模式

當我們進行REST API呼叫時,我們發出一個HTTP請求並收到一個響應,我們使用的是傳統的同步請求-響應模式。請求-_迴應模式通常很困難,或者有時需要複雜的解決方案或妥協。這種模式在使用NATS實現時相當簡單,因為它只需要你在釋出訊息時提供一個"回覆到 "_主題。

這種模式也可以被稱為 _分散-聚集_模式,釋出者向未知數量的訂閱者同時釋出一個主題的訊息。然後所有監聽這個主題的聽眾都會活躍起來並開始處理。然後,釋出者將等待積累來自部分或全部訂閱者的所有回覆。

Scatter Gather Pattern

佇列組

有時在一個分散式叢集中,你必須對多個應用程式或同一應用程式的多個例項進行_負載平衡_。這種模式將是一個完美的解決方案,可以在訂閱了同一主題的多個訂閱者之間實現訊息的_負載平衡_。

這個解決方案最好的部分是,與其他訊息系統不同,它不需要在NATS伺服器進行任何配置。佇列組是由應用程式和他們的佇列訂閱者定義的,並在他們之間進行管理。

為了建立一個佇列訂閱,所有的訂閱者都註冊一個佇列名稱。當註冊的主題上的訊息被髮布時,組中的一個成員被隨機選擇來接收訊息。儘管佇列組有多個訂閱者,但每個訊息只被一個人消費。

Queue Groups

所有這些模式在NATS伺服器上需要零配置。

它完全由應用程式或客戶端庫驅動。因此,讓我們研究一下jnatsJava客戶端庫,看看我們如何定義其中的一些模式並執行非同步訊息傳遞。

基本的NATS伺服器、NATS流和NATS JetStream

第一個 NATS雲原生_資訊傳遞生態系統是以 NATS伺服器基於 '最多一次'(At-most once_交付模式--訊息最多交付一次。它曾經以難以置信的速度將釋出的訊息轉發給消費者,為該行業設定了新的效能界限。對於一些應用來說,基本的NATS提供的效能超過了丟失訊息的潛在損失。

但是,在'最多一次'的交付模式下,如果任何一個訂戶出現故障,傳送到的訊息將永遠不會到達,因此,沒有保證資料的交付。

這類似於大多數流媒體服務所使用的超高速UDP協議,資料的速度比資料的完整性更重要。你寧願在視訊中損失幾個畫素或解析度較低,也不願意長時間等待聽到某人的聲音。

但這並不是你想在金融交易中發生的事情。在這裡和那裡損失一點可能會改變某人的賬單或收件人的地址。

作為對這個問題的迴應 NATS流媒體引入了NATS,它用一些效能來換取訊息的永續性。沒有犧牲太多的效能,NATS流是一個輕量級和高效能的平臺,在引擎蓋下使用基本的NATS。它的構建方式是 _'至少一次_交付模型,具有為釋出者和訂閱者傳送ACK 訊息的能力。

這類似於TCP,它保證資料的完整性,如果沒有收到ACK ,就會重新發送包,表示客戶端可能沒有收到包。

當訊息被髮布後,它們會被持久化一段時間(可定製),這樣如果消費者沒有收到它,它就可以被重新播放給消費者。雖然這個元件的效能非常好,而且是輕量級的,但在能力和成熟度方面,它和Kafka等分散式流媒體系統一樣強大。

開發人員提出了一些要求,如分散式安全、分散式管理、多租戶、超級叢集的全球擴充套件以及資料的安全共享,這些要求在NATS 2.0時代催生了下一代NATS流,被稱為 NATS JetStream.

對於具有分散式叢集的現代流媒體系統,建議使用最新的 NATS JetStream提供。JetStream_的建立是為了解決當今流媒體技術所發現的問題--複雜性、脆弱性和缺乏擴充套件性。我們將在本文中進一步討論_JetStream

用NATS JetStream實現Java中的非同步Pub/Sub訊息傳遞

專案設定

執行或安裝一個_NATS JetStream_伺服器是非常容易的。無論你想在Windows、Mac或Linux機器上託管這個叢集,Docker引擎都能使設定變得非常簡單。

我們將使用一個Docker容器來託管JetStream伺服器。為了執行Docker映象,我們可以簡單地執行。

$ docker run -ti -p 4222:4222 --name jetstream synadia/jsm:latest server

一旦你執行該程式,你會看到一些類似的內容。

JetStream Logs

NATS有一個龐大的不同語言的客戶端庫列表,有一個由1000多個貢獻者組成的活躍社群。它在2018年加入了 CNCF(雲原生計算基金會)。作為一個孵化專案在2018年加入。

我們將使用NATS的Java客戶端,即jnats。為了連線到NATS JetStream,我們只需要在pom.xml 中定義一個依賴項。

<dependency> <groupId>io.nats</groupId> <artifactId>jnats</artifactId> <version>${version}</version> </dependency>

就這樣了!我們已經準備好了。現在讓我們來看看我們的一些使用情況。如果你遇到困難,你可以在GitHub上找到完整的原始碼。

釋出者/訂閱者流

讓我們嘗試通過建立一個新的Stream 和一個主題來定義一個傳統的_釋出者/訂閱_者模型。Stream,在NATS JetStream中代表兩個端點之間的任何資料流,是API的核心構建塊。

我們將建立一個單一的類,首先發布一些訊息,然後訂閱讀取這些訊息併發送確認。

public class PubSubAsync { // Proceeding code goes here }

讓我們繼續定義一些全域性靜態設定,如流名稱、主題、預設訊息和伺服器。

private static final String defaultStream = "pubsubasync-stream"; private static final String defaultSubject = "pubsubasync-subject"; private static final String defaultMessage = "Hello User"; private static final int defaultMessageCount = 2; private static final String defaultServer = "nats://localhost:4222";

我們將在以後逐步設定流的時候使用這些設定,以避免在其中硬編碼變數。

讓我們首先設定一個Connection 到NATS JetStream伺服器,例項化一個JetStreamManagement 例項,用來新增Stream 例項,以及一個StreamConnfiguration 例項--通過生成器設計模式建立,以便在定義設定時有靈活性。

與NATS伺服器的連線可能會失敗,所以你要把*所有的程式程式碼包在一個try-catch 塊中。我們將使用一個 try-with-resources塊,因為這是一個可關閉的連線。

try (Connection nc = Nats.connect(defaultServer)) { // Creating streams, managers, sending messages, subscribing, etc. } catch (Exception e) { e.printStackTrace(); }

try 塊中,我們將首先建立一個JetStreamManagement 例項,以及一個StreamConfigurationJetStream 上下文。

JetStream 類是該框架的中心API。JetStream 通過將訊息推送到訂閱者正在收聽的_主題_,間接地將訊息_釋出_給_訂閱_者。它還將使用者_訂閱_到主題上。

_主題_是在構建StreamConfiguration 時定義的,而JetStreamManagement 例項讓我們將具有該配置的Streams 新增到我們的管道中。我們將在後面的章節中更詳細地介紹JetStreamManagement 。讓我們建立一個單一的流來發布訊息給一個主題,並建立JetStream 上下文來管理髮布和訂閱傳送給該主題的訊息。

``` JetStreamManagement jsm = nc.jetStreamManagement(); // Create a stream, here will use an in-memory storage type, and one subject StreamConfiguration sc = StreamConfiguration.builder() .name(defaultStream) .storageType(StorageType.Memory) .subjects(defaultSubject) .build();

// Add a stream via the JetStreamManagement instance and capture its info in a StreamInfo object StreamInfo streamInfo = jsm.addStream(sc); JsonUtils.printFormatted(streamInfo);

// Create a JetStream context. This hangs off the original connection // allowing us to produce data to publish into streams and consume data from // JetStream consumers. JetStream js = nc.jetStream();
```

現在,我們可以繼續建立一個Future列表來儲存我們的訊息結果,因為我們正在處理非同步訊息,不知道它們_何時_會返回。當通過JetStream 例項的publishAsync() 方法釋出訊息時,會返回一個PublishAck ,表示未來客戶端對接收的確認。

如果你想閱讀更多關於Future 介面的資訊,請閱讀我們的《Java中的未來介面指南》

此外,對於每個訊息,我們將建立一個Message 例項,它接受一個_主題_和_資料_。我們要向誰傳送訊息以及訊息是什麼。使用NatsMessage.builder() 方法,我們可以很容易地建立一個我們想傳送的訊息,並省略某些我們沒有任何用途的引數。

一旦建立了一個Message ,我們就可以通過JetStream'的publishAsync() 方法非同步釋出它。

``` // Create a future for asynchronous message processing List> futures = new ArrayList<>(); int stop = defaultMessageCount + 1; for (int x = 1; x < stop; x++) { String data = defaultMessage + "-" + x;

// Create a typical NATS message
Message msg = NatsMessage.builder()
        .subject(defaultSubject)
        .data(data, StandardCharsets.UTF_8)
        .build();
System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubject);

// Publish a message and add the result to our `CompletableFuture` list
futures.add(js.publishAsync(msg));

} ```

一旦我們傳送了訊息,我們很可能想知道它們發生了什麼,是否有任何問題被提出。通過迭代我們的futures 列表,我們可以檢查CompletableFuture 例項是否_完成了_,如果完成了就列印它們的內容,如果沒有完成就重新排隊,以後再檢查。

// Get Acknowledgement for the messages while (futures.size() > 0) { CompletableFuture<PublishAck> f = futures.remove(0); if (f.isDone()) { try { PublishAck pa = f.get(); System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n", defaultSubject, pa.getStream(), pa.getSeqno()); } catch (ExecutionException ee) { System.out.println("Publish Failed " + ee); } } else { // Re-queue it and try again futures.add(f); } }

對於一個_釋出者_來說,我們需要一個訂閱_者_來發布(合理的),以免資訊在沒有什麼意義的情況下懸空。一個_訂閱者_被建立為一個JetStreamSubscription 例項,由JetStream 上下文的subscribe() 方法返回。

``` // Subscribe to the messages that have been published to the subject JetStreamSubscription sub = js.subscribe(defaultSubject); List messages = new ArrayList<>(); // Retrieve the next message and kick off an iteration of all the messages Message msg = sub.nextMessage(Duration.ofSeconds(1)); boolean first = true; while (msg != null) { if (first) { first = false; System.out.print("Read/Ack ->"); } messages.add(msg); if (msg.isJetStream()) { msg.ack(); System.out.print(" " + new String(msg.getData()) + "\n");
} else if (msg.isStatusMessage()) { System.out.print(" !" + msg.getStatus().getCode() + "!"); } JsonUtils.printFormatted(msg.metaData()); msg = sub.nextMessage(Duration.ofSeconds(1)); }

// Make sure the message goes through before we close nc.flush(Duration.ZERO); nc.close(); ```

將所有這些聯絡起來,當我們執行程式碼時--我們應該看到這樣的資訊。

PubSubAsync

我們已經成功地建立了一個數據的Stream ,它將訊息傳遞給一個_主題_,而我們的訂閱者正在觀察它們的非同步到達。但有時,我們的主題名稱在我們想要訂閱它們之前是不知道的。例如,你可能會_生成_主題名稱,並想在新主題建立時訂閱它們。或者,有一整個具有共同字首的主題列表,你想訂閱。

在這兩種情況下--而不是複雜的迴圈和生成-訂閱邏輯--你可以使用_萬用字元_來針對不止一個主題。

萬用字元釋出者/訂閱者流

NATS支援_層次化_的標記,以支援萬用字元訂閱。作為本指南開始時的複習。

A 主題在NATS中是一個簡單的字串,代表對資料的興趣。它被_分層標記_以支援_萬用字元訂閱_。

  • foo.* 匹配 foo.bar_和 foo.baz_
  • foo.*.bar匹配 foo.a.bar_和 foo.b.bar_
  • _foo.>_匹配上述任何一個
  • _>_匹配NATS中的所有內容

這些萬用字元可以在釋出者或訂閱者中配置,也可以在兩者中配置。我們稍後會看一下這個典型的例子。我們現在要使用的方法背後的邏輯與我們之前看到的大致相同。

``` public class PubWildcardSubWildcard {

private static final String defaultStream = "pubsubwildcardasync-stream";
private static final String defaultSubjectWildcard = "audit.us.*";
private static final String defaultSubjectSpecific = "audit.us.east";
private static final String defaultMessage = "Audit User";
private static final int defaultMessageCount = 2;
private static final String defaultServer = "nats://localhost:4222";

public static void main( String[] args ) {
    System.out.printf("\nPublishing to %s. Server is %s\n\n", defaultSubjectWildcard, defaultServer);

      try (Connection nc = Nats.connect(defaultServer)) {      
      JetStreamManagement jsm = nc.jetStreamManagement();

     StreamConfiguration sc = StreamConfiguration.builder()
             .name(defaultStream)
             .storageType(StorageType.Memory)
             .subjects(defaultSubjectWildcard)
             .build();

     StreamInfo streamInfo = jsm.addStream(sc);
     JsonUtils.printFormatted(streamInfo);

     JetStream js = nc.jetStream();

     List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
     int stop = defaultMessageCount + 1;
     for (int x = 1; x < stop; x++) {
         String data = defaultMessage + "-" + x;

         Message msg = NatsMessage.builder()
                 .subject(defaultSubjectSpecific)
                 .data(data, StandardCharsets.UTF_8)
                 .build();
         System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubjectSpecific);

         futures.add(js.publishAsync(msg));
     }

     while (futures.size() > 0) {
         CompletableFuture<PublishAck> f = futures.remove(0);
         if (f.isDone()) {
             try {
                 PublishAck pa = f.get();
                 System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
                        defaultSubjectSpecific, pa.getStream(), pa.getSeqno());
             }
             catch (ExecutionException ee) {
                 System.out.println("Publish Failed " + ee);
             }
         }
         else {
             futures.add(f);
         }
    }

     JetStreamSubscription sub = js.subscribe(defaultSubjectWildcard);
     List<Message> messages = new ArrayList<>();
     Message msg = sub.nextMessage(Duration.ofSeconds(1));
     boolean first = true;
     while (msg != null) {
         if (first) {
             first = false;
             System.out.print("Read/Ack ->");
         }
         messages.add(msg);
         if (msg.isJetStream()) {
             msg.ack();
             System.out.print(" " + new String(msg.getData()) + "\n");            
         }
         else if (msg.isStatusMessage()) {
                 System.out.print(" !" + msg.getStatus().getCode() + "!");
         }
         JsonUtils.printFormatted(msg.metaData());
         msg = sub.nextMessage(Duration.ofSeconds(1));
     }
      nc.flush(Duration.ZERO)
      nc.close();
 }
 catch (Exception e) {
     e.printStackTrace();
 }

} } ```

當我們執行這段程式碼時,我們會看到。

PubSubWildcardLog

作為Pub/Sub模式的替代品,使用msg.getReplyTo() ,我們可以開始構建一個_Request-Reply_模式的實現,通過構建佇列組和通道來訂閱和取消訂閱--我們可以構建一個_佇列組_模式的實現。

這是可能的,因為我們根本沒有為NATS做任何特定模式的配置--你想使用的特定模式只取決於你如何使用這個庫

JetStream管理

在某一點上,你可能想觀察或管理你的流。為了做到這一點,我們將研究NATS JetStream中的流的生命週期。

  • 建立或新增一個帶有主題的流
  • 通過新增一個主題來更新一個流
  • 獲取流的資訊
  • 清除一個流中的資訊
  • 刪除一個流

為了演示這些,讓我們建立一個有幾個靜態欄位和只有一個main() 方法的類。在這個類中,我們將測試其中的一些操作,但根據你的架構和這些操作的觸發器,你會想相應地附加上接下來的程式碼段。

``` public class NatsJsManageStreams {

private static final String STREAM1 = "manage-stream1";
private static final String STREAM2 = "manage-stream2";
private static final String SUBJECT1 = "manage-subject1";
private static final String SUBJECT2 = "manage-subject2";
private static final String SUBJECT3 = "manage-subject3";
private static final String SUBJECT4 = "manage-subject4";
private static final String defaultServer = "nats://localhost:4222";

public static void main(String[] args) {
    try (Connection nc = Nats.connect(defaultServer)) {
        JetStreamManagement jsm = nc.jetStreamManagement();
        // Management code
        // ...

      // Make sure the message goes through before we close
        nc.flush(Duration.ZERO);
        nc.close();
    } catch (Exception exp) {
        exp.printStackTrace();
    }
}

} ```

在剩下的示例中,我們將使用同一個JetStreamManagement 例項,因為我們在一個單一的類中使用它們。不過,請記住,在真實世界的場景中,你永遠不會/很少會建立一個多流設定。相反,你通常會將主題新增到現有的流中以重新利用資源。

注意:在整個例子中,我們將使用一個自定義的_實用程式類來_處理流的建立或更新,非同步釋出而無需等待,或讀取有無確認的訊息 -NatsJsUtils 。這個實用類可以在GitHub上找到。

建立或新增一個帶有主題的流

我們第一次建立一個Stream ,我們只是設定了它的名字、主題和儲存策略。還有其他各種設定,我們可以通過構建器的方法進行調整。

// 1. Create (add) a stream with a subject System.out.println("\n----------\n1. Configure And Add Stream 1"); StreamConfiguration streamConfig = StreamConfiguration.builder() .name(STREAM1) .subjects(SUBJECT1) // .retentionPolicy() // .maxConsumers(...) // .maxBytes(...) // .maxAge(...) // .maxMsgSize(...) .storageType(StorageType.Memory) // .replicas(...) // .noAck(...) // .template(...) // .discardPolicy(...) .build(); StreamInfo streamInfo = jsm.addStream(streamConfig); NatsJsUtils.printStreamInfo(streamInfo);

RetentionPolicy 設定訊息被刪除的時間--當對它們沒有興趣時(沒有消費者會消費它),當它們被消費時,等等。你可以限制消費者的數量,訊息可以有多長的位元組,它可以被持久化多長時間,是否需要一個ACK 響應--等等。

在最簡單的形式下--你提供一個名稱、主題和儲存型別,然後build() 。我們可以在一個Stream ,作為JetStreamManagement 例項的addStream() 方法的返回型別,通過NatsJsUtils 類漂亮地打印出資訊。

用一個主題更新一個流

你可以通過JetStreamManagement 例項的updateStream() 方法來更新現有的流。我們將重新使用streamConfig 參考變數,並根據從現有的StreamInfo 例項中提取的配置,為我們想要更新的流建立一個新的配置build()

``` // 2. Update stream, in this case, adding a new subject // - StreamConfiguration is immutable once created // - but the builder can help with that. System.out.println("----------\n2. Update Stream 1"); streamConfig = StreamConfiguration.builder(streamInfo.getConfiguration()) .addSubjects(SUBJECT2).build(); streamInfo = jsm.updateStream(streamConfig); NatsJsUtils.printStreamInfo(streamInfo);

// 3. Create (add) another stream with 2 subjects System.out.println("----------\n3. Configure And Add Stream 2"); streamConfig = StreamConfiguration.builder() .name(STREAM2) .storageType(StorageType.Memory) .subjects(SUBJECT3, SUBJECT4) .build(); streamInfo = jsm.addStream(streamConfig); NatsJsUtils.printStreamInfo(streamInfo); ```

這樣做的結果是。

獲取流的資訊

``` // 4. Get information on streams // 4.0 publish some message for more interesting stream state information // - SUBJECT1 is associated with STREAM1 // 4.1 getStreamInfo on a specific stream // 4.2 get a list of all streams // 4.3 get a list of StreamInfo's for all streams System.out.println("----------\n4.1 getStreamInfo"); NatsJsUtils.publish(nc, SUBJECT1, 5); streamInfo = jsm.getStreamInfo(STREAM1); NatsJsUtils.printStreamInfo(streamInfo);

System.out.println("----------\n4.2 getStreamNames"); List streamNames = jsm.getStreamNames(); NatsJsUtils.printObject(streamNames);

System.out.println("----------\n4.2 getStreamNames"); List streamInfos = jsm.getStreams(); NatsJsUtils.printStreamInfoList(streamInfos); ```

清除一個流

你可以很容易地清除一個流中的所有資訊,把它完全清空。

// 5. Purge a stream of it's messages System.out.println("----------\n5. Purge stream"); PurgeResponse purgeResponse = jsm.purgeStream(STREAM1); NatsJsUtils.printObject(purgeResponse);

刪除一個數據流

或者,如果你肯定已經用完了一個資訊流--你可以輕鬆地刪除它。

// 6. Delete a stream System.out.println("----------\n6. Delete stream"); jsm.deleteStream(STREAM2); System.out.println("----------\n");

處理安全問題

NATS JetStream支援用TLS對連線進行加密。TLS可以用來加密/解密客戶/伺服器連線之間的流量,並檢查伺服器的身份。當在TLS模式下啟用時,NATS將要求所有客戶端用TLS連線。

你可以通過載入所有的Keystores和Truststores來定義一個SSLContext ,然後在連線到NATS時將SSLContext作為一個選項來過載。讓我們定義一個SSLUtils 類,我們可以用它來載入一個鑰匙庫,建立鑰匙管理器和一個SSL上下文。

``` class SSLUtils { public static String KEYSTORE_PATH = "keystore.jks"; public static String TRUSTSTORE_PATH = "truststore.jks"; public static String STORE_PASSWORD = "password"; public static String KEY_PASSWORD = "password"; public static String ALGORITHM = "SunX509";

public static KeyStore loadKeystore(String path) throws Exception {
    KeyStore store = KeyStore.getInstance("JKS");
    BufferedInputStream in = new BufferedInputStream(new FileInputStream(path));

    try {
        store.load(in, STORE_PASSWORD.toCharArray());
    } finally {
        if (in != null) {
            in.close();
        }
    }

    return store;
}

public static KeyManager[] createTestKeyManagers() throws Exception {
    KeyStore store = loadKeystore(KEYSTORE_PATH);
    KeyManagerFactory factory = KeyManagerFactory.getInstance(ALGORITHM);
    factory.init(store, KEY_PASSWORD.toCharArray());
    return factory.getKeyManagers();
}

public static TrustManager[] createTestTrustManagers() throws Exception {
    KeyStore store = loadKeystore(TRUSTSTORE_PATH);
    TrustManagerFactory factory = TrustManagerFactory.getInstance(ALGORITHM);
    factory.init(store);
    return factory.getTrustManagers();
}

public static SSLContext createSSLContext() throws Exception {
    SSLContext ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
    ctx.init(createTestKeyManagers(), createTestTrustManagers(), new SecureRandom());
    return ctx;
}

} ```

然後,我們的實用類都準備好了--我們可以在建立NATS連線時向sslContext() builder方法提供由它建立的SSLContext

``` public class NatsConnectTLS { public static void main(String[] args) { try { SSLContext ctx = SSLUtils.createSSLContext(); Options options = new Options.Builder() .server("nats://localhost:4222") .sslContext(ctx) // Set the SSL context .build(); Connection nc = Nats.connect(options);

        // Do something with the connection

        nc.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

} ```

我們還可以定義一個認證機制來限制對NATS系統的訪問。客戶端沒有對訪問控制的控制權,但客戶端提供了與系統進行身份驗證所需的配置,繫結到一個賬戶,並要求TLS。

在設定Options 時,可以通過userInfo() 方法設定一個簡單的配置,用一個_使用者名稱_和_密碼_來連線。

Options options = new Options.Builder(). .server("nats://localhost:4222") .userInfo("myname","password") // Set a user and plain text password .build(); Connection nc = Nats.connect(options);

然後,在建立一個連線時,我們可以通過在URL中提供使用者名稱和密碼來連線到NATS伺服器。

Connection nc = Nats.connect("nats://myname:password@localhost:4222");

同樣,我們也可以通過認證令牌,如JWTs,或祕密作為以下配置的一部分。

Options options = new Options.Builder() .server("nats://localhost:4222") .token("mytoken") // Set a token .build(); Connection nc = Nats.connect(options);

我們現在可以像下面這樣連線到NATS Url。

Connection nc = Nats.connect("nats://mytoken@localhost:4222"); // Token in URL

結論

當你考慮使用分散式流媒體系統作為構建分散式微服務叢集、基於物聯網的系統、下一代邊緣系統的神經系統時,你可以考慮使用NATS JetStream,與其他流行的、強大的框架(如Apache Kafka)相比,這是一個輕量級的選擇。在一個數據驅動的世界裡,處理大量的事件和訊息流正變得越來越普遍。NATS JetStream提供了分散式安全、多租戶和橫向擴充套件的能力。

一如既往,你可以在GitHub上找到完整的原始碼。