淺談etcd服務註冊與發現

語言: CN / TW / HK

開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第2天,點選檢視活動詳情

Hello朋友們,在之前參加雲原生活動的時候曾寫過一篇文章《淺談雲原生技術元件—etcd》,在其中我主要說明了etcd在基於Kubernetes雲原生微服務框架中的定位,主要是用來做服務的遠端配置、KV儲存等等,那麼今天就來簡要的補充講解下etcd的另一個重要的作用——服務註冊和發現,沒錯,正是和Zookeeper、Eureka、Consul等擁有一樣角色的開源微服務元件,且毫不遜色於這些,那麼我們就開始進行講解。

1 基於etcd的服務註冊與發現邏輯架構

1.1 服務註冊中心抽象

在這裡插入圖片描述 (圖片來自網路)

  • Service Registry(服務登錄檔,通常也成為服務註冊中心):內部擁有一個數據結構,用於儲存已釋出服務的配置資訊。註冊中心的作用一句話概括就是存放和排程服務的配置,實現服務和註冊中心,服務和服務之間的相互通訊,可以說是微服務中的”通訊錄“,它記錄了服務和服務地址的對映關係。
  • Service Requestor(服務呼叫者):根據服務註冊中心呼叫已有服務。
  • Service Provider(服務提供者):提供服務到服務註冊中心。

1.2 etcd服務註冊發現簡易版

在這裡插入圖片描述

2 程式碼實現

2.1 總體流程

服務提供者

(1)監聽網路

(2)建立gRPC服務端,並將具體的服務進行註冊

(3)利用服務地址、服務名等註冊etcd服務配置

(4)gRPC監聽服務

服務消費者

(1)註冊etcd解析器

(2)連線etcd服務

(3)獲取gRPC客戶端

(4)呼叫gRPC服務

2.2 程式碼

2.2.1 服務提供方

```go var ( cli *clientv3.Client Schema = "ns" Host = "127.0.0.1" Port = 3000 //埠 ServiceName = "api_log_service" //服務名稱 EtcdAddr = "127.0.0.1:2379" //etcd地址 )

type ApiLogServer struct{}

func (api ApiLogServer) GetApiLogByUid(ctx context.Context, req proto.ApiLogRequest) (*proto.ApiLogResponse, error) { resp := &proto.ApiLogResponse{ Msg: "ok", Data: "Hello", } return resp, nil }

//將服務地址註冊到etcd中 func register(etcdAddr, serviceName, serverAddr string, ttl int64) error { var err error if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ";"), DialTimeout: 50 * time.Second, }) if err != nil { fmt.Printf("connection server err : %s\n", err) return err } } //與etcd建立長連線,並保證連線不斷(心跳檢測) ticker := time.NewTicker(time.Second * time.Duration(ttl)) go func() { key := "/" + Schema + "/" + serviceName + "/" + serverAddr for { resp, err := cli.Get(context.Background(), key) if err != nil { fmt.Printf("get server address err : %s", err) } else if resp.Count == 0 { //尚未註冊 err = keepAlive(serviceName, serverAddr, ttl) if err != nil { fmt.Printf("keepAlive err : %s", err) } } <-ticker.C } }() return nil }

//保持伺服器與etcd的長連線 func keepAlive(serviceName, serverAddr string, ttl int64) error { //建立租約 leaseResp, err := cli.Grant(context.Background(), ttl) if err != nil { fmt.Printf("create grant err : %s\n", err) return err } //將服務地址註冊到etcd中 key := "/" + Schema + "/" + serviceName + "/" + serverAddr _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID)) if err != nil { fmt.Printf("register service err : %s", err) return err } //建立長連線 ch, err := cli.KeepAlive(context.Background(), leaseResp.ID) if err != nil { fmt.Printf("KeepAlive err : %s\n", err) return err } //清空keepAlive返回的channel go func() { for { <-ch } }() return nil }

//取消註冊 func unRegister(serviceName, serverAddr string) { if cli != nil { key := "/" + Schema + "/" + serviceName + "/" + serverAddr cli.Delete(context.Background(), key) } }

func RunApiLog() { //監聽網路 listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", Port)) if err != nil { fmt.Println("Listen network err :", err) return } defer listener.Close() //建立grpc srv := grpc.NewServer() defer srv.GracefulStop() //註冊到grpc服務中 proto.RegisterApiLogServiceServer(srv, &ApiLogServer{}) //將服務地址註冊到etcd中 serverAddr := fmt.Sprintf("%s:%d", Host, Port) fmt.Printf("rpc server address: %s\n", serverAddr) register(EtcdAddr, ServiceName, serverAddr, 10) //關閉訊號處理 ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch unRegister(ServiceName, serverAddr) if i, ok := s.(syscall.Signal); ok { os.Exit(int(i)) } else { os.Exit(0) } }() //監聽服務 err = srv.Serve(listener) if err != nil { fmt.Println("rpc server err : ", err) return } } ```

2.2.2 服務消費方

