gRPC 客户端調用服務端需要連接池嗎?

語言: CN / TW / HK

theme: scrolls-light

發現的問題

在微服務開發中,gRPC 的應用絕對少不了,一般情況下,內部微服務交互,通常是使用 RPC 進行通信,如果是外部通信的話,會提供 https 接口文檔

對於 gRPC 的基本使用可以查看文章 gRPC介紹

對於 gRPC ,我們需要基本知道如下的一些知識點:

  • gRPC 的基本四種模式的應用場景

    • 請求響應模式
    • 客户端數據流模式
    • 服務端數據流模式
    • 雙向流模式
  • Proto 文件的定義和使用
  • gRPC 攔截器的應用 , 基本的可以查看這篇 gRPC 攔截器

    • 實際上有客户端攔截器 和 服務端攔截器,具體詳細的可以自行學習
  • gRPC 的設計原理細節
  • Go-Kit 的使用

當然今天並不是要聊 gRPC 的應用或者原理,而是想聊我們在開發過程中很容易遇到的問題:

  • 未複用 gRPC 客户端連接,影響性能

最近審查各個服務代碼中,發現整個部門使用 gRPC 客户端請求服務端接口的時候,都是會新建一個連接,然後調用服務端接口,使用完畢之後就 close 掉, 例如這樣

這會有什麼問題呢?

正常簡單的使用不會有啥問題,但如果是面臨高併發的情況,性能問題很容易就會出現,例如我們在做性能測試的時候,就會發現,打一會性能測試,客户端請求服務端的時候就會報錯:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

實際去查看問題的時候,很明顯,這是 gRPC 的連接數被打滿了,很多連接都還未完全釋放

那這個時候,簡單思考一下,我們是沒有必要對於每一次客户端請求服務端接口的時候,都新建立一次連接,並且調用完畢之後就馬上關閉連接

我們知道,gRPC 的通信本質上也是 TCP 的連接,那麼一次連接就需要三次握手,和四次揮手,每一次建立連接和釋放連接的時候,都需要走這麼一個過程,如果我們頻繁的建立和釋放連接,這對於資源和性能其實都是一個大大的浪費

我們還知道 gRPC 是一個高性能、開源和擁有統一規定的 RPC框架,面向對象的 http/2 通信協議,能夠能節省空間和 IO 密集度的開銷 ,但是我們並沒有很好的將他運用起來,gRPC 服務端的連接管理不用我們操心,但是我們對於 gRPC 客户端的連續非常有必要關心,咱們要想辦法複用客户端的連接

gRPC 連接池

複用連接,我們可以使用連接池的方式

對於這種複用資源,我們其實也接觸了不少,例如複用線程 worker 的線程池,go 中的協程池 ..

簡單來説,連接池 ,就是提前創建好一定數量的 tcp 連接句柄放在池子中,咱們需要和外部通信的時候,就去池子中取一個連接來用,用完了之後,咱們就放回去

連接池解決了什麼問題

很明顯,連接池解決了上述咱們頻繁創建連接和釋放連接帶來的資源和性能上的損耗,咱們節省了這部分開銷後,自然就提高了咱們的性能

可是我們再次思考一下,如果這個連接池子就是隻能存放固定的連接,那麼我們業務擴張的時候,豈不是光等待池子裏面有空閒連接就會耗費大量的時間呢?

或者是池子過大,咱們需要的連接數較少,那麼開闢那麼多連接豈不是一種浪費?

那麼我們在設計或者是應用連接池的時候,就需要考慮如下幾個方面了:

  • 連接池是否支持擴縮容
  • 空閒的連接是否支持超時自行關閉,是否支持保活
  • 池子滿的時候,處理的策略是什麼樣的

其實關於連接池的設計和庫網上都很多,我們可以找一個案例來看看如何來使用連接池,以及它是如何來進行上述幾個方面的編碼落地的

如何去使用連接池

先來看看客户端如何使用連接池

客户端使用 pool

client/main.go

