You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
118 lines
2.3 KiB
118 lines
2.3 KiB
3 months ago
|
package routes
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"time"
|
||
|
|
||
|
"github.com/gorilla/websocket"
|
||
|
"zestack.dev/slim"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
// Time allowed to write a message to the peer.
|
||
|
writeWait = 10 * time.Second
|
||
|
|
||
|
// Time allowed to read the next pong message from the peer.
|
||
|
pongWait = 60 * time.Second
|
||
|
|
||
|
// Send pings to peer with this period. Must be less than pongWait.
|
||
|
pingPeriod = (pongWait * 9) / 10
|
||
|
|
||
|
// Maximum message size allowed from peer.
|
||
|
maxMessageSize = 512
|
||
|
)
|
||
|
|
||
|
func handleNotify(c slim.Context) error {
|
||
|
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
clients.Store(conn, struct{}{})
|
||
|
defer clients.Delete(conn)
|
||
|
|
||
|
// TODO 使用 pool 实现复用
|
||
|
exit := make(chan error)
|
||
|
defer close(exit)
|
||
|
|
||
|
stop := func(err error) {
|
||
|
select {
|
||
|
case <-exit:
|
||
|
default:
|
||
|
exit <- err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
conn.SetReadLimit(maxMessageSize)
|
||
|
conn.SetReadDeadline(time.Now().Add(pongWait))
|
||
|
conn.SetPongHandler(func(string) error {
|
||
|
conn.SetReadDeadline(time.Now().Add(pongWait))
|
||
|
return nil
|
||
|
})
|
||
|
for {
|
||
|
_, message, ex := conn.ReadMessage()
|
||
|
if ex != nil {
|
||
|
if websocket.IsUnexpectedCloseError(ex, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||
|
stop(ex)
|
||
|
} else {
|
||
|
stop(nil)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
|
||
|
broadcast <- message
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
go func() {
|
||
|
ticker := time.NewTicker(pingPeriod)
|
||
|
defer func() {
|
||
|
ticker.Stop()
|
||
|
conn.Close()
|
||
|
}()
|
||
|
|
||
|
for {
|
||
|
for {
|
||
|
select {
|
||
|
case message, ok := <-broadcast:
|
||
|
conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||
|
if !ok {
|
||
|
// The hub closed the channel.
|
||
|
conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||
|
stop(nil)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
w, err2 := conn.NextWriter(websocket.TextMessage)
|
||
|
if err2 != nil {
|
||
|
stop(err2)
|
||
|
return
|
||
|
}
|
||
|
w.Write(message)
|
||
|
|
||
|
// Add queued chat messages to the current websocket message.
|
||
|
n := len(broadcast)
|
||
|
for i := 0; i < n; i++ {
|
||
|
w.Write(newline)
|
||
|
w.Write(<-broadcast)
|
||
|
}
|
||
|
|
||
|
if err3 := w.Close(); err3 != nil {
|
||
|
stop(err3)
|
||
|
return
|
||
|
}
|
||
|
case <-ticker.C:
|
||
|
conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||
|
if err4 := conn.WriteMessage(websocket.PingMessage, nil); err4 != nil {
|
||
|
stop(err4)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return <-exit
|
||
|
}
|