fix: fixing data race
This commit is contained in:
@@ -3,7 +3,6 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"strings"
|
||||
@@ -235,12 +234,13 @@ func (wsClient *SafeWebsocketClient) connect() error {
|
||||
})
|
||||
|
||||
pingCtx, pingCancel := context.WithCancel(context.Background())
|
||||
go wsClient.startPingTicker(pingCtx)
|
||||
wsClient.mu.WriteHandler(func() error {
|
||||
wsClient.cancelFuncs = append(wsClient.cancelFuncs, pingCancel)
|
||||
return nil
|
||||
})
|
||||
|
||||
go wsClient.startPingTicker(pingCtx)
|
||||
|
||||
if wsClient.conn != nil {
|
||||
wsClient.conn.Close()
|
||||
}
|
||||
@@ -260,14 +260,10 @@ func (wsClient *SafeWebsocketClient) connect() error {
|
||||
return nil
|
||||
})
|
||||
|
||||
writer, err := conn.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("Writer stopped due to client shutdown")
|
||||
log.Println("Writer stopped due to client shutdown")
|
||||
return
|
||||
case data := <-wsClient.writeChan:
|
||||
if conn == nil {
|
||||
@@ -275,7 +271,8 @@ func (wsClient *SafeWebsocketClient) connect() error {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := writer.Write(data.Data); err != nil {
|
||||
if err := conn.WriteMessage(int(data.MessageType), data.Data); err != nil {
|
||||
log.Printf("error on write message: %v\n", err)
|
||||
wsClient.triggerReconnect()
|
||||
return
|
||||
}
|
||||
@@ -293,44 +290,37 @@ func (wsClient *SafeWebsocketClient) connect() error {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fmt.Println("Reader stopped due to client shutdown")
|
||||
log.Println("Reader stopped due to client shutdown")
|
||||
return
|
||||
default:
|
||||
if conn == nil {
|
||||
wsClient.triggerReconnect()
|
||||
return
|
||||
}
|
||||
if err := conn.SetReadDeadline(time.Now().Add(readDeadline)); err != nil {
|
||||
wsClient.triggerReconnect()
|
||||
fmt.Printf("error on read deadline: %v\n", err)
|
||||
log.Printf("error on read deadline: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
mt, reader, err := conn.NextReader()
|
||||
messageType, data, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.Printf("error on read message: %v\n", err)
|
||||
wsClient.triggerReconnect()
|
||||
return
|
||||
}
|
||||
|
||||
if mt != websocket.TextMessage {
|
||||
if messageType != websocket.TextMessage {
|
||||
continue
|
||||
}
|
||||
|
||||
readerBytes, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
fmt.Printf("io reader failed: %v\n", err)
|
||||
wsClient.triggerReconnect()
|
||||
return
|
||||
}
|
||||
select {
|
||||
case wsClient.dataChannel <- readerBytes:
|
||||
case wsClient.dataChannel <- data:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
fmt.Println("Data channel full, dropping message")
|
||||
log.Println("Data channel full, dropping message")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -458,7 +448,7 @@ func (wsClient *SafeWebsocketClient) Close() error {
|
||||
wsClient.conn.Close()
|
||||
}
|
||||
wsClient.isConnected = false
|
||||
close(wsClient.dataChannel)
|
||||
// close(wsClient.dataChannel)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user