From e92ce9cc21663056520b7b2b2032d360cd6e1ab8 Mon Sep 17 00:00:00 2001 From: David vonThenen <12752197+dvonthenen@users.noreply.github.com> Date: Wed, 5 Jun 2024 14:09:26 -0700 Subject: [PATCH] Fix Repeated/Excessive Read Failure on WS --- examples/streaming/http/main.go | 4 +- examples/streaming/microphone/main.go | 4 +- examples/streaming/replay/main.go | 4 +- examples/streaming/test/main.go | 4 +- pkg/client/live/client.go | 71 ++++++++++++++--------- tests/edge_cases/cancel/main.go | 4 +- tests/edge_cases/failed_retry/main.go | 4 +- tests/edge_cases/keepalive/main.go | 4 +- tests/edge_cases/reconnect_client/main.go | 4 +- tests/edge_cases/timeout/main.go | 4 +- 10 files changed, 62 insertions(+), 45 deletions(-) diff --git a/examples/streaming/http/main.go b/examples/streaming/http/main.go index 0c43941c..4b98f261 100644 --- a/examples/streaming/http/main.go +++ b/examples/streaming/http/main.go @@ -55,8 +55,8 @@ func main() { fmt.Printf("Stream is up and running %s\n", reflect.TypeOf(res)) // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { fmt.Println("Client.Connect failed") os.Exit(1) } diff --git a/examples/streaming/microphone/main.go b/examples/streaming/microphone/main.go index 8dd8a2ec..3b9c6db2 100644 --- a/examples/streaming/microphone/main.go +++ b/examples/streaming/microphone/main.go @@ -157,8 +157,8 @@ func main() { } // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { fmt.Println("Client.Connect failed") os.Exit(1) } diff --git a/examples/streaming/replay/main.go b/examples/streaming/replay/main.go index 477bc89f..dfa279e3 100644 --- a/examples/streaming/replay/main.go +++ b/examples/streaming/replay/main.go @@ -44,8 +44,8 @@ func main() { } // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { log.Println("Client.Connect failed") os.Exit(1) } diff --git a/examples/streaming/test/main.go b/examples/streaming/test/main.go index 16ef17a6..6c0892c9 100644 --- a/examples/streaming/test/main.go +++ b/examples/streaming/test/main.go @@ -59,8 +59,8 @@ func main() { fmt.Print("\n\nPress ENTER to exit!\n\n") // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { fmt.Println("Client.Connect failed") os.Exit(1) } diff --git a/pkg/client/live/client.go b/pkg/client/live/client.go index 51530116..366ba1cf 100644 --- a/pkg/client/live/client.go +++ b/pkg/client/live/client.go @@ -115,32 +115,45 @@ func NewWithCancel(ctx context.Context, ctxCancel context.CancelFunc, apiKey str } // Connect performs a websocket connection with "DefaultConnectRetry" number of retries. -func (c *Client) Connect() *websocket.Conn { - return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(DefaultConnectRetry)) +func (c *Client) Connect() bool { + // set the retry count + if c.retryCnt == 0 { + c.retryCnt = DefaultConnectRetry + } + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) != nil +} + +// ConnectWithCancel performs a websocket connection with specified number of retries and providing a +// cancel function to stop the connection +func (c *Client) ConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) bool { + return c.internalConnectWithCancel(ctx, ctxCancel, retryCnt) != nil } // AttemptReconnect performs a reconnect after failing retries -func (c *Client) AttemptReconnect(ctx context.Context, retries int64) *websocket.Conn { +func (c *Client) AttemptReconnect(ctx context.Context, retries int64) bool { c.retry = true c.ctx, c.ctxCancel = context.WithCancel(ctx) - return c.ConnectWithRetry(c.ctx, c.ctxCancel, int(retries)) + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(retries)) != nil } -// AttemptReconnect performs a reconnect after failing retries -func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) *websocket.Conn { +// AttemptReconnect performs a reconnect after failing retries and providing a cancel function +func (c *Client) AttemptReconnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retries int64) bool { c.retry = true - return c.ConnectWithRetry(ctx, ctxCancel, int(retries)) + return c.internalConnectWithCancel(ctx, ctxCancel, int(retries)) != nil +} + +func (c *Client) internalConnect() *websocket.Conn { + return c.internalConnectWithCancel(c.ctx, c.ctxCancel, int(c.retryCnt)) } -// ConnectWithRetry allows for connecting with specified retry attempts -// //nolint:funlen // this is a complex function. keep as is -func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn { +func (c *Client) internalConnectWithCancel(ctx context.Context, ctxCancel context.CancelFunc, retryCnt int) *websocket.Conn { klog.V(7).Infof("live.Connect() ENTER\n") // set the context c.ctx = ctx c.ctxCancel = ctxCancel + c.retryCnt = int64(retryCnt) // we explicitly stopped and should not attempt to reconnect if !c.retry { @@ -149,13 +162,6 @@ func (c *Client) ConnectWithRetry(ctx context.Context, ctxCancel context.CancelF return nil } - // set the retry count - if retryCnt <= 0 { - c.retryCnt = DefaultConnectRetry - } else { - c.retryCnt = int64(retryCnt) - } - // if the connection is good, return it otherwise, attempt reconnect if c.wsconn != nil { select { @@ -285,7 +291,7 @@ func (c *Client) listen() { klog.V(6).Infof("live.listen() LEAVE\n") return case <-ticker.C: - ws := c.Connect() + ws := c.internalConnect() if ws == nil { klog.V(3).Infof("listen: Connection is not valid\n") klog.V(6).Infof("live.listen() LEAVE\n") @@ -348,8 +354,19 @@ func (c *Client) listen() { return default: klog.V(1).Infof("listen: Cannot read websocket message. Err: %v\n", err) + + // send error on callback + sendErr := c.sendError(err) + if sendErr != nil { + klog.V(1).Infof("listen: EOF error. Err: %v\n", sendErr) + } + + // close the connection + c.closeWs(true) + + klog.V(6).Infof("live.listen() LEAVE\n") + return } - continue } if len(byMsg) == 0 { @@ -393,13 +410,13 @@ func (c *Client) Stream(r io.Reader) error { case strings.Contains(errStr, FatalReadSocketErr): klog.V(1).Infof("Fatal socket error: %v\n", err) klog.V(6).Infof("live.Stream() LEAVE\n") - return nil + return err case (err == io.EOF || err == io.ErrUnexpectedEOF) && !c.retry: klog.V(3).Infof("stream object EOF\n") klog.V(6).Infof("live.Stream() LEAVE\n") - return nil - case err != nil: - klog.V(1).Infof("r.Read encountered EOF. Err: %v\n", err) + return err + default: + klog.V(1).Infof("r.Read error. Err: %v\n", err) klog.V(6).Infof("live.Stream() LEAVE\n") return err } @@ -426,7 +443,7 @@ func (c *Client) WriteBinary(byData []byte) error { klog.V(7).Infof("live.WriteBinary() ENTER\n") // doing a write, need to lock - ws := c.Connect() + ws := c.internalConnect() if ws == nil { err := ErrInvalidConnection klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) @@ -463,7 +480,7 @@ func (c *Client) WriteJSON(payload interface{}) error { klog.V(7).Infof("live.WriteJSON() ENTER\n") // doing a write, need to lock - ws := c.Connect() + ws := c.internalConnect() if ws == nil { err := ErrInvalidConnection klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) @@ -522,7 +539,7 @@ func (c *Client) Finalize() error { klog.V(7).Infof("live.Finalize() ENTER\n") // doing a write, need to lock - ws := c.Connect() + ws := c.internalConnect() if ws == nil { err := ErrInvalidConnection klog.V(4).Infof("c.Connect() is nil. Err: %v\n", err) @@ -620,7 +637,7 @@ func (c *Client) ping() { klog.V(5).Infof("Starting ping...") counter++ - ws := c.Connect() + ws := c.internalConnect() if ws == nil { klog.V(1).Infof("ping Connection is not valid\n") klog.V(6).Infof("live.ping() LEAVE\n") diff --git a/tests/edge_cases/cancel/main.go b/tests/edge_cases/cancel/main.go index 39781066..94bd5451 100644 --- a/tests/edge_cases/cancel/main.go +++ b/tests/edge_cases/cancel/main.go @@ -43,8 +43,8 @@ func main() { } // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { fmt.Println("Client.Connect failed") os.Exit(1) } diff --git a/tests/edge_cases/failed_retry/main.go b/tests/edge_cases/failed_retry/main.go index 8d55f267..88945392 100644 --- a/tests/edge_cases/failed_retry/main.go +++ b/tests/edge_cases/failed_retry/main.go @@ -41,8 +41,8 @@ func main() { } // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { fmt.Println("Client.Connect failed") os.Exit(1) } diff --git a/tests/edge_cases/keepalive/main.go b/tests/edge_cases/keepalive/main.go index 9e002c9a..d4fc7408 100644 --- a/tests/edge_cases/keepalive/main.go +++ b/tests/edge_cases/keepalive/main.go @@ -40,8 +40,8 @@ func main() { } // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { fmt.Println("Client.Connect failed") os.Exit(1) } diff --git a/tests/edge_cases/reconnect_client/main.go b/tests/edge_cases/reconnect_client/main.go index 0da5399d..3e31bff4 100644 --- a/tests/edge_cases/reconnect_client/main.go +++ b/tests/edge_cases/reconnect_client/main.go @@ -154,8 +154,8 @@ func main() { } // connect the websocket to Deepgram - wsconn := dgClient.AttemptReconnect(context.Background(), 3) - if wsconn == nil { + bConnected := dgClient.AttemptReconnect(context.Background(), 3) + if !bConnected { fmt.Println("Client.AttemptReconnect failed") os.Exit(1) } diff --git a/tests/edge_cases/timeout/main.go b/tests/edge_cases/timeout/main.go index 3390a939..2a017e77 100644 --- a/tests/edge_cases/timeout/main.go +++ b/tests/edge_cases/timeout/main.go @@ -39,8 +39,8 @@ func main() { fmt.Printf("\n\nYou will first see an OpenResponse message followed by CloseResponse in 12 seconds.\n") // connect the websocket to Deepgram - wsconn := dgClient.Connect() - if wsconn == nil { + bConnected := dgClient.Connect() + if !bConnected { fmt.Println("Client.Connect failed") os.Exit(1) }