最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

Go 并发编程(一):协程 gorotine、channel、锁

来源:博客园
目录
  • 协程介绍
    • 什么是协程?
    • 协程 VS 进程 VS 线程
    • 协程的优缺点
  • Goroutine
    • Go 并发介绍
    • Goroutine 创建
    • 多协程异常捕获
      • recover 捕获范围
      • 绑定 recover 创建 goroutine
  • Channel
    • 什么是 Channel ?
    • Channel 初始化
    • Channel 操作
    • 无缓冲的 channel
    • 有缓冲的 channel
    • 双向 channel 和单向 channel
    • 实现并发锁
  • Sync
    • sync.WaitGroup
    • sync.Once
      • sync.Lock:锁
      • sync.Mutex:互斥锁
      • sync.RWMutex:读写锁
      • 死锁
      • sync.Map:并发安全 Map
    • sync.Atomic:原子操作

协程介绍

什么是协程?

协程,又称微线程,英文为 Coroutine。


(资料图片仅供参考)

协程可以理解为用户态线程,是比线程更小的执行单元。为啥说它是一个执行单元?因为它自带 CPU 上下文。这样只要在合适的时机,我们可以把一个协程切换到另一个协程。只要这个过程中保存或恢复 CPU 上下文,那么程序还是可以运行的。

通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都可以由开发者自己确定。

区别于线程,协程的调度在用户态进行,不需要切换到内核态,所以不由操作系统控制,而由用户自己控制。在一些支持协程高级语言中,往往都实现了自己的协程调度器,比如 Go 语言就有自己的协程调度器。

协程有独立的栈空间,并共享堆空间。

协程 VS 进程 VS 线程

协程 VS 进程

  • 执行流的调度者不同:无论多线程和多进程,其调度更多取决于操作系统;而协程的方式,调度来自用户。也就是说进程的上下文是在内核态保存恢复的,而协程是在用户态保存恢复的,显然用户态的代价更低。
  • 进程会被强占;而协程不会:也就是说协程如果不主动让出 CPU,那么其他的协程就没有执行的机会。
  • 对内存的占用不同:实际上协程可以只需要 4K 的栈就足够了;而进程占用的内存要大得多。
  • 从操作系统的角度讲,多协程的程序是单进程单线程

协程 VS 线程

  • 协程看起来跟线程差不多,其实不然。线程切换从系统层面来看远不止保存和恢复 CPU 上下文这么简单。操作系统为了程序运行的高效性,每个线程都有自己缓存 Cache 等数据,操作系统还会帮你做这些数据的恢复操作,所以线程的切换非常耗性能。但是协程的切换只是单纯的操作 CPU 的上下文,所以一秒钟切换个上百万次,系统都抗得住。
  • 同样的,线程的切换更多的是靠操作系统来控制,而协程的执行由我们自己控制。
  • 协程只是在单一的线程下,不同的协程之间做切换;其实和多线程很像,多线程是在一个进程下,不同的线程之间做切换。
  • 一个线程中可以有任意多个协程,但某一时刻只能有一个协程在运行,多个协程分享该线程分配到的计算机资源。

协程的优缺点

优点

  • 无需线程上下文切换的开销:协程执行效率极高,因为子程序(函数)切换不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。但也因此,程序员必须自己承担调度的责任,同时,协程也失去了标准线程使用多 CPU 的能力。
  • 高并发+高扩展性+低成本:一个 CPU 支持上万个协程都不是问题,所以很适合用于高并发处理。

缺点

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将单个 CPU 的多个核用上,协程需要和多进程/线程配合才能运行在多 CPU 上。
  • 进行阻塞(Blocking)操作(如 I/O 时)会阻塞掉整个程序
  • 协程可以很好地处理 I/O 密集型程序的效率问题,但是处理 CPU 密集型不是它的长处,如要充分发挥 CPU 利用率可以结合多进程/线程+协程。

Goroutine

Go 并发介绍

Go 在语言级别支持协程,叫 goroutine,且 Go 语言中的并发只会用到 goroutine,并不需要我们去考虑用多进程或者是多线程。

Go 语言标准库提供的所有系统调用操作(包括所有同步 I/O 操作),都会让出 CPU 给其他 goroutine。这让轻量级线程的切换管理不依赖于系统的线程和进程,也不依赖于 CPU 的核心数量。