``` package main

import ( "context" "flag" "fmt" "log" "time"

    "mypoolclient/pool"
    "mypoolclient/pb"

)

var addr = flag.String("addr", "127.0.0.1:8888", "the address to connect to")

func main() { flag.Parse()

    p, err := pool.New(*addr, pool.DefaultOptions)
    if err != nil {
            log.Fatalf("failed to new pool: %v", err)
    }
    defer p.Close()

    conn, err := p.Get()
    if err != nil {
            log.Fatalf("failed to get conn: %v", err)
    }
    defer conn.Close()

    client := pb.NewTestsvrClient(conn.Value())
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    res, err := client.Say(ctx, &pb.TestReq{Message: []byte("hi")})
    if err != nil {
            log.Fatalf("unexpected error from Say: %v", err)
    }
    fmt.Println("rpc response:", res)

} ```

此處的客户端,我們很明顯可以看出來,以前咱們使用客户端去調用服務端接口的時候,總會不自覺的 Dial 一下建立連接

咱們使用連接池的話,就可以直接從池子裏面拿一個連接出來直接使用即可

服務端

server/client.go

``` package main

import ( "context" "flag" "fmt" "log" "net"

    "google.golang.org/grpc"

    "mypoolserver/pb"

)

var port = flag.Int("port", 8888, "port number")

// server implements EchoServer. type server struct{}

func (s server) Say(context.Context, pb.TestReq) (*pb.TestRsp, error) { fmt.Println("call Say ... ") return &pb.TestRsp{Message: []byte("hello world")}, nil }

func main() { flag.Parse()

    listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", *port))
    if err != nil {
            log.Fatalf("failed to listen: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterTestsvrServer(s, &server{})
    fmt.Println("start server ...")

    if err := s.Serve(listen); err != nil {
            log.Fatalf("failed to serve: %v", err)
    }


    fmt.Println("over server ...")

} ```

連接池的具體實現方式

連接池的具體實現方式,參考了 github https://github.com/shimingyah/pool

具體的實現,都放在上述目錄的 pool 下面了 , 也可以訪問地址 : https://github.com/qingconglaixueit/mypoolapp

pool 包中包含了 3 個文件,作用如下:

.

├── conn.go

-- 關於 grpc 連接的結構定義和方法實現

├── options.go

-- 攔截器的常量定義,以及 Dial 建立連接的簡單封裝, 這個文件可要可不要,看自己的需求

└── pool.go

-- 具體 pool 的接口定義和實現

直接來看 pool.go 中的接口定義

``` type Pool interface { Get() (Conn, error)

Close() error

Status() string } ```

  • Get()

獲取一個新的連接 , 當關閉連接的時候,會將該連接放入到池子中

  • Close()

關閉連接池,自然連接池子中的連接也不再可用

關於 pool 結構的定義 ,conn 結構的定義建議,將上述 github 地址上的源碼下載下來進行閲讀,下面主要是分享關於

  • 連接池子的創建,擴縮容,釋放
  • 具體 TCP 連接的創建和釋放

創建連接池

``` func New(address string, option Options) (Pool, error) { if address == "" { return nil, errors.New("invalid address settings") } if option.Dial == nil { return nil, errors.New("invalid dial settings") } if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive { return nil, errors.New("invalid maximum settings") } if option.MaxConcurrentStreams <= 0 { return nil, errors.New("invalid maximun settings") }

p := &pool{ index: 0, current: int32(option.MaxIdle), ref: 0, opt: option, conns: make([]*conn, option.MaxActive), address: address, closed: 0, }

for i := 0; i < p.opt.MaxIdle; i++ { c, err := p.opt.Dial(address) if err != nil { p.Close() return nil, fmt.Errorf("dial is not able to fill the pool: %s", err) } p.conns[i] = p.wrapConn(c, false) } log.Printf("new pool success: %v\n", p.Status())

return p, nil } ```

關於 pool 的接口,可以看成是這樣的

對於創建連接池,除了校驗基本的參數以外,我們知道池子其實是一個 TCP 連接的切片,長度為 option.MaxActive 即最大的活躍連接數

p.conns[i] = p.wrapConn(c, false) 表示咱們初始化一個連接,並放到連接池中,且初始化的 once 參數置為 false,表示該連接默認保存在池子中,不被銷燬

換句話説,當我們需要真實銷燬連接池中的連接的時候,就將該鏈接的 once 參數置為 false 即可,實際上也無需我們使用這去做這一步

