Skip to content

Commit

Permalink
CR: fix await timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Dec 12, 2023
1 parent ebd1195 commit 8bede5c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
9 changes: 7 additions & 2 deletions metadata.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package druid

import (
"context"
_ "embed"
"errors"
"strings"
Expand Down Expand Up @@ -64,6 +65,8 @@ func fillDataSourceName(in string, ds string) string {

// AwaitDataSourceAvailable awaits for a datasource to be visible in druid table listing.
func (md *MetadataService) AwaitDataSourceAvailable(dataSourceName string) error {
ctx, cancel := context.WithTimeout(context.Background(), md.awaitTimeout)
defer cancel()
ticker := time.NewTicker(md.tickerDuration)
defer ticker.Stop()
q := query.
Expand All @@ -81,7 +84,7 @@ func (md *MetadataService) AwaitDataSourceAvailable(dataSourceName string) error
if len(res) >= 1 && res[0].Cnt == 1 {
return nil
}
case <-time.After(md.awaitTimeout):
case <-ctx.Done():
return errors.New("AwaitDataSourceAvailable timeout")
}
}
Expand All @@ -92,6 +95,8 @@ var datasourceRecordsQuery string

// AwaitRecordsCount awaits for specific recordsCount in a given datasource.
func (md *MetadataService) AwaitRecordsCount(dataSourceName string, recordsCount int) error {
ctx, cancel := context.WithTimeout(context.Background(), md.awaitTimeout)
defer cancel()
ticker := time.NewTicker(md.tickerDuration)
defer ticker.Stop()
q := query.NewSQL()
Expand All @@ -108,7 +113,7 @@ func (md *MetadataService) AwaitRecordsCount(dataSourceName string, recordsCount
if len(res) >= 1 && res[0].Cnt == recordsCount {
return nil
}
case <-time.After(md.awaitTimeout):
case <-ctx.Done():
return errors.New("AwaitRecordsCount timeout")
}
}
Expand Down
10 changes: 8 additions & 2 deletions tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func TriggerIngestionTask[T any](d *Client, dataSourceName string, entries []T)

// AwaitTaskCompletion waits for the task to complete. Function timeouts with an error after awaitTimeout nanoseconds.
func AwaitTaskCompletion(client *Client, taskID string, awaitTimeout time.Duration, tickerDuration time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), awaitTimeout)
defer cancel()

ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()
L:
Expand All @@ -73,7 +76,7 @@ L:
continue
}
break L
case <-time.After(awaitTimeout):
case <-ctx.Done():
return errors.New("AwaitTaskRunning timeout")
}
}
Expand All @@ -82,6 +85,9 @@ L:

// AwaitTaskStatus waits for the druid task status for the maximum of awaitTimeout duration, querying druid task API.
func AwaitTaskStatus(client *Client, taskID string, status string, awaitTimeout time.Duration, tickerDuration time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), awaitTimeout)
defer cancel()

ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()
for {
Expand All @@ -95,7 +101,7 @@ func AwaitTaskStatus(client *Client, taskID string, status string, awaitTimeout
if res.Status.Status == status {
return nil
}
case <-time.After(awaitTimeout):
case <-ctx.Done():
return errors.New("AwaitTaskRunning timeout")
}
}
Expand Down

0 comments on commit 8bede5c

Please sign in to comment.