这一直是我存在的祸根。
type ec2Params struct {
sess *session.Session
region string
}
type cloudwatchParams struct {
cl cloudwatch.CloudWatch
id string
metric string
region string
}
type request struct {
ec2Params
cloudwatchParams
}
// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request
func main() {
sem := make(chan bool, maxRoutines)
for i := 0; i < maxRoutines; i++ {
sem <- true
}
req := make(chan request)
go func() { // This is my the producer
for _, arn := range arns {
arnCreds := startSession(arn)
for _, region := range regions {
sess, err := session.NewSession(
&aws.Config{****})
if err != nil {
failOnError(err, "Can't assume role")
}
req <- request{ec2Params: ec2Params{ **** }}
}
}
}()
for f := range(req) {
<- sem
if (ec2Params{}) != f.ec2Params {
go getEC2Metrics(****)
} else {
// I should be excercising this line of code too,
// but I'm not :(
go getMetricFromCloudwatch(****)
}
sem <- true
}
}
getEC2Metrics
和getCloudwatchMetrics
是要执行的goroutinefunc getMetricFromCloudwatch(cl cloudwatch.CloudWatch, id, metric, region string) {
// Magic
}
func getEC2Metrics(sess *session.Session, region string) {
ec := ec2.New(sess)
var ids []string
l, err := ec.DescribeInstances(&ec2.DescribeInstancesInput{})
if err != nil {
fmt.Println(err.Error())
} else {
for _, rsv := range l.Reservations {
for _, inst := range rsv.Instances {
ids = append(ids, *inst.InstanceId)
}
}
metrics := cfg.AWSMetric.Metric
if len(ids) >= 0 {
cl := cloudwatch.New(sess)
for _, id := range ids{
for _, metric := range metrics {
// For what I can tell, execution get stuck here
req <- request{ cloudwatchParams: ***** }}
}
}
}
}
}
main
和getEC2Metrics
中的匿名生产者都应异步地将数据发布到req
中,但是到目前为止,似乎从未处理过发布到通道中的任何getEC2Metrics
。看起来有些东西阻止我从goroutine内部发布,但是我什么也没找到。我很想知道如何进行此操作并产生预期的行为(这是一个实际上有效的信号量)。
实现的基础可以在这里找到:https://burke.libbey.me/conserving-file-descriptors-in-go/
最佳答案
太疯狂了,JimB的评论使车轮旋转,现在我解决了!
// Control concurrency and sync
var maxRoutines = 128
var sem chan bool
var req chan request // Not reachable inside getEC2Metrics
func getEC2Metrics(sess *session.Session, region string, req chan <- request ) {
....
....
for _, id := range ids{
for _, metric := range metrics {
req <- request{ **** }} // When using the global req,
// this would block
}
}
....
....
}
func main() {
sem := make(chan bool, maxRoutines)
for i := 0; i < maxRoutines; i++ {
sem <- true
}
req := make(chan request)
go func() {
// Producing tasks
}()
for f := range(req) {
<- sem // checking out tickets outside the goroutine does block
//outside of the goroutine
go func() {
defer func() { sem <- true }()
if (ec2Params{}) != f.ec2Params {
getEC2Metrics(****, req) // somehow sending the channel makes
// possible to publish to it
} else {
getMetricFromCloudwatch(****)
}
}()
}
}
有两个问题:
getEC2Metrics
不能正确处理全局通道req,因此在尝试发布到显然是作用域的通道时,它将使goroutines卡住alll,但事实并非如此(我真的不知道为什么呢)。 老实说,我对第二个项目很幸运,到目前为止,我还没有找到关于这个怪癖的任何文档,但是最后,我很高兴它能正常工作。