Golang 基礎:原生併發 goroutine channel 和 select 常見使用場景

語言: CN / TW / HK

本文為極客時間 Go 語言第一課 相關章節學習筆記及思考。

@[toc]

Go 語言之父 Rob Pike 的經典名言:“不要通過共享記憶體來通訊,應該通過通訊來共享記憶體(Don’t communicate by sharing memory, share memory by communicating)

C/C++ 執行緒的複雜性:

執行緒退出時要考慮新建立的執行緒是否要與主執行緒分離(detach) 還是需要主執行緒等待子執行緒終止(join)並獲取其終止狀態? 又或者是否需要在新執行緒中設定取消點(cancel point)來保證被主執行緒取消(cancel)的時候能順利退出

goroutine 是由 Go 執行時(runtime)負責排程的、輕量的使用者級執行緒。

優勢: 1. 佔用記憶體小,goroutine 初始棧只有 2k,比 Linux 執行緒小多了 2. 使用者態排程,不需要核心介入,代價更小 3. 一退出就會被回收 4. 提供 channel 通訊

無論是 Go 自身執行時程式碼還是使用者層 Go 程式碼,都無一例外地執行在 goroutine 中。

goroutine

呼叫函式、方法(具名、匿名、閉包都可以)時,前面加上 go 關鍵字,就會建立一個 goroutine。

goroutine 排程原理

Goroutine 排程器的任務:將 Goroutine 按照一定演算法放到不同的作業系統執行緒中去執行。

演進: - G-M 模型(廢棄):將 G(Goroutine) 排程到 M(Machine) 上執行 - G-P-M 模型(使用中):增加中間層 P(Processor),提供佇列管理多個 G,然後在合適的時候繫結 M。先後使用協作式、搶佔式排程。 - NUMA 排程模型(尚未實現)

【G-P-M排程圖】

圖片來自:https://time.geekbang.org/column/article/476643

  • G:儲存 Goroutine 的執行資訊,包括:棧、狀態
  • P:邏輯處理器,有一個待排程的 G 佇列
  • M:真正的計算資源,Go 程式碼執行的真實載體(使用者態執行緒),要執行 G 需要繫結 P,繫結後會從 P 的本地佇列和全域性佇列獲取 G 然後執行

```

//src/runtime/runtime2.go type g struct { stack stack // offset known to runtime/cgo sched gobuf goid int64 gopc uintptr // pc of go statement that created this goroutine startpc uintptr // pc of goroutine function ... ... }

type p struct { lock mutex

id          int32
status      uint32 // one of pidle/prunning/...

mcache      *mcache
racectx     uintptr

// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq     [256]guintptr

runnext guintptr

// Available G's (status == Gdead)
gfree    *g
gfreecnt int32

... ...

}

type m struct { g0 g // goroutine with scheduling stack mstartfn func() curg g // current running goroutine ... ... } ```

從 Go 1.2 以後,Go 排程器使用 G-P-M 模型,排程目標:公平地將 G 排程到 P 上執行。

排程策略: 1. 常規執行,G 執行超出時間片後搶佔排程 2. G 阻塞在 channel 或者 I/O 上時,會被放置到等待佇列,M 會嘗試執行 P 的下一個可執行 G;當 G 可執行時,會被喚醒並修改狀態,然後放到某個 P 的佇列中,等待被繫結 M、執行 3. G 阻塞在 syscall 上時,執行 G 的 M 也會受影響,會解綁 P、進入掛起狀態;syscall 返回後,G 會嘗試獲取可用的 P,沒獲取到的話,修改狀態,等待被執行

如果一個 G 任務執行 10ms,sysmon 就會認為它的執行時間太久而發出搶佔式排程的請求。 一旦 G 的搶佔標誌位被設為 true,那麼等到這個 G 下一次呼叫函式或方法時,執行時就可以將 G 搶佔並移出執行狀態,放入佇列中,等待下一次被排程。

``` // $GOROOT/src/runtime/proc.go

// forcePreemptNS is the time slice given to a G before it is // preempted. const forcePreemptNS = 10 * 1000 * 1000 // 10ms ```

channel

和執行緒一樣,一個應用內部啟動的所有 goroutine 共享程序空間的資源,如果多個 goroutine 訪問同一塊記憶體資料,將會存在競爭。

Go 提供了 channel 作為 goroutine 之間的通訊方式,goroutine 可以從 channel 讀取資料,處理後再把資料寫到 channel 中。

channel 是和 切片、map 類似的複合型別,使用時需要指定具體的型別:

c := make(chan int) //c 是一個 int 型別的 channel

