Golang基础笔记十五之sync

    正在检查是否收录...

Golang基础笔记十五之sync

本文首发于公众号:Hunter后端

原文链接:Golang基础笔记十五之sync

这一篇笔记介绍 Golang 中的 sync 模块。

sync 包主要提供了基础的同步原语,比如互斥锁,读写锁,等待组等,用于解决并发编程中的线程安全问题,以下是本篇笔记目录:

  1. WaitGroup-等待组
  2. sync.Mutex-互斥锁
  3. sync.RWMutex-读写锁
  4. sync.Once-一次性执行
  5. sync.Pool-对象池
  6. sync.Cond-条件变量
  7. sync.Map

1、WaitGroup-等待组

前面在第十篇我们介绍 goroutine 和 channel 的时候,在使用 goroutine 的时候介绍有一段代码如下:

package main import ( "fmt" "time" ) func PrintGoroutineInfo() { fmt.Println("msg from goroutine") } func main() { go PrintGoroutineInfo() time.Sleep(1 * time.Millisecond) fmt.Println("msg from main") } 

在这里,我们开启了一个协程调用 PrintGoroutineInfo() 函数,然后使用 time.Sleep() 来等待它调用结束。

然而在开发中,我们不能确定这个函数多久才能调用完毕,也无法使用准确的 sleep 时间来等待,那么这里就可以使用到 sync 模块的 WaitGroup 函数来等待一个或多个 goroutine 执行完毕。

下面是使用示例:

package main import ( "fmt" "math/rand" "sync" "time" ) func SleepRandSeconds(wg *sync.WaitGroup) { defer wg.Done() sleepSeconds := rand.Intn(3) fmt.Printf("sleep %d seconds\n", sleepSeconds) time.Sleep(time.Duration(sleepSeconds) * time.Second) } func main() { var wg sync.WaitGroup wg.Add(2) go SleepRandSeconds(&wg) go SleepRandSeconds(&wg) wg.Wait() fmt.Println("函数执行完毕") } 

在这里,我们通过 var wg sync.WaitGroup 定义了一个等待组,并通过 wg.Add(2) 表示添加了需要等待的并发数,在并发中我们将 &wg 传入并通过 wg.Done() 减少需要等待的并发数。

wg.Done() 函数内部,使用 wg.Add(-1) 减少需要等待的并发数,在 main 函数中,使用 wg.Wait() 进入阻塞状态,当等待的并发都完成后,此函数就会返回,完成等待并接着往后执行。

2、sync.Mutex-互斥锁

1. 数据竞态与互斥锁

当多个 goroutine 并发访问同一个共享资源,且至少有一个访问是写操作时,就会发生数据竞态,造成的结果就是程序每次运行的结果表现可能会不一致。

比如下面的示例:

var balance int func AddFunc() { balance += 1 } func main() { for range 100 { go AddFunc() } time.Sleep(5 * time.Second) fmt.Println("balance is: ", balance) } 

多次执行上面的代码,最终输出的 balance 的值可能都不一致。

如果一个变量在多个 goroutine 同时访问时,不会出现比如数据不一致或程序崩溃的情况,那么我们就称其是并发安全的。

我们可以使用 go run -race main.go 的方式来检测数据竞态,执行检测后,会输出数据竞态的一些信息,比如发生在代码的多少行,一共发生了多少次数据竞态:

================== WARNING: DATA RACE Read at 0x000003910df8 by goroutine 7: main.AddFunc() /../main.go:13 +0x24 Previous write at 0x000003910df8 by goroutine 6: main.AddFunc() /../main.go:13 +0x3c Goroutine 7 (running) created at: main.main() /../main.go:18 +0x32 Goroutine 6 (finished) created at: main.main() /../main.go:18 +0x32 ================== balance is: 98 Found 3 data race(s) exit status 66 

而要避免这种数据竞态的发生,我们可以限制在同一时间只能有一个 goroutine 访问同一个变量,这种方法称为互斥机制。

我们可以通过缓冲通道和 sync.Mutex 来实现这种互斥锁的操作。

2. 缓冲通道实现互斥锁

我们可以通过容量为 1 的缓冲通道来实现互斥锁的操作,保证同一时间只有一个 goroutine 访问同一个变量,下面是修改后的代码:

var sema = make(chan struct{}, 1) var balance int func AddFunc() { sema <- struct{}{} balance += 1 <-sema } func main() { for range 100 { go AddFunc() } time.Sleep(3 * time.Second) fmt.Println("balance is: ", balance) } 

