Golang中的竞态条件

语言: CN / TW / HK

编写多线程程序是一项重要的工作,需要在编写前规划。如果您使用的是单线程语言,例如 JavaScript,了解基础知识就行了。但如果您熟悉 C 或 C++ 等服务端编程语言,他们多线程概念是相似的,用法略有区别。在这篇博文中,我想解释竞态条件是如何发生的,以及如何使用 Golang 实现同步数据访问。

什么是竞态条件?

当多个线程尝试访问和修改相同的数据(内存地址)时,就会出现竞态条件。例如,如果一个线程试图增加一个整数而另一个线程试图读取它,这将导致竞态条件。另一方面,如果变量是只读的,就不会有竞态条件。在 golang 中,当使用 Goroutines 时,线程是隐式创建的。

让我们尝试创建一个竞态条件。最简单的方法是使用多个 goroutines,并且至少一个 goroutines 必须写入共享变量。

以下代码演示了一种创建竞态条件的简单方法。

  • goroutine 读取名为“sharedInt”的变量

  • 另一个 goroutine 通过增加它的值来写入同一个变量。

package main

import "time"

// This is an example of race condition
// 2 goroutines tries to read&write sharedInt and there is no access control.

var sharedInt int = 0
var unusedValue int = 0

func runSimpleReader() {
 for {
  var val int = sharedInt
  if val%10 == 0 {
   unusedValue = unusedValue + 1
  }
 }
}

func runSimpleWriter() {
 for {
  sharedInt = sharedInt + 1
 }
}

func startSimpleReadWrite() {
 go runSimpleReader()
 go runSimpleWriter()
 time.Sleep(10 * time.Second)
}

如果您运行此代码,并不会导致崩溃,但读 goroutine 会访问“sharedInt”的过时副本。如果你使用内置的竞态条件检查器运行代码,go 编译器会提示这个问题。

go run -race .

关于 Golang 竞态条件检查器的一个小说明:如果您的代码偶尔访问共享变量,它可能无法检测到竞态条件。为了检测它,代码应该在高负载下运行,并且必须发生竞态条件。

您可以看到竞态条件检查器的输出。它提示数据访问不同步。

我们如何解决这个问题?如果共享数据是单个变量,我们可以使用 sync/atomic 包中提供的计数器。在下面的示例中,我们可以使用 atomic.LoadInt64()/atomic.AddInt64() 对来访问它,而不是直接访问共享变量。竞态条件检查器将不再提示不同步的数据访问。

package main

import (
 "sync/atomic"
 "time"
)

var sharedIntForAtomic int64 = 0
var unusedValueForAtomic int = 0

func runAtomicReader() {
 for {
  var val int64 = atomic.LoadInt64(&sharedIntForAtomic)
  if val%10 == 0 {
   unusedValueForAtomic = unusedValueForAtomic + 1
  }
 }
}

func runAtomicWriter() {
 for {
  atomic.AddInt64(&sharedIntForAtomic, 1)
 }
}

func startAtomicReadWrite() {
 go runAtomicReader()
 go runAtomicWriter()
 time.Sleep(10 * time.Second)
}

这解决了我们在使用原始变量时的问题,但是在很多情况下我们需要访问多个变量并使用复杂的数据结构。在这些情况下,使用互斥锁来控制访问更容易解决问题。

以下示例演示了对Map的非同步访问。使用复杂的数据结构时,竞态条件可能会导致崩溃。因此,如果我们在没有启用竞态检查的情况下运行此示例,go 运行时将提示并发访问,并且进程将退出。

fatal error: concurrent map read and map write
package main

import "time"

// This is an example of race condition
// 2 goroutines tries to read&write sharedMap and there is no access control.
// This code should raise a panic condition

var sharedMap map[string]int = map[string]int{}

func runSimpleMapReader() {
 for {
  var _ int = sharedMap["key"]
 }
}

func runSimpleMapWriter() {
 for {
  sharedMap["key"] = sharedMap["key"] + 1
 }
}

func startMapReadWrite() {
 sharedMap["key"] = 0

 go runSimpleMapReader()
 go runSimpleMapWriter()
 time.Sleep(10 * time.Second)
}

可以通过控制对临界区的访问来解决此问题。在这个例子中,临界区是我们读写“sharedMap”的地方。在下面的示例中,我们调用 mutex.Lock()mutex.Unlock() 对来控制访问。

互斥锁是如何工作的?

  • 互斥锁是在解锁状态下创建的。

  • 当第一次调用 mutex.Lock() 时,互斥锁状态更改为 Locked。

  • 对 mutex.Lock() 的任何其他调用都将阻塞 goroutine,直到调用 mutex.Unlock()

  • 因此,只有一个线程可以访问临界区。

例如,我们可以使用互斥锁来控制对临界区的访问。我添加了一个上下文来在工作 2 秒后取消 goroutine。

package main

import (
 "context"
 "fmt"
 "sync"
 "time"
)

var sharedMapForMutex map[string]int = map[string]int{}
var mapMutex = sync.Mutex{}
var mutexReadCount int64 = 0

func runMapMutexReader(ctx context.Context, readChan chan int) {
 readCount := 0
 for {
  select {
  case <-ctx.Done():
   fmt.Println("reader exiting...")
   readChan <- readCount
   return
  default:
   mapMutex.Lock()
   var _ int = sharedMapForMutex["key"]
   mapMutex.Unlock()
   readCount += 1
  }
 }
}

