我有一个大型日志文件,您想并行分析。

我有以下代码:

package main

import (
    "bufio"
    "fmt"
    "os"
    "time"
)

func main() {
    filename := "log.txt"
    threads := 10

    // Read the  file
    file, err := os.Open(filename)
    if err != nil {
        fmt.Println("Could not open file with the database.")
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)

    // Channel for strings
    tasks := make(chan string)

    // Run the threads that catch events from the channel and understand one line of the log file
    for i := 0; i < threads; i++ {
        go parseStrings(tasks)
    }

    // Start a thread load lines from a file into the channel
    go getStrings(scanner, tasks)

    // At this point I have to wait until all of the threads executed
    // For example, I set the sleep
    for {
        time.Sleep(1 * time.Second)
    }
}

func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
    for scanner.Scan() {
        s := scanner.Text()
        tasks <- s
    }
}

func parseStrings(tasks <-chan string) {
    for {
        s := <-tasks
        event := parseLine(s)
        fmt.Println(event)
    }
}

func parseLine(line string) string {
    return line
}

其实,当我等待所有线程结束时?
建议我创建一个单独的线程,在其中添加结果,但是如何添加呢?

最佳答案

使用管道模式和“扇出/扇入”模式:

package main

import (
    "bufio"
    "fmt"
    "strings"
    "sync"
)

func main() {
    file := "here is first line\n" +
        "here is second line\n" +
        "here is line 3\n" +
        "here is line 4\n" +
        "here is line 5\n" +
        "here is line 6\n" +
        "here is line 7\n"
    scanner := bufio.NewScanner(strings.NewReader(file))

    // all lines onto one channel
    in := getStrings(scanner)

    // FAN OUT
    // Multiple functions reading from the same channel until that channel is closed
    // Distribute work across multiple functions (ten goroutines) that all read from in.
    xc := fanOut(in, 10)

    // FAN IN
    // multiplex multiple channels onto a single channel
    // merge the channels from c0 through c9 onto a single channel
    for n := range merge(xc) {
        fmt.Println(n)
    }
}

func getStrings(scanner *bufio.Scanner) <-chan string {
    out := make(chan string)
    go func() {
        for scanner.Scan() {
            out <- scanner.Text()
        }
        close(out)
    }()
    return out
}

func fanOut(in <-chan string, n int) []<-chan string {
    var xc []<-chan string
    for i := 0; i < n; i++ {
        xc = append(xc, parseStrings(in))
    }
    return xc
}

func parseStrings(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        for n := range in {
            out <- parseLine(n)
        }
        close(out)
    }()
    return out
}

func parseLine(line string) string {
    return line
}

func merge(cs []<-chan string) <-chan string {
    var wg sync.WaitGroup
    wg.Add(len(cs))

    out := make(chan string)
    for _, c := range cs {
        go func(c <-chan string) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Check it out on the playground

09-10 08:31
查看更多