目录

并发

进程、线程、协程、并发、并行概念

协程goroutine

goroutine是golang的很好的机制

在Go语言编程中不需要自己写进程、线程、协程,需要让某个任务并发执行的时候,只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,在调用函数的时候在前面加上go关键字,就可以为一个函数创建一个goroutine。

注意:在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束。也就是说主协程结束,其他协程也就结束。

func hello() {
    fmt.Println("Hello Goroutine!")
}

//串行
func main() {
    hello()
    fmt.Println("main goroutine done!")
}

//并行
func main() {
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    time.Sleep(time.Second)
}


执行上面的代码你会发现,这一次先打印main goroutine done!,然后紧接着打印Hello Goroutine。

为什么main函数还加了time.Sleep,是因为前面我们说的main函数默认创建一个goroutine,一旦main函数创建的goroutine结束,所有在main函数中启动的goroutine也一同结束,而go hello(),创建协程需要一定时间,在这个时间里main函数就已经结束了,所以go hello()的协程也一并结束了,就不执行了。

Go语言中的操作系统线程和goroutine的关系:

  • 1.一个操作系统线程对应用户态多个goroutine。
  • 2.go程序可以同时使用多个操作系统线程。
  • 3.goroutine和OS线程是多对多的关系,即m:n。一个协程也就是用户线程是绑定到内核态线程(线程)的,多个协程可以同时绑定到同一个线程上,所以协程和OS线程是多对多的关系。

runtime包

  • runtime.Gosched()

    协程让出CPU时间片,就绪状态,等待重新安排

  • runtime.Goexit()

    退出当前协程

  • runtime.GOMAXPROCS

    Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。

channel通道

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

  • channel创建

    创建:声明+初始化

    声明:

    //var 变量 chan 元素类型
    var ch1 chan int   // 声明一个传递整型的通道
    var ch2 chan bool  // 声明一个传递布尔型的通道
    var ch3 chan []int // 声明一个传递int切片的通道
    

    初始化:

    //make(chan 元素类型, [缓冲大小])
    
    ch1 := make(chan int)  //channel的空值是nil
    
  • channel操作

    通道有发送(send)、接收(receive)和关闭(close)三种操作。

    发送和接收都使用<-符号。

    无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

    无缓冲的通道:

    注意:无缓冲的通道只有在有人接收值的时候才能发送值。缓存就是能够提供暂存数据的地方,如果没有缓存的通道,必须发送者和接收者同时存在,发送者才能发送数据到通道,接收者实时在通道中接收数据。所以无缓冲的通道会阻塞,知道另一个goroutine在该通道上执行接收操作。

    无缓冲通道,我们也可以称为同步通道。

    func recv(c chan int) {
        ret := <-c  //从ch中接收值,并赋值给变量ret
        fmt.Println("接收成功", ret)
    }
    func main() {
        ch := make(chan int)
        go recv(ch) // 启用goroutine从通道接收值
        ch <- 10 //把10发送到ch中
        fmt.Println("发送成功")
    }
    

    有缓冲的通道:

    func main() {
        ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
        ch <- 10
        fmt.Println("发送成功")
    }
    

    可以通过内置的close()函数关闭channel(如果你的管道不往里存值或者取值的时候一定记得关闭管道)

    close(ch)
    

    关闭通道一些特殊问题:

    • 1.对一个关闭的通道再发送值就会导致panic。
    • 2.对一个关闭的通道进行接收会一直获取值直到通道为空。
    • 3.对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
    • 4.关闭一个已经关闭的通道会导致panic。
  • channel遍历

    我们通常使用的是for range的方式判断通道是否被关闭,并从通道里遍历取值。

    // 在主goroutine中从ch2中接收值打印
    for i := range ch2 { // 通道关闭后会退出for range循环
        fmt.Println(i)
    }
    
  • channel生产消费demo

    package main
    
    import "fmt"
    
    func main() {
        ch1 := make(chan int)
        ch2 := make(chan int)
        // 开启goroutine将0~100的数发送到ch1中  【生产】
        go func() {
            for i := 0; i < 100; i++ {
                ch1 <- i
            }
            close(ch1)
        }()
        // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中 【消费->再加工】
        go func() {
            for {
                i, ok := <-ch1 // 通道关闭后再取值ok=false
                if !ok {
                    break
                }
                ch2 <- i * i
            }
            close(ch2)
        }()
        // 在主goroutine中从ch2中接收值打印  【输出】
        for i := range ch2 { // 通道关闭后会退出for range循环
            fmt.Println(i)
        }
    }
    
    
    
  • 单向通道

    在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的

    func squarer(out chan<- int, in <-chan int) {
        for i := range in {
            out <- i * i
        }
        close(out)
    }
    

    上面的例子中:

    chan<- int   这是一个只能发送的通道,可以发送但是不能接收;
    <-chan int   这是一个只能接收的通道,可以接收但是不能发送。
    
  • channel小结

channel nil 非空 空的 满了 没满
接收 阻塞 接收值 阻塞 接收值 接收值
发送 阻塞 发送值 发送值 阻塞 发送值
关闭 panic 关闭成功,读完数据后返回零值 关闭成功,返回零值 关闭成功,读完数据后返回零值 同上

多协程及同步实现sync.WaitGroup

为了解决多协程,且保证所有协程完成,主协程才退出,我们需要有个协程同步机制。

同步实现一直都是通过同步锁实现协同同步,golang中sync包提供了sync.WaitGroup实现goroutine的同步,判断所有协程(异步)任务是否都已经完成(wg.Wait())

sync.WaitGroup有三个方法:

