Skip to content

Commit

Permalink
shared_client: Drain requests on handler close
Browse files Browse the repository at this point in the history
Drain requests on handler close, so that pending requests are terminated
immediately when handler needs to close for an error condition, rather
than having the requests time out. This allows the handler to be recycled
faster for new requests.

Signed-off-by: Jarno Rajahalme <[email protected]>
  • Loading branch information
jrajahalme committed Apr 29, 2024
1 parent b4c25b7 commit 8f2d006
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,27 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
conn.Close()
close(receiverTrigger)

// Drain responses send by receive loop to allow it to exit.
// Drain responses sent by receive loop to allow it to exit.
// It may be repeatedly reading after an i/o timeout, for example.
// This range will be done only after the receive loop has returned
// and closed 'responses' in its defer function.
for range responses {
}

// Now cancel all the remaining waiters
for _, waiter := range waitingResponses {
waiter.ch <- sharedClientResponse{nil, 0, net.ErrClosed}
close(waiter.ch)
}

// Drain requests in case they come in while we are closing
// down. This loop is done only after 'requests' channel is closed in
// SharedClient.close() and it is not possible for new requests or timeouts
// to be sent on those closed channels.
for req := range requests {
req.ch <- sharedClientResponse{nil, 0, net.ErrClosed}
close(req.ch)
}
}()

for {
Expand All @@ -226,7 +238,6 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
// requests to be sent.
return
}
start := time.Now()

// Check if we already have a request with the same id
// Due to birthday paradox and the fact that ID is uint16
Expand All @@ -247,6 +258,7 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
}
}

start := time.Now()
err := client.SendContext(req.ctx, req.msg, conn, start)
if err != nil {
req.ch <- sharedClientResponse{nil, 0, err}
Expand Down Expand Up @@ -309,6 +321,7 @@ func (c *SharedClient) ExchangeSharedContext(ctx context.Context, m *Msg) (r *Ms
select {
case c.requests <- request{ctx: ctx, msg: m, ch: respCh}:
case <-ctx.Done():
// request was not sent, no cleanup to do
return nil, 0, ctx.Err()
}

Expand Down

0 comments on commit 8f2d006

Please sign in to comment.