```go var ( cli *clientv3.Client Schema = "ns" ServiceName = "api_log_service" //服務名稱 EtcdAddr = "127.0.0.1:2379" //etcd地址 )

type EtcdResolver struct { etcdAddr string clientConn resolver.ClientConn }

func NewEtcdResolver(etcdAddr string) resolver.Builder { return &EtcdResolver{etcdAddr: etcdAddr} }

func (r *EtcdResolver) Scheme() string { return Schema }

//ResolveNow watch有變化呼叫 func (r *EtcdResolver) ResolveNow(rn resolver.ResolveNowOptions) { fmt.Println(rn) }

//Close 解析器關閉時呼叫 func (r *EtcdResolver) Close() { fmt.Println("Close") }

//Build 構建解析器 grpc.Dial()時呼叫 func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { var err error //構建etcd client if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(r.etcdAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { fmt.Printf("connect etcd err : %s\n", err) return nil, err } } r.clientConn = clientConn go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/") return r, nil }

//watch機制:監聽etcd中某個key字首的服務地址列表的變化 func (r *EtcdResolver) watch(keyPrefix string) { //初始化服務地址列表 var addrList []resolver.Address resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("get service list err : ", err) } else { for i := range resp.Kvs { addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)}) } } r.clientConn.NewAddress(addrList) //監聽服務地址列表的變化 rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, ev := range n.Events { addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exists(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) r.clientConn.NewAddress(addrList) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s r.clientConn.NewAddress(addrList) } } } } }

func exists(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false }

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false }

func RunClient() { //註冊etcd解析器 r := NewEtcdResolver(EtcdAddr) resolver.Register(r) //連線伺服器,同步呼叫r.Build() conn, err := grpc.Dial(r.Scheme()+"://author/"+ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure()) if err != nil { fmt.Printf("connect err : %s", err) } defer conn.Close() //獲得gRPC客戶端 c := proto.NewApiLogServiceClient(conn) //呼叫服務 resp, err := c.GetApiLogByUid( context.Background(), &proto.ApiLogRequest{UId: 0}, ) if err != nil { fmt.Printf("call service err : %s", err) return } fmt.Printf("resp : %s , data : %s", resp.Msg, resp.Data) } ```

2.2.3 公共元件

```protobuf syntax = "proto3";
package proto;

option go_package = "../api_log";

service ApiLogService { rpc GetApiLogByUid(ApiLogRequest) returns (ApiLogResponse){} }

message ApiLogRequest{ int32 u_id = 1; }

message ApiLogResponse{ int64 code = 1; string msg = 2; int64 count = 3; string data = 4; } ```

注意要在編譯後進行使用哈

2.3 注意事項

在我編寫程式碼進行實現的過程中遇到過種種問題,但是最讓人記憶深刻的就是etcd與gRPC版本不相容的問題,用了很長時間才搞定,在這裡記錄下吧:

原因是etcd3.x版本不支援grpc1.27版本以上,但是grpc1.27以下編譯成的中間程式碼又不支援新版本的proto buffer,這就陷入了一個兩難的處境,最後通過Stack Overflow才查到:

https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc

解決,在go.mod中加入這幾行程式碼:

go replace ( github.com/coreos/etcd => github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible google.golang.org/grpc => google.golang.org/grpc v1.27.0 )

3 細節剖析

3.1 服務生產端keepAlive

keepAlive是一個老生常談的問題了,下到TCP/IP、HTTP連線,上到Redis叢集、MySQL叢集,都會有該機制,那麼etcd的keepAlive是怎麼搞的呢?

下面我們來看下

etcd使用LeaseKeepAlive API呼叫建立的雙向流來重新整理租約。當客戶端希望重新整理租約時,它通過流傳送一個leasekeepaliverrequest:

protobuf message LeaseKeepAliveRequest { int64 ID = 1; }

  • ID :keepAlive有效的租約ID。

LeaseKeepAliveResponse作為keepAlive的響應:

protobuf message LeaseKeepAliveResponse { ResponseHeader header = 1; int64 ID = 2; int64 TTL = 3; }

  • ID :用新的TTL重新整理的租約。
  • TTL :新的生存時間,以秒為單位,租約剩餘的時間。

3.2 服務消費端watch機制

Watch API提供了一個基於事件的介面,用於非同步監視服務key的更改。etcd3 watch通過持續觀察給定的修訂(當前的或歷史的)來等待鍵的更改,並將鍵更新流回客戶端。

對每個鍵的每次更改都用“Event”訊息表示。Event訊息提供了更新的資料和更新的型別:

protobuf message Event { enum EventType { PUT = 0; DELETE = 1; } EventType type = 1; KeyValue kv = 2; KeyValue prev_kv = 3; }

  • type:PUT型別表示新資料的更新,DELETE表示key的刪除。
  • kv:與事件相關的鍵值PUT事件包含kv。
  • prev_kv:事件發生前修改版本的金鑰的鍵值對。為了節省頻寬,它只在watch顯式啟用的情況下填寫。

watch流:

watch是長時間執行的請求,並使用gRPC流來流化事件資料。watch流是雙向的;客戶端寫入流來建立監視,讀取流來接收監視事件。通過使用每個watch識別符號來標記事件,單個watch流可以將多個不同的手錶組合在一起。這種多路複用有助於減少核心etcd叢集上的記憶體佔用和連線開銷。

4 總結

微服務是當今網際網路領域的廣泛概念,也是一種架構演進的結果,微服務的存在讓架構設計更加的解耦合,讓人員的分工更加明確,當然他的落地實現也並不止步與某一兩種方式,在雲原生領域的Kubernetes+etcd,網際網路領域常用的Spring Cloud全家桶以及Dubbo等都是微服務的具體實現,而etcd也僅僅是微服務中服務註冊中心元件角色的一個代表而已。

參考:

https://etcd.io/docs/v3.5/dev-guide/grpc_naming/

https://www.jianshu.com/p/217d0e3a8d0f

https://www.cnblogs.com/wujuntian/p/12838041.html

https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc

https://blog.csdn.net/fly910905/article/details/103545120