问题描述
我有一个使用EventStream的ReactJS客户端和一个实现SSE的golang后端.
I've a ReactJS client which uses EventStream and a golang backend which implements SSE.
当我将浏览器连接到在localhost上运行的后端时,以及后端在具有端口转发的k8s上运行时,一切似乎都可以正常工作.
Everything seemed to work when I connected my browser to the backend running on localhost, as well as when my backend was running on k8s with port forwarding.
创建带有主机名的入口(以便不必一直进行端口转发)后,SSE便停止工作.我仍然看到客户端发送请求,并且后端接收并注册了该请求.但是,发送事件时,它永远不会到达我的ReactJS应用.
As soon as I created an ingress with a hostname (so that I don't have to port-forward all the time) SSE stopped working. I still see that the client sends the request and this request is received and registered by the backend. However, when and event is sent, it never arrives to my ReactJS app.
我正在为我的后端SSE实现附加代码:
I'm attaching the code for my backend SSE implementation:
package sse
import (
"encoding/json"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
"github.com/talon-one/towers/controller/api/log"
)
// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 2
type customerStateUpdate struct {
sseEvent
CustomerName string `json:"customer_name"`
CustomerState string `json:"customer_state"`
}
type contentUpdate struct {
sseEvent
}
type sseEvent struct {
EventType string `json:"event_type"`
}
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
log *log.Logger
}
func NewBroker(log *log.Logger) (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
log: log.With(zap.String("component", "SSE")),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) HandleContentChange() error {
event := contentUpdate{
sseEvent: sseEvent{EventType: "contentUpdate"},
}
payload, err := json.Marshal(&event)
if err != nil {
return err
}
broker.Notifier <- payload
return nil
}
func (broker *Broker) HandleCustomerStateChange(name, state string) error {
event := customerStateUpdate{
sseEvent: sseEvent{EventType: "customerStateUpdate"},
CustomerName: name,
CustomerState: state,
}
broker.log.Info("Sending SSE to registered clients", zap.String("name", name), zap.String("state", state))
payload, err := json.Marshal(&event)
if err != nil {
return err
}
broker.Notifier <- payload
return nil
}
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
notify := rw.(http.CloseNotifier).CloseNotify()
for {
select {
case <-notify:
return
case msg := <-messageChan:
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(rw, "data: %s\n\n", msg)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
broker.log.Info("Client added", zap.Int("current_count", len(broker.clients)))
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
delete(broker.clients, s)
broker.log.Info("Client removed", zap.Int("current_count", len(broker.clients)))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
broker.log.Info("Skipping client")
}
}
}
}
}
在我的ReactJS应用中:
And in my ReactJS app:
export default class CustomersTable extends Component {
constructor(props) {
super(props)
this.eventSource = new EventSource('/v1/events')
}
updateCustomerState(e) {
let event = JSON.parse(e.data)
switch (event.event_type) {
case 'customerStateUpdate':
let newData = this.state.customers.map(item => {
if (item.name === event.customer_name) {
item.k8sState = event.customer_state
}
return item
})
this.setState(Object.assign({}, { customers: newData }))
break
case 'contentUpdate':
this.reload()
break
default:
break
}
}
componentDidMount() {
this.setState({ isLoading: true })
ReactModal.setAppElement('body')
this.reload()
this.eventSource.onmessage = e => this.updateCustomerState(e)
}
componentWillUnmount() {
this.eventSource.close()
}
...
推荐答案
我使用以下方法在Nginx Ingress上运行了我的SSE应用程序:
I did my SSE app working on Nginx Ingress using:
annotations:
nginx.ingress.kubernetes.io/proxy-read-timeout: "21600"
nginx.ingress.kubernetes.io/eventsource: "true"
这篇关于使用Kubernetes部署并通过Ingress连接后,SSE断开了的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!