和函式一樣,channel 也是“第一公民” 身份,可以做變數、引數、返回值等。

```

func spawn(f func() error) <-chan error { c := make(chan error)

go func() {
    c <- f()
}()

return c

}

func main() { c := spawn(func() error { time.Sleep(2 * time.Second) return errors.New("timeout") }) fmt.Println(<-c) } ```

main goroutine 與子 goroutine 之間建立了一個元素型別為 error 的 channel,子 goroutine 退出時,會將它執行的函式的錯誤返回值寫入這個 channel,main goroutine 可以通過讀取 channel 的值來獲取子 goroutine 的退出狀態。

channel 的不同型別

通過 make 可以建立 2 種類型的 channel: 1. 無緩衝:讀寫是同步進行,沒有對接人的話會一直阻塞著 2. 有緩衝:有資料時讀不會阻塞;未滿時寫資料不會阻塞

下面是無 buffer channel 的測試例子: ``` func testNoBufferChannel() { var c chan int = make(chan int) //無緩衝,同步進行,沒有對接人,就會阻塞住 //var c chan int = make(chan int, 5) //有緩衝,容量為 5

//大多數時候,讀寫要在不同 goroutine,尤其是無緩衝 channel
go func() {
    fmt.Println("goroutine run")
    b := <-c //讀取 channel
    fmt.Println("read from channel: ", b)
}()

fmt.Println("main goroutine before write")
c <- 1  //沒有 buffer,寫入 channel 時會阻塞,直到有讀取
fmt.Println("main goroutine finish")

} ```

執行結果:

main goroutine before write goroutine run read from channel: 1 main goroutine finish

和預期一致,主 goroutine 在寫入無 buffer 的 channel 時會阻塞,直到 子 goroutine 讀取。

下面是有 buffer channel 的測試例子:

``` func testBufferChannel() { c := make(chan int, 1) //有緩衝,容量為 5

//大多數時候,讀寫要在不同 goroutine,尤其是無緩衝 channel
go func() {
    fmt.Println("child_goroutine run")
    b := <-c //讀取 channel,有資料時不會阻塞
    fmt.Println("child_goroutine read from channel: ", b)
}()

fmt.Println("main goroutine before write first")
c <- 1  //有 buffer,寫入 channel 時不會阻塞,除非滿了
fmt.Println("main_goroutine first write finish, len:", len(c))

fmt.Println("main_goroutine write second:")
c <-2
fmt.Println("main_goroutine finish, len:", len(c))

time.Sleep( 3 * time.Second)    //不加這個子 goroutine 沒執行就退出了

} ```

執行結果:

main goroutine before write first main_goroutine first write finish, len: 1 main_goroutine write second: child_goroutine run child_goroutine read from channel: 1 main_goroutine finish, len: 1

可以看到 1. 第一次寫完立刻就返回;第二次寫時,因為這個 goroutine 已經滿了,所以阻塞在寫上 2. 子 goroutine 讀取了一次,主 goroutine 才從寫上返回

作為引數的單向型別

  1. 只發送, chan<-
  2. 只接收, <-chan

``` func testSingleDirectionChannel() {

f := func(a chan<- int, b <- chan int) {    //a 是隻能寫入,b 是隻能讀取
    x := <- a   //編譯報錯:Invalid operation: <- a (receive from send-only type chan<- int)
    b <- 2      //編譯報錯:nvalid operation: b <- 2 (send to receive-only type <-chan int)
}

} ```

通常只發送 channel 型別和只接收 channel 型別,會被用作函式的引數型別或返回值,用於限制對 channel 內的操作,或者是明確可對 channel 進行的操作的型別

普通channel,可以傳入函式作為只發送或只接收型別

關閉 channel

close(channel) 後,不同語句的結果:

``` func testCloseChannel() { a := make(chan int) close(a) //先關閉,然後看下幾種讀取關閉 channel 的結果 b := <- a fmt.Println("關閉後直接讀取:", b) //0 c, ok := <-a fmt.Println("關閉後通過逗號 ok 讀取:", c, ok) //0 false

for v := range a{   //關閉的話直接跳過
    fmt.Println("關閉後通過 for-range 讀取", v)
}

} ```

通過“comma, ok” 我們可以知道 channel 是否被關閉。

一般由傳送端負責關閉 channel,原因: 1. 向一個關閉的 channel 中傳送資料,會 panic (⚠️注意了!!!) 2. 傳送端沒有辦法判斷 channel 是否已經關閉。

len(channel)

