在这里已经获得了一些帮助,这使我朝着我正在尝试的概念前进,但是它仍然无法正常工作,并且遇到了我似乎无法解决的冲突。

我在这里尝试在流程图中说明我想要的内容-请注意,该客户端可以有许多将通过printjobs发送的客户端,因此我们此时无法答复该 worker 正在处理我们的工作,但是对于大多数情况(高峰时间不会,因为打印的处理工作会花费时间)。

go - 后台打印程序概念/API和 channel : issue passing jobs to a queue from serveHTTP-LMLPHP

type QueueElement struct {
    jobid string
    rw   http.ResponseWriter
  doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf ("incoming\r\n")

            doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
            newPrintJob := QueueElement{
                    doneChan: doneC,
                    jobid:    "jobid",
            }

            gv.jobs <- newPrintJob
            func(doneChan chan struct{},w http.ResponseWriter) {

                  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                    defer cancel()
                    select {
                    //If this triggers first, then this waiting goroutine would exit
                    //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
                    case <-ctx.Done():
                            fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
                            fmt.Printf ("took longer than 5 secs\r\n")
                    case <-doneChan:
                            fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
                            fmt.Printf ("instant\r\n")
                    }
            }(doneC,w)

    default:
            fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
            job := <-jobs
            processExec ("START /i /b try.cmd")
            fmt.Printf ("job done")
        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
            job.doneChan <- struct{}{}

    }
}

func main() {
      db, err := sql.Open("sqlite3", "jobs.sqlite")
      if err := db.Ping(); err != nil {
        log.Fatal(err)
    }

      db.SetMaxOpenConns(1) // prevents locked database error
      _, err = db.Exec(setupSQL)
      if err != nil {
          log.Fatal(err)
      }

    // create a GlobalVars instance
    gv := GlobalVars{
        db  :   db,
        jobs: make(chan QueueElement),
    }
    go worker (gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr : ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}

上面的代码是serveHTTP的代码,工作人员在这里提供了帮助,最初,ServeHTTP内的func是一个go例程,这对我来说是整个冲突的发生-概念是在serveHTTP中它产生一个将得到回复的进程如果 worker 能够在5秒钟之内及时处理工作,则向 worker 求助。

如果工作能够在1秒钟内完成,我想在那1秒钟后立即回覆给客户,如果需要3秒钟,我想在3秒钟后回复,如果花费超过5秒钟,我将在5秒钟后回复(如果这项工作需要13秒,但我仍然想在5秒后回复)。从现在开始,客户必须对工作进行轮询-但冲突是:

a)当ServeHTTP退出时-然后ResponseWriter关闭-为了能够回复到客户端,我们必须将答案写到ResponseWriter。

b)如果我阻止serveHTTP(如下面的代码示例,其中我未将func称为go例程),那么它不仅会影响单个API调用,而且似乎还会影响此后的所有其他API,因此第一个调用将及时得到正确处理,但是在第一次调用之后的同一时间将被阻塞操作延迟。

c)如果我不阻止它-并将其更改为执行例程:
    gv.jobs <- newPrintJob
    go func(doneChan chan struct{},w http.ResponseWriter) {

这样就没有延迟了-可以调用许多API-但问题是在这里serveHTTP立即存在,从而杀死了ResponseWriter,然后我无法回复给客户端。

我不确定如何解决这种冲突,在这种情况下我不会造成任何服务HTTP阻塞,因此我可以并行处理所有请求,但仍然能够回复有问题的ResponseWriter。

即使函数退出,是否有任何方法可以防止serveHTTP关闭响应编写器?

最佳答案

我已经为您的代码添加了一些更新。现在,它可以按照您描述的那样工作。

package main

import (
    "database/sql"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "sync"
    "time"
)

type QueueElement struct {
    jobid    string
    rw       http.ResponseWriter
    doneChan chan struct{}
}

type GlobalVars struct {
    db   *sql.DB
    wg   sync.WaitGroup
    jobs chan QueueElement
}

func (gv *GlobalVars) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        fmt.Printf("incoming\r\n")

        doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine

        go func(doneChan chan struct{}, w http.ResponseWriter) {
            gv.jobs <- QueueElement{
                doneChan: doneC,
                jobid:    "jobid",
            }
        }(doneC, w)

        select {
        case <-time.Tick(time.Second * 5):
            fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            fmt.Printf("took longer than 5 secs\r\n")
        case <-doneC:
            fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
            fmt.Printf("instant\r\n")
        }
    default:
        fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        fmt.Println("START /i /b try.cmd")
        fmt.Printf("job done")

        randTimeDuration := time.Second * time.Duration(rand.Intn(7))

        time.Sleep(randTimeDuration)

        //  processExec("START /i /b processAndPrint.exe -" + job.jobid)
        job.doneChan <- struct{}{}
    }
}

func main() {

    // create a GlobalVars instance
    gv := GlobalVars{
        //db:   db,
        jobs: make(chan QueueElement),
    }
    go worker(gv.jobs)
    // create an http.Server instance and specify our job manager as
    // the handler for requests.
    server := http.Server{
        Handler: &gv,
        Addr:    ":8888",
    }
    // start server and accept connections.
    log.Fatal(server.ListenAndServe())
}

关于go - 后台打印程序概念/API和 channel : issue passing jobs to a queue from serveHTTP,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/56405281/

10-11 09:14