在上面这段代码里,我们定义了 sema 这个容量为 1 的通道,在每个 AddFunc() 并发中,对变量 balance 执行读写操作前,我们先往通道里写入了一条数据,这样其他并发在执行该函数时,由于也会先往通道里写入数据,而这个时候通道已经满了,所以会处于堵塞状态,这就相当于获取锁。

直到 balance 写操作完成,从通道里读取数据,通道为空,相当于释放锁,这个时候其他并发才可以往通道里写入数据重新拿到锁。

这样我们通过往通道里写入和读取数据保证了同一时间只有一个 goroutine 在对 balance 进行写操作,从而实现互斥锁的操作。

3. sync.Mutex

在 sync 包中,sync.Mutex 直接为我们实现了互斥锁的操作,它的操作如下:

var mutex sync.Mutex // 互斥锁的定义 mutex.Lock() // 获取锁 mutex.Unlock() // 释放锁 

那么使用 sync.Mutex 实现上面的逻辑,代码如下:

var mutex sync.Mutex var balance int func AddFunc() { mutex.Lock() defer mutex.Unlock() balance += 1 } func main() { for range 100 { go AddFunc() } time.Sleep(3 * time.Second) fmt.Println("balance is: ", balance) } 

3、sync.RWMutex-读写锁

在上面介绍的 sync.Mutex 互斥锁中,限制了同一时间只能有一个 goroutine 访问某个变量,包括读和写,但这种情况并非是最理想的,比如在读多写少的场景下。

那么 Golang 里的 sync.RWMutex 为我们提供了读写锁的操作,它会允许多个读操作的并发,而写操作会阻塞所有读和写。

读写锁的基本规则如下:

  1. 当一个 goroutine 获取了读锁后,其他 goroutine 仍然可以获取读锁,但不能获取写锁。
  2. 当一个 goroutine 获取了写锁后,其他 goroutine 无论是读锁还是写锁都不能获取,必须等待该写锁释放。
  3. 当有写操作在等待时,避免写操作长期饥饿,会优先处理写锁请求。

下面是读写锁的用法:

var rwMu sync.RWMutex rwMu.RLock() // 获取读锁 rwMu.RUnlock() // 释放读锁 rwMu.Lock() // 获取写锁 rwMu.Unlock() // 释放写锁 

下面是读写锁在函数中的用法示例:

func ReadBalance() { rwMu.RLock() fmt.Println("get read lock") defer rwMu.RUnlock() fmt.Println("balance: ", balance) } func WriteBalance() { rwMu.Lock() fmt.Println("get write lock") defer rwMu.Unlock() balance += 1 } 

4、sync.Once-一次性执行

sync.Once 可用于确保函数只被执行一次,常用于初始化操作,且可以用于延迟加载。

提供的方法是 Do(f func()),参数内容是一个需要被执行的函数 f,这个方法实现的功能是只有在第一次被调用的时候会执行 f 函数进行初始化。

下面是该方法的使用示例:

import ( "fmt" "sync" ) type Config struct { // 配置信息 } var ( instance *Config once sync.Once ) func LoadConfig() { fmt.Println("初始化配置...") instance = &Config{} // 加载配置的逻辑 } func GetConfig() *Config { once.Do(LoadConfig) return instance } func main() { c1 := GetConfig() c2 := GetConfig() fmt.Println(c1 == c2) // 输出: true(同一个实例) } 

在这里,虽然 GetConfig() 函数执行了两遍,但是其内部的调用的 LoadConfig 函数却只执行了一次,因为 sync.Once 会在内部记录该函数是否已经初始化。

sync.Once 是个结构体,其结构如下:

type Once struct { done atomic.Uint32 m Mutex } 

其中,done 字段用于记录需要执行的函数 f 是否已经被执行,其对应的 Do() 方法内部会先根据 done 字段判断,如果已经被执行过则直接返回,而如果没有则会先执行一次。

m 字段表示的互斥锁则用于在 Do() 方法内部调用的 doSlow() 中使用,用于确保并发情况下目标函数只被执行一次,在 f 函数执行结束后,done 参数会被置为 1,表示该函数已经被执行,这样再次调用 Do() 方法时,判断 done 字段的值为 1 则不会再执行此函数。

5、sync.Pool-对象池

sync.Pool,对象池,我们可以将一些生命周期短且创建成本高的对象存在其中,从而避免频繁的创建和销毁对象,以减少内存分配和垃圾回收压力。

简单地说就是复用对象。

1. 基础用法

下面以复用一个字节缓冲区为例介绍一下对象池的基础用法。

1) 创建对象池

创建对象池的操作如下:

