package internal import ( "fmt" "log" "time" "github.com/google/uuid" "github.com/gorilla/websocket" ) const ( writeWait = 10 * time.Second pongWait = 60 * time.Second pingPeriod = 25 * time.Second ) type Client struct { ID string Conn *websocket.Conn Send chan []byte SubscribedPath string done chan struct{} } func NewClient(conn *websocket.Conn, subscribedPath string) *Client { return &Client{ ID: uuid.NewString(), Conn: conn, Send: make(chan []byte, 1), SubscribedPath: subscribedPath, done: make(chan struct{}, 1), } } type Hub struct { Clients map[*Client]bool Broadcast chan []byte Register chan *Client Unregister chan *Client } func NewHub() *Hub { return &Hub{ Broadcast: make(chan []byte, 1), Register: make(chan *Client, 1), Unregister: make(chan *Client, 1), Clients: make(map[*Client]bool), } } func (h *Hub) Run() { go func() { for { select { case client := <-h.Register: h.Clients[client] = true log.Println("Client registered") case c := <-h.Unregister: if v, ok := h.Clients[c]; ok { fmt.Println(v, c) delete(h.Clients, c) close(c.Send) } log.Println("Client Unregistered") case message := <-h.Broadcast: for client := range h.Clients { client.Send <- message // select { // case client.Send <- message: // default: // close(client.Send) // delete(h.Clients, client) // } } } } }() } func WritePump(c *Client, h *Hub) { pingTicker := time.NewTicker(pingPeriod) defer func() { h.Unregister <- c pingTicker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if !ok { c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) fmt.Println(ok) return } if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil { fmt.Println(err) return } case <-pingTicker.C: c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := c.Conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { fmt.Println(err) return } } } } func ReadPump(c *Client, h *Hub) { defer func() { h.Unregister <- c c.Conn.Close() }() c.Conn.SetReadLimit(1024) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for { messageType, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } if messageType == websocket.TextMessage { log.Printf("Received: %s\n", message) } } }