本文介绍了使用Kubernetes部署并通过Ingress连接后,SSE断开了的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用EventStream的ReactJS客户端和一个实现SSE的golang后端.

I've a ReactJS client which uses EventStream and a golang backend which implements SSE.

当我将浏览器连接到在localhost上运行的后端时,以及后端在具有端口转发的k8s上运行时,一切似乎都可以正常工作.

Everything seemed to work when I connected my browser to the backend running on localhost, as well as when my backend was running on k8s with port forwarding.

创建带有主机名的入口(以便不必一直进行端口转发)后,SSE便停止工作.我仍然看到客户端发送请求,并且后端接收并注册了该请求.但是,发送事件时,它永远不会到达我的ReactJS应用.

As soon as I created an ingress with a hostname (so that I don't have to port-forward all the time) SSE stopped working. I still see that the client sends the request and this request is received and registered by the backend. However, when and event is sent, it never arrives to my ReactJS app.

我正在为我的后端SSE实现附加代码:

I'm attaching the code for my backend SSE implementation:

package sse

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"

    "go.uber.org/zap"

    "github.com/talon-one/towers/controller/api/log"
)

// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 2

type customerStateUpdate struct {
    sseEvent
    CustomerName  string `json:"customer_name"`
    CustomerState string `json:"customer_state"`
}

type contentUpdate struct {
    sseEvent
}
type sseEvent struct {
    EventType string `json:"event_type"`
}
type Broker struct {

    // Events are pushed to this channel by the main events-gathering routine
    Notifier chan []byte

    // New client connections
    newClients chan chan []byte

    // Closed client connections
    closingClients chan chan []byte

    // Client connections registry
    clients map[chan []byte]bool

    log *log.Logger
}

func NewBroker(log *log.Logger) (broker *Broker) {
    // Instantiate a broker
    broker = &Broker{
        Notifier:       make(chan []byte, 1),
        newClients:     make(chan chan []byte),
        closingClients: make(chan chan []byte),
        clients:        make(map[chan []byte]bool),
        log:            log.With(zap.String("component", "SSE")),
    }

    // Set it running - listening and broadcasting events
    go broker.listen()
    return
}

func (broker *Broker) HandleContentChange() error {
    event := contentUpdate{
        sseEvent: sseEvent{EventType: "contentUpdate"},
    }
    payload, err := json.Marshal(&event)
    if err != nil {
        return err
    }
    broker.Notifier <- payload
    return nil
}

func (broker *Broker) HandleCustomerStateChange(name, state string) error {
    event := customerStateUpdate{
        sseEvent:      sseEvent{EventType: "customerStateUpdate"},
        CustomerName:  name,
        CustomerState: state,
    }

    broker.log.Info("Sending SSE to registered clients", zap.String("name", name), zap.String("state", state))

    payload, err := json.Marshal(&event)
    if err != nil {
        return err
    }
    broker.Notifier <- payload
    return nil
}

func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    // Make sure that the writer supports flushing.
    //
    flusher, ok := rw.(http.Flusher)

    if !ok {
        http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
        return
    }

    rw.Header().Set("Content-Type", "text/event-stream")
    rw.Header().Set("Cache-Control", "no-cache")
    rw.Header().Set("Connection", "keep-alive")
    rw.Header().Set("Access-Control-Allow-Origin", "*")

    // Each connection registers its own message channel with the Broker's connections registry
    messageChan := make(chan []byte)

    // Signal the broker that we have a new connection
    broker.newClients <- messageChan

    // Remove this client from the map of connected clients
    // when this handler exits.
    defer func() {
        broker.closingClients <- messageChan
    }()
    notify := rw.(http.CloseNotifier).CloseNotify()

    for {
        select {
        case <-notify:
            return
        case msg := <-messageChan:
            // Write to the ResponseWriter
            // Server Sent Events compatible
            fmt.Fprintf(rw, "data: %s\n\n", msg)

            // Flush the data immediately instead of buffering it for later.
            flusher.Flush()
        }
    }

}

func (broker *Broker) listen() {
    for {
        select {
        case s := <-broker.newClients:

            // A new client has connected.
            // Register their message channel
            broker.clients[s] = true
            broker.log.Info("Client added", zap.Int("current_count", len(broker.clients)))
        case s := <-broker.closingClients:

            // A client has detached and we want to
            // stop sending them messages.
            delete(broker.clients, s)
            broker.log.Info("Client removed", zap.Int("current_count", len(broker.clients)))

        case event := <-broker.Notifier:
            // We got a new event from the outside!
            // Send event to all connected clients
            for clientMessageChan := range broker.clients {
                select {
                case clientMessageChan <- event:
                case <-time.After(patience):
                    broker.log.Info("Skipping client")
                }
            }
        }
    }

}

在我的ReactJS应用中:

And in my ReactJS app:

export default class CustomersTable extends Component {
  constructor(props) {
    super(props)
    this.eventSource = new EventSource('/v1/events')
  }

  updateCustomerState(e) {
    let event = JSON.parse(e.data)
    switch (event.event_type) {
      case 'customerStateUpdate':
        let newData = this.state.customers.map(item => {
          if (item.name === event.customer_name) {
            item.k8sState = event.customer_state
          }
          return item
        })
        this.setState(Object.assign({}, { customers: newData }))
        break
      case 'contentUpdate':
        this.reload()
        break
      default:
        break
    }
  }

  componentDidMount() {
    this.setState({ isLoading: true })
    ReactModal.setAppElement('body')
    this.reload()
    this.eventSource.onmessage = e => this.updateCustomerState(e)
  }

  componentWillUnmount() {
    this.eventSource.close()
  }
...

推荐答案

我使用以下方法在Nginx Ingress上运行了我的SSE应用程序:

I did my SSE app working on Nginx Ingress using:

   annotations:
    nginx.ingress.kubernetes.io/proxy-read-timeout: "21600"
    nginx.ingress.kubernetes.io/eventsource: "true"

这篇关于使用Kubernetes部署并通过Ingress连接后,SSE断开了的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-22 08:13