等待任务模式是池模式的基本模式。 gobyexample code看起来不对,因为此代码使用的是缓冲通道。
在下面的代码中:
package main
import (
"fmt"
"runtime"
)
// pooling: You are a manager and you hire a team of employees. None of the new
// employees know what they are expected to do and wait for you to provide work.
// When work is provided to the group, any given employee can take it and you
// don't care who it is. The amount of time you wait for any given employee to
// take your work is unknown because you need a guarantee that the work your
// sending is received by an employee.
func pooling() {
jobCh := make(chan int) // signalling data on channel with guarantee - unbuffered
resultCh := make(chan int) // signalling data on channel with guarantee - unbuffered
workers := runtime.NumCPU() // 4 workers
for worker := 0; worker < workers; worker++ {
go func(emp int) {
var p int
for p = range jobCh {
fmt.Printf("employee %d : recv'd signal : %d\n", emp, p) // do the work
}
fmt.Printf("employee %d : recv'd shutdown signal\n", emp) // worker is signaled with closed state channel
resultCh <- p * 2
}(worker)
}
const jobs = 6
for jobNum := 1; jobNum <= jobs; jobNum++ {
jobCh <- jobNum
fmt.Println("manager : sent signal :", jobNum)
}
close(jobCh)
fmt.Println("manager : sent shutdown signal")
for a := 1; a <= jobs; a++ { //cannot range on 'resultCh'
fmt.Println("Result received: ", <-resultCh)
}
fmt.Println("-------------------------------------------------------------")
}
func main() {
pooling()
}
经理(
pooling()
)没有收到来自4个工人(雇员)的全部六个结果,如下所示,$ uname -a
Linux user 4.15.0-99-generic #100-Ubuntu SMP Wed Apr 22 20:32:56 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
$
$ go version
go version go1.14.1 linux/amd64
$
$ go install github.com/myhub/cs61a
$
$
$ bin/cs61a
manager : sent signal : 1
manager : sent signal : 2
manager : sent signal : 3
employee 3 : recv'd signal : 3
employee 3 : recv'd signal : 4
manager : sent signal : 4
manager : sent signal : 5
employee 3 : recv'd signal : 5
employee 3 : recv'd signal : 6
manager : sent signal : 6
manager : sent shutdown signal
employee 3 : recv'd shutdown signal
employee 2 : recv'd signal : 2
Result received: 12
employee 0 : recv'd signal : 1
employee 0 : recv'd shutdown signal
employee 2 : recv'd shutdown signal
Result received: 2
Result received: 4
employee 1 : recv'd shutdown signal
Result received: 0
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.pooling()
/home/../src/github.com/myhub/cs61a/Main.go:40 +0x25f
main.main()
/home/../src/github.com/myhub/cs61a/Main.go:45 +0x20
$
$
$ bin/cs61a
manager : sent signal : 1
employee 0 : recv'd signal : 1
manager : sent signal : 2
manager : sent signal : 3
manager : sent signal : 4
employee 3 : recv'd signal : 2
manager : sent signal : 5
manager : sent signal : 6
employee 2 : recv'd signal : 4
employee 2 : recv'd shutdown signal
employee 0 : recv'd signal : 5
manager : sent shutdown signal
Result received: 8
employee 0 : recv'd shutdown signal
Result received: 10
employee 1 : recv'd signal : 3
employee 1 : recv'd shutdown signal
Result received: 6
employee 3 : recv'd signal : 6
employee 3 : recv'd shutdown signal
Result received: 12
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.pooling()
/home/user/../github.com/myhub/cs61a/Main.go:40 +0x25f
main.main()
/home/user/../github.com/myhub/cs61a/Main.go:45 +0x20
编辑:
根据@Mark注释,将
resultCh <- p * 2
移入循环会产生以下死锁,这是有道理的,因为所有goroutine均被阻止。 (resultCh
)的缓冲通道有助于解决此问题吗?但是缓冲的通道不能保证发送信号。$ go install github.com/myhub/cs61a
$ bin/cs61a
manager : sent signal : 1
manager : sent signal : 2
manager : sent signal : 3
manager : sent signal : 4
employee 1 : recv'd signal : 2
employee 2 : recv'd signal : 3
employee 0 : recv'd signal : 1
employee 3 : recv'd signal : 4
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.pooling()
/home/user/../myhub/cs61a/Main.go:33 +0xfb
main.main()
/home/user/../myhub/cs61a/Main.go:46 +0x20
goroutine 6 [chan send]:
main.pooling.func1(0xc00001e0c0, 0xc00001e120, 0x0)
/home/user/../myhub/cs61a/Main.go:24 +0x136
created by main.pooling
/home/user/../myhub/cs61a/Main.go:20 +0xb7
goroutine 7 [chan send]:
main.pooling.func1(0xc00001e0c0, 0xc00001e120, 0x1)
/home/user/../myhub/cs61a/Main.go:24 +0x136
created by main.pooling
/home/user/../myhub/cs61a/Main.go:20 +0xb7
goroutine 8 [chan send]:
main.pooling.func1(0xc00001e0c0, 0xc00001e120, 0x2)
/home/user/../myhub/cs61a/Main.go:24 +0x136
created by main.pooling
/home/user/../myhub/cs61a/Main.go:20 +0xb7
goroutine 9 [chan send]:
main.pooling.func1(0xc00001e0c0, 0xc00001e120, 0x3)
/home/user/../myhub/cs61a/Main.go:24 +0x136
created by main.pooling
/home/user/../myhub/cs61a/Main.go:20 +0xb7
$
$
$
pooling()
无法接收所有工作人员的结果? Result received: 0
),在resultCh
上发送的数据始终假定为非零,为什么resultCh
接收零值?看来resultCh
已关闭。 注意:
resultCh
的正确工作不属于工作程序池模式的一部分。工人池模式仅确保使用jobCh
将工作成功提交给员工 最佳答案
为什么pooling()无法接收所有工作人员的结果?
goroutine(for p = range jobCh
)中的循环将处理所有请求。但是,发送到resultCh
的代码不在循环内,因此在每个go例程中只能执行一次(在循环完成后)。
这是根据@Marks注释;您对范围的回答是正确但无关紧要的。 for循环将遍历通道中的项目;当通道关闭时,循环结束,p
将包含上次迭代处理的值(如果有的话),并将该值发送到resultCh
。
这意味着将为每个go例程向resultCh
发送一个值(根据您的代码中的注释,在您的情况下为四个值)。如果您想为resultCh
上显示的每个值发布一个值到jobCh
,则需要将send发送到循环(playground)中:
var p int
for p = range jobCh {
fmt.Printf("employee %d : recv'd signal : %d\n", emp, p) // do the work
resultCh <- p * 2
}
fmt.Printf("employee %d : recv'd shutdown signal\n", emp)
Manager仅接收6个结果中的4个结果。接收到的结果之一为零(接收到的结果为0),在resultCh上发送的数据始终假定为非零,为什么resultCh接收到零值?看起来resultCh已关闭。
您无法预测每个go例程将处理多少个作业(日志显示两次运行之间的差异)。从您的日志中,我们可以知道哪个例程处理了哪些作业:
Employee 0: 1
Employee 1:
Employee 2: 2
Employee 3: 3, 4, 5, 6
您会注意到,员工1没有处理任何作业。这意味着员工循环
for p = range jobCh
终止而没有向p
分配任何内容,因此resultCh <- p * 2
将0(int的默认值)发送给resultCh
(根据@Shudipta Sharma的评论)。