我正在实现一个服务器以流式处理许多浮点数。我需要一些帮助来设计系统以实现以下目的:
任何答复表示赞赏。这是我目前的想法:
package main
import (
"fmt"
"io"
"net/http"
"strconv"
"time"
)
func main() {
c := AudioProcess()
handleHello := makeHello(c)
http.HandleFunc("/", handleHello)
http.ListenAndServe(":8000", nil)
}
func makeHello(c chan string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
for item := range c { // this loop runs when channel c is closed
io.WriteString(w, item)
}
}
}
func AudioProcess() chan string {
c := make(chan string)
go func() {
for i := 0; i <= 10; i++ { // Iterate the audio file
c <- strconv.Itoa(i) // have my frame of samples, send to channel c
time.Sleep(time.Second)
fmt.Println("send ", i) // logging
}
close(c) // done processing, close channel c
}()
return c
}
最佳答案
我不完全确定这是否可以解决您的问题,因为我还不完全了解您的用例,但是尽管如此,我还是在下面提出了一个解决方案。
我将Gin用于HTTP路由器是因为它对我来说比较舒适,但是我敢肯定您可以修改代码以适合您的代码。我很着急(抱歉),所以可能有我不知道的问题,但是请告诉我是否有问题。
简而言之:
Manager
,它处理了几个Client
。它还包含一个sync.Mutex
,以确保在任何给定时间只有一个线程在修改clients
。 InitBackgroundTask()
,它将生成一个随机的float64
号,并将其传递给clients
中的所有Manager
(如果有)。如果没有clients
,我们就 sleep ,然后继续... <-c.Writer.CloseNotify()
channel 断开连接时,客户端将被自动删除(因为该方法返回从而调用了defer
)。我们还可以在下一个后台任务滴答中接收随机的float64
号。最后,如果我们在20秒钟之内未收到任何通知,我们也可以终止。 我在这里对您的需求做了一些假设(例如,后台任务每Y分钟返回X)。如果您正在寻找更精细的流式传输,我建议改为使用websockets(并且仍然可以使用以下模式)。
如果您有任何问题,请告诉我。
代码:
package main
import (
"github.com/gin-gonic/gin"
"github.com/satori/go.uuid"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type Client struct {
uuid string
out chan float64
}
type Manager struct {
clients map[string]*Client
mutex sync.Mutex
}
func NewManager() *Manager {
m := new(Manager)
m.clients = make(map[string]*Client)
return m
}
func (m *Manager) AddClient(c *Client) {
m.mutex.Lock()
defer m.mutex.Unlock()
log.Printf("add client: %s\n", c.uuid)
m.clients[c.uuid] = c
}
func (m *Manager) DeleteClient(id string) {
m.mutex.Lock()
defer m.mutex.Unlock()
// log.Println("delete client: %s", c.uuid)
delete(m.clients, id)
}
func (m *Manager) InitBackgroundTask() {
for {
f64 := rand.Float64()
log.Printf("active clients: %d\n", len(m.clients))
for _, c := range m.clients {
c.out <- f64
}
log.Printf("sent output (%+v), sleeping for 10s...\n", f64)
time.Sleep(time.Second * 10)
}
}
func main() {
r := gin.Default()
m := NewManager()
go m.InitBackgroundTask()
r.GET("/", func(c *gin.Context) {
cl := new(Client)
cl.uuid = uuid.NewV4().String()
cl.out = make(chan float64)
defer m.DeleteClient(cl.uuid)
m.AddClient(cl)
select {
case <-c.Writer.CloseNotify():
log.Printf("%s : disconnected\n", cl.uuid)
case out := <-cl.out:
log.Printf("%s : received %+v\n", out)
c.JSON(http.StatusOK, gin.H{
"output": out,
})
case <-time.After(time.Second * 20):
log.Println("timed out")
}
})
r.Run()
}
注意:如果您要在Chrome上进行测试,则可能必须在URL的末尾附加一个随机参数,以便实际发出请求,例如
?rand=001
,?rand=002
等。