Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tasks based ingestion #11

Merged
merged 35 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
bc7e9a3
Implement suspend and resume endpoints of supervisor API
tomasz-h2o Oct 5, 2023
b2369aa
Added TODO tag
tomasz-h2o Oct 5, 2023
d481f9b
Merge branch 'master' into tomasz/implement-suspend-resume
vzayts Dec 5, 2023
6bc0bca
WIP: fix build
vzayts Dec 5, 2023
c226455
WIP: fix build
vzayts Dec 5, 2023
9f0b917
WIP: fix test
vzayts Dec 5, 2023
3ea7ac6
WIP: fix merge/rebase test issues
vzayts Dec 5, 2023
54b09ed
WIP: fix merge/rebase test issues and remove debug print
vzayts Dec 5, 2023
b87f9a6
WIP fix query granularity
vzayts Dec 6, 2023
fa66989
Address CR comments
vzayts Dec 11, 2023
b543780
Fix comment and rename types.
vzayts Dec 11, 2023
2510094
Make QueryGranularity field a pointer field.
vzayts Dec 11, 2023
956e7f7
WIP add tasks based ingestion
vzayts Dec 6, 2023
7120677
WIP: improve cleanup and configuration of the tasks spec
vzayts Dec 6, 2023
0f2f302
WIP: extend tasks apis with tests; refactor to extract common spec types
vzayts Dec 8, 2023
7dacb6b
WIP: fix tests
vzayts Dec 8, 2023
4bddc63
WIP: trim down loaded druid extensions
vzayts Dec 8, 2023
107a4c9
WIP: add metadata service and extend test coverage
vzayts Dec 8, 2023
7b77cb9
WIP: add terminate method and test
vzayts Dec 8, 2023
51b3ce0
merge-rebase with the parent branch
vzayts Dec 11, 2023
bc46dc3
address CR comments
vzayts Dec 11, 2023
af3e097
fix dead code findings
vzayts Dec 12, 2023
bd49dd9
fix test await queries, fix sql injection
vzayts Dec 12, 2023
4c698ab
add timeout for Metadata service methods
vzayts Dec 12, 2023
d146b40
fix lint findings
vzayts Dec 12, 2023
01f699d
Merge branch 'master' into vzaytsev/add-tasks-based-ingestion
vzayts Dec 12, 2023
1e83336
fix merge conflict and minor CR comment
vzayts Dec 12, 2023
3379e83
address CR comments
vzayts Dec 12, 2023
362fa95
fix tests
vzayts Dec 12, 2023
ebd1195
expose trigger inline ingestion and run inline ingestion test functions
vzayts Dec 12, 2023
8bede5c
CR: fix await timeout
vzayts Dec 12, 2023
2bf2f85
CR: reimplement await timeout without context
vzayts Dec 12, 2023
920e3b0
Address CR comments
vzayts Dec 13, 2023
b658587
Address CR comments: hide metadata and other tasks helper methods
vzayts Dec 13, 2023
d585945
Address CR comments
vzayts Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions druid.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,6 @@ func (c *Client) Tasks() *TasksService {
return &TasksService{client: c}
}

func (c *Client) Metadata(options ...metadataOption) *MetadataService {
return NewMetadataService(c, options...)
}

