NATS-Server(JetStream)和NATS Streaming Server對比

語言: CN / TW / HK

在我吐槽了無數次之後, NATS JetStream 終於結束了beta階段正式進入RC階段。終於官方也在最近剛剛正式回覆了我正式版本在處理幾個問題之後就會正式發佈。那麼在這個比較重要的 NATS-Server 特性發布之際聊一下NATS產品本身區別和新特性的使用,還有更多的潛在的區別。

概念區分:NATS-Server / NATS Streaming Server / NATS JetStream

NATS-Server

NATS-Server (或者叫 nats )是一個開源的、雲原生的、高性能的消息傳遞系統,是NATS的最基礎的產品。它的核心是一個發佈/訂閲(Pub/Sub)系統,客户端可以在不同集羣中的服務間 nats 進行通訊,而不需要關注具體的消息在哪個服務上。換而言之,客户端可以在任意一個集羣的服務端上發佈消息,同時在任意集羣客户端上嘗試讀取消息。在官方與其他同類消息隊列產品 功能對比 中,我們也可以管窺一下產品的功能列表。 nats 支持多流多服務進行 pub/sub ,負載均衡,保障消息最多/最少一次送達,多租户和用户認證等功能。雖然看上去優點很多,但是 nats 不是一個應用很廣的消息隊列的重要原因是,它缺少了一些對消息隊列而言很最重要的產品特性,比如持久化支持,比如消息確保一次送達。這意味着當你的消息發送出去之後,你的消息是在處理過程中可能丟失的,甚至是可能送達不到的。

NATS Streaming Server

NATS Streaming Server (或者叫 stan )是用於嘗試解決上面提到的 nats 的已存在問題的。 stan 添加了持久化功能和消息送達策略支持。 stan 中自帶了 nats 服務端,但是在使用過程中, natsstan 不能進行混用。在官方文檔中,是這麼描述 stannats 之間的關係的:

NATS客户端和NATS Streaming Server客户端之間不能相互交換數據。也就是説,如果一個NATS Streaming Server客户端在foo上發佈消息,在同一主題上訂閲的NATS客户端將不會收到消息。NATS Streaming Server消息是由protobuf組成的NATS消息。NATS Streaming Server要向生產者發送ACK,並接收消費者的ACK。如果與NATS客户端自由交換消息,就會引起問題。

stan 的具體架構如下圖:

