我最近在探索 Go 以及 goroutines 如何工作让我感到困惑。

我尝试使用 goroutines 将我之前编写的代码移植到 Go 中,但出现了 fatal error: all goroutines are asleep - deadlock! 错误。

我想要做的是使用 goroutines 处理列表中的项目,然后将处理后的值收集到一个新列表中。但是我在“收集”部分遇到了问题。

代码:

sampleChan := make(chan sample)
var wg sync.WaitGroup

// Read from contents list
for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}
wg.Wait()

// Read from sampleChan and put into a slice
var sampleList []sample
for s := range sampleChan {
    sampleList = append(sampleList, s)
}
close(sampleChan)

从 goroutines 收集结果的正确方法是什么?

我知道 slice 不是线程安全的,所以我不能让每个 goroutine 只是附加到 slice 。

最佳答案

你的代码几乎是正确的。有几个问题:首先,您在收集结果之前等待所有工作人员完成,其次您的 for 循环在 channel 关闭时终止,但 channel 仅在 for 循环终止后才关闭。

您可以通过在工作人员完成后异步关闭 channel 来修复代码:

for i, line := range contents {
    wg.Add(1)
    // Process each item with a goroutine and send output to sampleChan
    go newSample(line, *replicatePtr, *timePtr, sampleChan, &wg)
}

go func() {
    wg.Wait()
    close(sampleChan)
}()

for s := range sampleChan {
  ..
}

作为样式说明(以及在 https://github.com/golang/go/wiki/CodeReviewComments#synchronous-functions 之后),如果 newSample 是一个简单的同步函数,它不接受 WaitGroup 和 channel ,并且只生成其结果,那就更好了。然后工作代码看起来像:
for i, line := range contents {
    wg.Add(1)
    go func(line string) {
        defer wg.Done()
        sampleChan <- newSample(line, *replicatePtr, *timePtr)
    }(line)
}

这使您的并发原语保持在一起,除了简化 newSample 并使其更易于测试之外,它还允许您查看并发情况,并直观地检查 wg.Done() 是否始终被调用。如果您想重构代码以使用固定数量的工作人员,那么您的更改都将是本地的。

关于go - 使用 goroutines 处理值并将结果收集到一个 slice 中,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46010836/

10-13 04:24