并发编程
概述
基本概念
- 并行 :在同一时刻 有多条指令在多个编译器上
- 并发 :在同一时刻 只能有一条指令执行 但是多个进程指令被快速的轮换执行 使得在宏观上有多个进程被同时执行的效果
如果我们把它具象化成现实中的概念
- 并行就是同一时刻 两个队列使用两台咖啡机
- 并发就是同一时刻 两个队列使用一台咖啡机
go语言的并发优势
第一 Go语言在语言层面上天然支持并发 (不像某个语言 23版本才勉强上线)
第二 并发编程的内存管理都是十分复杂的 而Go语言支持GC即 垃圾回收机制
Go语言为了支持并发编程而内置的上层API是基于 CSP (顺序通信进程) 模型 这就意味着显示锁都是可以避免的 而Go语言通过相册安全的通道发送和接受数据以实现同步 这大大简化了并发程序的编写
一般情况下 一个普通的桌面计算机系统跑十几二十个线程就会有点负载了 但是同样的这台计算机却能轻松的让成百上千甚至过万个 goroutine进行资源竞争
goroutine
goroutine是什么
goroutine是Go并发设计的核心 说到底 其实它是协程 但是它比线程更小 十几个goroutine在底层的体现可能是几个线程
Go语言内部帮你实现了帮你实现了这些goroutine之间的内存共享 执行它只需要极少的栈内存 大概(4~5kb) 正因为如此 可以同时运行成千上万个goroutine任务
goroutine比thread更高效 更简单 更轻便
创建goroutine
只需要在函数调用之前添加go关键字 就可以创建并发执行单元 开发人员无需了解任何细节 调度器会自动将其安排到合适的系统线程上执行
在并发编程里 我们通常想将一个过程切分成几块 并且然后让每个goroutine负责它的一部分 当一个程序运行时 它的主函数即在一个单独的goroutine中执行 我们把它叫做 main goroutine
而新的goroutine使用go语句来创建
代码演示如下
func testnewgor() {
for i := 0; i < 5; i++ {
fmt.Println("new goroutine say :", i)
time.Sleep(time.Second)
}
}
func main() {
go testnewgor()
for i := 0; i < 5; i++ {
fmt.Println("main goroutine say :", i)
time.Sleep(time.Second)
}
}
运行这段代码之后我们会发现主协程 新协程会同时打印语句
如果主goroutine退出
如果说主goroutine推出了 并不会有类似linux中孤儿进程的概念 其他的goroutine也会立即退出
runtime包
Gosched
runtime.gosched() 用于让出CPU时间片 让出当前协程的执行权限 调度器会安排其他等待的任务执行 并在下次的某个时刻从该位置开始恢复执行
这就像接力赛一样 A跑了一段时间遇到代码runtime.gosched() 之后将接力棒交给B 之后B跑了一段时间遇到代码runtime.gosched()之后将接力棒交给A
下面是示例代码
func main() {
go func() {
for i := 0; i < 5; i++ {
runtime.Gosched()
fmt.Println("world")
}
}()
// main gorotinue
for i := 0; i < 5; i++ {
fmt.Println("hello")
runtime.Gosched()
}
// 最后结果为 hello world hello world ... ...
}
Goexit
调用Goexit函数将会立即终止当前goroutine执行 调度器会确保所有的defer调用被执行
下面是示例代码演示
go func() {
defer fmt.Println("this is A")
runtime.Goexit()
defer fmt.Println("this is B")
fmt.Println("this is C")
}() // 只会打印 this is A 因为后面的延时调用语句还没来得及执行协程就退出了
// 不让主协程退出 观察其他携程的掩饰效果
for {
}
GOMAXPROCS
GOMAXPROCS在Go语言中是一个环境变量 它表示可以Go语言可以并发的最大核心数
如果是 runtime.GOMAXPROCS(size int)
函数 我们有两种用法
- 第一种是将参数设置0 此时会返回我们当前的最大核心数
- 第二种是将参数设置为其他正整数 此时核心会变为我们设置的值
channel
它和map类似 channel也是一个对于make创建的底层数据结构的引用
当我们复制了一个channel用于函数传参时 我们只是拷贝了一个channel引用 因此调用者和被调用者将使用同一个channel对象 和其他的引用类型一样 channel的零值也是nil
定义一个channel时 我们也需要定义发送到chanel值的类型 channel可以使用内置的make()函数来实现
make(chan Type) //等价于make(chan Type, 0)
make(chan Type, capacity)
当capacity等于0的时候 是无缓冲阻塞式读写的
当capacity大于0的时候 是有缓冲非阻塞的 直到写入的数据大于capacity才会阻塞住
channel通过操作符<-来接收和发送数据 发送和接收数据语法如下
channel <- value // 发送value到channel
<- channel // 接受并且丢弃所有数据
x := <- channel // 从channel接受数据 并且赋值给x
x , ok := <- channel // 功能同上 不过增加了一个bool类型的数据来检查通道是否关闭或者是否为空
在默认情况下 channel接受和发送数据都是阻塞的 除非另一端已经准备好了 这就让goroutine的同步变得简单 不需要显示的lock了
c := make(chan int)
go func() {
fmt.Println("子协程正在运行")
defer fmt.Println("子协程已结束")
c <- 666
}()
fmt.Println("主协程正在运行")
x := <-c
time.Sleep(time.Second)
fmt.Println("子协程发送的值为", x)
无缓冲的channel
无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道
这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个goroutine没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。
这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
我们上面的代码就是一个无缓冲channel 这里为了方便大家理解再发一遍
c := make(chan int)
go func() {
fmt.Println("子协程正在运行")
defer fmt.Println("子协程已结束")
c <- 666
}()
fmt.Println("主协程正在运行")
x := <-c
time.Sleep(time.Second)
fmt.Println("子协程发送的值为", x)
有缓冲的channel
有缓冲的channel创建方式如下
make(chan Type, capacity)
此时它阻塞的方式也发生了变化
- 如果缓冲区满了并且还在写数据此时会写入阻塞
- 如果缓冲区空了并且还在读数据此时会读取阻塞
range和close
我们可以通过close来关闭一个channel
close (chan)
- channel 不像文件一样需要经常去关闭 只有当你确实没有任何发送数据了 或者要结束range循环才关闭
- 关闭之后无法再发送任何的数据 发数据会引发panic异常
- 关闭后可以接受数据
- 接受数据会阻塞住
此外我们还可以通过range迭代来获取数据 一旦管道关闭 range循环就会结束
单向channel
默认情况下,通道是双向的,也就是,既可以往里面发送数据也可以同里面接收数据
但是,我们经常见一个通道作为参数进行传递而值希望对方是单向使用的,要么只让它发送数据,要么只让它接收数据,这时候我们可以指定通道的方向
单向channel变量的声明非常简单,如下:
var ch1 chan int // ch1是一个双向的管道
var ch2 chan<- float64 // ch2只能往里写入float64数据
var ch3 <-chan int // ch3只能用于接受int类型的数据
- chan<- 表示数据进入管道,要把数据写进管道,对于调用者就是输出。
- <-chan 表示数据从管道出来,对于调用者就是得到管道的数据,当然就是输入
我们可以将channel隐式的转化为单向队列只收或者只发 不能将单向的channel转化为普通channel
转换的语法如下
c := make(chan int, 3)
var send chan<- int = c // send-only
var recv <-chan int = c // receive-only
下面是完整的使用代码
func recv(out <-chan int) {
for x := range out {
fmt.Println(x)
}
}
func send(in chan<- int) {
for i := 0; i < 5; i++ {
in <- i * 100
}
close(in)
}
func main() {
c := make(chan int, 3)
go send(c)
recv(c)
time.Sleep(3 * time.Second)
}
定时器
Timer
timer是一个定时器 代表未来的一个单一事件 你可以告诉timer这个时间要等待的时间 它会提供一个channel 在将来的那个时间 channel提供了一个时间值
下面是示例代码
func main() {
// 创建定时器 两秒后定时器就会像自己的c字节发送一个time.TIME类似的元素值
timer1 := time.NewTimer(2 * time.Second)
t1 := time.Now() // 当前时间
fmt.Printf("t1 : %v\n", t1)
t2 := <-timer1.C
fmt.Println("t2:", t2)
}
我们在创建定时器之后的两秒钟会收到一个时间 之后我们可以将该时间和现在的时间对比一下 我们发现正好相差了两秒
Ticker
Ticker是一个定时触发的计时器 它会以一个间隔往channel中发送一个事件 而channel的接收者可以以固定的时间间隔从channel中读取事件
下面是示例代码
func main() {
// 创建一个定时器 每隔一秒像channel中发送一个事件
ticker := time.NewTicker(time.Second * 1)
i := 0
go func() {
for i = 0; i < 5; i++ {
<-ticker.C
println("goroutine say : ", i)
}
// 最后关闭ticker
ticker.Stop()
}()
for {
}
}
Select
Go语言提供了一个关键字select 通过select可以监听channel上的数据流动
select的用法和switch十分相似 由select选择一个新的模块 之后每个选择条件由case语句来描述
此外select语句对比switch语句来说有诸多的限制 其中最大的一条限制就是每一条语句里面必须有一个IO操作 大致结构如下
select {
case <-chan1: // 如果chan1成功读取到数据 则执行该操作
// ....
case chan2 <- 1: // 如果chan2成被写入数据 则执行官该操作
default:
}
在一个select语句中 Go语言会按照顺序评估每个发送和接受的语句 如果说有任意条语句可以执行 那么就从这些可执行的语句中任选一条来使用
如果说所有的通道都被阻塞了 那么此时有两种情况
- 如果给出了default语句 那么就会执行default语句 并且程序会从select语句后恢复
- 如果没有default语句 那么default语句将会被阻塞 直到一个case可用
func fib(c, q chan int) {
x, y := 1, 1
for {
select {
case c <- x: // 如果c输出了数据
x, y = y, x+y
case <-q: // 如果q被写入了数据
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 6; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fib(c, quit)
}
值得注意的是select中 case c <- x:
的含义 它的意思是 c可以写入数据的时候执行 那么c什么时候可以写入数据呢? 当然是有人要接受数据的时候
所以说我们的 fmt.Println(<-c)
语句有两个作用
- 接受数据并打印
- 让c可以写入数据
运行结果如下
超时
有时候我们会遇到goroutine阻塞的情况 那么我们如何避免整个程序陷入阻塞呢 我们可以通过设置超时来实现
语法如下
func main() {
c := make(chan int)
q := make(chan int)
o := make(chan bool)
go func() {
select {
case c <- 0: // 当c可以写入数据的时候
println("可写入")
case <-q: // 当q可以输出数据的时候
println("可输出")
case <-time.After(5 * time.Second):
println("超时")
o <- false
println("我运行完毕了")
break
}
}()
<-o
}
这段代码的最终结果就是打印一个超时之后结束进程