我有以下代码,我尝试调用api 10000次,但出现错误:

package main

import (
    "fmt"

    "net/http"
    "runtime"
    "sync"
    "time"
)

func main() {

    nCPU := runtime.NumCPU()
    runtime.GOMAXPROCS(nCPU)

    var wg sync.WaitGroup
    totalRequests := 100000
    wg.Add(totalRequests)

    fmt.Println("Starting Go Routines")

    start := time.Now()
    total := 0

    for i := 0; i < totalRequests; i++ {

        go func(current int) {
            defer wg.Done()

            startFunc := time.Now()
            _, err := http.Get("http://127.0.0.1:8080/event/list")
            // resp, err := http.Get("https://graph.facebook.com/v2.4/me" + "?fields=id%2Cname&access_token=" + "CAACEdEose0cBAEpQvcsvVMQu5oZCyyDjcEPQi9yCdiXimm4F0AYexGHPZAJHgpyrFOJN5X1VMcicNJjlkaCquUqHMZAfRrtxx6K9cRIROrA0OmbqAqCcg8ZA3qJZCHCl68I1n4LtFb5qxPcotlP5ne5PBakK0OUa7sc6FAOWwByOnFtNZBpIe8XDeM4YFa33sDfftVUpZCoBgZDZD")

            if err != nil {
                fmt.Println(err)
            }
            // defer resp.Body.Close()
            elapsedFunc := time.Since(startFunc)
            fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total)
            total++

        }(i)

    }

    wg.Wait()
    elapsed := time.Since(start)
    fmt.Println("\nThe total time with cores", elapsed)
    fmt.Println("\nTerminating Program")
}

我得到的错误:
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 5390 ) took 1.619876633s No of requests completed 2781
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files
The request ( 7348 ) took 650.609825ms No of requests completed 1445

最佳答案

正如评论中其他人提到的那样,您的主要问题是您超出了进程的打开文件限制。

您可以轻松地使用 channel 来实现信号量以限制并发性:

totalRequests := 100000
concurrency := 1024
sem := make(chan bool, concurrency)

start := time.Now()
total := int32(0)

for i := 0; i < totalRequests; i++ {
    sem <- true

    go func(current int) {
        startTime := time.Now()

        // Make request here

        elapsedTime := time.Since(startTime)
        atomic.AddInt32(&total, 1)
        fmt.Printf("Request %d took %s. Requests completed: %d\n", current, elapsedTime, atomic.LoadInt32(&total))

        <-sem
    }(i)
}

for i := 0; i < cap(sem); i++ {
    sem <- true
}
elapsedTotal := time.Since(start)
fmt.Printf("\nTotal time elapsed: %s\n", elapsedTotal)

这会将并行请求的数量限制为concurrency中指定的任何数量。

如您所见,由于我们正在从潜在的并行goroutines修改该变量,因此使用total软件包对atomic变量进行了递增,当您不安全地对其进行修改时,它可能会产生错误的总数。

有关原始示例和Go中限制并发的说明,请参见此博客文章:http://jmoiron.net/blog/limiting-concurrency-in-go

编辑:

如下面的JimB所述,另一种常见的方法是在将goroutines喂给他们的同时,将其数量设为goroutines的concurrency。这是一个通用的do函数,可以用于此目的:
func do(total, concurrency int, fn func(int)) {
    workQueue := make(chan int, concurrency)

    var wg sync.WaitGroup
    wg.Add(concurrency)

    for i := 0; i < concurrency; i++ {
        go func() {
            for i := range workQueue {
                fn(i)
            }
            wg.Done()
        }()
    }
    for i := 0; i < total; i++ {
        workQueue <- i
    }
    close(workQueue)
    wg.Wait()
}

我们生成concurrency goroutines,然后开始将值发送到workQueue channel ,直到发送total。通过关闭workQueue channel ,我们有效地终止了goroutines中的范围循环。之后,我们只等所有剩余的goroutine完成运行即可。

对于有问题的用例,可以这样使用:
totalRequests := 1000000
concurrency := 1024

do(totalRequests, concurrency, func(i int) {
    // Make request here

    fmt.Printf("Request %d done.\n", i)
})

10-07 20:33