Compare commits

...

2 Commits

View File

@@ -101,7 +101,7 @@ func (b *SafeWebsocketClientBuilder) RawQuery(rawQuery string) *SafeWebsocketCli
return b return b
} }
func (b *SafeWebsocketClientBuilder) Build() (*SafeWebsocketClient, error) { func (b *SafeWebsocketClientBuilder) Build(channelSize int) (*SafeWebsocketClient, error) {
if err := internal.NilChecker(b); err != nil { if err := internal.NilChecker(b); err != nil {
return nil, err return nil, err
} }
@@ -117,7 +117,7 @@ func (b *SafeWebsocketClientBuilder) Build() (*SafeWebsocketClient, error) {
useTLS: useTLS, useTLS: useTLS,
path: b.path, path: b.path,
rawQuery: b.rawQuery, rawQuery: b.rawQuery,
dataChannel: make(chan []byte, 1), dataChannel: make(chan []byte, channelSize),
mu: custom_rwmutex.NewCustomRwMutex(), mu: custom_rwmutex.NewCustomRwMutex(),
reconnectCh: make(chan struct{}, 1), reconnectCh: make(chan struct{}, 1),
isConnected: false, isConnected: false,
@@ -275,6 +275,9 @@ func (wsClient *SafeWebsocketClient) reconnectHandler() {
backoff := 1 * time.Second backoff := 1 * time.Second
maxBackoff := 30 * time.Second maxBackoff := 30 * time.Second
for { for {
if wsClient.ctx == nil {
continue
}
select { select {
case <-wsClient.reconnectCh: case <-wsClient.reconnectCh:
log.Println("Reconnect triggered") log.Println("Reconnect triggered")
@@ -288,12 +291,13 @@ func (wsClient *SafeWebsocketClient) reconnectHandler() {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
for { isInnerLoop := true
log.Println("Attempting reconnect in %v...", backoff) for isInnerLoop {
log.Printf("Attempting reconnect in %v...", backoff)
select { select {
case <-time.After(backoff): case <-time.After(backoff):
if err := wsClient.connect(); err != nil { if err := wsClient.connect(); err != nil {
log.Println("Reconnect failed: %v", err) log.Printf("Reconnect failed: %v", err)
if backoff < maxBackoff { if backoff < maxBackoff {
backoff *= 2 backoff *= 2
} }
@@ -301,7 +305,10 @@ func (wsClient *SafeWebsocketClient) reconnectHandler() {
} }
log.Println("Reconnected successfully") log.Println("Reconnected successfully")
backoff = 1 * time.Second backoff = 1 * time.Second
break isInnerLoop = false
continue
case <-wsClient.ctx.Done():
return
} }
} }
case <-wsClient.ctx.Done(): case <-wsClient.ctx.Done():
@@ -314,3 +321,19 @@ func (wsClient *SafeWebsocketClient) reconnectHandler() {
func (wsClient *SafeWebsocketClient) DataChannel() <-chan []byte { func (wsClient *SafeWebsocketClient) DataChannel() <-chan []byte {
return wsClient.dataChannel return wsClient.dataChannel
} }
func (wsClient *SafeWebsocketClient) Close() error {
wsClient.mu.WriteHandler(func() error {
if wsClient.cancel != nil {
wsClient.cancel()
}
if wsClient.conn != nil {
wsClient.conn.Close()
}
wsClient.isConnected = false
return nil
})
close(wsClient.dataChannel)
return nil
}