我有以下代码,我尝试调用api 10000次,但出现错误:
package main
import (
"fmt"
"net/http"
"runtime"
"sync"
"time"
)
func main() {
nCPU := runtime.NumCPU()
runtime.GOMAXPROCS(nCPU)
var wg sync.WaitGroup
totalRequests := 100000
wg.Add(totalRequests)
fmt.Println("Starting Go Routines")
start := time.Now()
total := 0
for i := 0; i < totalRequests; i++ {
go func(current int) {
defer wg.Done()
startFunc := time.Now()
_, err := http.Get("http://127.0.0.1:8080/event/list")
// resp, err := http.Get("https://graph.facebook.com/v2.4/me" + "?fields=id%2Cname&access_token=" + "CAACEdEose0cBAEpQvcsvVMQu5oZCyyDjcEPQi9yCdiXimm4F0AYexGHPZAJHgpyrFOJN5X1VMcicNJjlkaCquUqHMZAfRrtxx6K9cRIROrA0OmbqAqCcg8ZA3qJZCHCl68I1n4LtFb5qxPcotlP5ne5PBakK0OUa7sc6FAOWwByOnFtNZBpIe8XDeM4YFa33sDfftVUpZCoBgZDZD")
if err != nil {
fmt.Println(err)
}
// defer resp.Body.Close()
elapsedFunc := time.Since(startFunc)
fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total)
total++
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Println("\nThe total time with cores", elapsed)
fmt.Println("\nTerminating Program")
}
我得到的错误:
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445
最佳答案
正如评论中其他人提到的那样,您的主要问题是您超出了进程的打开文件限制。
您可以轻松地使用 channel 来实现信号量以限制并发性:
totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)
start := time.Now()
total := int32(0)
for i := 0; i < totalRequests; i++ {
sem <- true
go func(current int) {
startTime := time.Now()
// Make request here
elapsedTime := time.Since(startTime)
atomic.AddInt32(&total, 1)
fmt.Printf("Request %d took %s. Requests completed: %d\n", current, elapsedTime, atomic.LoadInt32(&total))
<-sem
}(i)
}
for i := 0; i < cap(sem); i++ {
sem <- true
}
elapsedTotal := time.Since(start)
fmt.Printf("\nTotal time elapsed: %s\n", elapsedTotal)
这会将并行请求的数量限制为
concurrency
中指定的任何数量。如您所见,由于我们正在从潜在的并行goroutines修改该变量,因此使用
total
软件包对atomic
变量进行了递增,当您不安全地对其进行修改时,它可能会产生错误的总数。有关原始示例和Go中限制并发的说明,请参见此博客文章:http://jmoiron.net/blog/limiting-concurrency-in-go
编辑:
如下面的JimB所述,另一种常见的方法是在将goroutines喂给他们的同时,将其数量设为goroutines的
concurrency
。这是一个通用的do
函数,可以用于此目的:func do(total, concurrency int, fn func(int)) {
workQueue := make(chan int, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for i := range workQueue {
fn(i)
}
wg.Done()
}()
}
for i := 0; i < total; i++ {
workQueue <- i
}
close(workQueue)
wg.Wait()
}
我们生成
concurrency
goroutines,然后开始将值发送到workQueue
channel ,直到发送total
。通过关闭workQueue
channel ,我们有效地终止了goroutines中的范围循环。之后,我们只等所有剩余的goroutine完成运行即可。对于有问题的用例,可以这样使用:
totalRequests := 1000000
concurrency := 1024
do(totalRequests, concurrency, func(i int) {
// Make request here
fmt.Printf("Request %d done.\n", i)
})