我正在尝试并行处理Go中的递归问题,但我不确定做到这一点的最佳方法是什么。
我有一个递归函数,其工作原理如下:

func recFunc(input string) (result []string) {
    for subInput := range getSubInputs(input) {
        subOutput := recFunc(subInput)
        result = result.append(result, subOutput...)
    }
    result = result.append(result, getOutput(input)...)
}

func main() {
    output := recFunc("some_input")
    ...
}
因此,该函数将自己称为N时间(其中N在某种程度上为0),生成其自己的输出并返回列表中的所有内容。
现在,我要使此函数并行运行。但是我不确定最干净的方法是什么。我的想法:
  • 有一个“结果” channel ,所有函数调用都将其结果发送到该 channel 。
  • 在主函数中收集结果。
  • 有一个等待组,它确定何时收集所有结果。

  • 问题:我需要等待等待组并并行收集所有结果。我可以为此启动一个单独的go函数,但是如何退出该单独的go函数呢?
    func recFunc(input string) (result []string, outputChannel chan []string, waitGroup &sync.WaitGroup) {
        defer waitGroup.Done()
        waitGroup.Add(len(getSubInputs(input))
        for subInput := range getSubInputs(input) {
            go recFunc(subInput)
        }
        outputChannel <-getOutput(input)
    }
    
    func main() {
        outputChannel := make(chan []string)
        waitGroup := sync.WaitGroup{}
    
        waitGroup.Add(1)
        go recFunc("some_input", outputChannel, &waitGroup)
    
        result := []string{}
        go func() {
           nextResult := <- outputChannel
           result = append(result, nextResult ...)
        }
        waitGroup.Wait()
    }
    
    也许有更好的方法可以做到这一点?还是如何确保收集结果的匿名go函数在完成后就消失了?

    最佳答案

    tl; dr;

  • 递归算法应该对昂贵的资源(网络连接,goroutine,堆栈空间等)具有有限的限制。
    应该支持
  • 取消-确保不再需要结果时可以快速清理昂贵的操作
  • 分支遍历应支持错误报告;这样就可以使错误在堆栈中冒泡,并返回部分结果,而不会导致整个递归遍历失败。

  • 对于异步结果(无论是否使用递归),建议使用 channel 。另外,对于具有许多goroutine的长时间运行的作业,请提供一种取消方法(context.Context)以帮助进行清理。
    由于递归会导致资源的指数消耗,因此设置限制很重要(请参阅bounded parallelism)。
    下面是一个我经常使用异步任务的设计模式:
  • 始终支持采用context.Context取消
  • 任务
  • 所需的 worker 人数
  • 返回结果的chanchan error(只会返回一个错误或nil)
  • var (
        workers = 10
        ctx     = context.TODO() // use request context here - otherwise context.Background()
        input   = "abc"
    )
    
    resultC, errC := recJob(ctx, workers, input) // returns results & `error` channels
    
    // asynchronous results - so read that channel first in the event of partial results ...
    for r := range resultC {
        fmt.Println(r)
    }
    
    // ... then check for any errors
    if err := <-errC; err != nil {
        log.Fatal(err)
    }
    
    递归:
    由于递归在水平方向上快速扩展,因此需要一种一致的方法来将有限的 worker 填满工作,同时还需要确保何时释放 worker ,以便他们迅速从其他(过度劳累的) worker 那里接手工作。
    而不是创建管理者层,而是使用 worker 的协作对等系统:
  • 每个 worker 共享一个输入 channel
  • 递归输入(subIinputs)之前先检查
  • ,是否检查其他 worker 是否闲置
  • 如果是这样,则委派给该工作人员
  • 如果不是,则当前工作进程继续递归该分支


  • 使用此算法,有限数量的 worker 很快就变得工作饱和。任何较早完成分支机构的工作人员-都会很快从另一名工作人员中委派一个分支。最终,所有工作人员将用完所有分支,此时所有工作人员将被闲置(阻塞),并且递归任务可以完成。
    为此,需要进行一些仔细的协调。允许工作人员写入输入 channel 可帮助通过委派进行此同级协调。 “递归深度” WaitGroup用于跟踪何时所有分支都耗尽了所有工作程序。
    (包括上下文支持和错误链接-我更新了getSubInputs函数以采用ctx并返回可选的error):
    func recFunc(ctx context.Context, input string, in chan string, out chan<- string, rwg *sync.WaitGroup) error {
    
        defer rwg.Done() // decrement recursion count when a depth of recursion has completed
    
        subInputs, err := getSubInputs(ctx, input)
        if err != nil {
            return err
        }
    
        for subInput := range subInputs {
            rwg.Add(1) // about to recurse (or delegate recursion)
    
            select {
            case in <- subInput:
                // delegated - to another goroutine
    
            case <-ctx.Done():
                // context canceled...
    
                // but first we need to undo the earlier `rwg.Add(1)`
                // as this work item was never delegated or handled by this worker
                rwg.Done()
                return ctx.Err()
    
            default:
                // noone available to delegate - so this worker will need to recurse this item themselves
                err = recFunc(ctx, subInput, in, out, rwg)
                if err != nil {
                    return err
                }
            }
    
            select {
            case <-ctx.Done():
                // always check context when doing anything potentially blocking (in this case writing to `out`)
                // context canceled
                return ctx.Err()
    
            case out <- subInput:
            }
        }
    
        return nil
    }
    
    连接件:recJob创建:
  • 输入和输出 channel -由所有工作人员共享
  • “递归” WaitGroup检测何时所有 worker 都空闲

    然后可以安全地关闭
  • “输出” channel

  • 所有 worker 的
  • 错误 channel
  • 通过将初始输入写入输入 channel
  • 来启动递归工作负载
    func recJob(ctx context.Context, workers int, input string) (resultsC <-chan string, errC <-chan error) {
    
        // RW channels
        out := make(chan string)
        eC := make(chan error, 1)
    
        // R-only channels returned to caller
        resultsC, errC = out, eC
    
        // create workers + waitgroup logic
        go func() {
    
            var err error // error that will be returned to call via error channel
    
            defer func() {
                close(out)
                eC <- err
                close(eC)
            }()
    
            var wg sync.WaitGroup
            wg.Add(1)
            in := make(chan string) // input channel: shared by all workers (to read from and also to write to when they need to delegate)
    
            workerErrC := createWorkers(ctx, workers, in, out, &wg)
    
            // get the ball rolling, pass input job to one of the workers
            // Note: must be done *after* workers are created - otherwise deadlock
            in <- input
    
            errCount := 0
    
            // wait for all worker error codes to return
            for err2 := range workerErrC {
                if err2 != nil {
                    log.Println("worker error:", err2)
                    errCount++
                }
            }
    
            // all workers have completed
            if errCount > 0 {
                err = fmt.Errorf("PARTIAL RESULT: %d of %d workers encountered errors", errCount, workers)
                return
            }
    
            log.Printf("All %d workers have FINISHED\n", workers)
        }()
    
        return
    }
    
    最后,创建 worker :
    func createWorkers(ctx context.Context, workers int, in chan string, out chan<- string, rwg *sync.WaitGroup) (errC <-chan error) {
    
        eC := make(chan error) // RW-version
        errC = eC              // RO-version (returned to caller)
    
        // track the completeness of the workers - so we know when to wrap up
        var wg sync.WaitGroup
        wg.Add(workers)
    
        for i := 0; i < workers; i++ {
            i := i
            go func() {
                defer wg.Done()
    
                var err error
    
                // ensure the current worker's return code gets returned
                // via the common workers' error-channel
                defer func() {
                    if err != nil {
                        log.Printf("worker #%3d ERRORED: %s\n", i+1, err)
                    } else {
                        log.Printf("worker #%3d FINISHED.\n", i+1)
                    }
                    eC <- err
                }()
    
                log.Printf("worker #%3d STARTED successfully\n", i+1)
    
                // worker scans for input
                for input := range in {
    
                    err = recFunc(ctx, input, in, out, rwg)
                    if err != nil {
                        log.Printf("worker #%3d recurseManagers ERROR: %s\n", i+1, err)
                        return
                    }
                }
    
            }()
        }
    
        go func() {
            rwg.Wait() // wait for all recursion to finish
            close(in)  // safe to close input channel as all workers are blocked (i.e. no new inputs)
            wg.Wait()  // now wait for all workers to return
            close(eC)  // finally, signal to caller we're truly done by closing workers' error-channel
        }()
    
        return
    }
    

    关于go - 如何并行化递归函数,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/65548405/

    10-09 18:07
    查看更多