如何在EasyCVR中實現NSQ延時推流技術?

語言: CN / TW / HK

EasyCVR 是TSINGSEE青犀影片開發的高穩定、高接入性的影片平臺,可接入的協議豐富,且可通過國標協議級聯。EasyCVR 各模組之間進行訊息通訊時,需要一款訊息中介軟體進行訊息的傳輸和傳送。在調研各種 MQ 中介軟體後,確定採用 NSQ 來進行編譯。

EasyCVR 使用 NSQ 時,希望延時 60s,消費端才能夠收到對應的訊息,因此我們本文主要是調研是否有該功能的過程,我們主要使用 DeferredPublish 方法實現,方法程式碼如下:

package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "time"
   "zhangqiadams.com/gotools/model/consts"
)

func main() {
   config := nsq.NewConfig()
   // 1. 向 nsqd 的 tcp 埠傳送訊息,因此進行對應的配置
   producer, err := nsq.NewProducer("127.0.0.1:4154", config)
   if err != nil {
      log.Fatal(err)
   }

   messageBody := []byte("hello world delay")
   topicName := "topic2"

   // 2. 同步推流到 nspd, 同步推流代表等待 nspd 的響應,如果傳送失敗返回錯誤。
   // PublishAsync 代表是非同步推送訊息給 nspd,傳送完訊息後立刻返回
   err = producer.DeferredPublish(topicName, 60*time.Second, messageBody)
   fmt.Println("傳送訊息時間為", time.Now().Format(consts.TimeFormat))
   if err != nil {
      log.Fatal(err)
   }

   /*sigChan := make(chan os.Signal, 1)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   <-sigChan*/

   // 3. 停止生產者,一般在停止服務,停止程序的時候需要呼叫
   producer.Stop()
}

在 14:06:45 開始傳送了一個訊息。

消費者在 60s 後收到訊息,14:07:46 收到對應的訊息。

經過程式碼確認,延時訊息的傳送是在 nsqd 中進行實現的,延時推流功能已經實現。