You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
devops/routes/echo.go

118 lines
2.3 KiB

3 months ago
package routes
import (
"bytes"
"time"
"github.com/gorilla/websocket"
"zestack.dev/slim"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
func handleNotify(c slim.Context) error {
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
clients.Store(conn, struct{}{})
defer clients.Delete(conn)
// TODO 使用 pool 实现复用
exit := make(chan error)
defer close(exit)
stop := func(err error) {
select {
case <-exit:
default:
exit <- err
}
}
go func() {
conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, ex := conn.ReadMessage()
if ex != nil {
if websocket.IsUnexpectedCloseError(ex, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
stop(ex)
} else {
stop(nil)
}
return
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
broadcast <- message
}
}()
go func() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
conn.Close()
}()
for {
for {
select {
case message, ok := <-broadcast:
conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
conn.WriteMessage(websocket.CloseMessage, []byte{})
stop(nil)
return
}
w, err2 := conn.NextWriter(websocket.TextMessage)
if err2 != nil {
stop(err2)
return
}
w.Write(message)
// Add queued chat messages to the current websocket message.
n := len(broadcast)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-broadcast)
}
if err3 := w.Close(); err3 != nil {
stop(err3)
return
}
case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(writeWait))
if err4 := conn.WriteMessage(websocket.PingMessage, nil); err4 != nil {
stop(err4)
return
}
}
}
}
}()
return <-exit
}