實際上 關於每一個連接的建立也是在 New 裏面完成的,只要有 1 個連接未建立成功,那麼咱們的連接池就算是建立失敗,咱們會調用 p.Close() 將之前建立好的連接全部釋放掉

``` // 關閉連接池 func (p *pool) Close() error { atomic.StoreInt32(&p.closed, 1) atomic.StoreUint32(&p.index, 0) atomic.StoreInt32(&p.current, 0) atomic.StoreInt32(&p.ref, 0) p.deleteFrom(0) log.Printf("close pool success: %v\n", p.Status()) return nil }

// 清除從 指定位置開始到 MaxActive 之間的連接 func (p *pool) deleteFrom(begin int) { for i := begin; i < p.opt.MaxActive; i++ { p.reset(i) } }

// 清除具體的連接 func (p *pool) reset(index int) { conn := p.conns[index] if conn == nil { return } conn.reset() p.conns[index] = nil } ```

這裏我們可以看到,當需要從池子中清除具體的連接的時候,最終從連接池子中取出對應位置上的連接 ,conn := p.conns[index], conn.reset() ,實際上是給當前這個連接進行參數賦值

``` func (c *conn) reset() error { cc := c.cc c.cc = nil c.once = false if cc != nil { return cc.Close() } return nil }

func (c *conn) Close() error { c.pool.decrRef() if c.once { return c.reset() } return nil } ```

最終調用 Close() 將指定的連接清除掉,這些動作都是連接池自動給我們做了,無需我們使用者去擔心

我們使用連接池通過 pool.Get() 拿到具體的連接句柄 conn 之後,我們使用 conn.Close() 關閉連接,實際上也是會走到上述的 Close() 實現的位置,但是我們並未指定當然也沒有權限顯示的指定將 once 置位為 false ,因此對於調用者來説,是關閉了連接,對於連接池來説,實際上是將連接歸還到連接池中

關於連接池子的縮容和擴容是在 pool.Get() 中實現的

``` func (p pool) Get() (Conn, error) { // the first selected from the created connections nextRef := p.incrRef() p.RLock() current := atomic.LoadInt32(&p.current) p.RUnlock() if current == 0 { return nil, ErrClosed } if nextRef <= currentint32(p.opt.MaxConcurrentStreams) { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil }

// the number connection of pool is reach to max active if current == int32(p.opt.MaxActive) { // the second if reuse is true, select from pool's connections if p.opt.Reuse { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // the third create one-time connection c, err := p.opt.Dial(p.address) return p.wrapConn(c, true), err }

// the fourth create new connections given back to pool p.Lock() current = atomic.LoadInt32(&p.current) if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) { // 2 times the incremental or the remain incremental increment := current if current+increment > int32(p.opt.MaxActive) { increment = int32(p.opt.MaxActive) - current } var i int32 var err error for i = 0; i < increment; i++ { c, er := p.opt.Dial(p.address) if er != nil { err = er break } p.reset(int(current + i)) p.conns[current+i] = p.wrapConn(c, false) } current += i log.Printf("grow pool: %d ---> %d, increment: %d, maxActive: %d\n", p.current, current, increment, p.opt.MaxActive) atomic.StoreInt32(&p.current, current) if err != nil { p.Unlock() return nil, err } } p.Unlock() next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } ```

從 Get 的實現中,我們可以知道 Get 的邏輯如下

  • 先增加連接的引用計數,如果在設定 current*int32(p.opt.MaxConcurrentStreams) 範圍內,那麼直接取連接進行使用即可
  • 若當前的連接數達到了最大活躍的連接數,那麼就看我們新建池子的時候傳遞的 option 中的 reuse 參數是否是 true,若是複用,則隨機取出連接池中的任意連接提供使用,如果不復用,則新建一個連接
  • 其餘的情況,就需要我們進行 2 倍或者 1 倍的數量對連接池進行擴容了

實際上,上述的庫中,並沒有提供咱們縮容的算法,如果真的有這方面的需求的話

也可以在 Get 的實現上進行縮容,具體的縮容策略可以根據實際情況來定,例如當引用計數 nextRef 只有當前活躍連接數的 20% 的時候(這只是一個例子),就可以考慮縮容了

感謝閲讀,歡迎交流,點個贊,關注一波 再走吧

我正在參與掘金技術社區創作者簽約計劃招募活動,點擊鏈接報名投稿