var bufferPool = sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, } 

可以看到,这里对 sync.PoolNew 字段赋值了一个函数,返回的是一个字节缓冲区。

2) 从池中获取对象

从对象池中获取该对象的操作使用 Get() 操作:

buf := bufferPool.Get().(*bytes.Buffer) 
3) 将对象放回池中

对该字节缓冲区使用完毕后可以将该对象再放回池中:

bufferPool.Put(buf) 

2. 使用示例

import ( "bytes" "fmt" "sync" ) var bufferPool = sync.Pool{ New: func() interface{} { fmt.Println("create bytes buffer") return &bytes.Buffer{} }, } func LogMessage(msg string) { buf := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buf) buf.Reset() buf.WriteString(msg) fmt.Println(buf.String()) } func main() { LogMessage("hello world") LogMessage("hello world") LogMessage("hello world") } 

在上面的代码中,我们先定义了 bufferPool,然后在 LogMessage() 函数中,先使用 Get() 获取该字节缓冲对象,因为这里返回的数据是接口类型,所以这里将其转为了对应的类型,然后使用 buf.Reset() 重置了之前的记录后写入新的数据,最后使用的 defer 操作将此对象又放回了对象池。

6、sync.Cond-条件变量

sync.Cond 用于等待特定条件发生后再继续执行,可用于生产者-消费者的模式。

创建一个条件变量,参数只有一个,那就是锁,下面代码里用的是互斥锁:

cond = sync.NewCond(&sync.Mutex{}) 

返回的 cond 对外暴露的字段 L 就是我们输入的锁。

下面用一个生产者的代码示例来介绍 cond 的几个相关函数。

在这里定义了 queue 作为队列,其中拥有需要处理的数据,queueMaxSize 字段为限制的最大队列长度。

var ( queue []int queueMaxSize int = 5 cond = sync.NewCond(&sync.Mutex{}) ) func Producer() { for { cond.L.Lock() for len(queue) == queueMaxSize { fmt.Println("produce queue max size, wait") cond.Wait() } queue = append(queue, 1) fmt.Println("produce queue") cond.Signal() // 通知消费者 cond.L.Unlock() time.Sleep(100 * time.Millisecond) } } 

在定义的 Producer() 函数中,有一个死循环,内部先使用 cond.L.Lock() 获取锁,然后判断生产的数据是否有消费者消费,如果队列满了的话,则进入等待。

1. cond.Wait()

在上面的代码中,我们使用 cond.Wait() 进入了等待状态。

Wait() 函数内部,先对前面的锁进行释放操作,然后进入阻塞状态,直到其他 gouroutine 通过 Signal() 函数唤醒后重新获取锁。

2. cond.Signal()

前面往队列里添加数据后,通过 cond.Signal() 函数通知消费者,消费者在另一个函数中就可以被唤醒,然后进行处理,同时这个函数后面将锁释放 cond.L.Unlock()

3. cond.Broadcast()

前面的 Signal() 函数是唤醒一个等待的 goroutine,cond.Broadcast() 函数则可以唤醒所有等待的 goroutine。

下面提供一下生产者-消费者的全部处理代码:

package main import ( "fmt" "sync" "time" ) var ( queue []int queueMaxSize int = 5 cond = sync.NewCond(&sync.Mutex{}) ) func Producer() { for { cond.L.Lock() for len(queue) == queueMaxSize { fmt.Println("produce queue max size, wait") cond.Wait() } queue = append(queue, 1) fmt.Println("produce queue") cond.Signal() // 通知消费者 cond.L.Unlock() time.Sleep(100 * time.Millisecond) } } func Consumer() { for { cond.L.Lock() for len(queue) == 0 { fmt.Println("wait for produce") cond.Wait() // 等待并释放锁 } fmt.Println("consume queue") item := queue[0] queue = queue[1:] cond.Signal() // 通知生产者 cond.L.Unlock() ProcessItem(item) } } func ProcessItem(i int) { fmt.Println("process i: ", i) } func main() { go Producer() go Consumer() time.Sleep(1 * time.Second) } 

7、sync.Map

sync 模块提供了 sync.Map 用来存储键值对,但是和之前介绍的 map 不一样的是,sync.Map 是并发安全的,而且无需初始化,并且在操作方法上与原来的 map 不一样。

1. 并发安全

原生的 map 是非并发安全的,如果多个 goroutine 对其进行同时读写会触发错误,比如下面的操作:

import ( "fmt" "time" ) var originMap = make(map[string]int) func UpdateMapKey() { originMap["a"] += 1 } func GetMapKey() { a := originMap["a"] fmt.Println(a) } func main() { originMap["a"] = 0 for range 100 { go UpdateMapKey() go GetMapKey() } time.Sleep(1 * time.Second) fmt.Println("originMap: ", originMap) } 

但是 sync.Map 是并发安全的,内部会通过互斥锁的操作允许多个 goroutine 安全地读写,下面是使用 sync.Map 对上面逻辑的改写,后面我们会具体介绍其操作方法:

import ( "fmt" "sync" "time" ) var originMap sync.Map func UpdateMapKey() { for { oldValue, loaded := originMap.Load("a") if !loaded { if _, ok := originMap.LoadOrStore("a", 1); ok { return } } else { newValue := oldValue.(int) + 1 if originMap.CompareAndSwap("a", oldValue, newValue) { return } } } } func GetMapKey() { a, _ := originMap.Load("a") fmt.Println(a) } func main() { originMap.Store("a", 0) for range 100 { go UpdateMapKey() go GetMapKey() } time.Sleep(1 * time.Second) a, _ := originMap.Load("a") fmt.Println("originMap: ", a) } 

2. 初始化

原生的 map 进行初始化,有下面两种操作方法:

var originMap = make(map[string]int) var originMap = map[string]int{} 

sync.Map 可以直接声明使用:

var m sync.Map m.Store("a", 0) 

3. 操作方法

这里先介绍 sync.Map 增删改查的基础操作:

1) 增

增加一个 key 的操作如下:

originMap.Store("a", 1) 
2) 删

删除一个 key 的操作如下:

originMap.Delete("a", 1) 
3) 改

修改操作还是可以用 Store() 方法,而且可以修改为不同数据类型:

m.Store("a", "123") 
4) 查

