package rsp import ( "errors" "fmt" "github.com/labstack/echo/v4" "net/http" "strings" "time" ) type SseOptions struct { Id string Event string Data string Retry int } func Echo(c echo.Context, e <-chan *SseOptions) error { w := c.Response().Writer f, ok := w.(http.Flusher) if !ok { return errors.New("unable to get http.Flusher interface; this is probably due " + "to nginx buffering the response") } if want, have := "text/event-stream", c.Request().Header.Get("Accept"); want != have { return fmt.Errorf("accept header: want %q, have %q; seems like the browser doesn't "+ "support server-side events", want, have) } // Instruct nginx to NOT buffer the response w.Header().Set("X-Accel-Buffering", "no") w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.WriteHeader(http.StatusOK) // Send heartbeats to ensure the connection stays up heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-c.Request().Context().Done(): // When the browser closes the connection f.Flush() return nil case <-heartbeat.C: sendEvent(w, "", "heartbeat", "{}", 2000) case opts := <-e: sendEvent(w, cleanNewline(opts.Id), cleanNewline(opts.Event), cleanNewline(opts.Data), opts.Retry) } } } func cleanNewline(str string) string { return strings.ReplaceAll(str, "\n", "") } func sendEvent(w http.ResponseWriter, id, event, data string, retry int) { f := w.(http.Flusher) if id != "" { fmt.Fprintf(w, "id: %s\n", id) } if event != "" { fmt.Fprintf(w, "event: %s\n", event) } if data != "" { fmt.Fprintf(w, "data: %s\n", data) } if retry > 0 { fmt.Fprintf(w, "retry: %d\n", retry) } fmt.Fprint(w, "\n") f.Flush() }