聊聊Go的goroutine和Channel
- 一、使用channel等待任务结束
- sync.WaitGroup的用法
- 抽象代码
- 二、使用select进行调度
- 计时器的使用
- 三、总结
相关文章推荐:《聊聊Go的并发编程 (一)》
一、使用channel等待任务结束
使用案例还是在第一篇的第二节中写的代码,不过这里只需要一段即可。
package mainimport ( "fmt" "time")func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c}func worker(id int, c chan int) { for n := range c { fmt.Printf("Worker %d receive %c\n", id, n) }}func channelDemo() { var channels [10]chan<- int for i := 0; i < 10; i++ { channels[i] = createWorker(i) } for i := 0; i < 10; i++ { channels[i] <- 'a' + i } for i := 0; i < 10; i++ { channels[i] <- 'A' + i } time.Sleep(time.Millisecond)}func main() { channelDemo()}
这里咔咔将原始源码放在这里,如果你想跟着文章的节奏走,可以放到你的编辑器中进行操作。
那这段代码的问题是在哪里呢?
可以看到在channelDemo函数最后使用了一个sleep,这玩意在程序中可不能乱用。
说到这里给大家讲一个小故事,咔咔之前在网上看到一段就是加了sleep的代码。
然后一个新手程序员不明白为什么要加这个sleep,然后问题项目经理,项目经理说老板发现程序慢之后会找咱们优化,每一次优化把这个sleep的时间缩短即可。让老板感觉到我们在做事情。
新手就是新手对不懂得代码都会进行标注,然后就写了一句注释“项目经理要求这里运行缓慢,老板让优化时,代码得到明显的速度提升”。
这句话很不巧的是被老板给看见了,老板不认识代码,但文字还是认识的哈!于是,项目经理下马。
所以说对于sleep大多数都是一个测试状态,坚决不会出现在线上的,所以呢?就要解决代码中的这个sleep。
那么大家在回忆一下,在这里为什么要加sleep呢?
发送到channel的数据都是在另一个goroutine中进行并发打印的,并发打印就会出现问题,因为根本不会知道什么时候才打印完毕。
所以说这个sleep就会为了应对这个不知道什么时候打印完的问题,给个1毫秒让进行打印。
这种做法是非常不好的,接下来看看使用一种新的方式来解决这个问题。
以下代码是修改完的代码。
package mainimport ( "fmt")type worker struct { in chan int done chan bool}func createWorker(id int) worker { w := worker{ in: make(chan int), done: make(chan bool), } go doWorker(id, w.in, w.done) return w}func doWorker(id int, c chan int, done chan bool) { for n := range c { fmt.Printf("Worker %d receive %c\n", id, n) done <- true }}func channelDemo() { var workers [10]worker for i := 0; i < 10; i++ { workers[i] = createWorker(i) } for i := 0; i < 10; i++ { workers[i].in <- 'a' + i <-workers[i].done } for i := 0; i < 10; i++ { workers[i].in <- 'A' + i <-workers[i].done }}func main() { channelDemo()}
将这些代码复制到你的本地,然后再来看一下都做了什么改动。
- 首先为了参数传递方便,建立了一个结构体worker
- 并且把之前的worker方法改为了doWorker
- 这个时候createWorker方法返回值就不能是之前的channel了,而是创建的结构体worker
- 然后在createWorker方法里边把channel全部创建好。并且使用结构体给doWorker传递参数。
- 最终返回的就是结构体。
- 最后一步就是给channelDemo方法里边发送数据的俩个循环里边接收一下workers[i]的值即可。
看一下打印结果
是不是有点懵,这怎么成有序的了,如果是并行的那还有必要开那10个worker,直接按照顺序打印就好了。
现在就来解决这个问题,我不希望发一个任务然后等它结束。
最好的就是把他们全部发出去,等待它们全部结束再退出来。
代码实现如下
package mainimport ( "fmt")type worker struct { in chan int done chan bool}func createWorker(id int) worker { w := worker{ in: make(chan int), done: make(chan bool), } go doWorker(id, w.in, w.done) return w}func doWorker(id int, c chan int, done chan bool) { for n := range c { fmt.Printf("Worker %d receive %c\n", id, n) done <- true }}func channelDemo() { var workers [10]worker for i := 0; i < 10; i++ { workers[i] = createWorker(i) } for i, worker := range workers { worker.in <- 'a' + i } for i, worker := range workers { worker.in <- 'A' + i } for _, worker := range workers { <-worker.done <-worker.done }}func main() { channelDemo()}
在这里再进行打印看一下结果,你会发现代码是有问题的。
为什么将小写的字母打印出来,而打印大写字母时发生了报错呢?
这个就要追溯到代码中了,因为我们代码本身就写的有问题。
还是回归到本文长谈的一个问题,那就是对于所有的channel有发送数据就必须有接收数据,如果没有接收数据就会报错。
那么在代码中你能看出是那块只进行了发送数据,而没有接收数据吗?
这个问题就是当给channel把小写字母发送了后,就会到进入到doWorker方法,然后给done发送了一个true,但是接收done的方法是在后面,也就是说第二个发送大写字母时,就会发送循环的等待。
解决这个问题也很简单,我们只需要并发的发送done即可。
看到打印结果也是正确的。
本文给的这个案例在一般项目中是不会出现的,所以说不用纠结于此。
给的案例就是为了让大家更熟悉channel的机制而已。
对于这个解决方法还有一个方案解决,请看代码。
将代码还原到之前,然后在每一个发送字母的下面循环接收done即可。
对于这种多任务等待方式在go中有一个库是可以来做这个事情,接下来看一下。
sync.WaitGroup的用法
对于sync.WaitGroup的用法咔咔就不一一介绍了,简单的看一下源码的实现即可。
package mainimport ( "fmt" "sync")type worker struct { in chan int wg *sync.WaitGroup}func createWorker(id int, wg *sync.WaitGroup) worker { w := worker{ in: make(chan int), wg: wg, } go doWorker(id, w.in, wg) return w}func doWorker(id int, c chan int, wg *sync.WaitGroup) { for n := range c { fmt.Printf("Worker %d receive %c\n", id, n) wg.Done() }}func channelDemo() { var wg sync.WaitGroup var workers [10]worker for i := 0; i < 10; i++ { workers[i] = createWorker(i, &wg) } // 添加20个任务 wg.Add(20) for i, worker := range workers { worker.in <- 'a' + i } for i, worker := range workers { worker.in <- 'A' + i } wg.Wait()}func main() { channelDemo()}
这份源码也是非常简单的,具体修改得东西咔咔简单介绍一下。
- 首先取消了
channelDemo
这个方法中关于done的channel。 - 使用了
sync.WaitGroup
,并且给createWorker方法传递sync.WaitGroup - createWorker方法使用了 worker的结构体。
- 所以要先修改worker结构体,将之前的done改为wg *sync.WaitGroup即可
- 这样就可以直接用结构体的数据。
- 接着在doWorker方法中把最后一个参数done改为wg *sync.WaitGroup
- 将方法中的done改为wg.Done()
- 最后一步就是回到函数channelDemo中把任务数添加进去,然后在代码最后添加一个等待即可。
关于这块的内容先知道这么用即可,咔咔后期会慢慢的补充并且深入。
抽象代码
这块的代码看起来不是那么的完美的,接下来抽象一下。
这块代码有没有发现有点蹩脚,接下来我们使用函数式编程进行简单的处理。
package mainimport ( "fmt" "sync")type worker struct { in chan int done func()}func createWorker(id int, wg *sync.WaitGroup) worker { w := worker{ in: make(chan int), done: func() { wg.Done() }, } go doWorker(id, w) return w}func doWorker(id int, w worker) { for n := range w.in { fmt.Printf("Worker %d receive %c\n", id, n) w.done() }}func channelDemo() { var wg sync.WaitGroup var workers [10]worker for i := 0; i < 10; i++ { workers[i] = createWorker(i, &wg) } // 添加20个任务 wg.Add(20) for i, worker := range workers { worker.in <- 'a' + i } for i, worker := range workers { worker.in <- 'A' + i } wg.Wait()}func main() { channelDemo()}
这块代码看不明白就先放着,写的时间长了,你就会明白其中的含义了,学习东西不要钻牛角尖。
二、使用select进行调度
开头先给一个问题,假设现在有俩个channel,谁来的快先收谁应该怎么做?
package mainimport ( "fmt" "math/rand" "time")func generator() chan int { out := make(chan int) go func() { i := 0 for { // 随机睡眠1500毫秒以内 time.Sleep( time.Duration(rand.Intn(1500)) * time.Millisecond) // 往out这个channel发送i值 out <- i i++ } }() return out}func main() { // 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil // 在 select里面也是可以使用的,只不过是堵塞状态! var c1, c2 = generator(), generator() for { /** select 方式进行调度 使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁 这个select 可以是并行执行 channel管道 */ select { case n := <-c1: fmt.Printf("receive from c1 %d\n", n) case n := <-c2: fmt.Printf("receive from c2 %d\n", n) } }}
以上就是代码实现,代码注释也写的非常的清晰明了,就不过多的做解释了。
主要用法还是对channel的使用,在带上了一个新的概念select,可以在多个通道,那个通道先发送数据,就先执行谁,并且这个select也是可以并行执行channel管道。
在上文写的createWorker
和worker
俩个方法还记得吧!接下来就不在select里边直接打印了。
就使用之前写的俩个方法融合在一起,咔咔已将将源码写好了,接下来看一下实现。
package mainimport ( "fmt" "math/rand" "time")func worker(id int, c chan int) { for n := range c { fmt.Printf("Worker %d receive %d\n", id, n) }}func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c}func generator() chan int { out := make(chan int) go func() { i := 0 for { // 随机睡眠1500毫秒以内 time.Sleep( time.Duration(rand.Intn(1500)) * time.Millisecond) // 往out这个channel发送i值 out <- i i++ } }() return out}func main() { // 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil // 在 select里面也是可以使用的,只不过是堵塞状态! var c1, c2 = generator(), generator() // 直接调用createWorker方法,返回的就是一个channel w := createWorker(0) for { /** select 方式进行调度 使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁 这个select 可以是并行执行 channel管道 */ select { case n := <-c1: w <- n case n := <-c2: w <- n } }}
运行代码
看到运行结果得知也是没有问题的。
这段代码虽然运行没有任何问题,但是这样有什么缺点呢?
可以看下这段代码n := <-c1:
这里先收了一个值,然后在下边代码w <- n
又会阻塞住,这个是不好的。
那么希望是怎么执行的呢?
这种模式是在select中既可以收数据,也可以发数据,目前这个程序是编译不过的,请看修改后的源码。
package mainimport ( "fmt" "math/rand" "time")func worker(id int, c chan int) { for n := range c { fmt.Printf("Worker %d receive %d\n", id, n) }}func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c}func generator() chan int { out := make(chan int) go func() { i := 0 for { // 随机睡眠1500毫秒以内 time.Sleep( time.Duration(rand.Intn(1500)) * time.Millisecond) // 往out这个channel发送i值 out <- i i++ } }() return out}func main() { // 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil // 在 select里面也是可以使用的,只不过是堵塞状态! var c1, c2 = generator(), generator() // 直接调用createWorker方法,返回的就是一个channel var worker = createWorker(0) // 这个n如果放在for循环里边,就会一直打印0,因为从c1和c2收数据需要时间,所以会把0直接传给worker n := 0 // 使用这个标识告诉有没有值 hasValue := false for { // 利用nil channel的特性 var activeWorker chan<- int if hasValue { activeWorker = worker } /** select 方式进行调度 使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁 这个select 可以是并行执行 channel管道 */ select { case n = <-c1: // 收到值的话就标记为true hasValue = true case n = <-c2: // 收到值的话就标记为true hasValue = true case activeWorker <- n: hasValue = false } }}
这个模式还是有缺点的,因为n收c1和c2的速度跟消耗的速度是不一样的。
假设c1的生成速度特别快,一下子生成了1,2,3。那么最后输出的数据有可能就只有3,而1和2就无法输出了。
这个场景也是非常好模拟的,只需要在打印的位置加上一点延迟时间即可。
此时你会看到运行结果为0、7、12、20…中间很多的数字都没来得急打印。
因此我们就需要把收到的n存下来进行排队输出。
package mainimport ( "fmt" "math/rand" "time")func worker(id int, c chan int) { for n := range c { // 手动让消耗速度变慢 time.Sleep(5 * time.Second) fmt.Printf("Worker %d receive %d\n", id, n) }}func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c}func generator() chan int { out := make(chan int) go func() { i := 0 for { // 随机睡眠1500毫秒以内 time.Sleep( time.Duration(rand.Intn(1500)) * time.Millisecond) // 往out这个channel发送i值 out <- i i++ } }() return out}func main() { // 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil // 在 select里面也是可以使用的,只不过是堵塞状态! var c1, c2 = generator(), generator() // 直接调用createWorker方法,返回的就是一个channel var worker = createWorker(0) // 用来收n的值 var values []int for { // 利用nil channel的特性 var activeWorker chan<- int var activeValue int // 判断当values中有值时 if len(values) > 0 { activeWorker = worker // 取出索引为0的值 activeValue = values[0] } /** select 方式进行调度 使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁 这个select 可以是并行执行 channel管道 */ select { case n := <-c1: // 将收到的数据存到values中 values = append(values, n) case n := <-c2: // 将收到的数据存到values中 values = append(values, n) case activeWorker <- activeValue: // 送出去后就需要把values中的第一个值拿掉 values = values[1:] } }}
以上就是实现代码
此时在来看运行结果。
运行结果没有漏掉数据,并且也是无序的,这样就非常好了。
计时器的使用
上面的这个程序是退出不了的,我们想让它10s后就直接退出怎么做呢?
那就需要使用计时器来进行操作了。
package mainimport ( "fmt" "math/rand" "time")func worker(id int, c chan int) { for n := range c { // 手动让消耗速度变慢 time.Sleep(time.Second) fmt.Printf("Worker %d receive %d\n", id, n) }}func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c}func generator() chan int { out := make(chan int) go func() { i := 0 for { // 随机睡眠1500毫秒以内 time.Sleep( time.Duration(rand.Intn(1500)) * time.Millisecond) // 往out这个channel发送i值 out <- i i++ } }() return out}func main() { // 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil // 在 select里面也是可以使用的,只不过是堵塞状态! var c1, c2 = generator(), generator() // 直接调用createWorker方法,返回的就是一个channel var worker = createWorker(0) // 用来收n的值 var values []int // 返回的是一个channel tm := time.After(10 * time.Second) for { // 利用nil channel的特性 var activeWorker chan<- int var activeValue int // 判断当values中有值时 if len(values) > 0 { activeWorker = worker // 取出索引为0的值 activeValue = values[0] } /** select 方式进行调度 使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁 这个select 可以是并行执行 channel管道 */ select { case n := <-c1: // 将收到的数据存到values中 values = append(values, n) case n := <-c2: // 将收到的数据存到values中 values = append(values, n) case activeWorker <- activeValue: // 送出去后就需要把values中的第一个值拿掉 values = values[1:] case <-tm: fmt.Println("Bye") return } }}
这里就是源码的实现,可以看到直接在select中是可以收到tm的值的,也就说如果到了10s,就会执行打印bye的操作。
那么现在还有另外一个需求,就是如果在800毫秒的时间内还没有收到数据,可以做其它事情。
使用举一反三的思想,你可以思考一下这件事情应该怎么做。
其实也就很简单了,只需要在case中在设置一个定时器即可。
既然说到了这里就在给大家补充一个用法tick := time.Tick(time.Second)
同样也是在case中使用。
这样就可以每秒来显示一下values队列有多少数据。
这块的内容就结束了,最终给大家发一下源码,感兴趣的可以在自己的编辑器上试试看。
package mainimport ( "fmt" "math/rand" "time")func worker(id int, c chan int) { for n := range c { // 手动让消耗速度变慢 time.Sleep(time.Second) fmt.Printf("Worker %d receive %d\n", id, n) }}func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c}func generator() chan int { out := make(chan int) go func() { i := 0 for { // 随机睡眠1500毫秒以内 time.Sleep( time.Duration(rand.Intn(1500)) * time.Millisecond) // 往out这个channel发送i值 out <- i i++ } }() return out}func main() { // 这里需要明白如果代码为var c1, c2 chan int 则c1和c2都为nil // 在 select里面也是可以使用的,只不过是堵塞状态! var c1, c2 = generator(), generator() // 直接调用createWorker方法,返回的就是一个channel var worker = createWorker(0) // 用来收n的值 var values []int // 返回的是一个channel tm := time.After(10 * time.Second) tick := time.Tick(time.Second) for { // 利用nil channel的特性 var activeWorker chan<- int var activeValue int // 判断当values中有值时 if len(values) > 0 { activeWorker = worker // 取出索引为0的值 activeValue = values[0] } /** select 方式进行调度 使用场景:比如有多个通道,但我打算是哪一个通道先给我数据,我就先执行谁 这个select 可以是并行执行 channel管道 */ select { case n := <-c1: // 将收到的数据存到values中 values = append(values, n) case n := <-c2: // 将收到的数据存到values中 values = append(values, n) case activeWorker <- activeValue: // 送出去后就需要把values中的第一个值拿掉 values = values[1:] case <-time.After(800 * time.Millisecond): // 如果在800毫秒没有收到数据则提示超时 fmt.Println("timeout") case <-tick: // 每秒获取一下values中队列的长度 fmt.Println("queue len = ", len(values)) case <-tm: fmt.Println("Bye") return } }}
三、总结
本文主要就是对于goroutine和channel的大量练习。
文中的案例,有可能会一时半会理解不了,是没有关系的,不用钻牛角尖的。
等你在go的海洋里遨游的时间长了,有些东西就自然而然的明白了。
下一期的文章就是给大家实战一个go的并发爬虫项目。
以上就是聊聊Go的并发编程 (二)的详细内容,更多请关注Work网其它相关文章!