Skip to content

Commit

Permalink
fix: content deadlines for blocking calls should not be overridden (#666
Browse files Browse the repository at this point in the history
)

* fix blocking call timeouts when context has a deadline

* fix: blocking call timeouts shortly when context has a deadline

Signed-off-by: Rueian <[email protected]>

---------

Signed-off-by: Rueian <[email protected]>
Co-authored-by: Rueian <[email protected]>
  • Loading branch information
zeiler and rueian authored Nov 9, 2024
1 parent 6c926af commit 482bc6e
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 2 deletions.
16 changes: 14 additions & 2 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ func (p *pipe) DoStream(ctx context.Context, pool *pool, cmd Completed) RedisRes
}
dl, ok := ctx.Deadline()
if ok {
if p.timeout > 0 {
if p.timeout > 0 && !cmd.IsBlock() {
defaultDeadline := time.Now().Add(p.timeout)
if dl.After(defaultDeadline) {
dl = defaultDeadline
Expand Down Expand Up @@ -1101,6 +1101,12 @@ func (p *pipe) DoMultiStream(ctx context.Context, pool *pool, multi ...Completed
dl, ok := ctx.Deadline()
if ok {
if p.timeout > 0 {
for _, cmd := range multi {
if cmd.IsBlock() {
p.conn.SetDeadline(dl)
goto process
}
}
defaultDeadline := time.Now().Add(p.timeout)
if dl.After(defaultDeadline) {
dl = defaultDeadline
Expand Down Expand Up @@ -1138,7 +1144,7 @@ func (p *pipe) DoMultiStream(ctx context.Context, pool *pool, multi ...Completed

func (p *pipe) syncDo(dl time.Time, dlOk bool, cmd Completed) (resp RedisResult) {
if dlOk {
if p.timeout > 0 {
if p.timeout > 0 && !cmd.IsBlock() {
defaultDeadline := time.Now().Add(p.timeout)
if dl.After(defaultDeadline) {
dl = defaultDeadline
Expand Down Expand Up @@ -1170,6 +1176,12 @@ func (p *pipe) syncDo(dl time.Time, dlOk bool, cmd Completed) (resp RedisResult)
func (p *pipe) syncDoMulti(dl time.Time, dlOk bool, resp []RedisResult, multi []Completed) {
if dlOk {
if p.timeout > 0 {
for _, cmd := range multi {
if cmd.IsBlock() {
p.conn.SetDeadline(dl)
goto process
}
}
defaultDeadline := time.Now().Add(p.timeout)
if dl.After(defaultDeadline) {
dl = defaultDeadline
Expand Down
82 changes: 82 additions & 0 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3644,6 +3644,27 @@ func TestWriteDeadlineIsShorterThanContextDeadline_DoStream(t *testing.T) {
}
}

func TestWriteDeadlineIsNoShorterThanContextDeadline_DoStreamBlocked(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, _, _, _ := setup(t, ClientOption{ConnWriteTimeout: 5 * time.Millisecond})

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

cp := newPool(1, nil, nil)
startTime := time.Now()
s := p.DoStream(ctx, cp, cmds.NewBlockingCompleted([]string{"BLPOP", "a"}))
if err := s.Error(); err != io.EOF && !strings.Contains(err.Error(), "i/o") {
t.Fatalf("unexpected err %v", err)
}
if time.Since(startTime) < 100*time.Millisecond {
t.Fatalf("unexpected time %v", time.Since(startTime))
}
if len(cp.list) != 0 {
t.Fatalf("unexpected pool length %v", len(cp.list))
}
}

func TestCancelContext_Do_Block(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, shutdown, _ := setup(t, ClientOption{})
Expand Down Expand Up @@ -3736,6 +3757,27 @@ func TestWriteDeadlineIsShorterThanContextDeadline_DoMultiStream(t *testing.T) {
}
}

func TestWriteDeadlineIsNoShorterThanContextDeadline_DoMultiStreamBlocked(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, _, _, _ := setup(t, ClientOption{ConnWriteTimeout: 5 * time.Millisecond})

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

cp := newPool(1, nil, nil)
startTime := time.Now()
s := p.DoMultiStream(ctx, cp, cmds.NewBlockingCompleted([]string{"BLPOP", "a"}))
if err := s.Error(); err != io.EOF && !strings.Contains(err.Error(), "i/o") {
t.Fatalf("unexpected err %v", err)
}
if time.Since(startTime) < 100*time.Millisecond {
t.Fatalf("unexpected time %v", time.Since(startTime))
}
if len(cp.list) != 0 {
t.Fatalf("unexpected pool length %v", len(cp.list))
}
}

func TestForceClose_Do_Block(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, _, _ := setup(t, ClientOption{})
Expand Down Expand Up @@ -3987,6 +4029,26 @@ func TestWriteDeadlineIsShorterThanContextDeadlineInSyncMode_Do(t *testing.T) {
p.Close()
}

func TestWriteDeadlineIsNoShorterThanContextDeadlineInSyncMode_DoBlocked(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, _, _, closeConn := setup(t, ClientOption{ConnWriteTimeout: 5 * time.Millisecond, Dialer: net.Dialer{KeepAlive: time.Second}})
defer closeConn()

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

startTime := time.Now()
if err := p.Do(ctx, cmds.NewBlockingCompleted([]string{"BLPOP", "a"})).NonRedisError(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected err %v", err)
}

if time.Since(startTime) < 100*time.Millisecond {
t.Fatalf("unexpected time %v", time.Since(startTime))
}

p.Close()
}

func TestOngoingDeadlineContextInSyncMode_DoMulti(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, _, _, closeConn := setup(t, ClientOption{})
Expand Down Expand Up @@ -4032,6 +4094,26 @@ func TestWriteDeadlineIsShorterThanContextDeadlineInSyncMode_DoMulti(t *testing.
p.Close()
}

func TestWriteDeadlineIsNoShorterThanContextDeadlineInSyncMode_DoMultiBlocked(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, _, _, closeConn := setup(t, ClientOption{ConnWriteTimeout: 5 * time.Millisecond, Dialer: net.Dialer{KeepAlive: time.Second}})
defer closeConn()

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

startTime := time.Now()
if err := p.DoMulti(ctx, cmds.NewBlockingCompleted([]string{"BLPOP", "a"})).s[0].NonRedisError(); !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("unexpected err %v", err)
}

if time.Since(startTime) < 100*time.Millisecond {
t.Fatalf("unexpected time %v", time.Since(startTime))
}

p.Close()
}

func TestOngoingCancelContextInPipelineMode_Do(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())
p, mock, close, closeConn := setup(t, ClientOption{})
Expand Down

0 comments on commit 482bc6e

Please sign in to comment.