Golang基础笔记十五之sync
本文首发于公众号:Hunter后端
原文链接:Golang基础笔记十五之sync
这一篇笔记介绍 Golang 中的 sync 模块。
sync 包主要提供了基础的同步原语,比如互斥锁,读写锁,等待组等,用于解决并发编程中的线程安全问题,以下是本篇笔记目录:
- WaitGroup-等待组
- sync.Mutex-互斥锁
- sync.RWMutex-读写锁
- sync.Once-一次性执行
- sync.Pool-对象池
- sync.Cond-条件变量
- sync.Map
1、WaitGroup-等待组
前面在第十篇我们介绍 goroutine 和 channel 的时候,在使用 goroutine 的时候介绍有一段代码如下:
package main import ( "fmt" "time" ) func PrintGoroutineInfo() { fmt.Println("msg from goroutine") } func main() { go PrintGoroutineInfo() time.Sleep(1 * time.Millisecond) fmt.Println("msg from main") }
在这里,我们开启了一个协程调用 PrintGoroutineInfo()
函数,然后使用 time.Sleep()
来等待它调用结束。
然而在开发中,我们不能确定这个函数多久才能调用完毕,也无法使用准确的 sleep 时间来等待,那么这里就可以使用到 sync
模块的 WaitGroup
函数来等待一个或多个 goroutine
执行完毕。
下面是使用示例:
package main import ( "fmt" "math/rand" "sync" "time" ) func SleepRandSeconds(wg *sync.WaitGroup) { defer wg.Done() sleepSeconds := rand.Intn(3) fmt.Printf("sleep %d seconds\n", sleepSeconds) time.Sleep(time.Duration(sleepSeconds) * time.Second) } func main() { var wg sync.WaitGroup wg.Add(2) go SleepRandSeconds(&wg) go SleepRandSeconds(&wg) wg.Wait() fmt.Println("函数执行完毕") }
在这里,我们通过 var wg sync.WaitGroup
定义了一个等待组,并通过 wg.Add(2)
表示添加了需要等待的并发数,在并发中我们将 &wg
传入并通过 wg.Done()
减少需要等待的并发数。
在 wg.Done()
函数内部,使用 wg.Add(-1)
减少需要等待的并发数,在 main 函数中,使用 wg.Wait()
进入阻塞状态,当等待的并发都完成后,此函数就会返回,完成等待并接着往后执行。
2、sync.Mutex-互斥锁
1. 数据竞态与互斥锁
当多个 goroutine 并发访问同一个共享资源,且至少有一个访问是写操作时,就会发生数据竞态
,造成的结果就是程序每次运行的结果表现可能会不一致。
比如下面的示例:
var balance int func AddFunc() { balance += 1 } func main() { for range 100 { go AddFunc() } time.Sleep(5 * time.Second) fmt.Println("balance is: ", balance) }
多次执行上面的代码,最终输出的 balance
的值可能都不一致。
如果一个变量在多个 goroutine 同时访问时,不会出现比如数据不一致或程序崩溃的情况,那么我们就称其是并发安全
的。
我们可以使用 go run -race main.go
的方式来检测数据竞态,执行检测后,会输出数据竞态的一些信息,比如发生在代码的多少行,一共发生了多少次数据竞态:
================== WARNING: DATA RACE Read at 0x000003910df8 by goroutine 7: main.AddFunc() /../main.go:13 +0x24 Previous write at 0x000003910df8 by goroutine 6: main.AddFunc() /../main.go:13 +0x3c Goroutine 7 (running) created at: main.main() /../main.go:18 +0x32 Goroutine 6 (finished) created at: main.main() /../main.go:18 +0x32 ================== balance is: 98 Found 3 data race(s) exit status 66
而要避免这种数据竞态的发生,我们可以限制在同一时间只能有一个 goroutine 访问同一个变量,这种方法称为互斥机制。
我们可以通过缓冲通道和 sync.Mutex
来实现这种互斥锁的操作。
2. 缓冲通道实现互斥锁
我们可以通过容量为 1 的缓冲通道来实现互斥锁的操作,保证同一时间只有一个 goroutine 访问同一个变量,下面是修改后的代码:
var sema = make(chan struct{}, 1) var balance int func AddFunc() { sema <- struct{}{} balance += 1 <-sema } func main() { for range 100 { go AddFunc() } time.Sleep(3 * time.Second) fmt.Println("balance is: ", balance) }
在上面这段代码里,我们定义了 sema
这个容量为 1 的通道,在每个 AddFunc()
并发中,对变量 balance
执行读写操作前,我们先往通道里写入了一条数据,这样其他并发在执行该函数时,由于也会先往通道里写入数据,而这个时候通道已经满了,所以会处于堵塞状态,这就相当于获取锁。
直到 balance
写操作完成,从通道里读取数据,通道为空,相当于释放锁,这个时候其他并发才可以往通道里写入数据重新拿到锁。
这样我们通过往通道里写入和读取数据保证了同一时间只有一个 goroutine 在对 balance 进行写操作,从而实现互斥锁的操作。
3. sync.Mutex
在 sync 包中,sync.Mutex
直接为我们实现了互斥锁的操作,它的操作如下:
var mutex sync.Mutex // 互斥锁的定义 mutex.Lock() // 获取锁 mutex.Unlock() // 释放锁
那么使用 sync.Mutex
实现上面的逻辑,代码如下:
var mutex sync.Mutex var balance int func AddFunc() { mutex.Lock() defer mutex.Unlock() balance += 1 } func main() { for range 100 { go AddFunc() } time.Sleep(3 * time.Second) fmt.Println("balance is: ", balance) }
3、sync.RWMutex-读写锁
在上面介绍的 sync.Mutex
互斥锁中,限制了同一时间只能有一个 goroutine 访问某个变量,包括读和写,但这种情况并非是最理想的,比如在读多写少的场景下。
那么 Golang 里的 sync.RWMutex
为我们提供了读写锁的操作,它会允许多个读操作的并发,而写操作会阻塞所有读和写。
读写锁的基本规则如下:
- 当一个 goroutine 获取了读锁后,其他 goroutine 仍然可以获取读锁,但不能获取写锁。
- 当一个 goroutine 获取了写锁后,其他 goroutine 无论是读锁还是写锁都不能获取,必须等待该写锁释放。
- 当有写操作在等待时,避免写操作长期饥饿,会优先处理写锁请求。
下面是读写锁的用法:
var rwMu sync.RWMutex rwMu.RLock() // 获取读锁 rwMu.RUnlock() // 释放读锁 rwMu.Lock() // 获取写锁 rwMu.Unlock() // 释放写锁
下面是读写锁在函数中的用法示例:
func ReadBalance() { rwMu.RLock() fmt.Println("get read lock") defer rwMu.RUnlock() fmt.Println("balance: ", balance) } func WriteBalance() { rwMu.Lock() fmt.Println("get write lock") defer rwMu.Unlock() balance += 1 }
4、sync.Once-一次性执行
sync.Once
可用于确保函数只被执行一次,常用于初始化操作,且可以用于延迟加载。
提供的方法是 Do(f func()),参数内容是一个需要被执行的函数 f,这个方法实现的功能是只有在第一次被调用的时候会执行 f 函数进行初始化。
下面是该方法的使用示例:
import ( "fmt" "sync" ) type Config struct { // 配置信息 } var ( instance *Config once sync.Once ) func LoadConfig() { fmt.Println("初始化配置...") instance = &Config{} // 加载配置的逻辑 } func GetConfig() *Config { once.Do(LoadConfig) return instance } func main() { c1 := GetConfig() c2 := GetConfig() fmt.Println(c1 == c2) // 输出: true(同一个实例) }
在这里,虽然 GetConfig() 函数执行了两遍,但是其内部的调用的 LoadConfig
函数却只执行了一次,因为 sync.Once
会在内部记录该函数是否已经初始化。
sync.Once
是个结构体,其结构如下:
type Once struct { done atomic.Uint32 m Mutex }
其中,done
字段用于记录需要执行的函数 f 是否已经被执行,其对应的 Do()
方法内部会先根据 done 字段判断,如果已经被执行过则直接返回,而如果没有则会先执行一次。
而 m
字段表示的互斥锁则用于在 Do()
方法内部调用的 doSlow()
中使用,用于确保并发情况下目标函数只被执行一次,在 f 函数执行结束后,done
参数会被置为 1,表示该函数已经被执行,这样再次调用 Do()
方法时,判断 done
字段的值为 1 则不会再执行此函数。
5、sync.Pool-对象池
sync.Pool,对象池,我们可以将一些生命周期短且创建成本高的对象存在其中,从而避免频繁的创建和销毁对象,以减少内存分配和垃圾回收压力。
简单地说就是复用对象。
1. 基础用法
下面以复用一个字节缓冲区为例介绍一下对象池的基础用法。
1) 创建对象池
创建对象池的操作如下:
var bufferPool = sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, }
可以看到,这里对 sync.Pool
的 New
字段赋值了一个函数,返回的是一个字节缓冲区。
2) 从池中获取对象
从对象池中获取该对象的操作使用 Get() 操作:
buf := bufferPool.Get().(*bytes.Buffer)
3) 将对象放回池中
对该字节缓冲区使用完毕后可以将该对象再放回池中:
bufferPool.Put(buf)
2. 使用示例
import ( "bytes" "fmt" "sync" ) var bufferPool = sync.Pool{ New: func() interface{} { fmt.Println("create bytes buffer") return &bytes.Buffer{} }, } func LogMessage(msg string) { buf := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buf) buf.Reset() buf.WriteString(msg) fmt.Println(buf.String()) } func main() { LogMessage("hello world") LogMessage("hello world") LogMessage("hello world") }
在上面的代码中,我们先定义了 bufferPool
,然后在 LogMessage()
函数中,先使用 Get() 获取该字节缓冲对象,因为这里返回的数据是接口类型,所以这里将其转为了对应的类型,然后使用 buf.Reset()
重置了之前的记录后写入新的数据,最后使用的 defer
操作将此对象又放回了对象池。
6、sync.Cond-条件变量
sync.Cond
用于等待特定条件发生后再继续执行,可用于生产者-消费者的模式。
创建一个条件变量,参数只有一个,那就是锁,下面代码里用的是互斥锁:
cond = sync.NewCond(&sync.Mutex{})
返回的 cond
对外暴露的字段 L
就是我们输入的锁。
下面用一个生产者的代码示例来介绍 cond 的几个相关函数。
在这里定义了 queue 作为队列,其中拥有需要处理的数据,queueMaxSize 字段为限制的最大队列长度。
var ( queue []int queueMaxSize int = 5 cond = sync.NewCond(&sync.Mutex{}) ) func Producer() { for { cond.L.Lock() for len(queue) == queueMaxSize { fmt.Println("produce queue max size, wait") cond.Wait() } queue = append(queue, 1) fmt.Println("produce queue") cond.Signal() // 通知消费者 cond.L.Unlock() time.Sleep(100 * time.Millisecond) } }
在定义的 Producer()
函数中,有一个死循环,内部先使用 cond.L.Lock()
获取锁,然后判断生产的数据是否有消费者消费,如果队列满了的话,则进入等待。
1. cond.Wait()
在上面的代码中,我们使用 cond.Wait()
进入了等待状态。
在 Wait()
函数内部,先对前面的锁进行释放操作,然后进入阻塞状态,直到其他 gouroutine 通过 Signal()
函数唤醒后重新获取锁。
2. cond.Signal()
前面往队列里添加数据后,通过 cond.Signal()
函数通知消费者,消费者在另一个函数中就可以被唤醒,然后进行处理,同时这个函数后面将锁释放 cond.L.Unlock()
。
3. cond.Broadcast()
前面的 Signal() 函数是唤醒一个等待的 goroutine,cond.Broadcast() 函数则可以唤醒所有等待的 goroutine。
下面提供一下生产者-消费者的全部处理代码:
package main import ( "fmt" "sync" "time" ) var ( queue []int queueMaxSize int = 5 cond = sync.NewCond(&sync.Mutex{}) ) func Producer() { for { cond.L.Lock() for len(queue) == queueMaxSize { fmt.Println("produce queue max size, wait") cond.Wait() } queue = append(queue, 1) fmt.Println("produce queue") cond.Signal() // 通知消费者 cond.L.Unlock() time.Sleep(100 * time.Millisecond) } } func Consumer() { for { cond.L.Lock() for len(queue) == 0 { fmt.Println("wait for produce") cond.Wait() // 等待并释放锁 } fmt.Println("consume queue") item := queue[0] queue = queue[1:] cond.Signal() // 通知生产者 cond.L.Unlock() ProcessItem(item) } } func ProcessItem(i int) { fmt.Println("process i: ", i) } func main() { go Producer() go Consumer() time.Sleep(1 * time.Second) }
7、sync.Map
sync 模块提供了 sync.Map 用来存储键值对,但是和之前介绍的 map 不一样的是,sync.Map 是并发安全的,而且无需初始化,并且在操作方法上与原来的 map 不一样。
1. 并发安全
原生的 map 是非并发安全的,如果多个 goroutine 对其进行同时读写会触发错误,比如下面的操作:
import ( "fmt" "time" ) var originMap = make(map[string]int) func UpdateMapKey() { originMap["a"] += 1 } func GetMapKey() { a := originMap["a"] fmt.Println(a) } func main() { originMap["a"] = 0 for range 100 { go UpdateMapKey() go GetMapKey() } time.Sleep(1 * time.Second) fmt.Println("originMap: ", originMap) }
但是 sync.Map 是并发安全的,内部会通过互斥锁的操作允许多个 goroutine 安全地读写,下面是使用 sync.Map
对上面逻辑的改写,后面我们会具体介绍其操作方法:
import ( "fmt" "sync" "time" ) var originMap sync.Map func UpdateMapKey() { for { oldValue, loaded := originMap.Load("a") if !loaded { if _, ok := originMap.LoadOrStore("a", 1); ok { return } } else { newValue := oldValue.(int) + 1 if originMap.CompareAndSwap("a", oldValue, newValue) { return } } } } func GetMapKey() { a, _ := originMap.Load("a") fmt.Println(a) } func main() { originMap.Store("a", 0) for range 100 { go UpdateMapKey() go GetMapKey() } time.Sleep(1 * time.Second) a, _ := originMap.Load("a") fmt.Println("originMap: ", a) }
2. 初始化
原生的 map 进行初始化,有下面两种操作方法:
var originMap = make(map[string]int) var originMap = map[string]int{}
sync.Map 可以直接声明使用:
var m sync.Map m.Store("a", 0)
3. 操作方法
这里先介绍 sync.Map 增删改查的基础操作:
1) 增
增加一个 key 的操作如下:
originMap.Store("a", 1)
2) 删
删除一个 key 的操作如下:
originMap.Delete("a", 1)
3) 改
修改操作还是可以用 Store() 方法,而且可以修改为不同数据类型:
m.Store("a", "123")
4) 查
查询操作可以使用 Load() 方法,返回对应的 value 值以及是否存在:
m.Store("a", 1) v, ok := m.Load("a") if ok { fmt.Printf("exist value:%v\n", v.(int)) } else { fmt.Printf("key not exists") }
5) 遍历
遍历操作如下:
m.Store("a", 1) m.Range(func(key, value any) bool { fmt.Println(key, value) return true })
4. 原子性条件操作
上面的这些方法可以实现基础的增删改查操作,但是如果我们有一个需求,比如前面的获取一个 key 的 value,然后在原值的基础上 +1 再存入,大概逻辑如下:
v, ok := m.Load(key) v = v.(int) v +=1 m.Store(key, v)
但是在这个操作中,如果有其他 goroutine 已经修改了 v 的值,那么我们这里的操作就相当于污染了源数据,而为了避免这个可能,我们可以使用一些原子性条件操作,以实现并发操作。
1) CompareAndSwap()
CompareAndSwap() 是一个更新操作,传入 3 个参数,key,oldValue 和 newValue,仅当 key 的结果为 oldValue 的时候,将结果更新为 newValue,使用示例如下:
key := "a" m.Store(key, 1) swapped := m.CompareAndSwap(key, 1, 2) fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 2, 是否更新结果 %v\n", swapped) swapped = m.CompareAndSwap(key, 1, 3) fmt.Printf("当 value 为 1 的时候,将 value 从 1 修改为 3, 是否更新结果 %v\n", swapped)
所以在上面我们要对结果进行 +1 的代码操作为:
newValue := oldValue.(int) + 1 if originMap.CompareAndSwap("a", oldValue, newValue) { return }
2) CompareAndDelete()
CompareAndDelete 是一个原子性的删除操作,接受两个参数,key 和 oldValue,仅当 key 的值为 oldValue 时删除该 key,返回结果为是否删除:
key := "a" m.Store(key, 1) deleted := m.CompareAndDelete(key, 1) if deleted { fmt.Printf("当 key 的 value 为 %v 时,删除\n", 1) } else { fmt.Printf(" key 的 value 不为 %v 时,不执行删除\n", 1) }
3) LoadAndDelete()
LoadAndDelete 表示是否加载某个 key 的值并删除该 key,无论该 key 是否存在,参数为 key,返回值为 value 和是否存在该 key:
key := "a" m.Store(key, 1) value, loaded := m.LoadAndDelete(key) fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value) value, loaded = m.LoadAndDelete(key) fmt.Printf("是否存在 %v, value 为 %v\n", loaded, value)
4) LoadOrStore()
LoadOrStore 方法为不存在 key 则存入,存在的话则返回该值:
key := "a" value, loaded := m.LoadOrStore(key, 1) fmt.Printf("是否存在: %v, 值: %v\n", loaded, value) value, loaded = m.LoadOrStore(key, 1) fmt.Printf("是否存在: %v, 值: %v\n", loaded, value)
评论