nats 簡介和使用

語言: CN / TW / HK

nats 簡介和使用

nats 有 3 個產品

  • core-nats: 不做持久化的及時資訊傳輸系統
  • nats-streaming: 基於 nats 的持久化訊息佇列(已棄用)
  • nats-jetstream: 基於 nats 的持久化訊息佇列

這裡主要討論 core-nats 和 nats-jetstream

nats

nats 快速開始

  • 啟動 nats

```bash

啟動 nats

docker run --network host -p 4222:4222 nats ```

  • Connect 連線

go nc, err := nats.Connect("nats://localhost:4222") if err != nil { log.Fatal("NATS 連線失敗") } defer nc.Close()

  • Publish 釋出/生產訊息

go // 生產訊息 err := nc.Publish("foo", []byte("Hello World")) if err != nil { log.Fatal("NATS 釋出失敗") } // Flush 釋出緩衝區 err = nc.Flush() if err != nil { log.Fatal("NATS Flush 失敗") }

出於效能考慮, 釋出的訊息先寫入到類似 Buffer 快取的地方, 然後再一次性發送到 nats 伺服器

參考官方文件: https://docs.nats.io/using-nats/developer/sending/caches

  • Subscribe 訂閱/消費訊息

go // 消費訊息 _, err = nc.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("收到訊息: %s\n", msg.Data) }) if err != nil { log.Fatal("NATS 訂閱失敗") }

nats 提供 釋出訂閱, 請求響應, 和佇列模型 3 種 API. 分別是釋出訂閱模型, 請求響應模型, 和佇列模型, 下面展開介紹

釋出訂閱

釋出訂閱

釋出訂閱模型, 一個釋出者, 多個訂閱者, 多個訂閱者都可以收到同一個訊息

go // 消費訊息 _, err = nc.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("收到訊息: %s\n", msg.Data) }) if err != nil { log.Fatal("NATS 訂閱失敗") }

佇列模型

佇列模型

佇列模型, 一個釋出者, 多個訂閱者, 訊息在多個訊息中負載均衡分配, 分配給 A 消費者, 這個訊息就不會再分配給其他消費者了

go // 消費訊息 // queue 是佇列組的名稱, 同一組佇列最多隻有一個接收者能成功接收 _, _ = nc.QueueSubscribe("foo", "queue", func(msg *nats.Msg) { fmt.Printf("收到訊息: %s\n", string(msg.Data)) })

請求響應

生產者能收到消費者的回覆

請求響應

```go // 消費訊息 nc.Subscribe("help", func(m *nats.Msg) { fmt.Printf("收到訊息: %s\n", string(m.Data)) nc.Publish(m.Reply, []byte("I can help!")) })

// 生產訊息 go func() { msg, _ := nc.Request("help", []byte("help me"), 100*time.Millisecond) fmt.Printf("收到回覆: %s\n", string(msg.Data)) }()

select {} ```

應用1-保證訊息可靠性

nats 本身不做任何的訊息的持久化, 是 "最多一次" 交付模型

舉個例子, 如果生產的訊息沒有消費者接, 訊息就丟掉了

但是請求響應機制可以通過業務程式碼保證訊息的可靠性, 在業務層面實現常見訊息佇列的 ACK 機制

舉個例子, 生產者傳送訊息, 消費者接受訊息後處理, 成功返回 OK, 失敗返回 error, 生產者如果收到 error 或者超時就可以補發訊息

應用2-解耦 PRC 呼叫

請求響應模型和 RPC 呼叫是一致的, 我們可以用這個實現一個基於事件驅動的 RPC 匯流排

nats-jetstream

架構

jetstream 提供持久化 nats 服務, 客戶端支援實時推送的 push 模式和自定義拉取的 pull 模式, 架構圖如下

nats-jetstream架構

  • subject: 和 nats 一樣, 用來區分不同的訊息
  • stream: 定義了訊息的儲存方式, 保留規則, 丟棄規則 (stream 和 subject 是 1:n 的關係)
  • consumer: 定義了訊息接受方式並記錄接受到的位置, 有 2 種消費方式及時推送 push 和自定義拉取 pull (consumer 和 stream 是 1:n 的關係)

用支援 jetstream 的方式啟動 nats

```bash

啟動 nats jetStream (同時支援 nats API 和 jetStream API)

docker run --network host -p 4222:4222 nats -js ```

架構示例

貼一個來自於官方文件的 push pull 混合使用的架構示例圖

image-20221008162524128

基於 pull 的 worker 消費者和基於 push 的 monitor 消費者同時存在

Stream

JetStream 中 Stream 定義了訊息的儲存方式, 保留規則, 丟棄規則.

一個 Stream 可以對應多個 Subject, 如果一條訊息符合 Stream 的保留規則, 就會被保留下來

