From 752804cc5808e5f5275076ba6a79e02511f3c009 Mon Sep 17 00:00:00 2001 From: Roger Ferdinan Date: Sat, 27 Sep 2025 04:48:01 +0700 Subject: [PATCH] fix: restructuring safe websocket client --- v1/client/client.go | 100 +++++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 38 deletions(-) diff --git a/v1/client/client.go b/v1/client/client.go index 1071736..b7fcf77 100644 --- a/v1/client/client.go +++ b/v1/client/client.go @@ -124,6 +124,8 @@ func (b *SafeWebsocketClientBuilder) Build() (*SafeWebsocketClient, error) { doneMap: NewSafeMap[string, chan struct{}](), } + go wsClient.reconnectHandler() + if err := wsClient.connect(); err != nil { return nil, fmt.Errorf("failed to establish initial connection: %v", err) } @@ -199,63 +201,65 @@ func (wsClient *SafeWebsocketClient) connect() error { wsClient.conn = conn wsClient.isConnected = true + go wsClient.startPingTicker(ctx) + go wsClient.startReceiveHandler(ctx) + return nil }) - - go wsClient.startPingTicker() - go wsClient.startReceiveHandler() - go wsClient.reconnectHandler() - return nil } -func (wsClient *SafeWebsocketClient) startPingTicker() { +func (wsClient *SafeWebsocketClient) startPingTicker(ctx context.Context) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() - doneKey := "startPingTicker" - wsClient.doneMap.Store(doneKey, make(chan struct{})) - done, _ := wsClient.doneMap.Load(doneKey) for { select { case <-ticker.C: wsClient.mu.WriteHandler(func() error { + if wsClient.conn == nil { + return fmt.Errorf("connecrtion closed") + } if err := wsClient.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { log.Printf("Ping failed: %v. Will attempt reconnect.", err) wsClient.triggerReconnect() } return nil }) - case <-done: - log.Println("Ping ticker stopped") + case <-ctx.Done(): + log.Println("Ping ticker stopped due to context cancellation") return } } } -func (wsClient *SafeWebsocketClient) startReceiveHandler() { - doneKey := "startReceiveHandler" - wsClient.doneMap.Store(doneKey, make(chan struct{})) - done, _ := wsClient.doneMap.Load(doneKey) - +func (wsClient *SafeWebsocketClient) startReceiveHandler(ctx context.Context) { for { select { - case <-done: + case <-ctx.Done(): log.Println("Reconnect handler stopped") return default: - conn := wsClient.conn + wsClient.mu.ReadHandler(func() error { + conn := wsClient.conn + + if conn == nil { + wsClient.triggerReconnect() + return fmt.Errorf("connection closed") + } + _, message, err := conn.ReadMessage() + if err != nil { + wsClient.triggerReconnect() + return fmt.Errorf("failed to read message: %v", err) + } + select { + case wsClient.dataChannel <- message: + default: + log.Println("Data channel full, dropping message") + } + return nil + }) - if conn == nil { - wsClient.triggerReconnect() - return - } - _, message, err := conn.ReadMessage() - if err != nil { - wsClient.triggerReconnect() - return - } - wsClient.dataChannel <- message } } } @@ -268,20 +272,40 @@ func (wsClient *SafeWebsocketClient) triggerReconnect() { } func (wsClient *SafeWebsocketClient) reconnectHandler() { - doneKey := "reconnectHandler" - wsClient.doneMap.Store(doneKey, make(chan struct{})) - done, _ := wsClient.doneMap.Load(doneKey) + backoff := 1 * time.Second + maxBackoff := 30 * time.Second for { select { case <-wsClient.reconnectCh: - wsClient.cancel() - wsClient.connect() - - wsClient.doneMap.Range(func(s string, c chan struct{}) bool { - c <- struct{}{} - return true + log.Println("Reconnect triggered") + wsClient.mu.WriteHandler(func() error { + if wsClient.cancel != nil { + wsClient.cancel() + } + wsClient.isConnected = false + return nil }) - case <-done: + + time.Sleep(100 * time.Millisecond) + + for { + log.Println("Attempting reconnect in %v...", backoff) + select { + case <-time.After(backoff): + if err := wsClient.connect(); err != nil { + log.Println("Reconnect failed: %v", err) + if backoff < maxBackoff { + backoff *= 2 + } + continue + } + log.Println("Reconnected successfully") + backoff = 1 * time.Second + break + } + } + case <-wsClient.ctx.Done(): + log.Println("Reconnect handler stopped due to client shutdown") return } }