目录

什么是并发

高并发场景下,进程、线程(协程)会发生资源竞争,导致数据脏读脏写、死锁等并发安全问题。

进程-线程-协程-并行-并发 - 9ong

并发同步之sync.WaitGroup

golang经常会有多协程并行的场景,而我们又需要这些协程全部结束后,继续完成main协程的后续行为,这个时候,我们需要一个谁来统计所有协程的运行结果,并通知main协程,不能仅仅依靠timeout、sleep的方式硬编写main协程等待其他协程时间,无法完美预估其他协程最晚完成时间。

所以就有了sync.WaitGroup,sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。比如启动了5个并发任务时,就将计数器值增加5,每个任务完成时通过调用Done()方法将计数器减1,当计数器值为0时,表示所有并发任务已经完成,通过调用Wait()来等待并发任务执行完,达到main协程知道所有协程任务完成的效果。

package main

import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func goodjob(num int) {
    defer wg.Done()
    fmt.Println(num, "good job.")
}
func main() {
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go goodjob(i)
    }
    fmt.Println("main goroutine do something.")
    wg.Wait()
    fmt.Println("main goroutine end.")
}

使用sync.WaitGroup后的输出:

main goroutine do something.
1 good job.
0 good job.
2 good job.
3 good job.
4 good job.
main goroutine end.

如果不使用WaitGroup,将不会看到goodjob协程的输出打印,只有main协程的输出,因为main协程并不会去等待其他协程,而是先执行并退出main协程,一旦退出main协程,其他协程也会被销毁(在销毁前还来不及执行,因为需要一些时间给GMP调度)

并发加载之sync.once

可能不同协程会加载相同的静态资源,在高并发场景下,这些资源其实只需要加载一次就可以,比如配置、关闭一次通道、连接远端资源等,sync包提供了Once解决方案,一次只执行一次的解决方案。

package main

import (
    "errors"
    "fmt"
    "sync"
)

type subConfig struct {
    url  string
    src  string
    desc string
}

var configs map[string]subConfig
var loadOnce sync.Once

func loadConfigs() {

    configs = map[string]subConfig{
        "icons":   loadIcons(),
        "ads":     loadAds(),
        "banners": loadBanners(),
    }
}

func getConfig(key string) (res subConfig, err error) {
    //这里如果我们判断nil的方式来规避并发的话,是不安全的
    // if configs == nil {
    // 	loadConfigs()
    // }
    //所以我们换成了sync.Once的Do方法来控制并发安全及避免通过锁的机制来控制资源并发访问带来的性能问题
    loadOnce.Do(loadConfigs)
    res, ok := configs[key]
    if !ok {
        errors.New("key invalid.")
    }
    return
}

func loadIcons() subConfig {
    //可能从文件、缓存、数据库等资源获取
    icons := subConfig{
        src:  "http://www.9ong.com/xxx.png",
        url:  "http://www.9ong.com/",
        desc: "9ong icon desc",
    }

    return icons
}

func loadBanners() subConfig {
    //可能从文件、缓存、数据库等资源获取
    banners := subConfig{
        src:  "http://www.9ong.com/xxx.png",
        url:  "http://www.9ong.com/",
        desc: "9ong banner desc",
    }

    return banners
}

func loadAds() subConfig {
    //可能从文件、缓存、数据库等资源获取
    ads := subConfig{
        src:  "http://www.9ong.com/xxx.png",
        url:  "http://www.9ong.com/",
        desc: "9ong ads desc",
    }

    return ads
}

func main() {
    //并发调用getConfig
    ads, err := getConfig("ads")
    if err != nil {
        fmt.Println(err)
    } else {
        fmt.Println(ads)
    }
}

在getConfig函数中,我们通过判断configs这个全局变量是否为nil,决定是否重新加载configs数据,在这种情况下就会出现即使判断了configs不是nil,也不意味着configs变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化加载configs全局资源变量的时候不会被其他的协程读写,但是这样做又会引发性能问题。

sync.Once内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

所以getConfig函数改造后,通过loadOnce.Do(loadConfigs)来实现互斥加载。

解决了并发加载问题,还不会因为多次加载浪费时间和空间及引入锁等机制带来的性能问题。

并发安全与锁之sync.mutex

package main

import (
    "fmt"
    "sync"
)

var love int32 = 10000 //公共资源love
var wg sync.WaitGroup

func getLove() {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        love = love - 1
    }
    fmt.Println("love left:", love)
}
func main() {
    for children := 0; children < 10; children++ {
        wg.Add(1)
        go getLove()
    }
    wg.Wait()
    fmt.Println("love is:", love)
}

