几个概念

并行和并发

  • 并行 多线程程序在多个核的cpu上运行,就是并行 image.png|325

  • 并发 多线程程序在一个核的cpu上运行,就是并发 image.png|325

进程和线程

A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。比如你运行一个 Go 程序 go run main.go,操作系统就会创建一个进程。每个进程有自己独立的内存空间、文件描述符、环境变量等,不同进程之间的资源默认是隔离的。 B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。多个线程共享所属进程的所有资源(内存、文件句柄等) C.一个进程可以创建和撤销多个线程;同一个进程中的多个线程之间可以并发执行。

协程和线程

协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的。 线程:一个线程上可以跑多个协程,协程是轻量级的线程。

Goroutine

启动单个goroutine

goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU。Go语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。

启动goroutine的方式非常简单,只需要在调用的函数(普通函数和匿名函数)前面加上一个go关键字。

1func hello() {
2    fmt.Println("Hello Goroutine!")
3}
4func main() {
5    go hello()
6    fmt.Println("main goroutine done!")
7    time.sleep(time.Second)
8}

执行上面的代码你会发现,这一次先打印main goroutine done!,然后紧接着打印Hello Goroutine!。

启动多个goroutine

 1var wg sync.WaitGroup
 2
 3func hello(i int) {
 4    defer wg.Done() // goroutine结束就登记-1
 5    fmt.Println("Hello Goroutine!", i)
 6}
 7func main() {
 8
 9    for i := 0; i < 10; i++ {
10        wg.Add(1) // 启动一个goroutine就登记+1
11        go hello(i)
12    }
13    wg.Wait() // 等待所有登记的goroutine都结束
14}

若主协程退出,其他任务被迫终止。没有主协程,新协程也不进行。

runtime包

runtime.Gosched()

主动让渡,保证其他 goroutine 有机会执行

runtime.Goexit()

 1func main() {
 2
 3    go func() {
 4
 5        defer fmt.Println("A.defer")
 6
 7        func() {
 8
 9            defer fmt.Println("B.defer")
10
11            // 结束协程
12
13            runtime.Goexit()
14
15            defer fmt.Println("C.defer")
16
17            fmt.Println("B")
18
19        }()
20
21        fmt.Println("A")
22
23    }()
24
25    for {
26
27    }
28
29}

输出按顺序只有B.defer、A.defer

runtime.GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go代码。默认值是机器上的CPU核心数。例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

Go1.5版本之前,默认使用的是单核心执行。Go1.5版本之后,默认使用全部的CPU逻辑核心数。

Go语言中的操作系统线程和goroutine的关系:

  • 1.一个操作系统线程对应用户态多个goroutine。
  • 2.go程序可以同时使用多个操作系统线程。
  • 3.goroutine和OS线程是多对多的关系,即m:n。

Channel

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

channel存在的意义是什么? 答:因为 Go 的主线程不会自动等协程,所以要用 Channel 来通信(其他解决方案包括sync.WaitGrouptime.Sleep等)

channel 遵循 Go 的核心设计原则:不要通过共享内存通信,而通过通信共享内存

channel解决了数据竞争,自带阻塞,保证数据传递的原子性。

channel类型

channel是一种引用类型。声明通道类型的格式如下:

1var 变量 chan 元素类型
2
3var ch1 chan int
4var ch2 chan bool
5var cha3 chan []int

声明的通道后需要使用make函数初始化之后才能使用。

1make(chan 元素类型, [缓冲大小])
2
3ch4 := make(chan int)
4ch5 := make(chan bool)
5ch6 := make(chan []int)

channel操作

  • 发送
1ch := make(chan int)
2ch <- 10//把10发送到ch中
  • 接收
1x := <- ch // 从ch中接收值并赋值给变量x
2<-ch       // 从ch中接收值,忽略结果
  • 关闭
1close(ch) //这是唯一能关闭通道的方式

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。 未关闭的通道有以下特点: 1. 从空的无缓冲通道读取会永久阻塞(死锁),直到有数据可读或通道被关闭。 关闭后的通道有以下特点: 1.对一个关闭的通道再发送值就会导致panic。 2.关闭通道后,仍可以继续接收通道中剩余的数据 3.对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。 4.关闭一个已经关闭的通道会导致panic。

