package internal import ( "context" "fmt" "log" "sync" "time" "github.com/google/uuid" "github.com/gorilla/websocket" ) const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 maxMessageSize = 512 ) const ( minHighWaterMarkMapRebuild = 100 mapRebuildThreshold = 4 ) type Client struct { ID string Conn *websocket.Conn Send chan []byte SubscribedPath string mu sync.Mutex } func NewClient(conn *websocket.Conn, subscribedPath string) *Client { return &Client{ ID: uuid.NewString(), Conn: conn, Send: make(chan []byte, 256), SubscribedPath: subscribedPath, mu: sync.Mutex{}, } } type Hub struct { path string maxClients int Clients map[*Client]bool Broadcast chan []byte Register chan *Client Unregister chan *Client monitor *MemoryMonitor highWaterMark int } func NewHub(path string, maxClients int) *Hub { log.Printf("[%s] Hub created with max clients: %d", path, maxClients) return &Hub{ path: path, maxClients: maxClients, monitor: NewMemoryMonitor(), Broadcast: make(chan []byte, 256), Register: make(chan *Client, maxClients), Unregister: make(chan *Client, maxClients), Clients: make(map[*Client]bool), } } // Run starts the hub event loop. It exits when ctx is cancelled. func (h *Hub) Run(ctx context.Context) { monitorTicker := time.NewTicker(MonitorInterval) go func() { defer func() { monitorTicker.Stop() // On shutdown, close every client's Send channel so WritePump sends // a WebSocket close frame and exits. Also expire the read deadline so // blocked ReadMessage() calls in ReadPump return immediately instead // of waiting up to pongWait (60 s). for client := range h.Clients { close(client.Send) client.Conn.SetReadDeadline(time.Now()) } log.Printf("[%s] Hub stopped\n", h.path) }() for { select { case <-ctx.Done(): log.Printf("[%s] Hub shutting down\n", h.path) return case client := <-h.Register: if len(h.Clients) >= h.maxClients { close(client.Send) client.Conn.Close() log.Printf("[%s] Rejected client %s (max %d reached)\n", h.path, client.ID, h.maxClients) break } h.Clients[client] = true if len(h.Clients) > h.highWaterMark { h.highWaterMark = len(h.Clients) } log.Printf("[%s] Client registered %s\n", h.path, client.ID) case client := <-h.Unregister: if _, ok := h.Clients[client]; ok { delete(h.Clients, client) close(client.Send) // Rebuild the map when the live set has dropped to less than // 1/mapRebuildThreshold of the peak, so the old backing buckets // are released to the GC. if h.highWaterMark >= minHighWaterMarkMapRebuild && len(h.Clients) < h.highWaterMark/mapRebuildThreshold { rebuilt := make(map[*Client]bool, len(h.Clients)) for c := range h.Clients { rebuilt[c] = true } h.Clients = rebuilt h.highWaterMark = len(h.Clients) log.Printf("[%s] Clients map rebuilt: %d active clients\n", h.path, len(h.Clients)) } log.Printf("[%s] Client Unregistered %s\n", h.path, client.ID) } case message := <-h.Broadcast: for client := range h.Clients { select { case client.Send <- message: default: close(client.Send) delete(h.Clients, client) log.Printf("[%s] Client %s removed (slow/disconnected)\n", h.path, client.ID) } } case <-monitorTicker.C: current, peak := h.monitor.Snapshot() clientLength := len(h.Clients) if clientLength > 0 { log.Printf("[%s] connected clients: %d | heap alloc: %s | peak heap alloc: %s", h.path, clientLength, FormatBytes(current), FormatBytes(peak), ) } } } }() } func WritePump(c *Client, h *Hub) { pingTicker := time.NewTicker(pingPeriod) defer func() { select { case h.Unregister <- c: default: } pingTicker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.Conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) // Flush any messages that queued up while we were writing. n := len(c.Send) for i := 0; i < n; i++ { w.Write(<-c.Send) } if err := w.Close(); err != nil { return } case <-pingTicker.C: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { fmt.Println(err) return } } } } func ReadPump(c *Client, h *Hub) { defer func() { select { case h.Unregister <- c: default: } c.Conn.Close() }() c.Conn.SetReadLimit(maxMessageSize) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } log.Printf("Received: %s\n", message) } }