package internal import ( "fmt" "log" "sync" "time" "github.com/google/uuid" "github.com/gorilla/websocket" ) const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 maxMessageSize = 512 ) type Client struct { ID string Conn *websocket.Conn Send chan []byte SubscribedPath string mu sync.Mutex } func NewClient(conn *websocket.Conn, subscribedPath string) *Client { return &Client{ ID: uuid.NewString(), Conn: conn, Send: make(chan []byte, 256), SubscribedPath: subscribedPath, mu: sync.Mutex{}, } } type Hub struct { Clients map[*Client]bool Broadcast chan []byte Register chan *Client Unregister chan *Client } func NewHub() *Hub { return &Hub{ Broadcast: make(chan []byte, 256), Register: make(chan *Client, 10), Unregister: make(chan *Client, 10), Clients: make(map[*Client]bool), } } func (h *Hub) Run() { go func() { for { select { case client := <-h.Register: h.Clients[client] = true log.Printf("Client registered %s\n", client.ID) case client := <-h.Unregister: if _, ok := h.Clients[client]; ok { delete(h.Clients, client) close(client.Send) } log.Printf("Client Unregistered %s\n", client.ID) case message := <-h.Broadcast: for client := range h.Clients { select { case client.Send <- message: default: close(client.Send) delete(h.Clients, client) log.Printf("Client %s removed (slow/disconnected)", client.ID) } } } } }() } func WritePump(c *Client, h *Hub) { pingTicker := time.NewTicker(pingPeriod) defer func() { h.Unregister <- c pingTicker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.Conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) // Queue queued messages in the same buffer (optional optimization) n := len(c.Send) for i := 0; i < n; i++ { w.Write(<-c.Send) } if err := w.Close(); err != nil { return } case <-pingTicker.C: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { fmt.Println(err) return } } } } func ReadPump(c *Client, h *Hub) { defer func() { h.Unregister <- c c.Conn.Close() }() c.Conn.SetReadLimit(maxMessageSize) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } log.Printf("Received: %s\n", message) } }