无缓冲与有缓冲通道

  • 无缓存通道/阻塞通道 image.png|450
1func main() {
2    ch := make(chan int)
3    ch <- 10
4    fmt.Println("发送成功")
5}

上述代码会通过编译,执行会报错deadlock,为什么会这样呢? 因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道必须有接收才能发送。下述是解决方案:

 1func recv(c chan int) {
 2    ret := <-c
 3    fmt.Println("接收成功", ret)
 4}
 5func main() {
 6    ch := make(chan int)
 7    go recv(ch) // 启用goroutine从通道接收值
 8    ch <- 10
 9    fmt.Println("发送成功")
10}

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。 使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。

  • 有缓冲通道 这就是操作系统里的生产消费者模型在go中的实现 image.png|450 我们可以在使用make函数初始化通道的时候为其指定通道的容量,例如:
1func main() {
2    ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
3    ch <- 10
4    fmt.Println("发送成功")
5}

判断通道是否关闭

记住一句 规则: 只有当你需要通知对方「我不再发送数据了」才需要 close () 单纯用完通道,不需要 close!

两种方式判断 方式 1:“comma ok” 模式(显式判断) 方式 2:for range遍历通道(隐式判断) 我们通常使用的是for i := range ch的方式来判断

 1func main() {
 2    ch1 := make(chan int)
 3    ch2 := make(chan int)
 4    // 开启goroutine将0~100的数发送到ch1中
 5    go func() {
 6        for i := 0; i < 100; i++ {
 7            ch1 <- i
 8        }
 9        close(ch1)
10    }()
11    // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
12    go func() {
13        for {
14            i, ok := <-ch1 // 通道关闭后再取值ok=false
15            if !ok {
16                break
17            }
18            ch2 <- i * i
19        }
20        close(ch2)
21    }()
22    // 在主goroutine中从ch2中接收值打印
23    for i := range ch2 { // 通道关闭后会退出for range循环
24        fmt.Println(i)
25    }
26}

单向通道

1.chan<- int是一个只能发送的通道,可以发送但是不能接收; 2.<-chan int是一个只能接收的通道,可以接收但是不能发送。

异常自查表

channelnil非空空的满了没满
接收阻塞接收值阻塞接收值接收值
发送阻塞发送值发送值阻塞发送值
关闭panic关闭成功,读完数据后返回零值关闭成功,返回零值关闭成功,读完数据后返回零值关闭成功,读完数据后返回零值
关闭已经关闭的通道也会panic

Goroutine池示例

生产消费者模型

  1package main
  2
  3
  4import (
  5
  6    "fmt"
  7
  8    "math/rand"
  9
 10)
 11
 12  
 13
 14type Job struct {
 15
 16    Id      int
 17
 18    RandNum int
 19
 20}
 21
 22  
 23
 24type Result struct {
 25
 26    job *Job
 27
 28    sum int
 29
 30}
 31
 32  
 33
 34func main() {
 35
 36  
 37
 38    jobChan := make(chan *Job, 128)
 39
 40    resultChan := make(chan *Result, 128)
 41
 42    //中间处理者
 43
 44    createPool(64, jobChan, resultChan)
 45
 46    //消费者
 47
 48    go func(resultChan chan *Result) {
 49
 50        for result := range resultChan {
 51
 52            fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
 53
 54                result.job.RandNum, result.sum)
 55
 56        }
 57
 58    }(resultChan)
 59
 60    var id int
 61
 62    //生产者
 63
 64    for {
 65
 66        id++
 67
 68        r_num := rand.Int()
 69
 70        job := &Job{
 71
 72            Id:      id,
 73
 74            RandNum: r_num,
 75
 76        }
 77
 78        jobChan <- job
 79
 80    }
 81
 82}
 83
 84  
 85
 86func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
 87
 88    for i := 0; i < num; i++ {
 89
 90        go func(jobChan chan *Job, resultChan chan *Result) {
 91
 92            for job := range jobChan {
 93
 94                r_num := job.RandNum
 95
 96                var sum int
 97
 98                for r_num != 0 {
 99
100                    tmp := r_num % 10
101
102                    sum += tmp
103
104                    r_num /= 10
105
106                }
107
108                r := &Result{
109
110                    job: job,
111
112                    sum: sum,
113
114                }
115
116                resultChan <- r
117
118            }
119
120        }(jobChan, resultChan)
121
122    }
123
124}

