From b4e7238b0b1d0fa85478cfd5277c29e57229d9cb Mon Sep 17 00:00:00 2001 From: Roger Ferdinan Date: Tue, 30 Sep 2025 09:59:46 +0700 Subject: [PATCH] fix: fixing data race --- internal/hub.go | 3 +-- v1/client/client.go | 38 ++++++++++++++------------------------ 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/internal/hub.go b/internal/hub.go index 9ee124d..1eb3e19 100644 --- a/internal/hub.go +++ b/internal/hub.go @@ -1,7 +1,6 @@ package internal import ( - "fmt" "log" "time" @@ -125,7 +124,7 @@ func ReadPump(c *Client, h *Hub) { } if messageType == websocket.TextMessage { - fmt.Printf("Received: %s\n", message) + log.Printf("Received: %s\n", message) } } } diff --git a/v1/client/client.go b/v1/client/client.go index d629a1c..f6a311b 100644 --- a/v1/client/client.go +++ b/v1/client/client.go @@ -3,7 +3,6 @@ package client import ( "context" "fmt" - "io" "log" "net/url" "strings" @@ -235,12 +234,13 @@ func (wsClient *SafeWebsocketClient) connect() error { }) pingCtx, pingCancel := context.WithCancel(context.Background()) - go wsClient.startPingTicker(pingCtx) wsClient.mu.WriteHandler(func() error { wsClient.cancelFuncs = append(wsClient.cancelFuncs, pingCancel) return nil }) + go wsClient.startPingTicker(pingCtx) + if wsClient.conn != nil { wsClient.conn.Close() } @@ -260,14 +260,10 @@ func (wsClient *SafeWebsocketClient) connect() error { return nil }) - writer, err := conn.NextWriter(websocket.TextMessage) - if err != nil { - return - } for { select { case <-ctx.Done(): - fmt.Println("Writer stopped due to client shutdown") + log.Println("Writer stopped due to client shutdown") return case data := <-wsClient.writeChan: if conn == nil { @@ -275,7 +271,8 @@ func (wsClient *SafeWebsocketClient) connect() error { return } - if _, err := writer.Write(data.Data); err != nil { + if err := conn.WriteMessage(int(data.MessageType), data.Data); err != nil { + log.Printf("error on write message: %v\n", err) wsClient.triggerReconnect() return } @@ -293,44 +290,37 @@ func (wsClient *SafeWebsocketClient) connect() error { for { select { case <-ctx.Done(): - fmt.Println("Reader stopped due to client shutdown") + log.Println("Reader stopped due to client shutdown") return default: if conn == nil { + wsClient.triggerReconnect() return } if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { - wsClient.triggerReconnect() - fmt.Printf("error on read deadline: %v\n", err) + log.Printf("error on read deadline: %v\n", err) return } - mt, reader, err := conn.NextReader() + messageType, data, err := conn.ReadMessage() if err != nil { + log.Printf("error on read message: %v\n", err) wsClient.triggerReconnect() return } - if mt != websocket.TextMessage { + if messageType != websocket.TextMessage { continue } - readerBytes, err := io.ReadAll(reader) - if err != nil { - fmt.Printf("io reader failed: %v\n", err) - wsClient.triggerReconnect() - return - } select { - case wsClient.dataChannel <- readerBytes: + case wsClient.dataChannel <- data: case <-ctx.Done(): return default: - fmt.Println("Data channel full, dropping message") + log.Println("Data channel full, dropping message") } - } - } }() @@ -458,7 +448,7 @@ func (wsClient *SafeWebsocketClient) Close() error { wsClient.conn.Close() } wsClient.isConnected = false - close(wsClient.dataChannel) + // close(wsClient.dataChannel) return nil }