gRPC 客戶端呼叫服務端需要連線池嗎?

語言: CN / TW / HK

theme: scrolls-light

發現的問題

在微服務開發中,gRPC 的應用絕對少不了,一般情況下,內部微服務互動,通常是使用 RPC 進行通訊,如果是外部通訊的話,會提供 https 介面文件

對於 gRPC 的基本使用可以檢視文章 gRPC介紹

對於 gRPC ,我們需要基本知道如下的一些知識點:

  • gRPC 的基本四種模式的應用場景

    • 請求響應模式
    • 客戶端資料流模式
    • 服務端資料流模式
    • 雙向流模式
  • Proto 檔案的定義和使用
  • gRPC 攔截器的應用 , 基本的可以檢視這篇 gRPC 攔截器

    • 實際上有客戶端攔截器 和 服務端攔截器,具體詳細的可以自行學習
  • gRPC 的設計原理細節
  • Go-Kit 的使用

當然今天並不是要聊 gRPC 的應用或者原理,而是想聊我們在開發過程中很容易遇到的問題:

  • 未複用 gRPC 客戶端連線,影響效能

最近審查各個服務程式碼中,發現整個部門使用 gRPC 客戶端請求服務端介面的時候,都是會新建一個連線,然後呼叫服務端介面,使用完畢之後就 close 掉, 例如這樣

這會有什麼問題呢?

正常簡單的使用不會有啥問題,但如果是面臨高併發的情況,效能問題很容易就會出現,例如我們在做效能測試的時候,就會發現,打一會效能測試,客戶端請求服務端的時候就會報錯:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

實際去檢視問題的時候,很明顯,這是 gRPC 的連線數被打滿了,很多連線都還未完全釋放

那這個時候,簡單思考一下,我們是沒有必要對於每一次客戶端請求服務端介面的時候,都新建立一次連線,並且呼叫完畢之後就馬上關閉連線

我們知道,gRPC 的通訊本質上也是 TCP 的連線,那麼一次連線就需要三次握手,和四次揮手,每一次建立連線和釋放連線的時候,都需要走這麼一個過程,如果我們頻繁的建立和釋放連線,這對於資源和效能其實都是一個大大的浪費

我們還知道 gRPC 是一個高效能、開源和擁有統一規定的 RPC框架,面向物件的 http/2 通訊協議,能夠能節省空間和 IO 密集度的開銷 ,但是我們並沒有很好的將他運用起來,gRPC 服務端的連線管理不用我們操心,但是我們對於 gRPC 客戶端的連續非常有必要關心,咱們要想辦法複用客戶端的連線

gRPC 連線池

複用連線,我們可以使用連線池的方式

對於這種複用資源,我們其實也接觸了不少,例如複用執行緒 worker 的執行緒池,go 中的協程池 ..

簡單來說,連線池 ,就是提前建立好一定數量的 tcp 連線控制代碼放在池子中,咱們需要和外部通訊的時候,就去池子中取一個連線來用,用完了之後,咱們就放回去

連線池解決了什麼問題

很明顯,連線池解決了上述咱們頻繁建立連線和釋放連線帶來的資源和效能上的損耗,咱們節省了這部分開銷後,自然就提高了咱們的效能

可是我們再次思考一下,如果這個連線池子就是隻能存放固定的連線,那麼我們業務擴張的時候,豈不是光等待池子裡面有空閒連線就會耗費大量的時間呢?

或者是池子過大,咱們需要的連線數較少,那麼開闢那麼多連線豈不是一種浪費?

那麼我們在設計或者是應用連線池的時候,就需要考慮如下幾個方面了:

  • 連線池是否支援擴縮容
  • 空閒的連線是否支援超時自行關閉,是否支援保活
  • 池子滿的時候,處理的策略是什麼樣的

其實關於連線池的設計和庫網上都很多,我們可以找一個案例來看看如何來使用連線池,以及它是如何來進行上述幾個方面的編碼落地的

如何去使用連線池

先來看看客戶端如何使用連線池

客戶端使用 pool

client/main.go