image.png

Sync

sync.WaitGroup

waitGroup本质就是个带阻塞功能的计数器。

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

方法名功能
(wg * WaitGroup) Add(delta int)计数器+delta
(wg *WaitGroup) Done()计数器-1
(wg *WaitGroup) Wait()阻塞直到计数器变为0
这个计数计的是任务数量

errgroup.Group

是sync.WaitGroup的增强版 errgroup = sync.WaitGroup + 错误处理 + 上下文取消

waitGroup.Go() 就是加一,函数结束就是减一 waitGroup.Wait() 等所有协程结束 → 计数器 = 0

sync.Cond

sync.Cond 是 Go 语言sync包提供的条件变量,用于解决 “多个 goroutine 等待某个条件满足后再执行” 的同步问题。它基于互斥锁(或读写锁)工作,相比 “轮询 + 睡眠” 的方式,能更高效地让 goroutine 等待 / 唤醒,避免 CPU 空耗。

方法作用关键注意事项
Wait()1. 释放持有的锁

2. 阻塞当前 goroutine,等待信号

3. 被唤醒后重新获取锁
必须在锁(Cond.L)的保护下调用,否则会 panic
Signal()唤醒一个等待的 goroutine(随机 / 按队列,取决于实现)无需持有锁,但通常在锁保护下调用以保证条件一致性
Broadcast()唤醒所有等待的 goroutine适合条件满足后需要所有等待者执行的场景

Context

介绍

context 是 channel 的 “高级封装” ctx.Done() 返回的就是一个 <-chan struct{}

1Done() <-chan struct{}

context的作用就是在不同的goroutine之间同步请求特定的数据、取消信号以及处理请求的截止日期。

context包主要提供了两种方式创建context:

  • context.Backgroud()
  • context.TODO() 上面的两种方式是创建根context,不具备任何功能,具体实践还是要依靠context包提供的With系列函数来进行派生:
1func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
2func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
3func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
4func WithValue(parent Context, key, val interface{}) Context
写法作用
<-ctx.Done()纯阻塞等待取消信号,直到收到信号才继续执行(适合 “只等退出,不做其他事”)
select { case <-ctx.Done(): ...; default: ... }轮询检查取消信号,没信号时执行 default 业务逻辑(适合 “边干活边等退出”)

withCancel取消控制

日常业务开发中我们往往为了完成一个复杂的需求会开多个gouroutine去做一些事情,这就导致我们会在一次请求中开了多个goroutine确无法控制他们,这时我们就可以使用withCancel来衍生一个context传递到不同的goroutine中,当我想让这些goroutine停止运行,就可以调用cancel来进行取消。

来看一个例子:

 1func main()  {
 2    ctx,cancel := context.WithCancel(context.Background())
 3    go Speak(ctx)
 4    time.Sleep(10*time.Second)
 5    cancel()
 6    time.Sleep(1*time.Second)
 7}
 8
 9func Speak(ctx context.Context)  {
10    for range time.Tick(time.Second){
11        select {
12        case <- ctx.Done():
13            fmt.Println("我要闭嘴了")
14            return
15        default:
16            fmt.Println("balabalabalabala")
17        }
18    }
19}

运行结果:

1balabalabalabala
2....省略
3balabalabalabala
4我要闭嘴了

我们使用withCancel创建一个基于Background的ctx,然后启动一个讲话程序,每隔1s说一话,main函数在10s后执行cancel,那么speak检测到取消信号就会退出。

并发安全和锁

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:

 1var x int64
 2var wg sync.WaitGroup
 3var lock sync.Mutex
 4
 5func add() {
 6    for i := 0; i < 5000; i++ {
 7        lock.Lock() // 加锁
 8        x = x + 1
 9        lock.Unlock() // 解锁
10    }
11    wg.Done()
12}
13func main() {
14    wg.Add(2)
15    go add()
16    go add()
17    wg.Wait()
18    fmt.Println(x)
19}

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。