func WithBasicAuth(username, password string) ClientOption {
return func(opts *clientOptions) {
opts.username = username
Expand Down
27 changes: 17 additions & 10 deletions metadata.go → metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"github.com/h2oai/go-druid/builder/query"
)

func (c *Client) metadata(options ...metadataOption) *metadataService {
return newMetadataService(c, options...)
}

type count struct {
Cnt int `json:"cnt"`
}
Expand All @@ -20,22 +24,23 @@ type metadataOptions struct {

type metadataOption func(*metadataOptions)

// MetadataService is a service that runs druid metadata requests using druid SQL API.
type MetadataService struct {
// metadataService is a service that runs druid metadata requests using druid SQL API.
// NOTE: for internal tests use only, not representing official druid API.
type metadataService struct {
client *Client
tickerDuration time.Duration
awaitTimeout time.Duration
}

func NewMetadataService(client *Client, options ...metadataOption) *MetadataService {
func newMetadataService(client *Client, options ...metadataOption) *metadataService {
opts := &metadataOptions{
tickerDuration: 500 * time.Millisecond,
awaitTimeout: 180 * time.Second,
}
for _, opt := range options {
opt(opts)
}
md := &MetadataService{
md := &metadataService{
client: client,
tickerDuration: opts.tickerDuration,
awaitTimeout: opts.awaitTimeout,
Expand All @@ -62,8 +67,9 @@ func fillDataSourceName(in string, ds string) string {
return strings.Replace(in, "${{ datasource }}", ds, 1)
}

// AwaitDataSourceAvailable awaits for a datasource to be visible in druid table listing.
func (md *MetadataService) AwaitDataSourceAvailable(dataSourceName string) error {
// awaitDataSourceAvailable awaits for a datasource to be visible in druid table listing.
// NOTE: for internal tests use only, not representing official druid API.
func (md *metadataService) awaitDataSourceAvailable(dataSourceName string) error {
ticker := time.NewTicker(md.tickerDuration)
defer ticker.Stop()
afterTimeout := time.After(md.awaitTimeout)
Expand All @@ -83,16 +89,17 @@ func (md *MetadataService) AwaitDataSourceAvailable(dataSourceName string) error
return nil
}
case <-afterTimeout:
return errors.New("AwaitDataSourceAvailable timeout")
return errors.New("awaitDataSourceAvailable timeout")
}
}
}

//go:embed sql/datasource_records.sql
var datasourceRecordsQuery string

// AwaitRecordsCount awaits for specific recordsCount in a given datasource.
func (md *MetadataService) AwaitRecordsCount(dataSourceName string, recordsCount int) error {
// awaitRecordsCount awaits for specific recordsCount in a given datasource.
// NOTE: not safe and intended for internal tests use only. Not representing official druid API.
func (md *metadataService) awaitRecordsCount(dataSourceName string, recordsCount int) error {
ticker := time.NewTicker(md.tickerDuration)
defer ticker.Stop()
q := query.NewSQL()
Expand All @@ -111,7 +118,7 @@ func (md *MetadataService) AwaitRecordsCount(dataSourceName string, recordsCount
return nil
}
case <-afterTimeout:
return errors.New("AwaitRecordsCount timeout")
return errors.New("awaitRecordsCount timeout")
}
}
}
2 changes: 1 addition & 1 deletion sql/datasource_available.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT count(*) cnt
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME=?
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME=?
2 changes: 1 addition & 1 deletion sql/datasource_records.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
SELECT count(*) cnt
FROM "${{ datasource }}" ds
FROM "${{ datasource }}" ds
29 changes: 15 additions & 14 deletions task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec {
Spec: &IngestionSpecData{
DataSchema: &DataSchema{
DataSource: "some_datasource",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this can also be confusing. What about "unspecified" or something similar?

TimeStampSpec: &TimestampSpec{
Column: "ts",
Format: "auto",
},
GranularitySpec: &GranularitySpec{
Type: "uniform",
SegmentGranularity: "DAY",
Expand All @@ -63,16 +59,9 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec {
IOConfig: &IOConfig{
Type: "index_parallel",
InputSource: &InputSource{
Type: "sql",
Database: &Database{
Type: "postgresql",
ConnectorConfig: &ConnectorConfig{
ConnectURI: "jdbc:postgresql://host:port/schema",
User: "user",
Password: "password",
},
},
SQLs: []string{},
Type: "sql",
Database: &Database{},
SQLs: []string{},
},
InputFormat: &InputFormat{
Type: "json",
Expand All @@ -98,6 +87,18 @@ func SetTaskType(stype string) TaskIngestionSpecOptions {
}
}

// SetTaskTimestampColumn sets the type of the task IOConfig.
func SetTaskTimestampColumn(column string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
if column != "" {
spec.Spec.DataSchema.TimeStampSpec = &TimestampSpec{
Column: column,
Format: "auto",
}
}
}
}

// SetTaskDataSource sets the destination datasource of the task IOConfig.
func SetTaskDataSource(datasource string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
Expand Down
31 changes: 16 additions & 15 deletions tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var testObjects = []testDO{
},
}

// TriggerIngestionTask initiates inline ingestion task with druid client.
func TriggerIngestionTask[T any](d *Client, dataSourceName string, entries []T) (string, error) {
// triggerIngestionTask initiates inline ingestion task with druid client.
func triggerIngestionTask[T any](d *Client, dataSourceName string, entries []T) (string, error) {
csvEntriesBuff := &bytes.Buffer{}

err := gocsv.MarshalWithoutHeaders(entries, csvEntriesBuff)
Expand All @@ -46,6 +46,7 @@ func TriggerIngestionTask[T any](d *Client, dataSourceName string, entries []T)

var spec = NewTaskIngestionSpec(
SetTaskType("index_parallel"),
SetTaskTimestampColumn("ts"),
SetTaskDataSource(dataSourceName),
SetTaskTuningConfig("index_parallel", 25000, 5000000),
SetTaskIOConfigType("index_parallel"),
Expand All @@ -56,8 +57,8 @@ func TriggerIngestionTask[T any](d *Client, dataSourceName string, entries []T)
return taskID, err
}

// AwaitTaskCompletion waits for the task to complete. Function timeouts with an error after awaitTimeout nanoseconds.
func AwaitTaskCompletion(client *Client, taskID string, awaitTimeout time.Duration, tickerDuration time.Duration) error {
// awaitTaskCompletion waits for the task to complete. Function timeouts with an error after awaitTimeout nanoseconds.
func awaitTaskCompletion(client *Client, taskID string, awaitTimeout time.Duration, tickerDuration time.Duration) error {
ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()
afterTimeout := time.After(awaitTimeout)
Expand All @@ -79,8 +80,8 @@ func AwaitTaskCompletion(client *Client, taskID string, awaitTimeout time.Durati
}
}

// AwaitTaskStatus waits for the druid task status for the maximum of awaitTimeout duration, querying druid task API.
func AwaitTaskStatus(client *Client, taskID string, status string, awaitTimeout time.Duration, tickerDuration time.Duration) error {
// awaitTaskStatus waits for the druid task status for the maximum of awaitTimeout duration, querying druid task API.
func awaitTaskStatus(client *Client, taskID string, status string, awaitTimeout time.Duration, tickerDuration time.Duration) error {
ticker := time.NewTicker(tickerDuration)
defer ticker.Stop()
afterTimeout := time.After(awaitTimeout)
Expand All @@ -101,24 +102,24 @@ func AwaitTaskStatus(client *Client, taskID string, status string, awaitTimeout
}
}

// RunInlineIngestionTask initiates inline ingestion task with druid client and runs until it is complete.
func RunInlineIngestionTask[T any](client *Client, dataSourceName string, entries []T, recordsCount int) error {
taskID, err := TriggerIngestionTask(client, dataSourceName, entries)
// runInlineIngestionTask initiates inline ingestion task with druid client and runs until it is complete.
func runInlineIngestionTask[T any](client *Client, dataSourceName string, entries []T, recordsCount int) error {
taskID, err := triggerIngestionTask(client, dataSourceName, entries)
if err != nil {
return err
}

err = AwaitTaskCompletion(client, taskID, 180*time.Second, 500*time.Millisecond)
err = awaitTaskCompletion(client, taskID, 180*time.Second, 500*time.Millisecond)
if err != nil {
return err
}

err = client.Metadata(WithMetadataQueryTicker(500*time.Millisecond), WithMetadataQueryTimeout(180*time.Second)).AwaitDataSourceAvailable(dataSourceName)
err = client.metadata(WithMetadataQueryTicker(500*time.Millisecond), WithMetadataQueryTimeout(180*time.Second)).awaitDataSourceAvailable(dataSourceName)
if err != nil {
return err
}

err = client.Metadata().AwaitRecordsCount(dataSourceName, recordsCount)
err = client.metadata().awaitRecordsCount(dataSourceName, recordsCount)
if err != nil {
return err
}
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestTaskService(t *testing.T) {
require.NoError(t, err, "druid services should be up with no error")

// Test create ingestion task -> get status -> complete sequence.
RunInlineIngestionTask(d, "test-submit-task-datasource", testObjects, 2)
runInlineIngestionTask(d, "test-submit-task-datasource", testObjects, 2)
require.NoError(t, err, "error should be nil")
}

Expand Down Expand Up @@ -184,10 +185,10 @@ func TestTerminateTask(t *testing.T) {
require.NoError(t, err, "druid services should be up with no error")

// Test create ingestion task -> get status -> terminate sequence.
taskID, err := TriggerIngestionTask(d, "test-terminate-task-datasource", testObjects)
taskID, err := triggerIngestionTask(d, "test-terminate-task-datasource", testObjects)
require.NoError(t, err, "error should be nil")

err = AwaitTaskStatus(d, taskID, "RUNNING", 180*time.Second, 200*time.Millisecond)
err = awaitTaskStatus(d, taskID, "RUNNING", 180*time.Second, 200*time.Millisecond)
require.NoError(t, err, "error should be nil")

shutdownTaskID, err := d.Tasks().Shutdown(taskID)
Expand Down
Loading