Golang 基礎:底層併發原語 Mutex RWMutex Cond WaitGroup Once等使用和基本實現

語言: CN / TW / HK

@[toc]

上一篇 《原生併發 goroutine channel 和 select 常見使用場景》 介紹了基於 CSP 模型的併發方式。

除了 CSP,Go 通過 sync 包以及 atomic 包還提供了一些更底層的同步 API,一般用於效能要求比較高的場景。

sync.Mutex 實現的同步機制的效能要比 channel 實現的高出三倍多。

在這裡插入圖片描述

在 sync/mutex.go 中,有這麼一段註釋: // Package sync provides basic synchronization primitives such as mutual // exclusion locks. Other than the Once and WaitGroup types, most are intended // for use by low-level library routines. Higher-level synchronization is // better done via channels and communication. // // Values containing the types defined in this package should not be copied.

註釋的大概意思是,sync 包提供的是底層併發原語,一般給底層庫用的,如果是上層業務同步,最好還是使用 channel。

還有一點,在日常編碼中,不要使用拷貝的併發包物件。首先拷貝的物件,和原本不是一個內容,狀態不一致;其次如果被拷貝的鎖物件已經是鎖定狀態,可能會導致副本也是鎖定狀態,超出預期。

如果需要多處使用,可以使用全域性變數或者指標傳遞(& 建立,* 使用)。

只有擁有資料物件所有權(從 channel 接收到該資料)的 Goroutine 才可以對該資料物件進行狀態變更。

互斥鎖 Mutex

Mutex 和 RMMutex 的作用和 Java 裡的類似,主要來看下 API 和基本實現。

使用:

mu := sync.Mutex{} mu.Lock() //加互斥鎖 mu.Unlock()

實現:

``` // A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 }

```

實現很簡單,一個是狀態,另一個是訊號量。

拷貝使用 Mutex 的問題

來通過程式碼看一下拷貝使用 sync.Mutex 的問題。

``` var num int = 1 func testCopyMutex() { mu := sync.Mutex{} waitGroup := sync.WaitGroup{}

waitGroup.Add(1)

go func(copyMu sync.Mutex) {
    copyMu.Lock()
    num = 100
    fmt.Println("update num from sub-goroutine: ", num)
    time.Sleep(5 * time.Second)
    fmt.Println("read num from sub-goroutine: ", num)
    copyMu.Unlock()
    waitGroup.Done()
}(mu)

time.Sleep(time.Second)

mu.Lock()
num = 1
fmt.Println("read num from main: ", num)
mu.Unlock()

waitGroup.Wait()

} ```

上面的程式碼中,我們子 goroutine 裡先加鎖,然後修改了 num,在等待 5s 後,輸出了 num 的值,然後才釋放鎖;在這期間,主 goroutine 裡會嘗試獲取鎖,然後修改 num。

我們期望的是傳遞的同一個 Mutex,那子 goroutine 裡在釋放鎖之前,num 都是它修改後的值,但允許的結果卻讓人意外:

update num from sub-goroutine: 100 read num from main: 1 read num from sub-goroutine: 1

可以看到,在真正執行時,子 goroutine 加鎖的時間內,主 goroutine 居然也可以訪問到 num。

問題的原因就在於傳遞的是 Mutex 的值。

在這裡插入圖片描述

如果改成傳遞指標,結果就會符合預期:

go func(copyMu *sync.Mutex) { //引數型別加 * copyMu.Lock() num = 100 fmt.Println("update num from sub-goroutine: ", num) time.Sleep(5 * time.Second) fmt.Println("read num from sub-goroutine: ", num) copyMu.Unlock() waitGroup.Done() }(&mu) //傳遞指標

為什麼傳遞值會有問題呢?

原因在於,Mutex 複製後,狀態分離。子 goroutine 對副本加鎖,主 goroutine 感知不到,因為它們使用的不是同一份資料了!

``` // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }

```

讀寫鎖 RWMutex

使用:

``` rmu := sync.RWMutex{} rmu.RLock() //讀鎖 rmu.RUnlock() rmu.Lock() //寫鎖 rmu.Unlock()

rmu.RLocker().Lock()        //通過 RLock 實現
rmu.RLocker().Unlock()

```

實現:

// A RWMutex is a reader/writer mutual exclusion lock. // The lock can be held by an arbitrary number of readers or a single writer. // The zero value for a RWMutex is an unlocked mutex. // // A RWMutex must not be copied after first use. // // If a goroutine holds a RWMutex for reading and another goroutine might // call Lock, no goroutine should expect to be able to acquire a read lock // until the initial read lock is released. In particular, this prohibits // recursive read locking. This is to ensure that the lock eventually becomes // available; a blocked Lock call excludes new readers from acquiring the // lock. type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers }

可以看到,RWMutex 的成員有一個互斥鎖(用於在寫入時獲取),讀寫者的訊號量,讀者數量等。

RWMutext 讀不阻塞讀,但會阻塞寫。

``` // Lock locks rw for writing. // If the lock is already locked for reading or writing, // Lock blocks until the lock is available. func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } }

```

在呼叫寫鎖時,就是獲取其中的互斥鎖。

再多廢話一句:讀寫鎖適合併發量級比較大,且讀的次數大於寫的情況。

互斥鎖和讀寫鎖的注意點:

  • 減少鎖的範圍
  • 千千萬萬記得 unlock,可以早點寫 defer unlock,避免忘記

條件變數 Cond

sync.Cond,在需要“等待某個條件成立”的場景下使用,使用的很少。

支援多個 goroutine 等待某個條件,等條件允許後,廣播喚醒這些 goroutine 執行。

比起輪訓互斥的狀態,條件變數消耗資源更小,實現也更簡單。

使用:

``` cond := sync.NewCond(&sync.Mutex{}) ////引數為 sync.Locker 介面型別

go func() {
    //    cond.L.Lock()
    //    for !condition() {
           cond.Wait()      //等待,一般放在迴圈裡,查詢一次,不滿足就阻塞(釋放鎖),等被喚醒後,再檢查下條件
    //    }
    //    ... make use of condition ...
    //    cond.L.Unlock()
}()

cond.L.Lock()   //獲取構造傳入的鎖
cond.Broadcast()    //通知所有等待的 goroutine,從 Wait 返回,重新獲取鎖
cond.Signal()   //通知一個
cond.L.Unlock()

```

sync.Cond.Wait 一般結合 for 迴圈使用,反覆檢查條件是否滿足。

實現: ``` // Cond implements a condition variable, a rendezvous point // for goroutines waiting for or announcing the occurrence // of an event. // // Each Cond has an associated Locker L (often a Mutex or RWMutex), // which must be held when changing the condition and // when calling the Wait method. // // A Cond must not be copied after first use. type Cond struct { noCopy noCopy

// L is held while observing or changing the condition
L Locker

notify  notifyList
checker copyChecker

}

// NewCond returns a new Cond with Locker l. func NewCond(l Locker) *Cond { return &Cond{L: l} }

```

可以看到,條件主要由一個鎖和一個等待喚醒的佇列組成。

func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() }

