我正在实现一个服务器以流式处理许多浮点数。我需要一些帮助来设计系统以实现以下目的:

  • 音频进程必须独立并且即使没有任何请求进入也可以正常工作。我当前的方法是使DataProcess函数等待直到有请求。
  • 因为 channel 只能将数据提供给1个请求,所以2个或更多请求如何获得我在DataProcess中准备的数据?
  • 要实际流式传输数据,请求处理程序不能等待整个DataProcess完成,无论如何,只要在DataProcess中完成每次迭代,就可以立即提供处理程序数据吗?

  • 任何答复表示赞赏。这是我目前的想法:
    package main
    
    import (
    "fmt"
    "io"
    "net/http"
    "strconv"
    "time"
    )
    
    func main() {
        c := AudioProcess()
        handleHello := makeHello(c)
    
        http.HandleFunc("/", handleHello)
        http.ListenAndServe(":8000", nil)
    }
    
    func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
        return func(w http.ResponseWriter, r *http.Request) {
            for item := range c { // this loop runs when channel c is closed
                io.WriteString(w, item)
            }
        }
    }
    
    func AudioProcess() chan string {
        c := make(chan string)
        go func() {
            for i := 0; i <= 10; i++ { // Iterate the audio file
                c <- strconv.Itoa(i) // have my frame of samples, send to channel c
                time.Sleep(time.Second)
                fmt.Println("send ", i) // logging
            }
            close(c) // done processing, close channel c
            }()
            return c
        }
    

    最佳答案

    我不完全确定这是否可以解决您的问题,因为我还不完全了解您的用例,但是尽管如此,我还是在下面提出了一个解决方案。

    我将Gin用于HTTP路由器是因为它对我来说比较舒适,但是我敢肯定您可以修改代码以适合您的代码。我很着急(抱歉),所以可能有我不知道的问题,但是请告诉我是否有问题。

    简而言之:

  • 我创建了一个Manager,它处理了几个Client。它还包含一个sync.Mutex,以确保在任何给定时间只有一个线程在修改clients
  • 有一个InitBackgroundTask(),它将生成一个随机的float64号,并将其传递给clients中的所有Manager(如果有)。如果没有clients,我们就 sleep ,然后继续...
  • 索引处理程序处理添加和删除客户端。通过UUID标识客户端;
  • 3件事现在可以发生。当客户端通过<-c.Writer.CloseNotify() channel 断开连接时,客户端将被自动删除(因为该方法返回从而调用了defer)。我们还可以在下一个后台任务滴答中接收随机的float64号。最后,如果我们在20秒钟之内未收到任何通知,我们也可以终止。

  • 我在这里对您的需求做了一些假设(例如,后台任务每Y分钟返回X)。如果您正在寻找更精细的流式传输,我建议改为使用websockets(并且仍然可以使用以下模式)。

    如果您有任何问题,请告诉我。

    代码:
    package main
    
    import (
        "github.com/gin-gonic/gin"
        "github.com/satori/go.uuid"
        "log"
        "math/rand"
        "net/http"
        "sync"
        "time"
    )
    
    type Client struct {
        uuid string
        out  chan float64
    }
    
    type Manager struct {
        clients map[string]*Client
        mutex   sync.Mutex
    }
    
    func NewManager() *Manager {
        m := new(Manager)
        m.clients = make(map[string]*Client)
        return m
    }
    
    func (m *Manager) AddClient(c *Client) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
        log.Printf("add client: %s\n", c.uuid)
        m.clients[c.uuid] = c
    }
    
    func (m *Manager) DeleteClient(id string) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
        // log.Println("delete client: %s", c.uuid)
        delete(m.clients, id)
    }
    
    func (m *Manager) InitBackgroundTask() {
        for {
            f64 := rand.Float64()
            log.Printf("active clients: %d\n", len(m.clients))
            for _, c := range m.clients {
                c.out <- f64
            }
            log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
            time.Sleep(time.Second * 10)
        }
    }
    
    func main() {
        r := gin.Default()
        m := NewManager()
    
        go m.InitBackgroundTask()
    
        r.GET("/", func(c *gin.Context) {
            cl := new(Client)
            cl.uuid = uuid.NewV4().String()
            cl.out = make(chan float64)
    
            defer m.DeleteClient(cl.uuid)
            m.AddClient(cl)
    
            select {
            case <-c.Writer.CloseNotify():
                log.Printf("%s : disconnected\n", cl.uuid)
            case out := <-cl.out:
                log.Printf("%s : received %+v\n", out)
                c.JSON(http.StatusOK, gin.H{
                    "output": out,
                })
            case <-time.After(time.Second * 20):
                log.Println("timed out")
            }
        })
    
        r.Run()
    }
    

    注意:如果您要在Chrome上进行测试,则可能必须在URL的末尾附加一个随机参数,以便实际发出请求,例如?rand=001?rand=002等。

    08-17 15:59