func runMapMutexWriter(ctx context.Context) {
 for {
  select {
  case <-ctx.Done():
   fmt.Println("writer exiting...")
   return
  default:
   mapMutex.Lock()
   sharedMapForMutex["key"] = sharedMapForMutex["key"] + 1
   mapMutex.Unlock()
   time.Sleep(100 * time.Millisecond)
  }
 }
}

func startMapMutexReadWrite() {
 testContext, cancel := context.WithCancel(context.Background())

 readCh := make(chan int)
 sharedMapForMutex["key"] = 0

 numberOfReaders := 15
 for i := 0; i < numberOfReaders; i++ {
  go runMapMutexReader(testContext, readCh)
 }
 go runMapMutexWriter(testContext)

 time.Sleep(2 * time.Second)

 cancel()

 totalReadCount := 0
 for i := 0; i < numberOfReaders; i++ {
  totalReadCount += <-readCh
 }

 time.Sleep(1 * time.Second)

 var counter int = sharedMapForMutex["key"]
 fmt.Printf("[MUTEX] Write Counter value: %v\n", counter)
 fmt.Printf("[MUTEX] Read Counter value: %v\n", totalReadCount)
}

如果我们运行示例代码,go 运行时将不再提示并发读取和写入问题,因为一次只有一个 goroutine 可以访问临界区。在示例中,我使用了 15 个读取器 goroutine 和一个写入器 goroutine。每 100 毫秒更新一次“sharedMap”。在这种情况下,最好使用 RWMutex(读/写互斥锁)。它类似于互斥锁,但它还有另一种锁定机制,可以让多个读者在安全的情况下访问临界区。当写入很少并且读取更常见时,这可能会表现得更好。

RWMutex 是如何工作的?

  • 简单来说,如果没有写入者,多个读可以同时访问临界区。如果写入者试图访问临界区,则所有读取都会被阻止。当写入很少并且读取很常见时,这会更有效。

  • rwMutex.Lock() 和 rwMutex.Unlock() 的工作方式类似于互斥锁-解锁机制。

  • 如果互斥锁处于解锁状态,rwMutex.RLock() 不会阻止任何读取器。这允许多个读者同时访问临界区。

  • 当 rwMutex.Lock() 被调用时;调用者被阻塞,直到所有读者都调用 rwMutex.RUnlock()。此时,任何对 RLock() 的调用都开始阻塞,直到调用 rwMutex.Unlock()。这可以防止发生任何饥饿。

  • 当 rwMutex.Unlock() 被调用时;RLock() 的所有调用者都被解除阻塞并且可以访问临界区。

package main

import (
 "context"
 "fmt"
 "sync"
 "time"
)

var sharedMapForRWMutex map[string]int = map[string]int{}
var mapRWMutex = sync.RWMutex{}
var rwMutexReadCount int64 = 0

func runMapRWMutexReader(ctx context.Context, readChan chan int) {
 readCount := 0
 for {
  select {
  case <-ctx.Done():
   fmt.Println("reader exiting...")
   readChan <- readCount
   return
  default:
   mapRWMutex.RLock()
   var _ int = sharedMapForRWMutex["key"]
   mapRWMutex.RUnlock()
   readCount += 1
  }
 }
}

func runMapRWMutexWriter(ctx context.Context) {
 for {
  select {
  case <-ctx.Done():
   fmt.Println("writer exiting...")
   return
  default:
   mapRWMutex.Lock()
   sharedMapForRWMutex["key"] = sharedMapForRWMutex["key"] + 1
   mapRWMutex.Unlock()
   time.Sleep(100 * time.Millisecond)
  }
 }
}

func startMapRWMutexReadWrite() {
 testContext, cancel := context.WithCancel(context.Background())

 readCh := make(chan int)
 sharedMapForRWMutex["key"] = 0

 numberOfReaders := 15
 for i := 0; i < numberOfReaders; i++ {
  go runMapRWMutexReader(testContext, readCh)
 }
 go runMapRWMutexWriter(testContext)

 time.Sleep(2 * time.Second)

 cancel()

 totalReadCount := 0
 for i := 0; i < numberOfReaders; i++ {
  totalReadCount += <-readCh
 }

 time.Sleep(1 * time.Second)

 var counter int = sharedMapForRWMutex["key"]
 fmt.Printf("[RW MUTEX] Write Counter value: %v\n", counter)
 fmt.Printf("[RW MUTEX] Read Counter value: %v\n", totalReadCount)
}

Mutex 与 RWMutex 性能对比

我运行了五次示例并比较了平均值。结果,RWMutex 执行的读取操作增加了 14.35%。但请注意,这个例子是在特定场景下进行的,因为有 15 个读取器 goroutine 和一个写入器 goroutine。

总结

在这篇博文中,我试图回顾导致竞争条件的非同步数据访问的基础知识,来进一步讨论如何避免竞态条件的发生问题。根据我的个人经验,我更愿意让每个 goroutine 上下文中使用自己的局部变量,并通过使用通道传播数据。设计通过通道或队列进行通信的单线程组件更容易。如果这种方法并不适用于你的场景,这时互斥锁可以派上用场。

推荐

K8s Pod优雅关闭,没你想象的那么简单!

分享下云原生技术之外的另类话题

原创不易,随手关注或者”在看“,诚挚感谢!