在等待一個條件時,會先加入等待佇列,然後釋放這個條件的鎖。

func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) }

廣播時,會通知所有等待的 goroutine 恢復執行 Wait 裡的邏輯,重新申請獲取鎖。

等待組 WaitGroup

在需要等待多個 goroutine 完成任務後繼續執行的場景,可以使用 sync.WaitGroup,和 Java 的 CountDownLaunch 類似。

使用: ``` waitGroup := sync.WaitGroup{} waitGroup.Add(1) //需要等待數為 1 go func() { waitGroup.Done() //減去需要等待數 }()

waitGroup.Wait()    //等待數為 0 才繼續執行,迴圈檢查

```

實現:

``` type WaitGroup struct { noCopy noCopy

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32

} ```

可以看到,WaitGroup 核心就是一個計數的 state,高位 32 位為數量,低位 32 位為等待的數量。

// Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // Increment waiters count. //... } }

當呼叫 waitGroup.Wait() 時,會去迴圈檢查 state,只有當高 32 位為 0(即當前執行的任務為 0)時才會返回,否則就會增加低為然後繼續迴圈。

可想而知,呼叫 Done 就是高位減 1,就暫不贅述了。

僅執行一次 Once

見字如面,sync.Once 用於保證傳入的函式只執行一次。

在有些高併發的場景下,可能會有這種需求:多個 goroutine 同時執行任務 A,哪個先跑完就去執行任務 B,跑得慢的不需要執行。

使用:

once := sync.Once{} once.Do(func() { fmt.Println("do the work that only need exec once") })

實現也很簡單:

type Once struct { // done indicates whether the action has been performed. // It is first in the struct because it is used in the hot path. // The hot path is inlined at every call site. // Placing done first allows more compact instructions on some architectures (amd64/386), // and fewer instructions (to calculate offset) on other architectures. done uint32 m Mutex }

Once 的實現就是一個狀態值和一個互斥鎖。

``` func (o *Once) Do(f func()) { // Note: Here is an incorrect implementation of Do: // // if atomic.CompareAndSwapUint32(&o.done, 0, 1) { // f() // } // // Do guarantees that when it returns, f has finished. // This implementation would not implement that guarantee: // given two simultaneous calls, the winner of the cas would // call f, and the second would return immediately, without // waiting for the first's call to f to complete. // This is why the slow path falls back to a mutex, and why // the atomic.StoreUint32 must be delayed until after f returns.

if atomic.LoadUint32(&o.done) == 0 {
    // Outlined slow-path to allow inlining of the fast-path.
    o.doSlow(f)
}

} func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } } ```

當首次執行時,會通過原子操作修改其中的 done 狀態(這個過程需要獲取互斥鎖)。後面再執行 Do,發現狀態不對,就不會執行了。

原子操作

在前面看併發包的一些實現時,發現多多少少都是使用 atomic 進行實現,比如 WaitGroup#Wait:

state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state)

atomic 原子操作,只能同步一個整型變數或自定義型別變數,更適合一些對效能十分敏感、併發量較大且讀多寫少的場合。

原子操作由底層硬體直接提供支援,是一種硬體實現的指令級的“事務” atomic 原子操作的特性:隨著併發量提升,使用 atomic 實現的共享變數的併發讀寫效能表現更為穩定,尤其是原子讀操作,和 sync 包中的讀寫鎖原語比起來,atomic 表現出了更好的伸縮性和高效能

無論整型變數和自定義型別變數,atomic的操作實質上針對的都是字長長度的指標。在64位cpu上就是8個位元組。因為CPU通過資料匯流排,一次從記憶體中最多隻能獲取一個字長的資訊。所以atomic的限制也是一個字長。

其他

雖然都在 sync 包中,但 sync.WaitGroup,Map,Pool 層級更高一些,是基於 Mutex、RWMutex 和 Cond 這三個基本原語之上實現的機制。

Go 團隊認為遞迴鎖或可重入鎖是一個不好的語法,所以不支援。