package internal import ( "fmt" "log" "time" "github.com/gorilla/websocket" ) type Client struct { Conn *websocket.Conn Send chan []byte SubscribedPath string done chan struct{} mu *CustomRwMutex } func NewClient(conn *websocket.Conn, subscribedPath string) *Client { return &Client{ Conn: conn, Send: make(chan []byte, 1024), SubscribedPath: subscribedPath, done: make(chan struct{}), mu: NewCustomRwMutex(), } } type Hub struct { Clients map[*Client]bool Broadcast chan []byte Register chan *Client Unregister chan *Client ClientData map[string]chan []byte writeMu *CustomRwMutex readMu *CustomRwMutex } func NewHub() *Hub { return &Hub{ Broadcast: make(chan []byte), Register: make(chan *Client), Unregister: make(chan *Client), Clients: make(map[*Client]bool), ClientData: make(map[string]chan []byte), writeMu: NewCustomRwMutex(), } } func (h *Hub) AddDataChannel(dataID string) chan []byte { ch := make(chan []byte, 256) h.writeMu.WriteHandler(func() error { if innerCh, ok := h.ClientData[dataID]; ok { ch = innerCh return nil } h.ClientData[dataID] = ch log.Printf("Created data channel for: %s\n", dataID) return nil }) return ch } func (h *Hub) GetDataChannel(dataID string) (chan []byte, bool) { var ch chan []byte var ok bool h.writeMu.ReadHandler(func() error { innerCh, innerOk := h.ClientData[dataID] ch = innerCh ok = innerOk return nil }) return ch, ok } func (h *Hub) RemoveDataChannel(dataID string) { h.writeMu.WriteHandler(func() error { if ch, ok := h.ClientData[dataID]; ok { close(ch) delete(h.ClientData, dataID) log.Printf("Removed data channel for: %s\n", dataID) } return nil }) } func (h *Hub) Run() { go func() { for { select { case c := <-h.Register: h.Clients[c] = true log.Println("Client registered") case c := <-h.Unregister: if _, ok := h.Clients[c]; ok { delete(h.Clients, c) close(c.Send) c.Conn.Close() log.Println("Client unregistered") } case message := <-h.Broadcast: for c := range h.Clients { select { case c.Send <- message: default: close(c.Send) delete(h.Clients, c) } } } } }() } func WritePump(c *Client, h *Hub) { go func() { defer func() { h.Unregister <- c c.Conn.Close() }() c.Conn.SetReadLimit(1024) c.Conn.SetPongHandler(func(string) error { if err := c.Conn.WriteMessage(websocket.PongMessage, []byte{}); err != nil { return fmt.Errorf("failed to send pong: %v", err) } return nil }) ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case message, ok := <-c.Send: if err := c.mu.WriteHandler(func() error { if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return fmt.Errorf("message not ok") } w, err := c.Conn.NextWriter(websocket.TextMessage) if err != nil { return fmt.Errorf("failed to get writer: %q", err) } w.Write(message) if err := w.Close(); err != nil { return err } return nil }); err == nil { continue } case <-ticker.C: if err := c.mu.WriteHandler(func() error { if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { return err } return nil }); err != nil { return } } } }() } func ReadPump(c *Client) { go func() { for { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } fmt.Println(string(message)) } }() }