注意 JetStream 所有生產和消費的訊息的 Subject 都需要有 Stream 對應, 不然報錯

貼一個 Stream 的核心配置 (v1.15.0)

```go // jsm.go

// StreamConfig 用於定義一個流, 大多數引數都有合理的預設值 // 如果 subject 沒寫, 就會分配一個隨機的 subject type StreamConfig struct { // 名稱 Name string
// 描述 Description string // 對應的多個 Subject Subjects []string // 訊息 3 種保留策略 // RetentionPolicy 最大訊息數, 最大儲存空間或者最大存活時間達到限制, 就可以刪除訊息 // InterestPolicy 需要所有 consumer 確認可以刪除訊息 // WorkQueuePolicy 只需要一個 consumer 確認可以刪除訊息 Retention RetentionPolicy // 最大 Consumer 數量 MaxConsumers int // 最大儲存 Mgs 數量 MaxMsgs int64 // 最大儲存佔用 MaxBytes int64 // 訊息 2 種淘汰策略 // DiscardOld 訊息達到限制後, 丟棄最早的訊息 // DiscardNew 訊息達到限制後, 資訊訊息新推送會失敗 Discard DiscardPolicy // 訊息存活時間 MaxAge time.Duration // 每個 subject 最大訊息數量 MaxMsgsPerSubject int64 // 每個訊息最大大小 MaxMsgSize int32 // 支援檔案儲存和記憶體儲存 2 種類型 Storage StorageType // 訊息分片數量 Replicas int // 不需要 ack NoAck bool
// ...
} ```

Consumer

Consumer 定義了訊息接受方式並記錄接受到的位置

舉個例子如果消費者在 Sub 訊息的時候指定了 Consumer, 就會從記錄的位置開始推送訊息, 而不是從頭開始

貼一個 Consumer 的核心配置 (v1.15.0)

```go // jsm.go

type ConsumerConfig struct { // 名稱 Durable string json:"durable_name,omitempty" // 描述 Description string json:"description,omitempty" // 交付 Subject DeliverSubject string json:"deliver_subject,omitempty" // 交付 Group DeliverGroup string json:"deliver_group,omitempty" // 交付策略 // 交付所有 (預設), 交付最後一個, 交付最新, 自定義開始序號, 自定義開始時間 DeliverPolicy DeliverPolicy json:"deliver_policy" // 開始序號 OptStartSeq uint64 json:"opt_start_seq,omitempty" // 開始時間 OptStartTime *time.Time json:"opt_start_time,omitempty" // ack 策略 // 不需要ack (預設), 隱式ack All , 每個都需要顯示ack AckPolicy AckPolicy json:"ack_policy" // ack等待時間 AckWait time.Duration json:"ack_wait,omitempty" MaxDeliver int json:"max_deliver,omitempty" BackOff []time.Duration json:"backoff,omitempty" // 過濾的Subject FilterSubject string json:"filter_subject,omitempty" // 重試策略 // 儘快重試, ReplayOriginalPolicy 相同時間重試 ReplayPolicy ReplayPolicy json:"replay_policy" // 限速 RateLimit uint64 json:"rate_limit_bps,omitempty" // Bits per sec // 取樣頻率 SampleFrequency string json:"sample_freq,omitempty" // 最大等待數量 MaxWaiting int json:"max_waiting,omitempty" //最大Pending ack數量 MaxAckPending int json:"max_ack_pending,omitempty" // flow 控制 FlowControl bool json:"flow_control,omitempty" // 心跳時間 Heartbeat time.Duration json:"idle_heartbeat,omitempty" // ... } ```

程式碼示例

Core Publish-Subcribe

```go package main

import ( "fmt" "os" "time"

"github.com/nats-io/nats.go"

)

func main() { // 環境變數中獲取 NATS 伺服器地址 url := os.Getenv("NATS_URL") if url == "" { url = nats.DefaultURL }

// 連線 NATS 伺服器
nc, _ := nats.Connect(url)


defer nc.Drain()

// 生產訊息 1, 因為沒有消費者, 這個訊息會丟失
nc.Publish("greet.1", []byte("hello"))

// 訂閱訊息, 非同步接受, 這個時候有消費者了
sub, _ := nc.SubscribeSync("greet.*")

// 第一個訊息因為沒有消費者所以會丟失
msg, _ := sub.NextMsg(10 * time.Millisecond)
fmt.Println("subscribed after a publish...")
fmt.Printf("msg is nil? %v\n", msg == nil)

// 生產訊息 2, 3
nc.Publish("greet.2", []byte("hello"))
nc.Publish("greet.3", []byte("hello"))


msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)


msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)


nc.Publish("greet.4", []byte("hello"))


msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)

}

```

output:

bash subscribed after a publish... msg is nil? true msg data: "hello" on subject "greet.2" msg data: "hello" on subject "greet.3" msg data: "hello" on subject "greet.4"

