在这里已经获得了一些帮助,这使我朝着我正在尝试的概念前进,但是它仍然无法正常工作,并且遇到了我似乎无法解决的冲突。
我在这里尝试在流程图中说明我想要的内容-请注意,该客户端可以有许多将通过printjobs发送的客户端,因此我们此时无法答复该 worker 正在处理我们的工作,但是对于大多数情况(高峰时间不会,因为打印的处理工作会花费时间)。
type QueueElement struct {
jobid string
rw http.ResponseWriter
doneChan chan struct{}
}
type GlobalVars struct {
db *sql.DB
wg sync.WaitGroup
jobs chan QueueElement
}
func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/StartJob":
fmt.Printf ("incoming\r\n")
doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
newPrintJob := QueueElement{
doneChan: doneC,
jobid: "jobid",
}
gv.jobs <- newPrintJob
func(doneChan chan struct{},w http.ResponseWriter) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
//If this triggers first, then this waiting goroutine would exit
//and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
case <-ctx.Done():
fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
fmt.Printf ("took longer than 5 secs\r\n")
case <-doneChan:
fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
fmt.Printf ("instant\r\n")
}
}(doneC,w)
default:
fmt.Fprintf(w, "No such Api")
}
}
func worker(jobs <-chan QueueElement) {
for {
job := <-jobs
processExec ("START /i /b try.cmd")
fmt.Printf ("job done")
// processExec("START /i /b processAndPrint.exe -" + job.jobid)
job.doneChan <- struct{}{}
}
}
func main() {
db, err := sql.Open("sqlite3", "jobs.sqlite")
if err := db.Ping(); err != nil {
log.Fatal(err)
}
db.SetMaxOpenConns(1) // prevents locked database error
_, err = db.Exec(setupSQL)
if err != nil {
log.Fatal(err)
}
// create a GlobalVars instance
gv := GlobalVars{
db : db,
jobs: make(chan QueueElement),
}
go worker (gv.jobs)
// create an http.Server instance and specify our job manager as
// the handler for requests.
server := http.Server{
Handler: &gv,
Addr : ":8888",
}
// start server and accept connections.
log.Fatal(server.ListenAndServe())
}
上面的代码是serveHTTP的代码,工作人员在这里提供了帮助,最初,ServeHTTP内的func是一个go例程,这对我来说是整个冲突的发生-概念是在serveHTTP中它产生一个将得到回复的进程如果 worker 能够在5秒钟之内及时处理工作,则向 worker 求助。
如果工作能够在1秒钟内完成,我想在那1秒钟后立即回覆给客户,如果需要3秒钟,我想在3秒钟后回复,如果花费超过5秒钟,我将在5秒钟后回复(如果这项工作需要13秒,但我仍然想在5秒后回复)。从现在开始,客户必须对工作进行轮询-但冲突是:
a)当ServeHTTP退出时-然后ResponseWriter关闭-为了能够回复到客户端,我们必须将答案写到ResponseWriter。
b)如果我阻止serveHTTP(如下面的代码示例,其中我未将func称为go例程),那么它不仅会影响单个API调用,而且似乎还会影响此后的所有其他API,因此第一个调用将及时得到正确处理,但是在第一次调用之后的同一时间将被阻塞操作延迟。
c)如果我不阻止它-并将其更改为执行例程:
gv.jobs <- newPrintJob
go func(doneChan chan struct{},w http.ResponseWriter) {
这样就没有延迟了-可以调用许多API-但问题是在这里serveHTTP立即存在,从而杀死了ResponseWriter,然后我无法回复给客户端。
我不确定如何解决这种冲突,在这种情况下我不会造成任何服务HTTP阻塞,因此我可以并行处理所有请求,但仍然能够回复有问题的ResponseWriter。
即使函数退出,是否有任何方法可以防止serveHTTP关闭响应编写器?
最佳答案
我已经为您的代码添加了一些更新。现在,它可以按照您描述的那样工作。
package main
import (
"database/sql"
"fmt"
"log"
"math/rand"
"net/http"
"sync"
"time"
)
type QueueElement struct {
jobid string
rw http.ResponseWriter
doneChan chan struct{}
}
type GlobalVars struct {
db *sql.DB
wg sync.WaitGroup
jobs chan QueueElement
}
func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/StartJob":
fmt.Printf("incoming\r\n")
doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
go func(doneChan chan struct{}, w http.ResponseWriter) {
gv.jobs <- QueueElement{
doneChan: doneC,
jobid: "jobid",
}
}(doneC, w)
select {
case <-time.Tick(time.Second * 5):
fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
fmt.Printf("took longer than 5 secs\r\n")
case <-doneC:
fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
fmt.Printf("instant\r\n")
}
default:
fmt.Fprintf(w, "No such Api")
}
}
func worker(jobs <-chan QueueElement) {
for {
job := <-jobs
fmt.Println("START /i /b try.cmd")
fmt.Printf("job done")
randTimeDuration := time.Second * time.Duration(rand.Intn(7))
time.Sleep(randTimeDuration)
// processExec("START /i /b processAndPrint.exe -" + job.jobid)
job.doneChan <- struct{}{}
}
}
func main() {
// create a GlobalVars instance
gv := GlobalVars{
//db: db,
jobs: make(chan QueueElement),
}
go worker(gv.jobs)
// create an http.Server instance and specify our job manager as
// the handler for requests.
server := http.Server{
Handler: &gv,
Addr: ":8888",
}
// start server and accept connections.
log.Fatal(server.ListenAndServe())
}
关于go - 后台打印程序概念/API和 channel : issue passing jobs to a queue from serveHTTP,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56405281/