有人把 Go 比作 21 世纪的 C 语言,第一是因为 Go 语言设计简单;第二,21 世纪最重要的就是并行程序设计,而 Go 从语言层面就支持并行;同时,并发程序的内存管理有时候是非常复杂的,而 Go 语言提供了自动垃圾回收机制。

线程本身是有一定大小的,一般 OS 线程栈大小为 2MB,且线程在创建和上下文切换的时候是需要消耗资源的,会带来性能损耗,所以在我们用到多线程技术的时候,往往会通过池化技术,即创建线程池来管理一定数量的线程。

在 Go 语言中,一个 goroutine 栈在其生命周期开始时占用空间很小(一般 2KB),并且栈大小可以按需增大和缩小,goroutine 的栈大小限制可以达到 1GB,但是一般不会用到这么大。所以在 Go 语言中一次创建成千上万,甚至十万左右的 goroutine 在理论上也是可以的。

在 Go 语言中,当某个任务需要并发执行的时候,只需要把这个任务包装成一个函数,开启一个 goroutine 去执行这个函数就可以了。并不需要我们来维护一个类似于线程池的东西,也不需要我们去关心协程是怎么切换和调度的,因为这些都已经有 Go 语言内置的调度器帮我们做了,并且效率还非常高。

Goroutine 创建

只需在函数调⽤语句前添加 go 关键字,就可创建并发执⾏单元。开发⼈员无需了解任何执⾏细节,调度器会自动将其安排到合适的系统线程上执行。

在并发编程中,我们通常想将一个过程切分成几块,然后让每个 goroutine 各自负责一块工作,当一个程序启动时,main() 函数会在一个单独的 goroutine 中运行,我们叫它 main goroutine,而新的子 goroutine 会用 go 语句来创建。

当 main() 函数返回时该 main goroutine 就结束了,而当主协程退出的时候,其余协程不管是否运行完,都会跟着结束。

import (    "fmt"    "time")func myGroutine(name string) {    for i := 0; i < 5; i++ {        fmt.Printf("myGroutine %s\n", name)        time.Sleep(10 * time.Millisecond)    }}func main() {    go myGroutine("1")    go myGroutine("2")    time.Sleep(2 * time.Second)}

运行结果:

myGroutine 2myGroutine 1myGroutine 1myGroutine 2myGroutine 2myGroutine 1myGroutine 1myGroutine 2

多协程异常捕获

recover 捕获范围

用 recover 捕获异常时,只能捕获当前 goroutine 的 panic,不能捕获其他 goroutine 发生的 panic 。

示例:

func main() {   defer func() {      if e := recover(); e != nil {         fmt.Printf("main recover:%v\n", e)      }   }()   go func() {      defer func() {         if e := recover(); e != nil {            fmt.Printf("sub recover:%v\n", e)         }      }()      panic("sub func panic!!!")  // 只会被go func()的defer recover捕获      fmt.Println("111")   }()   panic("main func panic!!!")   fmt.Println("222")   // 只会被main的defer recover捕获   time.Sleep(2 * time.Second)}

运行结果:

main recover:main func panic!!!sub recover:sub func panic!!!

可以看出,主函数 goroutine 中的 recover 只能捕获主 goroutine 中发生的 panic,子 goroutine 只能捕获子 goroutine 发生的 panic 。

所以当我们程序中有多个 goroutine 处理任务时,如果 goroutine 有可能发生 panic ,则需要在 goroutine 中也捕获异常。

绑定 recover 创建 goroutine

在开发项目的时候,我们可能会创建多个 goroutine 来提高程序的效率,但是在每个创建的 goroutine 中为了捕获异常需要频繁的写 defer recover 函数来捕获异常,既繁琐又显得代码不简洁。

因此我们可以将协程的逻辑封装成函数,绑定 recover,以此来创建 goroutine 。

示例:

package mainimport (   "fmt"   "sync")// 传入的是不定参数:返回值为error类型的函数func withGoroutine(opts ...func() error) (err error) {    var wg sync.WaitGroup    for _, opt := range opts {       wg.Add(1)       // 开启goroutine       go func(handler func() error) {          defer func() {             // 协程内部捕获panic             if e := recover(); e != nil {                fmt.Printf("recover:%v\n", e)             }             wg.Done()          }()          e := handler()  // 真正调用传入的函数          // 取第一个报错的handler调用的错误并返回          // err == nil表示之前还没有handler报错          // 配合 e != nil表示处于第一个报错的handler中          if err == nil && e != nil {             err = e          }       }(opt)  // 将goroutine的函数逻辑通过封装成的函数变量传入    }     wg.Wait()  // 等待所有的协程执行完    return}func main() {    handler1 := func() error {       panic("handler1 fail ")       return nil    }     handler2 := func() error {       panic("handler2 fail")       return nil    }    // 并发执行handler1和handler2两个任务,返回第一个报错的任务错误    err := withGoroutine(handler1, handler2)     if err != nil {       fmt.Printf("err is:%v", err)    }}

运行结果:

recover:handler2 failrecover:handler1 fail

通过 err := withGoroutine(handler1, handler2) 并发执行,使得这两个 handler 中都有 panic时都能成功捕获。

Channel

什么是 Channel ?

Channel 官方定义:Channels are a typed conduit through which you can send and receive values with the channel operator

Channel 是一种数据类型,是一个可以收发数据的管道,主要用来解决 goroutine 的同步问题以及协程之间数据共享(数据传递)的问题。

goroutine 运行在相同的地址空间,因此访问共享内存必须做好同步。Goroutine 奉行通过通信来共享内存,而不是共享内存来通信

引⽤类型 channel 可用于多个 goroutine 通讯,其内部实现了同步,确保并发安全

Channel 初始化

方式一:先声明,再初始化

var channel_name chan channel_typechannel_name = make(chan channel_name)

示例:

// 声明一个无缓冲的channel,即其缓冲容量大小为0var a chan intfmt.Println(a) // // 通道是一个引用类型,初始值为nil// 对于值为nil的通道,不论具体是什么类型,它们所属的接收和发送操作都会永久处于阻塞状态。// 所以必须手动make初始化a = make(chan int)fmt.Println(a)       // 0x11086100

方式二:一步到位

// channel_name := make(chan channel_name, capacity)a := make(chan int) // 等价于 make(chan int, 0)b := make(chan int, 5)
  • 当参数 capacity = 0 时,channel 是无缓冲阻塞读写的,可以理解为同步模式,即写入一个,如果没有消费者在消费,写入就会阻塞。
  • 当参数 capacity > 0 时,channel 是有缓冲、是非阻塞的,可以理解为异步模式。写入消息之后,即使还没被消费,只要队列没满,就可继续写入;如果队列满了,写入就会阻塞。

Channel 操作

channel通过操作符 <- 来接收和发送数据,其发送和接收数据的语法如下:

channel <- value      // 发送value到channelx := <-channel        // 从channel中接收数据,并赋值给x<-channel             // 从channel中接收数据,并将其丢弃x, ok := <-channel    // 功能同上,同时检查通道是否已关闭或者是否为空close(channel)        // 关闭管道channel

需要注意 close(channel) 这个操作,表示管道用完了,需要对其进行关闭,避免程序一直在等待以及资源的浪费。

但 channel 不像文件一样需要经常去关闭,只有当你确实没有任何发送数据了,或者你想显式的结束 range 循环之类的,才去关闭 channel 。

  • 关闭一个未初始化的 channel 会产生 panic 。
  • channel只能被关闭一次,对同一个channel重复关闭会产生 panic 。
  • 向一个已关闭的 channel 发送消息会产生 panic 。
  • 从一个已关闭的 channel 读取消息不会发生 panic,会一直读取到零值。
  • channel 可以读端和写端都可有多个 goroutine 操作,在一端关闭 channel 的时候,该 channel 读端的所有 goroutine 都会收到 channel 已关闭的消息。

示例:收发数据

func main() {    channel := make(chan int)    go func() {        defer fmt.Println("子协程结束")        fmt.Println("子协程运行中")        channel <- 666 // 将666发送至管道    }()    num := <-channel // 从管道中取数据并赋值给num    fmt.Println("从管道中接收num=", num)    fmt.Println("main协程结束")}

运行结果:

子协程运行中子协程结束从管道中接收num= 666main协程结束 

示例:遍历管道

更多的时候,我们是不明确读取次数的,只是在 Channel 的一端读取数据,有数据就读,直到另一端关闭这个 channel,这时就可以用 for range 这种优雅的方式来读取 channel 中的数据(相比 x, ok := <-channel 的 ok 来判断更优化)

func main() {    channel := make(chan int, 5)    channel <- 1    channel <- 2    close(channel)    go func() {        for i := range channel {            fmt.Println("i from channel is: ", i)        }    }()    time.Sleep(time.Second * 2)}

运行结果:

i from channel is:  1i from channel is:  2
  • 主 goroutine 往 channel 里写了两个数据,然后关闭。子 channel 也只能读取到两个数据。
  • 在主 goroutine 关闭了 channel 之后,子 goroutine 里的 for range 循环才会结束。

无缓冲的 channel

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。

这种类型的通道要求发送 goroutine 和接收 goroutine 需要同时准备好,才能完成发送和接收操作。否则,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。

这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

下图的 6 个步骤,展示了两个 goroutine 如何利用无缓冲的通道来共享一个值:

  • 在第 1 步,两个 goroutine 都到达通道,但哪个都没有开始执行发送或者接收。
  • 在第 2 步,左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个 goroutine 会在通道中被锁住,直到交换完成。
  • 在第 3 步,右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据。这个 goroutine 一样也会在通道中被锁住,直到交换完成。
  • 在第 4 步和第 5 步,进行交换。
  • 在第 6 步,两个 goroutine 都将它们的手从通道里拿出来,这模拟了被锁住的 goroutine 得到释放。两个 goroutine 现在都可以去做别的事情了。

示例:

func main() {    c := make(chan int, 0) //创建无缓冲的通道 c    //内置函数 len 返回未被读取的缓冲元素数量,cap 返回缓冲区大小    fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))    go func() {        defer fmt.Println("子协程结束")        for i := 0; i < 3; i++ {            c <- i            fmt.Printf("子协程正在运行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))        }    }()    for i := 0; i < 3; i++ {        num := <-c // 从c中接收数据,并赋值给num        fmt.Println("num = ", num)        time.Sleep(2 * time.Second)    }    fmt.Println("main协程结束")}

运行结果:

len(c)=0, cap(c)=0子协程正在运行[0]: len(c)=0, cap(c)=0num =  0子协程正在运行[1]: len(c)=0, cap(c)=0num =  1子协程正在运行[2]: len(c)=0, cap(c)=0子协程结束num =  2  main协程结束

有缓冲的 channel

有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个数据值的通道。

这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收,通道会阻塞发送和接收动作的条件也不同:

  • 只有当通道中没有要接收的值时,接收动作才会阻塞。

  • 只有当通道中没有空间容纳要发送的值时,发送动作才会阻塞。

这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:

  • 无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换。
  • 有缓冲的通道没有这种保证,如果给定了一个缓冲区容量,通道就是异步的。
  • 第 1 步,右侧的 goroutine 正在从通道接收一个值。
  • 第 2 步,右侧的这个 goroutine 独立完成了接收值的动作,而左侧的 goroutine 正在发送一个新值到通道里。
  • 第 3 步,左侧的 goroutine 还在向通道发送新值,而右侧的 goroutine 正在从通道接收另外一个值。这个步骤里的两个操作既不是同步的,也不会互相阻塞。
  • 第 4 步,所有的发送和接收都完成,而通道里还有几个值,也有一些空间可以存更多的值。
func main() {    c := make(chan int, 3) //创建有缓冲的通道    //内置函数 len 返回未被读取的缓冲元素数量,cap 返回缓冲区大小    fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))    go func() {        defer fmt.Println("子协程结束")        for i := 0; i < 3; i++ {            c <- i            fmt.Printf("子协程正在运行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))        }    }()    for i := 0; i < 3; i++ {        num := <-c // 从c中接收数据,并赋值给num        fmt.Println("num = ", num)        time.Sleep(2 * time.Second)    }    fmt.Println("main协程结束")}

运行结果:

len(c)=0, cap(c)=3子协程正在运行[0]: len(c)=0, cap(c)=3子协程正在运行[1]: len(c)=1, cap(c)=3子协程正在运行[2]: len(c)=2, cap(c)=3子协程结束num =  0num =  1num =  2main协程结束

双向 channel 和单向 channel

channel 根据其功能又可以分为双向 channel 和单向 channel:

  • 双向 channel 即可发送数据又可接收数据。
  • 单向 channel 要么只能发送数据,要么只能接收数据。

定义与初始化:

// 双向channelchannel := make(chan int, 3)// send是单向channel,只用于写数据var send chan<- int = csend <- 1// recv是单向channel,只用于读数据var recv <-chan int = c<-recv
  • 可以理解为其实只有一个双向管道,只是人为给两端定义了别名,一端只用来发送,一端只用来接收。
  • 可以将 channel 隐式转换为单向队列,只收或只发;但不能将单向 channel 转换为普通 channel 。

示例:

// chan<-  只写func counter(out chan<- int) {    defer close(out)    for i := 0; i < 5; i++ {        out <- i // 如果对方不读 会阻塞    }}// <-chan  只读func printer(in <-chan int) {    for num := range in {        fmt.Println(num)    }}func main() {    c := make(chan int) // 无缓冲双向    go counter(c) // 生产者    printer(c)    // 消费者    fmt.Println("done")}

运行结果:

01234done

实现并发锁

上面说了当缓冲队列满了以后,继续往 channel 里面写数据,就会阻塞,那么利用这个特性,我们可以实现一个 goroutine 之间的锁。

package mainimport (   "fmt"   "time")func add(ch chan bool, num *int) {   ch <- true   *num = *num + 1   <-ch}func main() {   // 创建一个size为1的channel   ch := make(chan bool, 1)   var num int   for i := 0; i < 100; i++ {      go add(ch, &num)  // 引用传递   }   time.Sleep(2)   fmt.Println("num 的值:", num)}

运行结果:

num 的值: 100
  • ch <- true 和 <- ch 就相当于一个锁,将 *num = *num + 1 这个操作锁住了。

  • 因为 ch 管道的容量是 1,在每个 add 函数里都会往 channel 放置一个 true,直到执行完 +1 操作之后才将 channel 里的 true 取出。

  • 由于 channel 的 size 是 1,所以当一个 goroutine 在执行 add 函数的时候,其他 goroutine 执行 add 函数,执行到 ch <- true 的时候就会阻塞,*num = *num + 1 不会成功,直到前一个 +1 操作完成,<-ch,读出了管道的元素,这样就实现了并发安全。

Sync

在前面讲 channel 的时候,我们说到在 Go 语言并发编程中,倡导使用通信共享内存,不要使用共享内存通信,即 goroutine 之间尽量通过 channel 来协作。

而在其他的传统语言中,都是通过共享内存加上锁机制来保证并发安全的,同样 go 语言也提供了对共享内存并发安全机制的支持,这些功能都存在于 sync 包下。

sync.WaitGroup

在前面很多 goroutine 的示例中,我们都是通过 time.Sleep() 方法让主 goroutine 等待一段时间,以便子 gortoutine 能够执行完打印结果,显然这不是一个很好的办法,因为我们不知道所有的子 gortoutine 要多久才能执行完

解决方案一

  • 在每个 goroutine 中,向管道里发送一条数据,这样我们在程序最后,通过 for 循环将管道里的数据全部取出,直到数据全部取出完毕才能继续后面的逻辑,这样就可以实现等待各个 goroutine 执行完。
  • 但是,这样使用 channel 显得并不优雅。其次,我们得知道具体循环的次数,来创建管道的大小,假设次数非常的多,则需要申请同样数量大小的管道出来,对内存也是不小的开销。

解决方案二:使用 sync.WaitGroup

Go 语言中可以使用 sync.WaitGroup 来实现并发任务的同步以及协程任务等待。

sync.WaitGroup 是一个对象,里面维护者一个计数器,并且通过三个方法来配合使用:

  • (wg *WaitGroup) Add(delta int):计数器加 delta
  • (wg *WaitGroup) Done():计数器减 1
  • (wg *WaitGroup) Wait():会阻塞代码的运行,直至计数器减为0

示例:

import (    "fmt"    "sync")var wg sync.WaitGroupfunc myGoroutine(i int) {    defer wg.Done()    fmt.Println("myGoroutine: ", i)}func main() {    wg.Add(10)    for i := 0; i < 10; i++ {        go myGoroutine(i)    }    wg.Wait()    fmt.Println("end!!!")}

运行结果:

myGoroutine:  2myGoroutine:  5myGoroutine:  1myGoroutine:  0myGoroutine:  9myGoroutine:  7myGoroutine:  6myGoroutine:  8myGoroutine:  3myGoroutine:  4end!!!         

程序首先把 wg 的计数设置为 10,每个 for 循环运行完毕都把计数器减 1,main 函数中执行到 wg.Wait() 会一直阻塞,直到 wg 的计数器为零。最后打印了 10 个 myGoroutine!,待所有子 goroutine 任务结束后主 goroutine 才退出。

注意:sync.WaitGroup 对象的计数器不能为负数,否则会 panic。在使用的过程中,我们需要保证 add() 的参数值,以及执行完 Done() 之后计数器大于等于零。

sync.Once

sync.Once 最大的作用就是延迟初始化,对于一个使用 sync.Once 的变量,我们并不会在程序启动的时候初始化,而是在第一次用的它的时候才会初始化,并且只初始化这一次,初始化之后驻留在内存里,这就非常适合配置文件加载场景,设想一下,如果是在程序刚开始就加载配置,若迟迟未被使用,则既浪费了内存,又延长了程序加载时间,而 sync.Onece 就刚好解决了这个问题。

sync.Once 可以在代码的任意位置初始化和调用,并且线程安全。

示例:

// 声明配置结构体Configtype Config struct{}var instance Configvar once sync.Once     // 声明一个sync.Once变量// 获取配置结构体func InitConfig() *Config {   once.Do(func(){      instance = &Config{}   })   return instance}

只有在第一次调用 InitConfig() 获取Config 指针时,才会执行 once.Do(func(){instance = &Config{}}) 语句,执行完之后 instance 就驻留在内存中,后面再次执行 InitConfig() 的时候,就直接返回内存中的 instance (也就是单例模式)。

与init()的区别:有时候我们会使用 init() 方法进行初始化,init() 方法是在其所在的 package 首次加载时执行的;而 sync.Onece 可以在代码的任意位置初始化和调用,是在第一次用的它的时候才会初始化。

sync.Lock:锁

说到并发编程,就不得不谈一个老生常谈的问题,那就是资源竞争,因为一旦开启了多个 goroutine 去处理问题,那么这些 goroutine 就有可能在同一时间操作同一个系统资源,比如同一个变量、文件等等,如果不加控制的话,那么就会存在最后只有一个操作对资源生效,显然不是我们想要的结果。

示例:并发不安全

var num int = 1func add() {    num++}func main() {    go add()    go add()    go add()    time.Sleep(time.Second * 5)    fmt.Println("num:", num)}

运行结果:

2955

在 Go 语言中,有两种方式来控制并发安全,锁和原子操作。

sync.Mutex:互斥锁

互斥锁是一种最常用的控制并发安全的法式,它在同一时间只允许一个 goroutine 对共享资源进行访问。

互斥锁的声明方式如下:

var lock sync.Mutex

互斥锁有两个方法:

func (m *Mutex) Lock()     // 加锁func (m *Mutex) Unlock()   // 解锁

注意:

  1. 一个互斥锁只能同时被一个 goroutine 锁定,其它 goroutine 将阻塞,直到互斥锁被解锁才能加锁成功。

  2. 对一个未锁定的互斥锁进行解锁将会产生运行时错误。

  3. 对资源操作完成后,一定要解锁,否则会出现流程执行异常,死锁等问题。通常借助 defer。锁定后,立即使用 defer 语句保证互斥锁及时解锁。

示例:对上面并发不安全的例子稍做修改,加上互斥锁

package mainimport (    "fmt"    "sync")var num int = 1func add(wg *sync.WaitGroup, mu *sync.Mutex) {    mu.Lock() // 加锁    defer func() {        wg.Done()   // 计数器-1        mu.Unlock() // 解锁    }()    for i := 0; i < 1000; i++ {        num++    }}func main() {    var wg sync.WaitGroup    var mu sync.Mutex    wg.Add(3) // 开启3个goroutine,计数器加3    go add(&wg, &mu)    go add(&wg, &mu)    go add(&wg, &mu)    wg.Wait() // 等待所有协程执行完毕    fmt.Println("num:", num)}

运行结果:

3001

sync.RWMutex:读写锁

互斥锁的本质是当一个 goroutine 访问的时候,其他 goroutine 都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能,因为程序由原来的并行执行变成了串行执行。

其实,当我们对一个不会变化的数据只做“读”操作的话,是不存在资源竞争的问题的。因为数据是不变的,不管怎么读取,多少 goroutine 同时读取,都是可以的。

所以问题不是出在“读”上,而是出在“写”上,也就是修改数据。

由于修改的数据要同步,这样其他 goroutine 才可以感知到。所以真正的互斥应该是读取和修改、修改和修改之间,读和读是没有互斥操作的必要的

因此,衍生出另外一种锁,叫做读写锁:

  • 写写互斥
  • 读写互斥
  • 读读不互斥

从互斥锁和读写锁的源码可以看出,它们是同源的。读写锁的内部用互斥锁来实现写锁定操作之间的互斥。可以把读写锁看作是互斥锁的一种扩展。

读写锁的使用方法如下:

func (rw *RWMutex) Lock()     // 对写锁加锁func (rw *RWMutex) Unlock()   // 对写锁解锁func (rw *RWMutex) RLock()    // 对读锁加锁func (rw *RWMutex) RUnlock()  // 对读锁解锁

示例:

package mainimport (    "fmt"    "sync"    "time")var count int           // 全局变量countvar rwlock sync.RWMutex // 全局读写锁rwlockfunc read(i int) {    rwlock.RLock()    fmt.Printf("读 goroutine %d 读数据开始\n", i)    fmt.Printf("读 goroutine %d 读数据结束,读到: %d\n", i, count)    defer rwlock.RUnlock()}func write(i int) {    rwlock.Lock()    fmt.Printf("写 goroutine %d 写数据开始\n", i)    count++    fmt.Printf("写 goroutine %d 写数据结束,新值为: %d\n", i, count)    defer rwlock.Unlock()}func main() {    for i := 0; i < 3; i++ {        go read(i)    }    for i := 0; i < 3; i++ {        go write(i)    }    time.Sleep(time.Second * 5)    fmt.Println("final count:", count)}

运行结果:

读 goroutine 1 读数据开始读 goroutine 2 读数据开始读 goroutine 2 读数据结束,读到: 0读 goroutine 1 读数据结束,读到: 0写 goroutine 2 写数据开始写 goroutine 2 写数据结束,新值为: 1读 goroutine 0 读数据开始读 goroutine 0 读数据结束,读到: 1写 goroutine 1 写数据开始写 goroutine 1 写数据结束,新值为: 2写 goroutine 0 写数据开始写 goroutine 0 写数据结束,新值为: 3final count: 3

结果分析:

  • 首先,读 goroutine 1 和 2 均获得了读锁并同时进行了读操作,可以看出读操作并不互斥。
  • 而后面读写操作并交替进行,没有看到同时操作的情况,可以看出读写、写写操作互斥。

死锁

死锁是一种状态:当两个或以上的 goroutine 在执行过程中,因争夺共享资源处在互相等待的状态,如果没有外部干涉将会一直维持这种阻塞状态,我们称这时的系统发生了死锁。

死锁场景:

  1. Lock/Unlock 不成对

    • 这类情况最常见的场景就是对锁进行拷贝使用:即如果将带有锁结构的变量赋值给其他变量,锁的状态会复制。
    • 所以在使用锁的时候,我们应当尽量避免锁拷贝,并且保证 Lock() 和 Unlock() 成对出现。没有成对出现容易会出现死锁的情况,或者是 Unlock 一个未加锁的 Mutex 而导致 panic。
  2. 循环等待

    • 另一个容易造成死锁的场景就是循环等待,如 A 等 B,B 等 C,C 等 A 。
    • 比如两个 goroutine,一个 goroutine 先锁 mu1,再锁 mu2,另一个 goroutine 先锁 mu2,再锁 mu1;在它们分别进行第二次加锁操作的时候,彼此等待对方释放锁,这样就造成了循环等待,一直阻塞,形成死锁。

sync.Map:并发安全 Map

Go 语言内置的 Map 并不是线程安全的,在多个 goroutine 同时操作 map 时,会有并发问题。

示例:map 不能同时被多个 goroutine 读写

package mainimport (   "fmt"   "strconv"   "sync")var m = make(map[string]int)func getVal(key string) int {   return m[key]}func setVal(key string, value int) {   m[key] = value}func main() {   wg := sync.WaitGroup{}   wg.Add(10)   for i := 0; i < 10; i++ {      go func(num int) {         defer wg.Done()         key := strconv.Itoa(num)         setVal(key, num)         fmt.Printf("key=:%v,val:=%v\n", key, getVal(key))      }(i)   }   wg.Wait()}

运行结果:

fatal error: concurrent map writes

程序报错了,说明 map 不能同时被多个 goroutine 读写。

解决方案一:对 map 加锁

package mainimport (   "fmt"   "strconv"   "sync")var m = make(map[string]int)var mu sync.Mutexfunc getVal(key string) int {   return m[key]}func setVal(key string, value int) {   m[key] = value}func main() {   wg := sync.WaitGroup{}   wg.Add(10)   for i := 0; i < 10; i++ {      go func(num int) {         defer func() {            wg.Done()            mu.Unlock()         }()         key := strconv.Itoa(num)         mu.Lock()         setVal(key, num)         fmt.Printf("key=:%v,val:=%v\n", key, getVal(key))      }(i)   }   wg.Wait()}

运行结果:

key=:9,val:=9key=:4,val:=4key=:0,val:=0key=:1,val:=1key=:2,val:=2key=:3,val:=3key=:6,val:=6key=:7,val:=7key=:5,val:=5key=:8,val:=8

解决方案二:使用 sync 包提供的 map

sync 包中提供的一个开箱即用的并发安全版 map–sync.Map(在 Go 1.9 引入)。

sync.Map 不用初始化就可以使用,同时内置了如 Store、Load、LoadOrStore、Delete、Range 等操作方法。

示例:

package mainimport (   "fmt"   "sync")func main() {   var m sync.Map   // 1. 写入   m.Store("name", "zhangsan")   m.Store("age", 18)   // 2. 读取   age, _ := m.Load("age")   fmt.Println(age.(int))   // 3. 遍历:入参是一个函数   m.Range(       func(key, value interface{}) bool {           fmt.Printf("key is:%v, val is:%v\n", key, value)           return true       }   )   // 4. 删除   m.Delete("age")   age, ok := m.Load("age")   fmt.Println(age, ok)   // 5. 读取或写入   m.LoadOrStore("name", "zhangsan")   name, _ := m.Load("name")   fmt.Println(name)}

运行结果:

18key is:name, val is:zhangsankey is:age, val is:18        false                 zhangsan  
  1. 通过 store 方法写入两个键值对
  2. 读取 key 为 age 的值,读出来 age 为 18
  3. 通过 range 方法遍历 map 的 key 和 value
  4. 删除 key 为 age 的键值对,删除完之后,再次读取 age,age 为空,ok 为 false 表示 map 里没有这个 key
  5. LoadOrStore 尝试读取 key 为 name 的值,读取不到就写入键值对 name-zhangsan ,能读取到就返回原来 map 里的 name 对应的值

注意:

  • sync.Map 没有提供获取 map 数量的方法,需要我们在对 sync.Map 进行遍历时自行计算。
  • sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。

sync.Atomic:原子操作

除了前面介绍的锁 mutex 以外,还有一种解决并发安全的策略,就是原子操作(sync.Atomic)。

所谓原子操作是指这一系列的操作在 CPU 上的执行是一个不可分割的整体,显然要么全部执行,要么全部不执行,不会受到其他操作的影响,也就不会存在并发问题。

atomic 和 metux 的区别:

  • 使用方式:通常 metux 用于保护一段执行逻辑;而 atomic 主要是对变量进行操作。
  • 底层实现:metux 由操作系统调度器实现;而 atomic 操作由底层硬件指令支持,保证在 CPU 上执行不中断。所以 atomic 的性能也会随 CPU 个数增加而线性提升。

atomic 提供的方法:

func AddT(addr *T, delta T)(new T)func StoreT(addr *T, val T)func LoadT(addr *T) (val T)func SwapT(addr *T, new T) (old T)func CompareAndSwapT(addr *T, old, new T) (swapped bool)// T的类型是int32、int64、uint32、uint64和uintptr中的任意一种

示例:AddT

package mainimport (   "fmt"   "sync"   "sync/atomic")func main() {   var sum int32 =  0   var wg sync.WaitGroup   // 100个goroutine,每个goroutine都对sum+1,最后结果为100   for i := 0; i < 100; i++ {      wg.Add(1)      go func() {         defer wg.Done()         atomic.AddInt32(&sum, 1)      }()   }   wg.Wait()   fmt.Printf("sum is %d\n",sum)}

关键词: