From a9e36708e8d3459fbdc803f655d3f199b7be1b44 Mon Sep 17 00:00:00 2001 From: Roger Ferdinan Date: Mon, 2 Mar 2026 19:57:35 +0700 Subject: [PATCH] fix: adding monitoring --- internal/format.go | 16 ++++++ internal/hub.go | 102 +++++++++++++++++++++++++++++++------ internal/memory_monitor.go | 60 ++++++++++++++++++++++ v1/client/client.go | 57 ++++++++++++++------- v1/examples/server/main.go | 72 +++++++++++++++++++------- v1/server/server.go | 54 ++++++++++++++++---- 6 files changed, 301 insertions(+), 60 deletions(-) create mode 100644 internal/format.go create mode 100644 internal/memory_monitor.go diff --git a/internal/format.go b/internal/format.go new file mode 100644 index 0000000..1a737d8 --- /dev/null +++ b/internal/format.go @@ -0,0 +1,16 @@ +package internal + +import "fmt" + +func FormatBytes(b uint64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := uint64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.2f %cB", float64(b)/float64(div), "KMGTPE"[exp]) +} diff --git a/internal/hub.go b/internal/hub.go index 82e3d2c..0e6d97e 100644 --- a/internal/hub.go +++ b/internal/hub.go @@ -1,6 +1,7 @@ package internal import ( + "context" "fmt" "log" "sync" @@ -17,6 +18,11 @@ const ( maxMessageSize = 512 ) +const ( + minHighWaterMarkMapRebuild = 100 + mapRebuildThreshold = 4 +) + type Client struct { ID string Conn *websocket.Conn @@ -36,34 +42,86 @@ func NewClient(conn *websocket.Conn, subscribedPath string) *Client { } type Hub struct { - Clients map[*Client]bool - Broadcast chan []byte - Register chan *Client - Unregister chan *Client + path string + maxClients int + Clients map[*Client]bool + Broadcast chan []byte + Register chan *Client + Unregister chan *Client + monitor *MemoryMonitor + highWaterMark int } -func NewHub() *Hub { +func NewHub(path string, maxClients int) *Hub { + log.Printf("[%s] Hub created with max clients: %d", path, maxClients) return &Hub{ + path: path, + maxClients: maxClients, + monitor: NewMemoryMonitor(), Broadcast: make(chan []byte, 256), - Register: make(chan *Client, 10), - Unregister: make(chan *Client, 10), + Register: make(chan *Client, maxClients), + Unregister: make(chan *Client, maxClients), Clients: make(map[*Client]bool), } } -func (h *Hub) Run() { +// Run starts the hub event loop. It exits when ctx is cancelled. +func (h *Hub) Run(ctx context.Context) { + monitorTicker := time.NewTicker(MonitorInterval) go func() { + defer func() { + monitorTicker.Stop() + // On shutdown, close every client's Send channel so WritePump sends + // a WebSocket close frame and exits. Also expire the read deadline so + // blocked ReadMessage() calls in ReadPump return immediately instead + // of waiting up to pongWait (60 s). + for client := range h.Clients { + close(client.Send) + client.Conn.SetReadDeadline(time.Now()) + } + log.Printf("[%s] Hub stopped\n", h.path) + }() + for { select { + case <-ctx.Done(): + log.Printf("[%s] Hub shutting down\n", h.path) + return + case client := <-h.Register: + if len(h.Clients) >= h.maxClients { + close(client.Send) + client.Conn.Close() + log.Printf("[%s] Rejected client %s (max %d reached)\n", h.path, client.ID, h.maxClients) + break + } h.Clients[client] = true - log.Printf("Client registered %s\n", client.ID) + if len(h.Clients) > h.highWaterMark { + h.highWaterMark = len(h.Clients) + } + log.Printf("[%s] Client registered %s\n", h.path, client.ID) + case client := <-h.Unregister: if _, ok := h.Clients[client]; ok { delete(h.Clients, client) close(client.Send) + + // Rebuild the map when the live set has dropped to less than + // 1/mapRebuildThreshold of the peak, so the old backing buckets + // are released to the GC. + if h.highWaterMark >= minHighWaterMarkMapRebuild && + len(h.Clients) < h.highWaterMark/mapRebuildThreshold { + rebuilt := make(map[*Client]bool, len(h.Clients)) + for c := range h.Clients { + rebuilt[c] = true + } + h.Clients = rebuilt + h.highWaterMark = len(h.Clients) + log.Printf("[%s] Clients map rebuilt: %d active clients\n", h.path, len(h.Clients)) + } + log.Printf("[%s] Client Unregistered %s\n", h.path, client.ID) } - log.Printf("Client Unregistered %s\n", client.ID) + case message := <-h.Broadcast: for client := range h.Clients { select { @@ -71,9 +129,18 @@ func (h *Hub) Run() { default: close(client.Send) delete(h.Clients, client) - log.Printf("Client %s removed (slow/disconnected)", client.ID) + log.Printf("[%s] Client %s removed (slow/disconnected)\n", h.path, client.ID) } } + + case <-monitorTicker.C: + current, peak := h.monitor.Snapshot() + clientLength := len(h.Clients) + if clientLength > 0 { + log.Printf("[%s] connected clients: %d | heap alloc: %s | peak heap alloc: %s", + h.path, clientLength, FormatBytes(current), FormatBytes(peak), + ) + } } } }() @@ -82,7 +149,10 @@ func (h *Hub) Run() { func WritePump(c *Client, h *Hub) { pingTicker := time.NewTicker(pingPeriod) defer func() { - h.Unregister <- c + select { + case h.Unregister <- c: + default: + } pingTicker.Stop() c.Conn.Close() }() @@ -103,7 +173,7 @@ func WritePump(c *Client, h *Hub) { } w.Write(message) - // Queue queued messages in the same buffer (optional optimization) + // Flush any messages that queued up while we were writing. n := len(c.Send) for i := 0; i < n; i++ { w.Write(<-c.Send) @@ -120,13 +190,15 @@ func WritePump(c *Client, h *Hub) { return } } - } } func ReadPump(c *Client, h *Hub) { defer func() { - h.Unregister <- c + select { + case h.Unregister <- c: + default: + } c.Conn.Close() }() diff --git a/internal/memory_monitor.go b/internal/memory_monitor.go new file mode 100644 index 0000000..ae51f2b --- /dev/null +++ b/internal/memory_monitor.go @@ -0,0 +1,60 @@ +package internal + +import ( + "context" + "log" + "runtime" + "sync/atomic" + "time" +) + +const MonitorInterval = 30 * time.Second + +type MemoryMonitor struct { + peakAlloc atomic.Uint64 +} + +func NewMemoryMonitor() *MemoryMonitor { + return &MemoryMonitor{} +} + +// Snapshot reads current heap allocation, updates the peak, and returns both. +func (m *MemoryMonitor) Snapshot() (currentAlloc, peakAlloc uint64) { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + currentAlloc = ms.HeapAlloc + for { + peak := m.peakAlloc.Load() + if currentAlloc <= peak { + return currentAlloc, peak + } + if m.peakAlloc.CompareAndSwap(peak, currentAlloc) { + return currentAlloc, currentAlloc + } + } +} + +// Run starts a periodic monitor loop that calls logFn with each snapshot. +// It blocks until ctx is cancelled. +func (m *MemoryMonitor) Run(ctx context.Context, logFn func(currentAlloc, peakAlloc uint64)) { + ticker := time.NewTicker(MonitorInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + current, peak := m.Snapshot() + logFn(current, peak) + } + } +} + +// DefaultLogFn returns a log function with the given prefix. +func DefaultLogFn(prefix string) func(currentAlloc, peakAlloc uint64) { + return func(currentAlloc, peakAlloc uint64) { + log.Printf("[%s] heap alloc: %s | peak heap alloc: %s", + prefix, FormatBytes(currentAlloc), FormatBytes(peakAlloc)) + } +} diff --git a/v1/client/client.go b/v1/client/client.go index 90b5582..054236d 100644 --- a/v1/client/client.go +++ b/v1/client/client.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" customrwmutex "git.neurocipta.com/rogerferdinan/custom-rwmutex" @@ -137,9 +138,11 @@ func (b *SafeWebsocketClientBuilder) Build(ctx context.Context) (*SafeWebsocketC isConnected: false, doneMap: safemap.NewSafeMap[string, chan struct{}](), writeChan: make(chan Message, *b.writeChannelSize), + monitor: internal.NewMemoryMonitor(), } go wsClient.reconnectHandler() + go wsClient.monitor.Run(ctx, internal.DefaultLogFn("client-monitor")) if err := wsClient.connect(); err != nil { return nil, fmt.Errorf("failed to establish initial connection: %v", err) @@ -158,11 +161,12 @@ type SafeWebsocketClient struct { path *string rawQuery *string - mu *customrwmutex.CustomRwMutex - conn *websocket.Conn - ctx context.Context - cancelFuncs []context.CancelFunc - dataChannel chan []byte + mu *customrwmutex.CustomRwMutex + conn *websocket.Conn + ctx context.Context + cancelFuncs []context.CancelFunc + dataChannel chan []byte + dataChannelOnce sync.Once reconnectCh chan struct{} reconnectChans []chan struct{} @@ -170,8 +174,10 @@ type SafeWebsocketClient struct { doneMap *safemap.SafeMap[string, chan struct{}] writeChan chan Message + monitor *internal.MemoryMonitor } + func (wsClient *SafeWebsocketClient) connect() error { var scheme string if wsClient.useTLS { @@ -190,7 +196,7 @@ func (wsClient *SafeWebsocketClient) connect() error { newURL.RawQuery = *wsClient.rawQuery } - header := make(http.Header) + header := make(http.Header, 0) if wsClient.headers != nil { for k, v := range *wsClient.headers { header.Set(k, v) @@ -226,9 +232,13 @@ func (wsClient *SafeWebsocketClient) connect() error { return err } - wsClient.writeChan <- Message{ + select { + case wsClient.writeChan <- Message{ MessageType: MessageTypePong, Data: []byte(pingData), + }: + default: + log.Println("writeChan full, dropping pong") } return nil }) @@ -291,14 +301,10 @@ func (wsClient *SafeWebsocketClient) reconnectHandler() { for _, reconnectCh := range wsClient.reconnectChans { select { case reconnectCh <- struct{}{}: - default: // prevent blocking if chan is full + default: } } - if len(wsClient.reconnectChans) > 1 { - wsClient.reconnectChans = wsClient.reconnectChans[1:] - } else { - wsClient.reconnectChans = nil - } + wsClient.reconnectChans = nil } case <-wsClient.ctx.Done(): log.Println("reconnect handler stopped due to client shutdown") @@ -334,8 +340,11 @@ func (wsClient *SafeWebsocketClient) writePump(ctx context.Context, c *websocket } func (wsClient *SafeWebsocketClient) readPump(ctx context.Context, c *websocket.Conn) { + canceled := false defer func() { - wsClient.triggerReconnect() + if !canceled { + wsClient.triggerReconnect() + } c.Close() }() @@ -343,6 +352,7 @@ func (wsClient *SafeWebsocketClient) readPump(ctx context.Context, c *websocket. select { case <-ctx.Done(): log.Println("Reader canceled by context") + canceled = true return default: // Set read deadline @@ -363,6 +373,7 @@ func (wsClient *SafeWebsocketClient) readPump(ctx context.Context, c *websocket. select { case wsClient.dataChannel <- data: case <-ctx.Done(): + canceled = true return default: if wsClient.isDrop { @@ -383,9 +394,14 @@ func (wsClient *SafeWebsocketClient) startPingTicker(ctx context.Context) { log.Println("ping ticker canceled by context") return case <-ticker.C: - wsClient.writeChan <- Message{ + select { + case wsClient.writeChan <- Message{ MessageType: websocket.PingMessage, Data: []byte{}, + }: + case <-ctx.Done(): + log.Println("ping ticker canceled by context") + return } } } @@ -413,7 +429,9 @@ func (wsClient *SafeWebsocketClient) DataChannel() <-chan []byte { } func (wsClient *SafeWebsocketClient) CloseDataChannel() { - close(wsClient.dataChannel) + wsClient.dataChannelOnce.Do(func() { + close(wsClient.dataChannel) + }) } func (wsClient *SafeWebsocketClient) Write(data []byte) error { @@ -425,7 +443,7 @@ func (wsClient *SafeWebsocketClient) Write(data []byte) error { } func (wsClient *SafeWebsocketClient) Close() error { - wsClient.mu.ReadHandler(func() error { + wsClient.mu.WriteHandler(func() error { if wsClient.cancelFuncs != nil { for _, cancel := range wsClient.cancelFuncs { cancel() @@ -443,7 +461,10 @@ func (wsClient *SafeWebsocketClient) Close() error { wsClient.conn.Close() } wsClient.isConnected = false - close(wsClient.dataChannel) + + wsClient.dataChannelOnce.Do(func() { + close(wsClient.dataChannel) + }) return nil } diff --git a/v1/examples/server/main.go b/v1/examples/server/main.go index 9e34b72..9297eeb 100644 --- a/v1/examples/server/main.go +++ b/v1/examples/server/main.go @@ -1,8 +1,12 @@ package main import ( + "context" "encoding/json" "log" + "os" + "os/signal" + "syscall" "time" "git.neurocipta.com/rogerferdinan/safe-web-socket/v1/server" @@ -14,34 +18,66 @@ type ExampleData struct { } func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + log.Println("Received shutdown signal") + cancel() + }() + s, err := server.NewSafeWebsocketServerBuilder(). BaseHost("localhost"). BasePort(8080). ApiKey("abcd"). - HandleFuncWebsocket("/ws/test/", "data_1", func(c chan []byte) { + Context(ctx). + HandleFuncWebsocket("/ws/test/", "data_1", 5_000, func(ctx context.Context, c chan []byte) { ticker := time.NewTicker(100 * time.Millisecond) - for range ticker.C { - jsonBytes, err := json.Marshal(ExampleData{ - Time: time.Now(), - Data: "data_1", - }) - if err != nil { - continue + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + jsonBytes, err := json.Marshal(ExampleData{ + Time: time.Now(), + Data: "data_1", + }) + if err != nil { + continue + } + select { + case c <- jsonBytes: + case <-ctx.Done(): + return + } } - c <- jsonBytes } }). - HandleFuncWebsocket("/ws/test/", "data_2", func(c chan []byte) { + HandleFuncWebsocket("/ws/test/", "data_2", 5_000, func(ctx context.Context, c chan []byte) { ticker := time.NewTicker(100 * time.Millisecond) - for range ticker.C { - jsonBytes, err := json.Marshal(ExampleData{ - Time: time.Now(), - Data: "data_2", - }) - if err != nil { - continue + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + jsonBytes, err := json.Marshal(ExampleData{ + Time: time.Now(), + Data: "data_2", + }) + if err != nil { + continue + } + select { + case c <- jsonBytes: + case <-ctx.Done(): + return + } } - c <- jsonBytes } }). Build() diff --git a/v1/server/server.go b/v1/server/server.go index 3aa315a..d686fd0 100644 --- a/v1/server/server.go +++ b/v1/server/server.go @@ -1,12 +1,14 @@ package server import ( + "context" "crypto/subtle" "fmt" "log" "net/http" "strings" "syscall" + "time" "git.neurocipta.com/rogerferdinan/safe-web-socket/internal" "github.com/gorilla/websocket" @@ -29,6 +31,7 @@ type SafeWebsocketServerBuilder struct { apiKey *string `nil_checker:"required"` upgrader *websocket.Upgrader `nil_checker:"required"` mux *http.ServeMux `nil_checker:"required"` + ctx context.Context } func NewSafeWebsocketServerBuilder() *SafeWebsocketServerBuilder { @@ -41,6 +44,7 @@ func NewSafeWebsocketServerBuilder() *SafeWebsocketServerBuilder { }, }, mux: http.NewServeMux(), + ctx: context.Background(), } } @@ -59,16 +63,26 @@ func (b *SafeWebsocketServerBuilder) ApiKey(apiKey string) *SafeWebsocketServerB return b } +// Context sets the lifecycle context for all hub and writeFunc goroutines. +// When ctx is cancelled, hubs stop dispatching and all active connections +// are unblocked so their goroutines can exit cleanly. +// Call this before HandleFuncWebsocket. Defaults to context.Background(). +func (b *SafeWebsocketServerBuilder) Context(ctx context.Context) *SafeWebsocketServerBuilder { + b.ctx = ctx + return b +} + func (b *SafeWebsocketServerBuilder) HandleFunc(pattern string, fn func(http.ResponseWriter, *http.Request)) *SafeWebsocketServerBuilder { b.mux.HandleFunc(pattern, fn) return b } -func (b *SafeWebsocketServerBuilder) HandleFuncWebsocket(pattern string, subscribedPath string, writeFunc func(writeChannel chan []byte)) *SafeWebsocketServerBuilder { - h := internal.NewHub() - h.Run() +// HandleFuncWebsocket registers a WebSocket endpoint. +func (b *SafeWebsocketServerBuilder) HandleFuncWebsocket(pattern string, subscribedPath string, maxClients int, writeFunc func(ctx context.Context, writeChannel chan []byte)) *SafeWebsocketServerBuilder { + h := internal.NewHub(pattern+subscribedPath, maxClients) + h.Run(b.ctx) - go writeFunc(h.Broadcast) + go writeFunc(b.ctx, h.Broadcast) b.mux.HandleFunc(pattern+subscribedPath, func(w http.ResponseWriter, r *http.Request) { conn, err := b.upgrader.Upgrade(w, r, nil) @@ -87,7 +101,6 @@ func (b *SafeWebsocketServerBuilder) HandleFuncWebsocket(pattern string, subscri go internal.WritePump(c, h) go internal.ReadPump(c, h) - }) return b } @@ -102,6 +115,7 @@ func (b *SafeWebsocketServerBuilder) Build() (*SafeWebsocketServer, error) { mux: b.mux, url: fmt.Sprintf("%s:%d", *b.baseHost, *b.basePort), apiKey: *b.apiKey, + ctx: b.ctx, } return &safeServer, nil } @@ -110,6 +124,7 @@ type SafeWebsocketServer struct { mux *http.ServeMux url string apiKey string + ctx context.Context } func (s *SafeWebsocketServer) AuthMiddleware(next http.Handler) http.Handler { @@ -130,9 +145,30 @@ func (s *SafeWebsocketServer) AuthMiddleware(next http.Handler) http.Handler { } func (s *SafeWebsocketServer) ListenAndServe() error { - log.Printf("HTTP serve on %s\n", s.url) - if err := http.ListenAndServe(s.url, s.AuthMiddleware(s.mux)); err != nil { - return fmt.Errorf("failed to serve websocket: %w", err) + srv := &http.Server{ + Addr: s.url, + Handler: s.AuthMiddleware(s.mux), + } + + errCh := make(chan error, 1) + go func() { + log.Printf("HTTP serve on %s\n", s.url) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errCh <- fmt.Errorf("failed to serve websocket: %w", err) + } + close(errCh) + }() + + select { + case err := <-errCh: + return err + case <-s.ctx.Done(): + log.Println("Server shutting down...") + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("server shutdown: %w", err) + } + return <-errCh } - return nil }