當 ch 為無緩衝 channel 時,len(ch) 總是返回 0;當 ch 為帶緩衝 channel 時,len(ch) 返回當前 channel ch 中尚未被讀取的元素個數。

如果只是想知道 channel 中是否有資料、不想阻塞,可以使用 len(channel) 先做檢查:

【len(channel) 的圖】

nil channel

預設讀取一個關閉的 channel,會返回零值。但是讀取一個 nil channel,操作將阻塞。

所以在有些場景下,可能需要手動修改 channel 為 nil,以實現阻塞的效果,比如在 select 語句中。

無緩衝 channel 的常見用途 🔥

Go 語言倡導:

Do not communicate by sharing memory; instead, share memory by communicating. 不要通過共享記憶體來通訊,而是通過通訊來共享記憶體

多 goroutine 通訊:訊號

基於無 buffer channel,可以實現一對一和一對多的訊號傳遞。

1.一對一 ``` type signal struct{}

//接收一個函式,在子 routine 裡執行,然後返回一個 channel,用於主 routine 等待 func spawn(f func()) <-chan signal { c := make(chan signal) go func() { fmt.Println("exec f in child_routine"); f(); fmt.Println("f exec finished, write to channel") c<- signal{} }() return c }

//測試使用無 buffer channel 實現訊號 func testUseNonBufferChannelImplSignal() { //模擬主 routine 等待子 routine

worker := func() {
    fmt.Println("do some work")
    time.Sleep(3 * time.Second)
}

fmt.Println("start a worker...")
c := spawn(worker)

fmt.Println("spawn finished, read channel...")
<-c //讀取,阻塞等待

fmt.Println("worker finished")

} ```

上面的程式碼中,主 routine 建立了一個函式,然後在子 routine 中執行,主 routine 阻塞在一個 channel 上,等待子 routine 完成後繼續執行。

執行結果:

start a worker... spawn finished, read channel... exec f in child_routine do some work f exec finished, write to channel worker finished

可以看到,這樣的確實現了類似“訊號”的機制:在一個 routine 中通知另一個 routine。 如果 channel 的型別複雜些,就可以傳遞任意資料了!

struct{} 大小是0,不佔記憶體

2.一對多

關閉一個無 buffer channel 會讓所有阻塞在這個 channel 上的 read 操作返回,基於此我們可以實現 1 對 n 的“廣播”機制。

``` var waitGroup sync.WaitGroup

func spawnGroup(f func(ind int), count int, groupSignal chan struct{}) <-chan signal { c := make(chan signal) //用於讓主 routine 阻塞的 channel waitGroup.Add(count) //等待總數

//建立 n 個 goroutine
for i := 0; i < count; i++ {
    go func(index int) {
        <- groupSignal  //讀取阻塞,等待通知執行

        //fmt.Println("exec f in child_routine, index: ", i);
        //⚠️注意上面註釋的程式碼,這裡不能直接訪問 for 迴圈的 i,因為這個是複用的,會導致訪問的值不是目標值

        fmt.Println("exec f in child_routine, index: ", index);
        f(index);
        fmt.Println(index , " exec finished, write to channel")

        waitGroup.Done()
    }(i + 1)
}

//建立通知主 routine 結束的 routine,不能阻塞當前函式
go func() {
    //需要同步等待所有子 routine 執行完
    waitGroup.Wait()
    c <- signal{}   //寫入資料
}()
return c

}

func testUseNonBufferChannelImplGroupSignal() { worker := func(i int) { fmt.Println("do some work, index ", i) time.Sleep(3 * time.Second) }

groupSignal := make(chan struct{})
c := spawnGroup(worker, 5, groupSignal)

fmt.Println("main routine: close channel")
close(groupSignal)  //通知剛建立的所有 routine


fmt.Println("main routine: read channel...")
<- c    //阻塞在這裡

fmt.Println("main routine: all worker finished")

}

```

上面的程式碼做了這些事: 1. 建立 channelA,傳遞給多個 goroutine 2. 子 routine 中讀取等待這個 channelA 3. 主 routine 關閉 channel,然後阻塞在 channelB 上,此時所有子 routine 開始執行 4. 所有子 routine 執行完後,通過 channelB 喚醒主 routine

執行結果:

main routine: close channel main routine: read channel exec f in child_routine, index: 2 do some work, index 2 exec f in child_routine, index: 1 do some work, index 1 exec f in child_routine, index: 3 do some work, index 3 exec f in child_routine, index: 4 do some work, index 4 exec f in child_routine, index: 5 do some work, index 5 4 exec finished, write to channel 5 exec finished, write to channel 3 exec finished, write to channel 1 exec finished, write to channel 2 exec finished, write to channel main routine: all worker finished

一句話總結: 用 2 個 channel 實現了 【主 routine 通知所有子 routine 開始】 和【子 routine 通知主 routine 任務結束】。

多 goroutine 同步:通過阻塞,替代鎖

``` type NewCounter struct { c chan int i int }

func CreateNewCounter() *NewCounter { counter := &NewCounter{ c: make(chan int), i: 0, }

go func() {
    for {
        counter.i ++
        counter.c <- counter.i      //每次加一,阻塞在這裡
    }
}()

return counter

}

func (c *NewCounter)Increase() int { return <- c.c //讀取到的值,是上一次加一 }

//多協程併發增加計數,通過 channel 寫入阻塞,讀取時加一 func testCounterWithChannel() { fmt.Println("\ntestCounterWithChannel ->>>")

group := sync.WaitGroup{}
counter := CreateNewCounter()

for i:=0 ; i<10 ; i++ {
    group.Add(1)

    go func(i int) {
        count := counter.Increase()
        fmt.Printf("Goroutine-%d, count %d \n", i, count)
    }(i)
}

group.Wait()

} ```

上面的程式碼中,我們建立了一個單獨的協程,在其中迴圈增加計數,但每次加一後,就會嘗試寫入 channel(無 buffer 的),在沒有人讀取時,會阻塞在這個方法上。

然後在 10 個協程裡併發讀取 channel,從而實現每次讀取遞增。

帶緩衝 channel 的常見用途 🔥

訊息佇列

channel 的特性符合對訊息佇列的要求: 1. 跨 goroutine 訪問安全 2. FIFO 3. 可設定容量 4. 非同步收發

Go 支援 channel 的初衷是將它作為 Goroutine 間的通訊手段,它並不是專門用於訊息佇列場景的。 如果你的專案需要專業訊息佇列的功能特性,比如支援優先順序、支援權重、支援離線持久化等,那麼 channel 就不合適了,可以使用第三方的專業的訊息佇列實現。

計數訊號量

由於帶 buffer channel 的特性(容量滿時寫入會阻塞),可以用它的容量表示同時最大併發數量。

下面是一個例子: ``` var active = make(chan struct{}, 3) //"訊號量",最多 3 個 var jobs = make(chan int, 10)

//使用帶快取的 channel,容量就是訊號量的大小 func testSemaphoreWithBufferChannel() {

//先寫入資料,用作表示任務
go func() {
    for i:= 0; i < 9; i++ {
        jobs <- i + 1
    }
    close(jobs)
}()

var wg sync.WaitGroup

for j := range jobs {
    wg.Add(1)

    //執行任務
    go func(i int) {
        //通知開始執行,當容量用完時,阻塞
        active <- struct{}{}

        //fmt.Println("exec job ", i)
        log.Printf("exec job: %d, length of active: %d \n", i, len(active))
        time.Sleep(2 * time.Second)

        //執行完,通知結束
        <- active
        wg.Done()

    }(j)
}

wg.Wait()

} ```

上面的程式碼中,我們用 channel jobs 表示要執行的任務(這裡為 8 個),然後用 channel active 表示訊號量(最多三個)。

然後在 8 個 goroutine 裡執行任務,每個任務耗時 2s。在每次執行任務前,先寫入 channel 表示獲取訊號量;執行完後讀取,表示釋放訊號量。

由於訊號量最多三個,所以同一時刻最多能有 3 個任務得以執行。

執行結果如下,符合預期:

2022/04/20 19:14:26 exec job: 1, length of active: 1 2022/04/20 19:14:26 exec job: 9, length of active: 2 2022/04/20 19:14:26 exec job: 5, length of active: 3 2022/04/20 19:14:28 exec job: 6, length of active: 3 2022/04/20 19:14:28 exec job: 7, length of active: 3 2022/04/20 19:14:28 exec job: 8, length of active: 3 2022/04/20 19:14:30 exec job: 3, length of active: 3 2022/04/20 19:14:30 exec job: 2, length of active: 3 2022/04/20 19:14:30 exec job: 4, length of active: 3

select

當需要在一個 goroutine 同時讀/寫多個 channel 時,可以使用 select:

類似 Linux 的 I/O 多路複用思路,我們可以叫它:goroutine 多路複用。

