目录

并发

进程、线程、协程、并发、并行概念

协程goroutine

goroutine是golang的很好的机制

在Go语言编程中你不需要去自己写进程、线程、协程,需要让某个任务并发执行的时候,只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。

注意:在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束。

func hello() {
    fmt.Println("Hello Goroutine!")
}
func main() {
    hello()
    fmt.Println("main goroutine done!")
}

func main() {
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}


执行上面的代码你会发现,这一次先打印main goroutine done!,然后紧接着打印Hello Goroutine。

为什么main函数还加了time.Sleep,是因为前面我们说的main函数默认创建一个goroutine,一旦main函数创建的goroutine结束,所有在main函数中启动的goroutine也一同结束,而go hello(),创建协程需要一定时间,在这个时间里main函数就已经结束了,所以go hello()的协程也一并结束了,就不执行了。

Go语言中的操作系统线程和goroutine的关系:

  • 1.一个操作系统线程对应用户态多个goroutine。
  • 2.go程序可以同时使用多个操作系统线程。
  • 3.goroutine和OS线程是多对多的关系,即m:n。

多协程及同步实现sync.WaitGroup

同步实现一直都是通过同步锁实现协同同步,golang中sync包提供了sync.WaitGroup实现goroutine的同步,判断所有协程(异步)任务是否都已经完成(wg.Wait())

sync.WaitGroup有三个方法:

func (wg * WaitGroup) Add(delta int){}
func (wg *WaitGroup) Done(){}
func (wg *WaitGroup) Wait(){}
var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done() // goroutine结束就登记-1
    fmt.Println("Hello Goroutine!", i)
}
func main() {

    for i := 0; i < 10; i++ {
        wg.Add(1) // 启动一个goroutine就登记+1
        go hello(i)
    }
    wg.Wait() // 等待所有登记的goroutine都结束
}

协程并发时,goroutine的调度是随时无序的,所以最后打印时不是按数字顺序的,和多核下线程并发类似

协程串行实现sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等、只处理一次写操作等

sync.Once 有个do方法

func (o *Once) Do(f func()) {}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 是并发安全的
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

sync.Map

Go语言中内置的map不是并发安全的。

var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

runtime包

  • runtime.Gosched()

    协程让出CPU时间片,就绪状态,等待重新安排

  • runtime.Goexit()

    退出当前协程

  • runtime.GOMAXPROCS

    Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。

channel通道

  • channel创建
# make(chan 元素类型, [缓冲大小])

ch := make(chan int)
  • channel操作

通道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号。

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

无缓冲的通道:

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 启用goroutine从通道接收值
    ch <- 10
    fmt.Println("发送成功")
}

有缓冲的通道:

func main() {
    ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
    ch <- 10
    fmt.Println("发送成功")
}

可以通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)

  • channel遍历

我们通常使用的是for range的方式判断通道是否被关闭,并从通道里遍历取值。

// 在主goroutine中从ch2中接收值打印
for i := range ch2 { // 通道关闭后会退出for range循环
    fmt.Println(i)
}
  • channel小结
channel nil 非空 空的 满了 没满
接收 阻塞 接收值 阻塞 接收值 接收值
发送 阻塞 发送值 发送值 阻塞 发送值
关闭 panic 关闭成功,读完数据后返回零值 关闭成功,返回零值 关闭成功,读完数据后返回零值 同上

goroutine池

goroutine池

select

前面流程控制里已经介绍过一次select,select用于同时从多个通道接收数据,select负责监听case里的所有通道,直到其中一个通道处于ready就绪状态

  select {
    case <-chan1:
       // 如果chan1成功读到数据,则进行该case处理语句
    case chan2 <- 1:
       // 如果成功向chan2写入数据,则进行该case处理语句
    default:
       // 如果上面都没有成功,则进入default处理流程
    }
package main

import (
   "fmt"
   "time"
)

func test1(ch chan string) {
   time.Sleep(time.Second * 5)
   ch <- "test1"
}
func test2(ch chan string) {
   time.Sleep(time.Second * 2)
   ch <- "test2"
}

func main() {
   // 2个管道
   output1 := make(chan string)
   output2 := make(chan string)
   // 跑2个子协程,写数据
   go test1(output1)
   go test2(output2)
   // 用select监控
   select {
   case s1 := <-output1:
      fmt.Println("s1=", s1)
   case s2 := <-output2:
      fmt.Println("s2=", s2)
   }
}

互斥锁sync.Mutex

在高并发场景,我们经常会遇到资源竞争问题,通常通过加锁及事务来解决并发导致数据读写错乱问题。

比如redis提供锁,sql提供事务

在golang中,多协程更容易出现资源竞争问题,golang在sync包中提供了锁机制

  • 互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 5000; i++ {
        lock.Lock() // 加锁
        x = x + 1
        lock.Unlock() // 解锁
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}
  • 读写互斥锁

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在golang中使用sync包中的 RWMutex 类型。

读写锁分为两种:读锁写锁。当一个goroutine获取读锁之后,其他的goroutine可以随时获取读锁,如果是获取写锁就要等待第一个goroutine的读锁释放;当一个goroutine获取写锁之后,其他的goroutine都要等待,不论是获取读锁还是写锁。

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    // lock.Lock()   // 加互斥锁
    rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
    rwlock.Unlock()                   // 解写锁
    // lock.Unlock()                     // 解互斥锁
    wg.Done()
}

func read() {
    // lock.Lock()                  // 加互斥锁
    rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
    rwlock.RUnlock()             // 解读锁
    // lock.Unlock()                // 解互斥锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}

原子操作atomic

互斥锁也是通过原子性解决并发安全问题,golang对于基础类型还提供了简单原子操作方式,比起互斥锁,性能更好。

golang原子操作由内置的标准库sync/atomic提供,目前支持更多的是计数级别的原子操作

var x int64
var l sync.Mutex
var wg sync.WaitGroup

// 普通版加函数
func add() {
    // x = x + 1
    x++ // 等价于上面的操作
    wg.Done()
}

// 互斥锁版加函数
func mutexAdd() {
    l.Lock()
    x++
    l.Unlock()
    wg.Done()
}

// 原子操作版加函数
func atomicAdd() {
    atomic.AddInt64(&x, 1)
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        // go add()       // 普通版add函数 不是并发安全的
        // go mutexAdd()  // 加锁版add函数 是并发安全的,但是加锁性能开销大
        go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(x)
    fmt.Println(end.Sub(start))
}

转载请标注来源:@tsingchan