讓我們使用 Go 實現基本的服務發現

語言: CN / TW / HK

注微信 公眾號 《雲原生CTO》更多雲原生乾貨等你來探索

專注 於  雲原生技術   分享

提供優質 雲原生開發   影片技術培訓

面試技巧 ,及 技術疑難問題 解答

雲原生技術分享不僅僅侷限於 Go Rust Python Istio containerd CoreDNS Envoy etcd Fluentd Harbor Helm Jaeger Kubernetes Open Policy Agent Prometheus Rook TiKV TUF Vitess Arg o Buildpacks CloudEvents CNI Contour Cortex CRI-O Falco Flux gRPC KubeEdge Linkerd NATS Notary OpenTracing Operator  Framework SPIFFE SPIRE  和  Thanos

讓我們使用 Go 實現基本的服務發現

我們已經知道,要請求一個服務例項(伺服器),我們必須知道它的網路位置( IP 地址和埠)。

隨著當今世界微服務的突破,越來越多的使用者、請求和需求使得這項工作變得非常困難。在基於雲的微服務時代,我們的服務會因為自動伸縮、故障、升級等各種不同的情況而不斷變化。由於這些變化,他們不斷獲得新的 IP

這就是服務發現進入微服務場景的地方。我們需要一些系統來隨時關注所有服務,並隨時跟蹤哪個服務部署在哪個 IP/埠組合上,以便微服務的客戶端可以進行相應的無縫路由。[1]

服務發現在概念上非常簡單:它的關鍵元件是服務登錄檔,它是應用程式服務例項的網路位置的資料庫。[2]該機制在服務例項啟動和停止時更新服務登錄檔。

實現服務發現主要有兩種方式:

  • 服務及其客戶端直接與服務註冊中心互動。

  • 部署基礎設施( k8s 等)處理服務發現。

我們將使用 3rd 方註冊模式來實現我們的服務發現。多虧了這種模式,一個名為註冊器的第三方(在我們的例子中非常基本的 go 函式 docker ps -a 以特定的時間間隔執行)而不是向服務註冊中心註冊自己的服務。

讓我們更詳細地回顧一下我們的應用程式。

反向代理

為了實現反向代理,我使用了 httputil 包。我實現這個提供負載平衡的主要目的。

為了以迴圈方式實現客戶端請求路由,我進行了基本的數學運算,計算了獲取請求的數量,並對服務登錄檔列表的長度進行了模組化操作,以便我輕鬆找到後端並代理請求。

package main

import (
 "fmt"
 "net/http"
 "sync/atomic"
)

type Application struct {
 RequestCount uint64
 SRegistry    *ServiceRegistry
}

func (a *Application) Handle(w http.ResponseWriter, r *http.Request) {
 atomic.AddUint64(&a.RequestCount, 1)

 if a.SRegistry.Len() == 0 {
  w.Write([]byte(`No backend entry in the service registry`))
  return
 }

 backendIndex := int(atomic.LoadUint64(&a.RequestCount) % uint64(a.SRegistry.Len()))
 fmt.Printf("Request routing to instance %d\n", backendIndex)

 a.SRegistry.GetByIndex(backendIndex).
  proxy.
  ServeHTTP(w, r)
}

Registrar

我使用 time.Tick 來實現特定時間間隔(預設 3 秒)之間的輪詢。在每個滴答聲中,程式碼都 docker ps -a 使用官方 docker go SDK 執行。(我使用 -a 是因為我需要知道哪些容器處於停機狀態,因此我從我們的服務登錄檔列表中刪除了不健康的容器 IP 和埠)。如果添加了一個新容器並處於執行狀態,請檢查它是否已存在於服務登錄檔中,如果不存在,則將其地址新增到服務登錄檔中。

package main

import (
 "context"
 "fmt"
 "github.com/docker/docker/api/types"
 "github.com/docker/docker/client"
 "time"
)

type Registrar struct {
 Interval  time.Duration
 DockerCLI *client.Client
 SRegistry *ServiceRegistry
}

const (
 HelloServiceImageName = "hello"
 ContainerRunningState = "running"
)

func (r *Registrar) Observe() {
 for range time.Tick(r.Interval) {
  cList, _ := r.DockerCLI.ContainerList(context.Background(), types.ContainerListOptions{
   All: true,
  })

  if len(cList) == 0 {
   r.SRegistry.RemoveAll()
   continue
  }

  for _, c := range cList {
   if c.Image != HelloServiceImageName {
    continue
   }

   _, exist := r.SRegistry.GetByContainerID(c.ID)

   if c.State == ContainerRunningState {
    if !exist {
     addr := fmt.Sprintf("http://localhost:%d", c.Ports[0].PublicPort)
     r.SRegistry.Add(c.ID, addr)
    }
   } else {
    if exist {
     r.SRegistry.RemoveByContainerID(c.ID)
    }
   }
  }
 }
}

服務註冊

這是一個非常基本的結構切片,具有併發訪問的安全性,這要歸功於 sync.RWMutex 並且如上所述,它保留了所有健康的後端地址列表。該列表由註冊商每 3 秒更新一次。

package main

import (
 "fmt"
 "net/http/httputil"
 "net/url"
 "sync"
)

type backend struct {
 proxy       *httputil.ReverseProxy
 containerID string
}

type ServiceRegistry struct {
 mu       sync.RWMutex
 backends []backend
}

func (s *ServiceRegistry) Init() {
 s.mu = sync.RWMutex{}
 s.backends = []backend{}
}

func (s *ServiceRegistry) Add(containerID, addr string) {
 s.mu.Lock()
 defer s.mu.Unlock()

 URL, _ := url.Parse(addr)

 s.backends = append(s.backends, backend{
  proxy:       httputil.NewSingleHostReverseProxy(URL),
  containerID: containerID,
 })
}

func (s *ServiceRegistry) GetByContainerID(containerID string) (backend, bool) {
 s.mu.RLock()
 defer s.mu.RUnlock()

 for _, b := range s.backends {
  if b.containerID == containerID {
   return b, true
  }
 }

 return backend{}, false
}

func (s *ServiceRegistry) GetByIndex(index int) backend {
 s.mu.RLock()
 defer s.mu.RUnlock()

 return s.backends[index]
}

func (s *ServiceRegistry) RemoveByContainerID(containerID string) {
 s.mu.Lock()
 defer s.mu.Unlock()

 var backends []backend
 for _, b := range s.backends {
  if b.containerID == containerID {
   continue
  }
  backends = append(backends, b)
 }

 s.backends = backends
}

func (s *ServiceRegistry) RemoveAll() {
 s.mu.Lock()
 defer s.mu.Unlock()

 s.backends = []backend{}
}

func (s *ServiceRegistry) Len() int {
 s.mu.RLock()
 defer s.mu.RUnlock()

 return len(s.backends)
}

func (s *ServiceRegistry) List() {
 s.mu.RLock()
 defer s.mu.RUnlock()

 for i := range s.backends {
  fmt.Println(s.backends[i].containerID)
 }
}

原始碼

https://github.com/Abdulsametileri/simple-service-discovery