```

func testSelect() { channelA := make(chan int) channelB := make(chan int)

go func() {
    var readA bool
    var readB bool

    for {
        select {
        case x := <- channelA:
            fmt.Println("child_routine: read from channelA:", x)
            readA = true
        case y := <- channelB:
            fmt.Println("child_routine:  read from channelB:", y)
            readB = true
        //default:
        //  //其他 case 阻塞,就執行 default
        //  fmt.Println("default")
        }

        if readA && readB {

            fmt.Println("child_goroutine finish")
            return;
        } else {
            fmt.Println("child_goroutine still loop, ", readA, readB)
        }
    }
}()

fmt.Println("main goroutine")

time.Sleep(2 * time.Second)

fmt.Println("main goroutine, write to channelA")
channelA <- 111
fmt.Println("main goroutine, write to channelA finish")

time.Sleep(1 * time.Second)

fmt.Println("main goroutine, write to channelB")
channelB <- 111
fmt.Println("main goroutine, write to channelB finish")

time.Sleep( 5 * time.Second)
fmt.Println("main goroutinefinish")

} ```

輸出:

main goroutine main goroutine, write to channelA main goroutine, write to channelA finish child_routine: read from channelA: 111 child_goroutine still loop, true false main goroutine, write to channelB main goroutine, write to channelB finish child_routine: read from channelB: 111 child_goroutine finish main goroutinefinish

可以看到: 1. 使用 select 在一個 goroutine 裡讀取了 2 個 channel 2. 這 2 個 case 裡的 channel 都不可讀時,select 阻塞,只會執行 default,不會執行 select 程式碼塊以外的 3. 主 goroutine 寫入資料後,select 的其中一個 case 返回,然後繼續執行 select 後面的邏輯 4. 下一輪迴圈後 2 個 case 都不可讀,繼續阻塞 5. 然後主 goroutine 寫入後,另外一個 case 也返回,迴圈結束

channel 與 select 結合的常見用途 🔥

利用 default 分支避免阻塞

select 的 default 分支語義:當所有 case 語句裡讀/寫 channel 阻塞時,會執行 default!

無論 channel 是否有 buffer。

有些時候,我們可能不希望阻塞在寫入 channel 上,那可以利用 select default 的特性,這樣封裝一個函式,當寫入阻塞時,返回一個 false,讓外界可以處理阻塞的情況: func tryWriteChannel(c chan<- int, value int) bool { select { case c <- value return true default: //其他沒就緒時,會執行 return false } }

這樣使用:

//active <- 1 //之前直接寫 channel,如果滿了,就會阻塞 writed := tryWriteChannel(active, 1) //改成這樣,可以在阻塞時,處理相關邏輯 if !writed { log.Println("failed to write channel") return }

實現超時

假如我們想在一個 channel 的讀/寫操作上加一個超時邏輯,可以通過這樣實現: 在 select 程式碼塊中,加一個 case,這個 case 會在超時後執行,這樣會結束其他 case。

比如這樣:

func tryGetSemaphore(c chan<- struct{}) bool { select { case c <- struct {}{}: return true case <- time.After(1 * time.Second): //在寫 channel 的基礎上,額外加一個情況,超時情況 log.Println("timeout!!!") //1s 後返回,可以在這裡做超時處理 return true } }

及時呼叫 timer 的 Stop 方法回收 Timer 資源。

心跳機制

迴圈執行一個額外的 case,這個 case 會定時返回。

```

func worker() { heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-c: // ... do some stuff case <- heartbeat.C: //... do heartbeat stuff } } } ```

time.NewTicker 會建立一個定時執行的心跳,可以把這個 ticker channel 讀取的操作放到一個 case 裡,這樣 select 程式碼塊就會定時執行一次。

ticker 也要及時 Stop。

總結

本文介紹了 Golang 中通過 goroutine channel 和 select 實現併發操作的一些典型場景。

可以看到,通過 goroutine 實現併發是如此的簡單;通過 channel 無 buffer 和有 buffer,實現一些 goroutine 同步機制也比較方便;結合 select,實現 goroutine 的統一管理。

在學習一門語言時,既要結合已有的語言知識,也要吸收新語言的設計思想。

需要記住的是,Go 提倡通過 CSP 模型(communicating sequential processes 通訊順序程序)進行通訊,而不是傳統語言的共享記憶體方式。

CSP:兩個獨立的併發實體通過共享 channel(管道)進行通訊的併發模型。

我們在遇到多 goroutine 通訊、同步的情況,可以儘量多使本文的內容進行處理。

不過對於某些情況,也可以使用 go 提供的 sync 包下的內容,進行區域性同步。下篇文章我們就來看看這些內容。

對於區域性情況,比如涉及效能敏感的區域或需要保護的結構體資料時,我們可以使用更為高效的低階同步原語(如 mutex),保證 goroutine 對資料的同步訪問。