From 07f7893a26ee7dd24066d30faaccf3b10fd8cbe4 Mon Sep 17 00:00:00 2001 From: Roger Ferdinan Date: Wed, 4 Feb 2026 22:20:19 +0700 Subject: [PATCH] fix: fixing ping & memory leak --- internal/hub.go | 77 ++++++++++++++++++++++++++------------------- v1/server/server.go | 8 ++++- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/internal/hub.go b/internal/hub.go index 0485e7d..82e3d2c 100644 --- a/internal/hub.go +++ b/internal/hub.go @@ -3,6 +3,7 @@ package internal import ( "fmt" "log" + "sync" "time" "github.com/google/uuid" @@ -10,9 +11,10 @@ import ( ) const ( - writeWait = 10 * time.Second - pongWait = 60 * time.Second - pingPeriod = 25 * time.Second + writeWait = 10 * time.Second + pongWait = 60 * time.Second + pingPeriod = (pongWait * 9) / 10 + maxMessageSize = 512 ) type Client struct { @@ -20,16 +22,16 @@ type Client struct { Conn *websocket.Conn Send chan []byte SubscribedPath string - done chan struct{} + mu sync.Mutex } func NewClient(conn *websocket.Conn, subscribedPath string) *Client { return &Client{ ID: uuid.NewString(), Conn: conn, - Send: make(chan []byte, 1), + Send: make(chan []byte, 256), SubscribedPath: subscribedPath, - done: make(chan struct{}, 1), + mu: sync.Mutex{}, } } @@ -42,9 +44,9 @@ type Hub struct { func NewHub() *Hub { return &Hub{ - Broadcast: make(chan []byte, 1), - Register: make(chan *Client, 1), - Unregister: make(chan *Client, 1), + Broadcast: make(chan []byte, 256), + Register: make(chan *Client, 10), + Unregister: make(chan *Client, 10), Clients: make(map[*Client]bool), } } @@ -55,23 +57,22 @@ func (h *Hub) Run() { select { case client := <-h.Register: h.Clients[client] = true - log.Println("Client registered") - case c := <-h.Unregister: - if v, ok := h.Clients[c]; ok { - fmt.Println(v, c) - delete(h.Clients, c) - close(c.Send) + log.Printf("Client registered %s\n", client.ID) + case client := <-h.Unregister: + if _, ok := h.Clients[client]; ok { + delete(h.Clients, client) + close(client.Send) } - log.Println("Client Unregistered") + log.Printf("Client Unregistered %s\n", client.ID) case message := <-h.Broadcast: for client := range h.Clients { - client.Send <- message - // select { - // case client.Send <- message: - // default: - // close(client.Send) - // delete(h.Clients, client) - // } + select { + case client.Send <- message: + default: + close(client.Send) + delete(h.Clients, client) + log.Printf("Client %s removed (slow/disconnected)", client.ID) + } } } } @@ -90,23 +91,36 @@ func WritePump(c *Client, h *Hub) { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) - fmt.Println(ok) return } - if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil { - fmt.Println(err) + w, err := c.Conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Queue queued messages in the same buffer (optional optimization) + n := len(c.Send) + for i := 0; i < n; i++ { + w.Write(<-c.Send) + } + + if err := w.Close(); err != nil { return } case <-pingTicker.C: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.Conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + + if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { fmt.Println(err) return } } + } } @@ -116,15 +130,16 @@ func ReadPump(c *Client, h *Hub) { c.Conn.Close() }() - c.Conn.SetReadLimit(1024) + c.Conn.SetReadLimit(maxMessageSize) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { - messageType, message, err := c.Conn.ReadMessage() + _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) @@ -132,8 +147,6 @@ func ReadPump(c *Client, h *Hub) { break } - if messageType == websocket.TextMessage { - log.Printf("Received: %s\n", message) - } + log.Printf("Received: %s\n", message) } } diff --git a/v1/server/server.go b/v1/server/server.go index 60a61e4..61673cf 100644 --- a/v1/server/server.go +++ b/v1/server/server.go @@ -1,6 +1,7 @@ package server import ( + "crypto/subtle" "fmt" "log" "net/http" @@ -81,6 +82,7 @@ func (b *SafeWebsocketServerBuilder) HandleFuncWebsocket(pattern string, subscri } c := internal.NewClient(conn, subscribedPath) h.Register <- c + go internal.WritePump(c, h) go internal.ReadPump(c, h) go writeFunc(h.Broadcast) @@ -110,13 +112,17 @@ type SafeWebsocketServer struct { func (s *SafeWebsocketServer) AuthMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("X-MBX-APIKEY") != s.apiKey { + providedKey := r.Header.Get("X-MBX-APIKEY") + expectedKey := s.apiKey + + if subtle.ConstantTimeCompare([]byte(providedKey), []byte(expectedKey)) != 1 { internal.ErrorResponse(w, internal.NewStatusMessage(). StatusCode(http.StatusForbidden). Message("X-MBX-APIKEY is missing"). Build()) return } + next.ServeHTTP(w, r) }) }