Request-Reply

```go package main

import ( "fmt" "os" "time"

"github.com/nats-io/nats.go"

)

func main() {

url := os.Getenv("NATS_URL")
if url == "" {
    url = nats.DefaultURL
}


nc, _ := nats.Connect(url)
defer nc.Drain()


sub, _ := nc.Subscribe("greet.*", func(msg *nats.Msg) {

    name := msg.Subject[6:]
    msg.Respond([]byte("hello, " + name))
})


rep, _ := nc.Request("greet.joe", nil, time.Second)
fmt.Println(string(rep.Data))


rep, _ = nc.Request("greet.sue", nil, time.Second)
fmt.Println(string(rep.Data))


rep, _ = nc.Request("greet.bob", nil, time.Second)
fmt.Println(string(rep.Data))


sub.Unsubscribe()


_, err := nc.Request("greet.joe", nil, time.Second)
fmt.Println(err)

} ```

output

bash hello, joe hello, sue hello, bob nats: no responders available for request

Limits-based Stream

```go package main

import ( "encoding/json" "fmt" "log" "os" "time"

"github.com/nats-io/nats.go"

)

func main() {

url := os.Getenv("NATS_URL")
if url == "" {
    url = nats.DefaultURL
}


nc, _ := nats.Connect(url)


defer nc.Drain()


js, _ := nc.JetStream()


cfg := nats.StreamConfig{
    Name:     "EVENTS",
    Subjects: []string{"events.>"},
}


cfg.Storage = nats.FileStorage


js.AddStream(&cfg)
fmt.Println("created the stream")


js.Publish("events.page_loaded", nil)
js.Publish("events.mouse_clicked", nil)
js.Publish("events.mouse_clicked", nil)
js.Publish("events.page_loaded", nil)
js.Publish("events.mouse_clicked", nil)
js.Publish("events.input_focused", nil)
fmt.Println("published 6 messages")


js.PublishAsync("events.input_changed", nil)
js.PublishAsync("events.input_blurred", nil)
js.PublishAsync("events.key_pressed", nil)
js.PublishAsync("events.input_focused", nil)
js.PublishAsync("events.input_changed", nil)
js.PublishAsync("events.input_blurred", nil)


select {
case <-js.PublishAsyncComplete():
    fmt.Println("published 6 messages")
case <-time.After(time.Second):
    log.Fatal("publish took too long")
}


printStreamState(js, cfg.Name)


// 限制訊息數量
cfg.MaxMsgs = 10
js.UpdateStream(&cfg)
fmt.Println("set max messages to 10")


printStreamState(js, cfg.Name)

// 限制訊息大小
cfg.MaxBytes = 300
js.UpdateStream(&cfg)
fmt.Println("set max bytes to 300")


printStreamState(js, cfg.Name)

// 限制訊息最大存活時間
cfg.MaxAge = time.Second
js.UpdateStream(&cfg)
fmt.Println("set max age to one second")


printStreamState(js, cfg.Name)


fmt.Println("sleeping one second...")
time.Sleep(time.Second)


printStreamState(js, cfg.Name)

}

func printStreamState(js nats.JetStreamContext, name string) { info, _ := js.StreamInfo(name) b, _ := json.MarshalIndent(info.State, "", " ") fmt.Println("inspecting stream info") fmt.Println(string(b)) } ```

output

bash created the stream published 6 messages published 6 messages inspecting stream info { "messages": 12, "bytes": 594, "first_seq": 1, "first_ts": "2022-07-22T13:04:47.814798969Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0 } set max messages to 10 inspecting stream info { "messages": 10, "bytes": 496, "first_seq": 3, "first_ts": "2022-07-22T13:04:47.815772395Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0 } set max bytes to 300 inspecting stream info { "messages": 6, "bytes": 298, "first_seq": 7, "first_ts": "2022-07-22T13:04:47.817220635Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0 } set max age to one second inspecting stream info { "messages": 6, "bytes": 298, "first_seq": 7, "first_ts": "2022-07-22T13:04:47.817220635Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0 } sleeping one second... inspecting stream info { "messages": 0, "bytes": 0, "first_seq": 13, "first_ts": "1970-01-01T00:00:00Z", "last_seq": 12, "last_ts": "2022-07-22T13:04:47.817297637Z", "consumer_count": 0 }

更多示例參考: https://natsbyexample.com/

reference

官方文件: https://docs.nats.io/

官方GitHub: https://github.com/nats-io/nats.go

程式碼示例: https://natsbyexample.com/

https://marco79423.net/articles/%E6%B7%BA%E8%AB%87-natsstan-%E5%92%8C-jetstream-%E5%85%A9%E4%B8%89%E4%BA%8B

本文由mdnice多平臺釋出