微服務從程式碼到k8s部署應有盡有系列(八、各種佇列)
我們用一個系列來講解從需求到上線、從程式碼到k8s部署、從日誌到監控等各個方面的微服務完整實踐。
整個專案使用了go-zero開發的微服務,基本包含了go-zero以及相關go-zero作者開發的一些中介軟體,所用到的技術棧基本是go-zero專案組的自研元件,基本是go-zero全家桶了。
實戰專案地址:http://github.com/Mikaelemmmm/go-zero-looklook
1、概述
訊息佇列有很多種,有rabbitmq、rocketmq、kafka等常用的,其中go-queue(http://github.com/zeromicro/go-queue)是go-zero官方開發的訊息佇列元件,其中分為2類,一種是kq、一種是dq,kq是基於kafka的訊息佇列,dq是基於beanstalkd的延遲佇列,但是go-queue不支援定時任務。具體想更多瞭解go-queue的我之前也寫過一篇教程可以去看一下這裡不細說了。
本專案採用的是go-queue做訊息佇列,asynq做延遲佇列、定時佇列
為什麼使用asynq的幾個原因
- 直接基於redis,一般專案都有redis,而asynq本身就是基於redis所以可以少維護一箇中間件
- 支援訊息佇列、延遲佇列、定時任務排程 , 因為希望專案支援定時任務而asynq直接就支援
- 有webui介面,每個任務都可以暫停、歸檔、通過ui介面檢視成功失敗、監控
為什麼asynq支援訊息佇列還在使用go-queue?
- kafka的吞吐是業績出名的,如果前期量不大可以直接用asynq
- 沒啥目的,就是想給你們演示一下go-queue
在我們使用go-zero的時候,goctl給我們帶了很大的便利,但是目前go-zero只有生成api、rpc,很多同學在群裡問定時任務、延遲佇列、訊息佇列如何生成,目錄結構該怎樣做,其實go-zero是為我們設計好了的,就是serviceGroup,使用serviceGroup管理你的服務。
2、如何使用
在前面訂單、訊息等場景我們其實已經演示過了,這裡再額外單獨補充一次
我們還是拿order-mq來舉例子,顯然使用goctl生成api、rpc不是我們想要的,那我們就自己使用serviceGroup改造,目錄結構還是延續api的基本差不多,只是將handler改成了listen , 將logic換成了mqs。
2.1 在main中程式碼如下
var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
// log, prometheus, trace, metricsUrl
if err := c.SetUp(); err != nil {
panic(err)
}
serviceGroup := service.NewServiceGroup()
defer serviceGroup.Stop()
for _, mq := range listen.Mqs(c) {
serviceGroup.Add(mq)
}
serviceGroup.Start()
}
-
首先我們要定義配置以及解析配置。
-
其次為什麼我們要在這裡加SetUp而api、rpc不需要呢?因為api、rpc都是在MustNewServer中已經框架寫的,但是我們用serviceGroup管理沒有,可以手動點進去SetUp看看,這個方法中包含了log、prometheus、trace、metricsUrl的定義,一個方法可以省很多事情,這樣我們直接修改配置檔案就可以實現日誌、監控、鏈路追蹤了。
-
接下來就是go-zero的serivceGroup管理服務了,serviceGroup是用來管理一組service的,那service其實就是一個介面,程式碼如下
Service (程式碼在go-zero/core/service/servicegroup.go)
// Service is the interface that groups Start and Stop methods. Service interface { Starter // Start Stopper // Stop }
所以,只要你的服務實現了這兩個介面,就可以加入到serviceGroup統一管理
那可以看到我們把所有的mq都實現這個介面,然後統一放到都 list.Mqs中,在啟動服務即可
2.2 mq分類管理
go-zero-looklook/app/order/cmd/mq/internal/listen目錄下程式碼
該目錄下程式碼是統一管理不同型別mq,因為我們要管理kq、asynq可能後續還有rabbitmq、rocketmq等等,所以在這裡做了分類方便維護
統一管理在go-zero-looklook/app/order/cmd/mq/internal/listen/listen.go,然後在main中呼叫listen.Mqs可以獲取所有mq一起start
// 返回所有消費者
func Mqs(c config.Config) []service.Service {
svcContext := svc.NewServiceContext(c)
ctx := context.Background()
var services []service.Service
// kq :訊息佇列.
services = append(services, KqMqs(c, ctx, svcContext)...)
// asynq:延遲佇列、定時任務
services = append(services, AsynqMqs(c, ctx, svcContext)...)
// other mq ....
return services
}
go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定義的asynq
// asynq
// 定時任務、延遲任務
func AsynqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
return []service.Service{
// 監聽延遲佇列
deferMq.NewAsynqTask(ctx, svcContext),
// 監聽定時任務
}
}
go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定義的kq (go-queue的kafka)
// kq
// 訊息佇列
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
return []service.Service{
// 監聽消費流水狀態變更
kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
// .....
}
}
2.3 實際業務
編寫實際業務,我們就在go-zero-looklook/app/order/cmd/mq/internal/listen/mqs下,這裡為了方便維護,也是做了分類
- deferMq : 延遲佇列
- kq:訊息佇列
2.3.1 延遲佇列
// 監聽關閉訂單
type AsynqTask struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewAsynqTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqTask {
return &AsynqTask{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *AsynqTask) Start() {
fmt.Println("AsynqTask start ")
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass},
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
mux := asynq.NewServeMux()
// 關閉民宿訂單任務
mux.HandleFunc(asynqmq.TypeHomestayOrderCloseDelivery, l.closeHomestayOrderStateMqHandler)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
func (l *AsynqTask) Stop() {
fmt.Println("AsynqTask stop")
}
因為 asynq 要先啟動,然後定義路由任務,所以我們在asynqTask.go中做了統一的路由管理,之後我們每個業務都單獨的在deferMq的資料夾下面定義一個檔案(如“延遲關閉訂單:closeHomestayOrderState.go”),這樣每個業務一個檔案,跟go-zero的api、rpc的logic一樣,維護很方便
closeHomestayOrderState.go 關閉訂單邏輯
package deferMq
import (
"context"
"encoding/json"
"looklook/app/order/cmd/rpc/order"
"looklook/app/order/model"
"looklook/common/asynqmq"
"looklook/common/xerr"
"github.com/hibiken/asynq"
"github.com/pkg/errors"
)
func (l *AsynqTask) closeHomestayOrderStateMqHandler(ctx context.Context, t *asynq.Task) error {
var p asynqmq.HomestayOrderCloseTaskPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return errors.Wrapf(xerr.NewErrMsg("解析asynq task payload err"), "closeHomestayOrderStateMqHandler payload err:%v, payLoad:%+v", err, t.Payload())
}
resp, err := l.svcCtx.OrderRpc.HomestayOrderDetail(ctx, &order.HomestayOrderDetailReq{
Sn: p.Sn,
})
if err != nil || resp.HomestayOrder == nil {
return errors.Wrapf(xerr.NewErrMsg("獲取訂單失敗"), "closeHomestayOrderStateMqHandler 獲取訂單失敗 or 訂單不存在 err:%v, sn:%s ,HomestayOrder : %+v", err, p.Sn, resp.HomestayOrder)
}
if resp.HomestayOrder.TradeState == model.HomestayOrderTradeStateWaitPay {
_, err := l.svcCtx.OrderRpc.UpdateHomestayOrderTradeState(ctx, &order.UpdateHomestayOrderTradeStateReq{
Sn: p.Sn,
TradeState: model.HomestayOrderTradeStateCancel,
})
if err != nil {
return errors.Wrapf(xerr.NewErrMsg("關閉訂單失敗"), "closeHomestayOrderStateMqHandler 關閉訂單失敗 err:%v, sn:%s ", err, p.Sn)
}
}
return nil
}
2.3.2 kq訊息佇列
看go-zero-looklook/app/order/cmd/mq/internal/mqs/kq資料夾下,因為kq跟asynq不太一樣,它本身就是使用go-zero的Service管理的,已經實現了starter、stopper介面了,所以我們在/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go中直接定義好一個go-queue業務扔給serviceGroup,去交給main啟動就好了 , 我們的業務程式碼只需要實現go-queue的Consumer直接寫我們自己業務即可。
1)/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {
return []service.Service{
// 監聽消費流水狀態變更
kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),
// .....
}
}
可以看到kq.MustNewQueue本身返回就是 queue.MessageQueue , queue.MessageQueue又實現了Start、Stop
2)業務中
/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/mqs/kq/paymentUpdateStatus.go
func (l *PaymentUpdateStatusMq) Consume(_, val string) error {
fmt.Printf(" PaymentUpdateStatusMq Consume val : %s \n", val)
// 解析資料
var message kqueue.ThirdPaymentUpdatePayStatusNotifyMessage
if err := json.Unmarshal([]byte(val), &message); err != nil {
logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->Consume Unmarshal err : %v , val : %s", err, val)
return err
}
// 執行業務..
if err := l.execService(message); err != nil {
logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->execService err : %v , val : %s , message:%+v", err, val, message)
return err
}
return nil
}
我們在paymentUpdateStatus.go中只需要實現介面Consume 就可以接受來自kq傳過來的kafka的訊息了,我們只管在我們Consumer中處理我們業務即可
3、定時任務
關於定時任務,目前go-zero-looklook沒有使用,這裡我也說明一下
- 如果你想簡單一點直接使用cron(裸機、k8s都有),
- 如果稍微複雜一點可以使用http://github.com/robfig/cron包,在程式碼中定義時間
- 使用 xxl-job、gocron 分散式定時任務系統接入
- asynq 的 shedule
這裡因為專案用的asynq,我就演示一下asynq的shedule吧
分為client與server , client用來定義排程時間,server是到了時間接受client的訊息觸發來執行我們寫的業務的,實際業務我們應該寫在server,client用來定義業務排程時間的
asynqtest/docker-compose.yml
version: '3'
services:
#asynqmon asynq延遲佇列、定時佇列的webui
asynqmon:
image: hibiken/asynqmon:latest
container_name: asynqmon_asynq
ports:
- 8980:8080
command:
- '--redis-addr=redis:6379'
- '--redis-password=G62m50oigInC30sf'
restart: always
networks:
- asynqtest_net
depends_on:
- redis
#redis容器
redis:
image: redis:6.2.5
container_name: redis_asynq
ports:
- 63779:6379
environment:
# 時區上海
TZ: Asia/Shanghai
volumes:
# 資料檔案
- ./data/redis/data:/data:rw
command: "redis-server --requirepass G62m50oigInC30sf --appendonly yes"
privileged: true
restart: always
networks:
- asynqtest_net
networks:
asynqtest_net:
driver: bridge
ipam:
config:
- subnet: 172.22.0.0/16
asynqtest/shedule/client/client.go
package main
import (
"asynqtest/tpl"
"encoding/json"
"log"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:63779"
const redisPwd = "G62m50oigInC30sf"
func main() {
// 週期性任務
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{
Addr: redisAddr,
Password: redisPwd,
}, nil)
payload, err := json.Marshal(tpl.EmailPayload{Email: "[email protected]", Content: "發郵件呀"})
if err != nil {
log.Fatal(err)
}
task := asynq.NewTask(tpl.EMAIL_TPL, payload)
// 每隔1分鐘同步一次
entryID, err := scheduler.Register("*/1 * * * *", task)
if err != nil {
log.Fatal(err)
}
log.Printf("registered an entry: %q\n", entryID)
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
asynqtest/shedule/server/server.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"asynqtest/tpl"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: "127.0.0.1:63779", Password: "G62m50oigInC30sf"},
asynq.Config{
Concurrency: 10,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
},
)
mux := asynq.NewServeMux()
// 關閉民宿訂單任務
mux.HandleFunc(tpl.EMAIL_TPL, emailMqHandler)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
func emailMqHandler(ctx context.Context, t *asynq.Task) error {
var p tpl.EmailPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("emailMqHandler err:%+v", err)
}
fmt.Printf("p : %+v \n", p)
return nil
}
asynqtest/tpl/tpl.go
package tpl
const EMAIL_TPL = "schedule:email"
type EmailPayload struct {
Email string
Content string
}
啟動 server.go
、client.go
瀏覽器輸入http://127.0.0.1:8980/schedulers這裡 可以看到所有client定義的任務
瀏覽器輸入http://127.0.0.1:8990/這裡可以看到我們的server消費請
控制檯消費情況
說一下asynq的shedule在整合到專案中的思路,可以單獨啟動一個服務作為排程client定義系統的定時任務排程管理,將server定義在每個業務自己的mq的asynq一起即可。
4、結尾
在這一節中,我們學會使用了訊息佇列、延遲佇列 ,kafka可以通過管理工具去檢視,至於asynq檢視webui在go-zero-looklook/docker-compose-env.yml中我們已經啟動好了asynqmon,直接使用http://127.0.0.1:8980 即可檢視
專案地址
http://github.com/zeromicro/go-zero
http://gitee.com/kevwan/go-zero
歡迎使用 go-zero
並 star 支援我們!
微信交流群
關注『微服務實踐』公眾號並點選 交流群 獲取社群群二維碼。
- go-zero微服務實戰系列(六、快取一致性保證)
- 詳解連線池引數設定(邊調邊看)
- go-zero微服務實戰系列(五、快取程式碼怎麼寫)
- go-zero微服務實戰系列(四、CRUD熱熱身)
- go-zero微服務實戰系列(三、API定義和表結構設計)
- go-zero 微服務實戰系列(二、服務拆分)
- go-zero 微服務實戰系列(一、開篇)
- 微服務效率工具 goctl 深度解析(上)
- 型別安全的 Go HTTP 請求
- 用 Go 快速開發一個 RESTful API 服務
- Go 專案配置檔案的定義和讀取
- 簡單易懂的 Go 泛型使用和實現原理介紹
- Go單體服務開發最佳實踐
- 通過 SingleFlight 模式學習 Go 併發程式設計
- 程序內優雅管理多個服務
- 構建 Go 應用 docker 映象的十八種姿勢
- 史上最強程式碼自測方法,沒有之一!
- 微服務從程式碼到k8s部署應有盡有大結局(k8s部署)
- 微服務從程式碼到k8s部署應有盡有系列(十四、部署環境搭建)
- 微服務從程式碼到k8s部署應有盡有系列(十三、服務監控)