func (wg * WaitGroup) Add(delta int){}
func (wg *WaitGroup) Done(){}
func (wg *WaitGroup) Wait(){}
var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done() // goroutine结束就登记-1
    fmt.Println("Hello Goroutine!", i)
}
func main() {

    for i := 0; i < 10; i++ {
        wg.Add(1) // 启动一个goroutine就登记+1
        go hello(i)
    }
    wg.Wait() // 等待所有登记的goroutine都结束
}

协程并发时,goroutine的调度是随时无序的,所以最后打印时不是按数字顺序的,和多核下线程并发类似

协程串行实现sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等、只处理一次写操作等

sync.Once 有个do方法

func (o *Once) Do(f func()) {}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
    icons = map[string]image.Image{
        "left":  loadIcon("left.png"),
        "up":    loadIcon("up.png"),
        "right": loadIcon("right.png"),
        "down":  loadIcon("down.png"),
    }
}

// Icon 是并发安全的
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

sync.Map

Go语言中内置的map不是并发安全的。

var m = sync.Map{}

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(n int) {
            key := strconv.Itoa(n)
            m.Store(key, n)
            value, _ := m.Load(key)
            fmt.Printf("k=:%v,v:=%v\n", key, value)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

goroutine池

goroutine池

定时器

package main

import (
	"fmt"
	"time"
)

func main() {


	timer1 := time.NewTimer(2 * time.Second)
	t1 := time.Now()
	fmt.Printf("t1:%v\n", t1)
	t2 := <-timer1.C  //执行定时器
	fmt.Printf("t2:%v\n", t2)
}

select

前面流程控制里已经介绍过一次select,select用于同时从多个通道接收数据,select负责监听case里的所有通道,直到其中一个通道处于ready就绪状态

  select {
    case <-chan1:
       // 如果chan1成功读到数据,则进行该case处理语句
    case chan2 <- 1:
       // 如果成功向chan2写入数据,则进行该case处理语句
    default:
       // 如果上面都没有成功,则进入default处理流程
    }
package main

import (
   "fmt"
   "time"
)

func test1(ch chan string) {
   time.Sleep(time.Second * 5)
   ch <- "test1"
}
func test2(ch chan string) {
   time.Sleep(time.Second * 2)
   ch <- "test2"
}

func main() {
   // 2个管道
   output1 := make(chan string)
   output2 := make(chan string)
   // 跑2个子协程,写数据
   go test1(output1)
   go test2(output2)
   // 用select监控
   select {
   case s1 := <-output1:
      fmt.Println("s1=", s1)
   case s2 := <-output2:
      fmt.Println("s2=", s2)
   }
}

并发安全与锁机制

在高并发场景,我们经常会遇到资源竞争问题,通常通过加锁及事务来解决并发导致数据读写错乱问题。

比如redis提供锁,sql提供事务

在golang中,多协程更容易出现资源竞争问题,golang在sync包中提供了锁机制,有两种锁:互斥锁和读写锁

  • 互斥锁

    互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。

    var x int64
    var wg sync.WaitGroup
    var lock sync.Mutex
    
    func add() {
        for i := 0; i < 5000; i++ {
            lock.Lock() // 加锁
            x = x + 1
            lock.Unlock() // 解锁
        }
        wg.Done()
    }
    func main() {
        wg.Add(2)
        go add()
        go add()
        wg.Wait()
        fmt.Println(x)
    }
    
  • 读写互斥锁

    互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在golang中使用sync包中的 RWMutex 类型。

    读写锁分为两种:读锁写锁。当一个goroutine获取读锁之后,其他的goroutine可以随时获取读锁,如果是获取写锁就要等待第一个goroutine的读锁释放;当一个goroutine获取写锁之后,其他的goroutine都要等待,不论是获取读锁还是写锁。

    var (
        x      int64
        wg     sync.WaitGroup
        lock   sync.Mutex
        rwlock sync.RWMutex
    )
    
    func write() {
        // lock.Lock()   // 加互斥锁
        rwlock.Lock() // 加写锁
        x = x + 1
        time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
        rwlock.Unlock()                   // 解写锁
        // lock.Unlock()                     // 解互斥锁
        wg.Done()
    }
    
    func read() {
        // lock.Lock()                  // 加互斥锁
        rwlock.RLock()               // 加读锁
        time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
        rwlock.RUnlock()             // 解读锁
        // lock.Unlock()                // 解互斥锁
        wg.Done()
    }
    
    func main() {
        start := time.Now()
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go write()
        }
    
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go read()
        }
    
        wg.Wait()
        end := time.Now()
        fmt.Println(end.Sub(start))
    }
    

原子操作atomic

互斥锁也是通过原子性解决并发安全问题,golang对于基础类型还提供了简单原子操作方式,比起互斥锁,性能更好。

golang原子操作由内置的标准库sync/atomic提供,目前支持更多的是计数级别的原子操作

var x int64
var l sync.Mutex
var wg sync.WaitGroup

// 普通版加函数
func add() {
    // x = x + 1
    x++ // 等价于上面的操作
    wg.Done()
}

// 互斥锁版加函数
func mutexAdd() {
    l.Lock()
    x++
    l.Unlock()
    wg.Done()
}

// 原子操作版加函数
func atomicAdd() {
    atomic.AddInt64(&x, 1)
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        // go add()       // 普通版add函数 不是并发安全的
        // go mutexAdd()  // 加锁版add函数 是并发安全的,但是加锁性能开销大
        go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
    }
    wg.Wait()
    end := time.Now()
    fmt.Println(x)
    fmt.Println(end.Sub(start))
}