feat: adding optional drop channel

This commit is contained in:
2025-10-11 22:17:31 +07:00
parent 0d0de32d21
commit cd4a239f14

View File

@@ -38,6 +38,7 @@ type SafeWebsocketClientBuilder struct {
basePort *uint16 `nil_checker:"required"` basePort *uint16 `nil_checker:"required"`
path *string path *string
rawQuery *string rawQuery *string
isDrop *bool
useTLS *bool useTLS *bool
channelSize *int64 channelSize *int64
} }
@@ -71,6 +72,11 @@ func (b *SafeWebsocketClientBuilder) RawQuery(rawQuery string) *SafeWebsocketCli
return b return b
} }
func (b *SafeWebsocketClientBuilder) IsDrop(isDrop bool) *SafeWebsocketClientBuilder {
b.isDrop = &isDrop
return b
}
func (b *SafeWebsocketClientBuilder) ChannelSize(channelSize int64) *SafeWebsocketClientBuilder { func (b *SafeWebsocketClientBuilder) ChannelSize(channelSize int64) *SafeWebsocketClientBuilder {
b.channelSize = &channelSize b.channelSize = &channelSize
return b return b
@@ -81,9 +87,15 @@ func (b *SafeWebsocketClientBuilder) Build(ctx context.Context) (*SafeWebsocketC
return nil, err return nil, err
} }
var useTLS bool // var useTLS bool
if b.useTLS != nil { if b.useTLS == nil {
useTLS = *b.useTLS useTLS := true
b.useTLS = &useTLS
}
if b.isDrop == nil {
isDrop := true
b.isDrop = &isDrop
} }
if b.channelSize == nil { if b.channelSize == nil {
@@ -94,7 +106,8 @@ func (b *SafeWebsocketClientBuilder) Build(ctx context.Context) (*SafeWebsocketC
wsClient := SafeWebsocketClient{ wsClient := SafeWebsocketClient{
baseHost: *b.baseHost, baseHost: *b.baseHost,
basePort: *b.basePort, basePort: *b.basePort,
useTLS: useTLS, useTLS: *b.useTLS,
isDrop: *b.isDrop,
path: b.path, path: b.path,
rawQuery: b.rawQuery, rawQuery: b.rawQuery,
dataChannel: make(chan []byte, *b.channelSize), dataChannel: make(chan []byte, *b.channelSize),
@@ -118,17 +131,16 @@ func (b *SafeWebsocketClientBuilder) Build(ctx context.Context) (*SafeWebsocketC
type SafeWebsocketClient struct { type SafeWebsocketClient struct {
baseHost string baseHost string
basePort uint16 basePort uint16
isDrop bool
useTLS bool useTLS bool
path *string path *string
rawQuery *string rawQuery *string
dataChannel chan []byte
mu *custom_rwmutex.CustomRwMutex mu *custom_rwmutex.CustomRwMutex
conn *websocket.Conn conn *websocket.Conn
cancelFuncs []context.CancelFunc
ctx context.Context ctx context.Context
Cancel context.CancelFunc cancelFuncs []context.CancelFunc
dataChannel chan []byte
reconnectCh chan struct{} reconnectCh chan struct{}
reconnectChans []chan struct{} reconnectChans []chan struct{}
@@ -214,18 +226,15 @@ func (wsClient *SafeWebsocketClient) writePump() {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Println("Writer stopped due to client shutdown") log.Println("Writer canceled by context")
return return
case data := <-wsClient.writeChan: case data := <-wsClient.writeChan:
if c == nil { if c == nil {
wsClient.triggerReconnect()
return return
} }
if err := c.WriteMessage(int(data.MessageType), data.Data); err != nil { if err := c.WriteMessage(int(data.MessageType), data.Data); err != nil {
log.Printf("error on write message: %v\n", err) log.Printf("error on write message: %v\n", err)
c.Close()
wsClient.triggerReconnect()
if data.MessageType == MessageTypePong { if data.MessageType == MessageTypePong {
wsClient.pongChan <- err wsClient.pongChan <- err
} }
@@ -243,22 +252,22 @@ func (wsClient *SafeWebsocketClient) readPump() {
}) })
var c *websocket.Conn var c *websocket.Conn
wsClient.mu.ReadHandler(func() error { wsClient.mu.ReadHandler(func() error {
c = wsClient.conn c = wsClient.conn
return nil return nil
}) })
if c == nil {
wsClient.triggerReconnect()
return
}
if wsClient.isDrop {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Println("Reader stopped due to client shutdown") log.Println("Reader canceled by context")
return return
default: default:
if c == nil {
return
}
if err := c.SetReadDeadline(time.Now().Add(readDeadline)); err != nil { if err := c.SetReadDeadline(time.Now().Add(readDeadline)); err != nil {
log.Printf("error on read deadline: %v\n", err) log.Printf("error on read deadline: %v\n", err)
return return
@@ -284,6 +293,42 @@ func (wsClient *SafeWebsocketClient) readPump() {
} }
} }
} }
} else {
for {
select {
case <-ctx.Done():
log.Println("Reader canceled by context")
return
default:
if c == nil {
return
}
if err := c.SetReadDeadline(time.Now().Add(readDeadline)); err != nil {
log.Printf("error on read deadline: %v\n", err)
return
}
messageType, data, err := c.ReadMessage()
if err != nil {
log.Printf("error on read message: %v\n", err)
wsClient.triggerReconnect()
return
}
if messageType != websocket.TextMessage {
continue
}
select {
case wsClient.dataChannel <- data:
case <-ctx.Done():
return
}
}
}
}
} }
func (wsClient *SafeWebsocketClient) startPingTicker(ctx context.Context) { func (wsClient *SafeWebsocketClient) startPingTicker(ctx context.Context) {