NATS-Server(JetStream)和NATS Streaming Server對比

語言: CN / TW / HK

在我吐槽了無數次之後, NATS JetStream 終於結束了beta階段正式進入RC階段。終於官方也在最近剛剛正式回覆了我正式版本在處理幾個問題之後就會正式釋出。那麼在這個比較重要的 NATS-Server 特性發布之際聊一下NATS產品本身區別和新特性的使用,還有更多的潛在的區別。

概念區分:NATS-Server / NATS Streaming Server / NATS JetStream

NATS-Server

NATS-Server (或者叫 nats )是一個開源的、雲原生的、高效能的訊息傳遞系統,是NATS的最基礎的產品。它的核心是一個釋出/訂閱(Pub/Sub)系統,客戶端可以在不同叢集中的服務間 nats 進行通訊,而不需要關注具體的訊息在哪個服務上。換而言之,客戶端可以在任意一個叢集的服務端上釋出訊息,同時在任意叢集客戶端上嘗試讀取訊息。在官方與其他同類訊息佇列產品 功能對比 中,我們也可以管窺一下產品的功能列表。 nats 支援多流多服務進行 pub/sub ,負載均衡,保障訊息最多/最少一次送達,多租戶和使用者認證等功能。雖然看上去優點很多,但是 nats 不是一個應用很廣的訊息佇列的重要原因是,它缺少了一些對訊息佇列而言很最重要的產品特性,比如持久化支援,比如訊息確保一次送達。這意味著當你的訊息傳送出去之後,你的訊息是在處理過程中可能丟失的,甚至是可能送達不到的。

NATS Streaming Server

NATS Streaming Server (或者叫 stan )是用於嘗試解決上面提到的 nats 的已存在問題的。 stan 添加了持久化功能和訊息送達策略支援。 stan 中自帶了 nats 服務端,但是在使用過程中, natsstan 不能進行混用。在官方文件中,是這麼描述 stannats 之間的關係的:

NATS客戶端和NATS Streaming Server客戶端之間不能相互交換資料。也就是說,如果一個NATS Streaming Server客戶端在foo上釋出訊息,在同一主題上訂閱的NATS客戶端將不會收到訊息。NATS Streaming Server訊息是由protobuf組成的NATS訊息。NATS Streaming Server要向生產者傳送ACK,並接收消費者的ACK。如果與NATS客戶端自由交換訊息,就會引起問題。

stan 的具體架構如下圖:

但是 stan 雖然提供了持久化和訊息傳遞策略支援,但是在架構設計上卻出現了問題,導致在最開始設計時遺留了很多問題,比如當你確定 stan 叢集是固定的不能無限制水平擴容( #999 ),比如不支援多租戶功能( #1122 ),比如客戶端無法主動拉取訊息只能被推送等等

NATS JetStream

NATS JetStream (或者叫 JetStream )是NATS基於 Raft 演算法實現的最新的架構設計嘗試解決上述問題的新方案。在區別於原有的 stan 功能上,提供了新的持久化功能和訊息送達策略,同時支援水平擴容。同時,新的 JetStream 也為大訊息做了一些優化,不再將這特性功能作為 nats 的客戶端存在而是嵌入 NATS Server 中作為其中的一個功能存在。也就是說,如果在對這幾項技術進行選擇時, JetStream 應該是最應該被選擇的方案。更多詳細情況具體可以檢視官方的 指導文件

NATS JetStream使用

理論介紹過了,接下來說說實際使用的事情。現在 JetStream 還是RC階段,

編譯和啟動客戶端

下載 nats-server 原始碼 ,解壓之後執行:

cd nats-server-master
go build -o nats-server -ldflags="-s -w -buildid=" .
./nats-server -js

這樣就可以啟動一個支援 JetStream 功能的服務端了。

[54738] 2021/03/02 18:27:02.605197 [INF] Starting nats-server
[54738] 2021/03/02 18:27:02.605236 [INF]   Version:  2.2.0-RC.2
[54738] 2021/03/02 18:27:02.605238 [INF]   Git:      [not set]
[54738] 2021/03/02 18:27:02.605239 [INF]   Name:     NAFWRGQTR2CHMBIKNPE6R3ZTW2BWV2FWPAZREMHI24IYVM6FVHMVIYLQ
[54738] 2021/03/02 18:27:02.605240 [INF]   ID:       NAFWRGQTR2CHMBIKNPE6R3ZTW2BWV2FWPAZREMHI24IYVM6FVHMVIYLQ
[54738] 2021/03/02 18:27:02.605658 [INF] Starting JetStream
[54738] 2021/03/02 18:27:02.606062 [WRN]     _ ___ _____ ___ _____ ___ ___   _   __  __
[54738] 2021/03/02 18:27:02.606076 [WRN]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[54738] 2021/03/02 18:27:02.606077 [WRN] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[54738] 2021/03/02 18:27:02.606078 [WRN]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[54738] 2021/03/02 18:27:02.606079 [WRN]
[54738] 2021/03/02 18:27:02.606080 [WRN]                _         _
[54738] 2021/03/02 18:27:02.606081 [WRN]               | |__  ___| |_ __ _
[54738] 2021/03/02 18:27:02.606082 [WRN]               | '_ \/ -_)  _/ _` |
[54738] 2021/03/02 18:27:02.606083 [WRN]               |_.__/\___|\__\__,_|
[54738] 2021/03/02 18:27:02.606084 [WRN]
[54738] 2021/03/02 18:27:02.606084 [WRN]          JetStream is a Beta feature
[54738] 2021/03/02 18:27:02.606085 [WRN]     https://github.com/nats-io/jetstream
[54738] 2021/03/02 18:27:02.606092 [INF]
[54738] 2021/03/02 18:27:02.606093 [INF] ----------- JETSTREAM -----------
[54738] 2021/03/02 18:27:02.606095 [INF]   Max Memory:      12.00 GB
[54738] 2021/03/02 18:27:02.606096 [INF]   Max Storage:     35.79 GB
[54738] 2021/03/02 18:27:02.606098 [INF]   Store Directory: "/var/folders/5s/8rczg1gs4wb59y9s22nc3f_r0000gn/T/nats/jetstream"
[54738] 2021/03/02 18:27:02.606099 [INF] ---------------------------------
[54738] 2021/03/02 18:27:02.606399 [INF] Listening for client connections on 0.0.0.0:4222
[54738] 2021/03/02 18:27:02.606512 [INF] Server is ready

編寫JetStream DEMO

接下來我們看一下如何使用 JetStream 進行訊息釋出/訂閱功能:

// 連線到nats的伺服器
	conn, err := nats.Connect("nats://127.0.0.1:4222")
	if err != nil {
		log.Panic(err)
	}
	defer conn.Close()

	// 初始化JetStream功能
	js, err := conn.JetStream()
	if err != nil {
		log.Panic(err)
	}

	// 判斷Stream是否存在,如果不存在,那麼需要建立這個Stream,否則會導致pub/sub失敗
	stream, err := js.StreamInfo(streamName)
	if err != nil {
		log.Println(err) // 如果不存在,這裡會有報錯
	}
	if stream == nil {
		log.Printf("creating stream %q and subject %q", streamName, subject)
		_, err = js.AddStream(&nats.StreamConfig{
			Name:     streamName,
			Subjects: []string{subject},
			MaxAge:   3 * 24 * time.Hour,
		})
		if err != nil {
			log.Panicln(err)
		}
	}

	// 訂閱訊息
	sub, err := js.Subscribe(subject, cbHandle, nats.AckAll(), nats.DeliverNew())
	if err != nil {
		log.Panic(err)
		return
	}
	defer sub.Unsubscribe()

	// 傳送訊息
	js.Publish(subject, []byte("Hello World! "+time.Now().Format(time.RFC3339)))

	time.Sleep(5 * time.Second)
	log.Println("Exiting...")

在這個例子中,有個值得注意的功能需要額外強調一下,在 Subscribe 訊息時,我們在這裡特別聲明瞭 nats.DeliverNew() 這個選項。如果不宣告,則預設為 nats.DeliverAll() ;除了這兩個引數,還有一個 nats.DeliverLast() 引數,這分別對應了3種開始訂閱時的方式:預設方式 nats.DeliverAll() 是會讀取有效生命週期內的所有訊息,甚至包含已被處理的訊息; nats.DeliverLast() 是會包含訊息佇列中的最後一條訊息,即使被處理過的訊息; nats.DeliverNew() 則只處理訂閱之後的新訊息。