``` package main

import ( "context" "flag" "fmt" "log" "time"

    "mypoolclient/pool"
    "mypoolclient/pb"

)

var addr = flag.String("addr", "127.0.0.1:8888", "the address to connect to")

func main() { flag.Parse()

    p, err := pool.New(*addr, pool.DefaultOptions)
    if err != nil {
            log.Fatalf("failed to new pool: %v", err)
    }
    defer p.Close()

    conn, err := p.Get()
    if err != nil {
            log.Fatalf("failed to get conn: %v", err)
    }
    defer conn.Close()

    client := pb.NewTestsvrClient(conn.Value())
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    res, err := client.Say(ctx, &pb.TestReq{Message: []byte("hi")})
    if err != nil {
            log.Fatalf("unexpected error from Say: %v", err)
    }
    fmt.Println("rpc response:", res)

} ```

此處的客戶端,我們很明顯可以看出來,以前咱們使用客戶端去呼叫服務端介面的時候,總會不自覺的 Dial 一下建立連線

咱們使用連線池的話,就可以直接從池子裡面拿一個連接出來直接使用即可

服務端

server/client.go

``` package main

import ( "context" "flag" "fmt" "log" "net"

    "google.golang.org/grpc"

    "mypoolserver/pb"

)

var port = flag.Int("port", 8888, "port number")

// server implements EchoServer. type server struct{}

func (s server) Say(context.Context, pb.TestReq) (*pb.TestRsp, error) { fmt.Println("call Say ... ") return &pb.TestRsp{Message: []byte("hello world")}, nil }

func main() { flag.Parse()

    listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", *port))
    if err != nil {
            log.Fatalf("failed to listen: %v", err)
    }

    s := grpc.NewServer()
    pb.RegisterTestsvrServer(s, &server{})
    fmt.Println("start server ...")

    if err := s.Serve(listen); err != nil {
            log.Fatalf("failed to serve: %v", err)
    }


    fmt.Println("over server ...")

} ```

連線池的具體實現方式

連線池的具體實現方式,參考了 github http://github.com/shimingyah/pool

具體的實現,都放在上述目錄的 pool 下面了 , 也可以訪問地址 : http://github.com/qingconglaixueit/mypoolapp

pool 包中包含了 3 個檔案,作用如下:

.

├── conn.go

-- 關於 grpc 連線的結構定義和方法實現

├── options.go

-- 攔截器的常量定義,以及 Dial 建立連線的簡單封裝, 這個檔案可要可不要,看自己的需求

└── pool.go

-- 具體 pool 的介面定義和實現

直接來看 pool.go 中的介面定義

``` type Pool interface { Get() (Conn, error)

Close() error

Status() string } ```

  • Get()

獲取一個新的連線 , 當關閉連線的時候,會將該連線放入到池子中

  • Close()

關閉連線池,自然連線池子中的連線也不再可用

關於 pool 結構的定義 ,conn 結構的定義建議,將上述 github 地址上的原始碼下載下來進行閱讀,下面主要是分享關於

  • 連線池子的建立,擴縮容,釋放
  • 具體 TCP 連線的建立和釋放

建立連線池

``` func New(address string, option Options) (Pool, error) { if address == "" { return nil, errors.New("invalid address settings") } if option.Dial == nil { return nil, errors.New("invalid dial settings") } if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive { return nil, errors.New("invalid maximum settings") } if option.MaxConcurrentStreams <= 0 { return nil, errors.New("invalid maximun settings") }

p := &pool{ index: 0, current: int32(option.MaxIdle), ref: 0, opt: option, conns: make([]*conn, option.MaxActive), address: address, closed: 0, }

for i := 0; i < p.opt.MaxIdle; i++ { c, err := p.opt.Dial(address) if err != nil { p.Close() return nil, fmt.Errorf("dial is not able to fill the pool: %s", err) } p.conns[i] = p.wrapConn(c, false) } log.Printf("new pool success: %v\n", p.Status())

return p, nil } ```

關於 pool 的介面,可以看成是這樣的

對於建立連線池,除了校驗基本的引數以外,我們知道池子其實是一個 TCP 連線的切片,長度為 option.MaxActive 即最大的活躍連線數

p.conns[i] = p.wrapConn(c, false) 表示咱們初始化一個連線,並放到連線池中,且初始化的 once 引數置為 false,表示該連線預設儲存在池子中,不被銷燬

換句話說,當我們需要真實銷燬連線池中的連線的時候,就將該連結的 once 引數置為 false 即可,實際上也無需我們使用這去做這一步