查询操作可以使用 Load() 方法,返回对应的 value 值以及是否存在:

m.Store("a", 1) v, ok := m.Load("a") if ok { fmt.Printf("exist value:%v\n", v.(int)) } else { fmt.Printf("key not exists") } 
5) 遍历

遍历操作如下:

m.Store("a", 1) m.Range(func(key, value any) bool { fmt.Println(key, value) return true }) 

4. 原子性条件操作

上面的这些方法可以实现基础的增删改查操作,但是如果我们有一个需求,比如前面的获取一个 key 的 value,然后在原值的基础上 +1 再存入,大概逻辑如下:

v, ok := m.Load(key) v = v.(int) v +=1 m.Store(key, v) 

但是在这个操作中,如果有其他 goroutine 已经修改了 v 的值,那么我们这里的操作就相当于污染了源数据,而为了避免这个可能,我们可以使用一些原子性条件操作,以实现并发操作。

1) CompareAndSwap()

CompareAndSwap() 是一个更新操作,传入 3 个参数,key,oldValue 和 newValue,仅当 key 的结果为 oldValue 的时候,将结果更新为 newValue,使用示例如下:

key := "a" m.Store(key, 1) swapped := m.CompareAndSwap(key, 1, 2) fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 2, 是否更新结果 %v\n", swapped) swapped = m.CompareAndSwap(key, 1, 3) fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 3, 是否更新结果 %v\n", swapped) 

所以在上面我们要对结果进行 +1 的代码操作为:

newValue := oldValue.(int) + 1 if originMap.CompareAndSwap("a", oldValue, newValue) { return } 
2) CompareAndDelete()

CompareAndDelete 是一个原子性的删除操作,接受两个参数,key 和 oldValue,仅当 key 的值为 oldValue 时删除该 key,返回结果为是否删除:

key := "a" m.Store(key, 1) deleted := m.CompareAndDelete(key, 1) if deleted { fmt.Printf("当 key 的 value 为 %v 时,删除\n", 1) } else { fmt.Printf(" key 的 value 不为 %v 时,不执行删除\n", 1) } 
3) LoadAndDelete()

LoadAndDelete 表示是否加载某个 key 的值并删除该 key,无论该 key 是否存在,参数为 key,返回值为 value 和是否存在该 key:

key := "a" m.Store(key, 1) value, loaded := m.LoadAndDelete(key) fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value) value, loaded = m.LoadAndDelete(key) fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value) 
4) LoadOrStore()

LoadOrStore 方法为不存在 key 则存入,存在的话则返回该值:

key := "a" value, loaded := m.LoadOrStore(key, 1) fmt.Printf("是否存在: %v, 值: %v\n", loaded, value) value, loaded = m.LoadOrStore(key, 1) fmt.Printf("是否存在: %v, 值: %v\n", loaded, value) 

评论

昵称
邮箱
主页