这一直是我存在的祸根。

type ec2Params struct {
    sess *session.Session
    region string
}

type cloudwatchParams struct {
    cl cloudwatch.CloudWatch
    id string
    metric string
    region string
}

type request struct {
    ec2Params
    cloudwatchParams
}

// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request

func main() {
    sem := make(chan bool, maxRoutines)
    for i := 0; i < maxRoutines; i++ {
        sem <- true
    }
    req := make(chan request)
    go func() { // This is my the producer
        for _, arn := range arns {
            arnCreds := startSession(arn)
            for _, region := range regions {
                sess, err := session.NewSession(
                    &aws.Config{****})
                if err != nil {
                    failOnError(err, "Can't assume role")
                }
                req <- request{ec2Params: ec2Params{ **** }}
            }
        }
    }()
    for f := range(req) {
        <- sem
        if (ec2Params{}) != f.ec2Params {
            go getEC2Metrics(****)
        } else {
            // I should be excercising this line of code too,
            // but I'm not :(
            go getMetricFromCloudwatch(****)
        }
        sem <- true
    }
}
getEC2MetricsgetCloudwatchMetrics是要执行的goroutine
func getMetricFromCloudwatch(cl cloudwatch.CloudWatch, id, metric, region string) {
    // Magic
}

func getEC2Metrics(sess *session.Session, region string) {
    ec := ec2.New(sess)
    var ids []string
    l, err := ec.DescribeInstances(&ec2.DescribeInstancesInput{})
    if err != nil {
        fmt.Println(err.Error())
    } else {
        for _, rsv := range l.Reservations {
            for _, inst := range rsv.Instances {
                ids = append(ids, *inst.InstanceId)
            }
        }
        metrics := cfg.AWSMetric.Metric
        if len(ids) >= 0 {
            cl := cloudwatch.New(sess)
            for _, id := range ids{
                for _, metric := range metrics {
                    // For what I can tell, execution get stuck here
                    req <- request{ cloudwatchParams: ***** }}
                }
            }
        }
    }
}
maingetEC2Metrics中的匿名生产者都应异步地将数据发布到req中,但是到目前为止,似乎从未处理过发布到通道中的任何getEC2Metrics
看起来有些东西阻止我从goroutine内部发布,但是我什么也没找到。我很想知道如何进行此操作并产生预期的行为(这是一个实际上有效的信号量)。

实现的基础可以在这里找到:https://burke.libbey.me/conserving-file-descriptors-in-go/

最佳答案

太疯狂了,JimB的评论使车轮旋转,现在我解决了!

// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request // Not reachable inside getEC2Metrics

func getEC2Metrics(sess *session.Session, region string, req chan <- request ) {
    ....
    ....
            for _, id := range ids{
                for _, metric := range metrics {
                    req <- request{ **** }} // When using the global req,
                                            // this would block
                }
            }
    ....
    ....
}

func main() {
    sem := make(chan bool, maxRoutines)
    for i := 0; i < maxRoutines; i++ {
        sem <- true
    }
    req := make(chan request)
    go func() {
        // Producing tasks
    }()
    for f := range(req) {
        <- sem // checking out tickets outside the goroutine does block
               //outside of the goroutine
        go func() {
            defer func() { sem <- true }()
            if (ec2Params{}) != f.ec2Params {
                getEC2Metrics(****, req) // somehow sending the channel makes
                                         // possible to publish to it
            } else {
                getMetricFromCloudwatch(****)
            }
        }()
    }
}

有两个问题:
  • 信号量没有锁定(我认为这是因为我正在goroutine内签入和签入 token ,因此可能存在争用情况)。
  • 出于某种原因,getEC2Metrics不能正确处理全局通道req,因此在尝试发布到显然是作用域的通道时,它将使goroutines卡住alll,但事实并非如此(我真的不知道为什么呢)。

  • 老实说,我对第二个项目很幸运,到目前为止,我还没有找到关于这个怪癖的任何文档,但是最后,我很高兴它能正常工作。

    08-04 16:08