由于启动了10个协程,这些协程存在竞争love资源的可能(多试几次或把初始值和循环数调大点),可能会出现以下的输出,其实也就是电商里说的超卖问题,按照逻辑love最后应该为0,但实际情况是love还剩下3045:

I:\src\go\src\helo>go run test.go
love left: 7763
love left: 8763
love left: 9000
love left: 6763
love left: 6452
love left: 5452
love left: 5220
love left: 4220
love left: 4045
love left: 3045
love is: 3045

超卖的问题在于并发不安全,并发时对临界资源的读写不安全,我们需要考虑在读写临界资源时,将并发/并行转换成串行,通常采用加锁的机制。

互斥锁(完全互斥)是一种常用的控制共享资源访问的方法,它能够保证同时只有一个协程可以访问共享资源。golang中使用sync包的Mutex类型来实现互斥锁:

package main

import (
    "fmt"
    "sync"
)

var love int32 = 10000 //公共资源love
var wg sync.WaitGroup
var mu sync.Mutex

func getLove() {
    defer wg.Done()
    mu.Lock()
    for i := 0; i < 1000; i++ {
        love = love - 1
    }
    mu.Unlock()
    fmt.Println("love left:", love)
}
func main() {
    for children := 0; children < 10; children++ {
        wg.Add(1)
        go getLove()
    }
    wg.Wait()
    fmt.Println("love is:", love)
}

前面我们说互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当并发去读取一个资源不涉及资源修改的时候是没有必要加锁的,如果使用完全互斥锁的话,会导致读多的协程会堵塞等待锁的释放,很影响效率,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为读锁和写锁,当一个协程获取读锁之后,其他的协程如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个协程获取写锁之后,其他的协程无论是获取读锁还是写锁都会等待。

也就是说,读写锁互斥,读读不会互斥,协程的读锁,不应其他协程的读锁获取,但影响写锁的获取;而写锁影响所有协程对该资源锁的获取。

编码如何处理呢?

我们只要对共享资源的读加读锁Rlock(),对共享资源的写加写锁lock()


var rwlock sync.RWMutex

func read(){
    rwlock.RLock()
    //read love action
    rwlock.RUnlock()
}

func write(){
    rwlock.Lock()
    //writen love actioin
    rwlock.Unlock()
}

并发安全之sync.map

golang内置map不是并发安全的,很好理解,map只是一种资源的数据结构而已,所以作为共享资源,其本身当然也是并发不安全的,对于map中key、value的操作也需要做并发安全处理,比如加锁。

虽然加锁可以解决,但golang也提供了一套并发安全的map操作类:sync.Map,其内置了Store、Load、LoadOrStore、Delete、Range等操作方法,这些方法都是并发安全的,可以理解为原子操作。

package main

import (
    "fmt"
    "strconv"
    "sync"
)

var sm = sync.Map{}
var wg = sync.WaitGroup{}

func setMap(i int) {
    key := strconv.Itoa(i)
    sm.Store(key, i)
    value, _ := sm.Load(key)
    fmt.Printf("k=:%v,v:=%v\n", key, value)
    wg.Done()

}

func main() {
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go setMap(i)
    }
    wg.Wait()
}

并发安全之atomic

针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好。Go语言中原子操作由内置的标准库sync/atomic提供。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

var love int32 = 10000 //公共资源love
var wg sync.WaitGroup
var mu sync.Mutex

func getLove() {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        love = love - 1
    }
    fmt.Println("love left:", love)
}
func mutexGetLove() {
    defer wg.Done()
    mu.Lock()
    for i := 0; i < 1000; i++ {
        love = love - 1
    }
    mu.Unlock()
    fmt.Println("love left:", love)
}
func atomicGetLove() {
    defer wg.Done()
    for i := 0; i < 1000; i++ {
        atomic.AddInt32(&love, -1)
    }
    fmt.Println("love left:", love)
}
func main() {
    for children := 0; children < 10; children++ {
        wg.Add(1)
        // go getLove()	//普通,并发不安全
        // go mutexGetLove() //互斥锁,并发安全,但性能开销大
        go atomicGetLove() //原子操作,并发安全,性能优于互斥锁机制
    }
    wg.Wait()
    fmt.Println("love is:", love)
}

atomic包提供了底层的原子级内存操作,虽然其对于同步算法的实现很有效果,但除了某些特殊的底层应用,我们建议使用channel或者sync.包实现同步会更好些。


TsingChan@9ong.com