package routes import ( "bytes" "time" "github.com/gorilla/websocket" "zestack.dev/slim" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. maxMessageSize = 512 ) func handleNotify(c slim.Context) error { conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil) if err != nil { return err } clients.Store(conn, struct{}{}) defer clients.Delete(conn) // TODO 使用 pool 实现复用 exit := make(chan error) defer close(exit) stop := func(err error) { select { case <-exit: default: exit <- err } } go func() { conn.SetReadLimit(maxMessageSize) conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { _, message, ex := conn.ReadMessage() if ex != nil { if websocket.IsUnexpectedCloseError(ex, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { stop(ex) } else { stop(nil) } return } message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) broadcast <- message } }() go func() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() conn.Close() }() for { for { select { case message, ok := <-broadcast: conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { // The hub closed the channel. conn.WriteMessage(websocket.CloseMessage, []byte{}) stop(nil) return } w, err2 := conn.NextWriter(websocket.TextMessage) if err2 != nil { stop(err2) return } w.Write(message) // Add queued chat messages to the current websocket message. n := len(broadcast) for i := 0; i < n; i++ { w.Write(newline) w.Write(<-broadcast) } if err3 := w.Close(); err3 != nil { stop(err3) return } case <-ticker.C: conn.SetWriteDeadline(time.Now().Add(writeWait)) if err4 := conn.WriteMessage(websocket.PingMessage, nil); err4 != nil { stop(err4) return } } } } }() return <-exit }