但是 stan 雖然提供了持久化和消息傳遞策略支持,但是在架構設計上卻出現了問題,導致在最開始設計時遺留了很多問題,比如當你確定 stan 集羣是固定的不能無限制水平擴容( #999 ),比如不支持多租户功能( #1122 ),比如客户端無法主動拉取消息只能被推送等等

NATS JetStream

NATS JetStream (或者叫 JetStream )是NATS基於 Raft 算法實現的最新的架構設計嘗試解決上述問題的新方案。在區別於原有的 stan 功能上,提供了新的持久化功能和消息送達策略,同時支持水平擴容。同時,新的 JetStream 也為大消息做了一些優化,不再將這特性功能作為 nats 的客户端存在而是嵌入 NATS Server 中作為其中的一個功能存在。也就是説,如果在對這幾項技術進行選擇時, JetStream 應該是最應該被選擇的方案。更多詳細情況具體可以查看官方的 指導文檔

NATS JetStream使用

理論介紹過了,接下來説説實際使用的事情。現在 JetStream 還是RC階段,

編譯和啟動客户端

下載 nats-server 源碼 ,解壓之後執行:

cd nats-server-master
go build -o nats-server -ldflags="-s -w -buildid=" .
./nats-server -js

這樣就可以啟動一個支持 JetStream 功能的服務端了。

[54738] 2021/03/02 18:27:02.605197 [INF] Starting nats-server
[54738] 2021/03/02 18:27:02.605236 [INF]   Version:  2.2.0-RC.2
[54738] 2021/03/02 18:27:02.605238 [INF]   Git:      [not set]
[54738] 2021/03/02 18:27:02.605239 [INF]   Name:     NAFWRGQTR2CHMBIKNPE6R3ZTW2BWV2FWPAZREMHI24IYVM6FVHMVIYLQ
[54738] 2021/03/02 18:27:02.605240 [INF]   ID:       NAFWRGQTR2CHMBIKNPE6R3ZTW2BWV2FWPAZREMHI24IYVM6FVHMVIYLQ
[54738] 2021/03/02 18:27:02.605658 [INF] Starting JetStream
[54738] 2021/03/02 18:27:02.606062 [WRN]     _ ___ _____ ___ _____ ___ ___   _   __  __
[54738] 2021/03/02 18:27:02.606076 [WRN]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[54738] 2021/03/02 18:27:02.606077 [WRN] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[54738] 2021/03/02 18:27:02.606078 [WRN]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[54738] 2021/03/02 18:27:02.606079 [WRN]
[54738] 2021/03/02 18:27:02.606080 [WRN]                _         _
[54738] 2021/03/02 18:27:02.606081 [WRN]               | |__  ___| |_ __ _
[54738] 2021/03/02 18:27:02.606082 [WRN]               | '_ \/ -_)  _/ _` |
[54738] 2021/03/02 18:27:02.606083 [WRN]               |_.__/\___|\__\__,_|
[54738] 2021/03/02 18:27:02.606084 [WRN]
[54738] 2021/03/02 18:27:02.606084 [WRN]          JetStream is a Beta feature
[54738] 2021/03/02 18:27:02.606085 [WRN]     https://github.com/nats-io/jetstream
[54738] 2021/03/02 18:27:02.606092 [INF]
[54738] 2021/03/02 18:27:02.606093 [INF] ----------- JETSTREAM -----------
[54738] 2021/03/02 18:27:02.606095 [INF]   Max Memory:      12.00 GB
[54738] 2021/03/02 18:27:02.606096 [INF]   Max Storage:     35.79 GB
[54738] 2021/03/02 18:27:02.606098 [INF]   Store Directory: "/var/folders/5s/8rczg1gs4wb59y9s22nc3f_r0000gn/T/nats/jetstream"
[54738] 2021/03/02 18:27:02.606099 [INF] ---------------------------------
[54738] 2021/03/02 18:27:02.606399 [INF] Listening for client connections on 0.0.0.0:4222
[54738] 2021/03/02 18:27:02.606512 [INF] Server is ready

編寫JetStream DEMO

接下來我們看一下如何使用 JetStream 進行消息發佈/訂閲功能:

// 連接到nats的服務器
	conn, err := nats.Connect("nats://127.0.0.1:4222")
	if err != nil {
		log.Panic(err)
	}
	defer conn.Close()

	// 初始化JetStream功能
	js, err := conn.JetStream()
	if err != nil {
		log.Panic(err)
	}

	// 判斷Stream是否存在,如果不存在,那麼需要創建這個Stream,否則會導致pub/sub失敗
	stream, err := js.StreamInfo(streamName)
	if err != nil {
		log.Println(err) // 如果不存在,這裏會有報錯
	}
	if stream == nil {
		log.Printf("creating stream %q and subject %q", streamName, subject)
		_, err = js.AddStream(&nats.StreamConfig{
			Name:     streamName,
			Subjects: []string{subject},
			MaxAge:   3 * 24 * time.Hour,
		})
		if err != nil {
			log.Panicln(err)
		}
	}

	// 訂閲消息
	sub, err := js.Subscribe(subject, cbHandle, nats.AckAll(), nats.DeliverNew())
	if err != nil {
		log.Panic(err)
		return
	}
	defer sub.Unsubscribe()

	// 發送消息
	js.Publish(subject, []byte("Hello World! "+time.Now().Format(time.RFC3339)))

	time.Sleep(5 * time.Second)
	log.Println("Exiting...")

在這個例子中,有個值得注意的功能需要額外強調一下,在 Subscribe 消息時,我們在這裏特別聲明瞭 nats.DeliverNew() 這個選項。如果不聲明,則默認為 nats.DeliverAll() ;除了這兩個參數,還有一個 nats.DeliverLast() 參數,這分別對應了3種開始訂閲時的方式:默認方式 nats.DeliverAll() 是會讀取有效生命週期內的所有消息,甚至包含已被處理的消息; nats.DeliverLast() 是會包含消息隊列中的最後一條消息,即使被處理過的消息; nats.DeliverNew() 則只處理訂閲之後的新消息。