實際上 關於每一個連線的建立也是在 New 裡面完成的,只要有 1 個連線未建立成功,那麼咱們的連線池就算是建立失敗,咱們會呼叫 p.Close() 將之前建立好的連線全部釋放掉

``` // 關閉連線池 func (p *pool) Close() error { atomic.StoreInt32(&p.closed, 1) atomic.StoreUint32(&p.index, 0) atomic.StoreInt32(&p.current, 0) atomic.StoreInt32(&p.ref, 0) p.deleteFrom(0) log.Printf("close pool success: %v\n", p.Status()) return nil }

// 清除從 指定位置開始到 MaxActive 之間的連線 func (p *pool) deleteFrom(begin int) { for i := begin; i < p.opt.MaxActive; i++ { p.reset(i) } }

// 清除具體的連線 func (p *pool) reset(index int) { conn := p.conns[index] if conn == nil { return } conn.reset() p.conns[index] = nil } ```

這裡我們可以看到,當需要從池子中清除具體的連線的時候,最終從連線池子中取出對應位置上的連線 ,conn := p.conns[index], conn.reset() ,實際上是給當前這個連線進行引數賦值

``` func (c *conn) reset() error { cc := c.cc c.cc = nil c.once = false if cc != nil { return cc.Close() } return nil }

func (c *conn) Close() error { c.pool.decrRef() if c.once { return c.reset() } return nil } ```

最終呼叫 Close() 將指定的連線清除掉,這些動作都是連線池自動給我們做了,無需我們使用者去擔心

我們使用連線池通過 pool.Get() 拿到具體的連線控制代碼 conn 之後,我們使用 conn.Close() 關閉連線,實際上也是會走到上述的 Close() 實現的位置,但是我們並未指定當然也沒有許可權顯示的指定將 once 置位為 false ,因此對於呼叫者來說,是關閉了連線,對於連線池來說,實際上是將連線歸還到連線池中

關於連線池子的縮容和擴容是在 pool.Get() 中實現的

``` func (p pool) Get() (Conn, error) { // the first selected from the created connections nextRef := p.incrRef() p.RLock() current := atomic.LoadInt32(&p.current) p.RUnlock() if current == 0 { return nil, ErrClosed } if nextRef <= currentint32(p.opt.MaxConcurrentStreams) { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil }

// the number connection of pool is reach to max active if current == int32(p.opt.MaxActive) { // the second if reuse is true, select from pool's connections if p.opt.Reuse { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // the third create one-time connection c, err := p.opt.Dial(p.address) return p.wrapConn(c, true), err }

// the fourth create new connections given back to pool p.Lock() current = atomic.LoadInt32(&p.current) if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) { // 2 times the incremental or the remain incremental increment := current if current+increment > int32(p.opt.MaxActive) { increment = int32(p.opt.MaxActive) - current } var i int32 var err error for i = 0; i < increment; i++ { c, er := p.opt.Dial(p.address) if er != nil { err = er break } p.reset(int(current + i)) p.conns[current+i] = p.wrapConn(c, false) } current += i log.Printf("grow pool: %d ---> %d, increment: %d, maxActive: %d\n", p.current, current, increment, p.opt.MaxActive) atomic.StoreInt32(&p.current, current) if err != nil { p.Unlock() return nil, err } } p.Unlock() next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } ```

從 Get 的實現中,我們可以知道 Get 的邏輯如下

  • 先增加連線的引用計數,如果在設定 current*int32(p.opt.MaxConcurrentStreams) 範圍內,那麼直接取連線進行使用即可
  • 若當前的連線數達到了最大活躍的連線數,那麼就看我們新建池子的時候傳遞的 option 中的 reuse 引數是否是 true,若是複用,則隨機取出連線池中的任意連線提供使用,如果不復用,則新建一個連線
  • 其餘的情況,就需要我們進行 2 倍或者 1 倍的數量對連線池進行擴容了

實際上,上述的庫中,並沒有提供咱們縮容的演算法,如果真的有這方面的需求的話

也可以在 Get 的實現上進行縮容,具體的縮容策略可以根據實際情況來定,例如當引用計數 nextRef 只有當前活躍連線數的 20% 的時候(這只是一個例子),就可以考慮縮容了

感謝閱讀,歡迎交流,點個贊,關注一波 再走吧

我正在參與掘金技術社群創作者簽約計劃招募活動,點選連結報名投稿