diff --git a/druid.go b/druid.go index d6047c2..bba3b9d 100644 --- a/druid.go +++ b/druid.go @@ -288,6 +288,10 @@ func (c *Client) Supervisor() *SupervisorService { return &SupervisorService{client: c} } +func (c *Client) Tasks() *TasksService { + return &TasksService{client: c} +} + func WithBasicAuth(username, password string) ClientOption { return func(opts *clientOptions) { opts.username = username diff --git a/task_types.go b/task_types.go new file mode 100644 index 0000000..ada960d --- /dev/null +++ b/task_types.go @@ -0,0 +1,110 @@ +package druid + +// TaskStatusResponse is a response object containing status of a task. +type TaskStatusResponse struct { + Task string `json:"task"` + Status TaskStatus `json:"status"` +} + +// TaskLocation holds location of the task execution. +type TaskLocation struct { + Host string `json:"host"` + Port int `json:"port"` + TlsPort int `json:"tlsPort"` +} + +// TaskStatus is an object representing status of a druid task. +type TaskStatus struct { + ID string `json:"id"` + Type string `json:"type"` + CreatedTime string `json:"createdTime"` + QueueInsertionTime string `json:"queueInsertionTime"` + StatusCode string `json:"statusCode"` + Status string `json:"status"` + RunnerStatusCode string `json:"runnerStatusCode"` + Duration int `json:"duration"` + GroupId string `json:"groupId"` + Location *TaskLocation `json:"location|omitempty"` + Datasource string `json:"datasource"` + ErrorMessage string `json:"errorMessage"` +} + +// TaskIngestionSpec is a specification for a druid task execution. +type TaskIngestionSpec struct { + Type string `json:"type"` + Spec *IngestionSpecData `json:"spec"` +} + +// defaultKafkaIngestionSpec returns a default InputIngestionSpec with basic ingestion +// specification fields initialized. +func defaultTaskIngestionSpec() *TaskIngestionSpec { + spec := &TaskIngestionSpec{ + Type: "index_parallel", + Spec: &IngestionSpecData{ + DataSchema: &DataSchema{ + DataSource: "", + TimeStampSpec: &TimestampSpec{ + Column: "ts", + Format: "auto", + }, + GranularitySpec: &GranularitySpec{ + Type: "uniform", + SegmentGranularity: "DAY", + QueryGranularity: "none", + }, + DimensionsSpec: &DimensionsSpec{ + Dimensions: DimensionSet{}, + }, + }, + IOConfig: &IOConfig{ + Type: "index_parallel", + InputSource: &InputSource{ + Type: "inline", + Data: "", + }, + InputFormat: &InputFormat{ + Type: "csv", + FindColumnsHeader: "true", + Columns: []string{}, + }, + }, + TuningConfig: &TuningConfig{ + Type: "index_parallel", + MaxRowsPerSegment: 5000000, + MaxRowsInMemory: 25000, + }, + }, + } + return spec +} + +// IngestionSpecOptions allows for configuring a InputIngestionSpec. +type TaskIngestionSpecOptions func(*TaskIngestionSpec) + +// SetType sets the type of the supervisor (IOConfig). +func SetTaskType(stype string) TaskIngestionSpecOptions { + return func(spec *TaskIngestionSpec) { + if stype != "" { + spec.Type = stype + } + } +} + +// SetType sets the type of the supervisor (IOConfig). +func SetTaskDataSource(datasource string) TaskIngestionSpecOptions { + return func(spec *TaskIngestionSpec) { + if datasource != "" { + spec.Spec.DataSchema.DataSource = datasource + } + } +} + +// NewTaskIngestionSpec returns a default TaskIngestionSpec and applies any +// options passed to it. +func NewTaskIngestionSpec(options ...TaskIngestionSpecOptions) *TaskIngestionSpec { + spec := defaultTaskIngestionSpec() + for _, fn := range options { + fn(spec) + } + return spec +} diff --git a/tasks.go b/tasks.go new file mode 100644 index 0000000..c9faf22 --- /dev/null +++ b/tasks.go @@ -0,0 +1,64 @@ +package druid + +import "strings" + +const ( + tasksEndpoint = "druid/indexer/v1/tasks" + taskStatusEndpoint = "druid/indexer/v1/task/:taskId/status" + tasksCompleteEndpoint = "druid/indexer/v1/completeTasks" + tasksRunningEndpoint = "druid/indexer/v1/runningTasks" + tasksWaitingEndpoint = "druid/indexer/v1/waitingTasks" + tasksPendingEndpoint = "druid/indexer/v1/pendingTasks" + taskPayloadEndpoint = "druid/indexer/v1/task/:taskId" + taskSegmentsEndpoint = "druid/indexer/v1/task/:taskId/segments" + taskReportEndpoint = "druid/indexer/v1/task/:taskId/reports" + taskSubmitEndpoint = "druid/indexer/v1/task" + taskShutdownEndpoint = "druid/indexer/v1/task/:taskId/shutdown" + tasksShutdownAllEndpoint = "druid/indexer/v1/datasources/:datasource/shutdownAllTasks" + tasksStatusesEndpoint = "druid/indexer/v1/taskStatus" + taskDeletePendingSegments = "druid/indexer/v1/pendingSegments/:datasource" +) + +// TasksService is a service that runs requests to druid tasks API. +type TasksService struct { + client *Client +} + +// SubmitTaskResponse is a response object of Druid SupervisorService's Terminate method. +type SubmitTaskResponse struct { + Task string `json:"task"` +} + +// SubmitTask submits an ingestion specification to druid tasks API with a pre-configured druid client. +// https://druid.apache.org/docs/latest/api-reference/tasks-api/#submit-a-task +func (s *TasksService) SubmitTask(spec *TaskIngestionSpec) (string, error) { + r, err := s.client.NewRequest("POST", taskSubmitEndpoint, spec) + if err != nil { + return "", err + } + var result SubmitTaskResponse + _, err = s.client.Do(r, &result) + if err != nil { + return "", err + } + return result.Task, nil +} + +// GetStatus calls druid tasks service's Get status API. +// https://druid.apache.org/docs/latest/api-reference/tasks-api/#get-task-status +func (s *TasksService) GetStatus(taskId string) (TaskStatusResponse, error) { + r, err := s.client.NewRequest("GET", applyTaskId(taskStatusEndpoint, taskId), nil) + var result TaskStatusResponse + if err != nil { + return result, err + } + _, err = s.client.Do(r, &result) + if err != nil { + return result, err + } + return result, nil +} + +func applyTaskId(input string, taskId string) string { + return strings.Replace(input, ":taskId", taskId, 1) +} diff --git a/tasks_test.go b/tasks_test.go new file mode 100644 index 0000000..66528dd --- /dev/null +++ b/tasks_test.go @@ -0,0 +1,77 @@ +package druid + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + tc "github.com/testcontainers/testcontainers-go/modules/compose" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestTaskService(t *testing.T) { + // Set up druid containers using docker-compose. + compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml") + assert.NoError(t, err, "NewDockerComposeAPI()") + + // Set up cleanup for druid containers. + t.Cleanup(func() { + assert.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveImagesLocal), "compose.Down()") + }) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Wait for druid contaners to start. + assert.NoError(t, compose.Up(ctx, tc.Wait(true)), "compose.Up()") + + // Set up druid service and client. + var druidOpts []ClientOption + d, err := NewClient("http://localhost:8888", druidOpts...) + assert.NoError(t, err, "error should be nil") + var spec = NewTaskIngestionSpec( + SetTaskType("index_parallel"), + SetTaskDataSource("test-datasource"), + ) + assert.NoError(t, err, "error should be nil") + assert.NotNil(t, spec, "specification should not be nil") + + // Waiting for druid coordinator service to start. + err = compose. + WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(60*time.Second)). + Up(ctx, tc.Wait(true)) + assert.NoError(t, err, "coordinator should be up with no error") + + // Test create supervisor -> get status -> terminate sequence. + _, err = d.Tasks().SubmitTask(spec) + assert.NoError(t, err, "error should be nil") + //assert.Equal(t, id, spec.DataSchema.DataSource) + //status, err := d.Supervisor().GetStatus(spec.DataSchema.DataSource) + //assert.NoError(t, err, "error should be nil") + //assert.Equal(t, "PENDING", status.Payload.State) + //assert.False(t, status.Payload.Suspended) + // + //// suspend and check status + //suspendedSpec, err := d.Supervisor().Suspend(spec.DataSchema.DataSource) + //assert.True(t, suspendedSpec.Suspended) + //assert.NoError(t, err, "error should be nil") + // + //status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) + //assert.NoError(t, err, "error should be nil") + //assert.True(t, status.Payload.Suspended) + // + //// resume and check status + //_, err = d.Supervisor().Resume(spec.DataSchema.DataSource) + //assert.NoError(t, err, "error should be nil") + // + //status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) + //assert.NoError(t, err, "error should be nil") + //assert.Equal(t, "PENDING", status.Payload.State) + //assert.False(t, status.Payload.Suspended) + // + //// terminate + //id, err = d.Supervisor().Terminate(spec.DataSchema.DataSource) + //assert.NoError(t, err, "error should be nil") + //assert.Equal(t, id, spec.DataSchema.DataSource) +}