Skip to content

Commit

Permalink
WIP add tasks based ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Dec 6, 2023
1 parent b87f9a6 commit 2b1f24d
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 0 deletions.
4 changes: 4 additions & 0 deletions druid.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (c *Client) Supervisor() *SupervisorService {
return &SupervisorService{client: c}
}

func (c *Client) Tasks() *TasksService {
return &TasksService{client: c}
}

func WithBasicAuth(username, password string) ClientOption {
return func(opts *clientOptions) {
opts.username = username
Expand Down
110 changes: 110 additions & 0 deletions task_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package druid

// TaskStatusResponse is a response object containing status of a task.
type TaskStatusResponse struct {
Task string `json:"task"`
Status TaskStatus `json:"status"`
}

// TaskLocation holds location of the task execution.
type TaskLocation struct {
Host string `json:"host"`
Port int `json:"port"`
TlsPort int `json:"tlsPort"`
}

// TaskStatus is an object representing status of a druid task.
type TaskStatus struct {
ID string `json:"id"`
Type string `json:"type"`
CreatedTime string `json:"createdTime"`
QueueInsertionTime string `json:"queueInsertionTime"`
StatusCode string `json:"statusCode"`
Status string `json:"status"`
RunnerStatusCode string `json:"runnerStatusCode"`
Duration int `json:"duration"`
GroupId string `json:"groupId"`
Location *TaskLocation `json:"location|omitempty"`
Datasource string `json:"datasource"`
ErrorMessage string `json:"errorMessage"`
}

// TaskIngestionSpec is a specification for a druid task execution.
type TaskIngestionSpec struct {
Type string `json:"type"`
Spec *IngestionSpecData `json:"spec"`
}

// defaultKafkaIngestionSpec returns a default InputIngestionSpec with basic ingestion
// specification fields initialized.
func defaultTaskIngestionSpec() *TaskIngestionSpec {
spec := &TaskIngestionSpec{
Type: "index_parallel",
Spec: &IngestionSpecData{
DataSchema: &DataSchema{
DataSource: "",
TimeStampSpec: &TimestampSpec{
Column: "ts",
Format: "auto",
},
GranularitySpec: &GranularitySpec{
Type: "uniform",
SegmentGranularity: "DAY",
QueryGranularity: "none",
},
DimensionsSpec: &DimensionsSpec{
Dimensions: DimensionSet{},
},
},
IOConfig: &IOConfig{
Type: "index_parallel",
InputSource: &InputSource{
Type: "inline",
Data: "",
},
InputFormat: &InputFormat{
Type: "csv",
FindColumnsHeader: "true",
Columns: []string{},
},
},
TuningConfig: &TuningConfig{
Type: "index_parallel",
MaxRowsPerSegment: 5000000,
MaxRowsInMemory: 25000,
},
},
}
return spec
}

// IngestionSpecOptions allows for configuring a InputIngestionSpec.
type TaskIngestionSpecOptions func(*TaskIngestionSpec)

// SetType sets the type of the supervisor (IOConfig).
func SetTaskType(stype string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
if stype != "" {
spec.Type = stype
}
}
}

// SetType sets the type of the supervisor (IOConfig).
func SetTaskDataSource(datasource string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
if datasource != "" {
spec.Spec.DataSchema.DataSource = datasource
}
}
}

// NewTaskIngestionSpec returns a default TaskIngestionSpec and applies any
// options passed to it.
func NewTaskIngestionSpec(options ...TaskIngestionSpecOptions) *TaskIngestionSpec {
spec := defaultTaskIngestionSpec()
for _, fn := range options {
fn(spec)
}
return spec
}
64 changes: 64 additions & 0 deletions tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package druid

import "strings"

const (
tasksEndpoint = "druid/indexer/v1/tasks"
taskStatusEndpoint = "druid/indexer/v1/task/:taskId/status"
tasksCompleteEndpoint = "druid/indexer/v1/completeTasks"
tasksRunningEndpoint = "druid/indexer/v1/runningTasks"
tasksWaitingEndpoint = "druid/indexer/v1/waitingTasks"
tasksPendingEndpoint = "druid/indexer/v1/pendingTasks"
taskPayloadEndpoint = "druid/indexer/v1/task/:taskId"
taskSegmentsEndpoint = "druid/indexer/v1/task/:taskId/segments"
taskReportEndpoint = "druid/indexer/v1/task/:taskId/reports"
taskSubmitEndpoint = "druid/indexer/v1/task"
taskShutdownEndpoint = "druid/indexer/v1/task/:taskId/shutdown"
tasksShutdownAllEndpoint = "druid/indexer/v1/datasources/:datasource/shutdownAllTasks"
tasksStatusesEndpoint = "druid/indexer/v1/taskStatus"
taskDeletePendingSegments = "druid/indexer/v1/pendingSegments/:datasource"
)

// TasksService is a service that runs requests to druid tasks API.
type TasksService struct {
client *Client
}

// SubmitTaskResponse is a response object of Druid SupervisorService's Terminate method.
type SubmitTaskResponse struct {
Task string `json:"task"`
}

// SubmitTask submits an ingestion specification to druid tasks API with a pre-configured druid client.
// https://druid.apache.org/docs/latest/api-reference/tasks-api/#submit-a-task
func (s *TasksService) SubmitTask(spec *TaskIngestionSpec) (string, error) {
r, err := s.client.NewRequest("POST", taskSubmitEndpoint, spec)
if err != nil {
return "", err
}
var result SubmitTaskResponse
_, err = s.client.Do(r, &result)
if err != nil {
return "", err
}
return result.Task, nil
}

// GetStatus calls druid tasks service's Get status API.
// https://druid.apache.org/docs/latest/api-reference/tasks-api/#get-task-status
func (s *TasksService) GetStatus(taskId string) (TaskStatusResponse, error) {
r, err := s.client.NewRequest("GET", applyTaskId(taskStatusEndpoint, taskId), nil)
var result TaskStatusResponse
if err != nil {
return result, err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result, err
}
return result, nil
}

func applyTaskId(input string, taskId string) string {
return strings.Replace(input, ":taskId", taskId, 1)
}
77 changes: 77 additions & 0 deletions tasks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package druid

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
tc "github.com/testcontainers/testcontainers-go/modules/compose"
"github.com/testcontainers/testcontainers-go/wait"
)

func TestTaskService(t *testing.T) {
// Set up druid containers using docker-compose.
compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml")
assert.NoError(t, err, "NewDockerComposeAPI()")

// Set up cleanup for druid containers.
t.Cleanup(func() {
assert.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveImagesLocal), "compose.Down()")
})

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// Wait for druid contaners to start.
assert.NoError(t, compose.Up(ctx, tc.Wait(true)), "compose.Up()")

// Set up druid service and client.
var druidOpts []ClientOption
d, err := NewClient("http://localhost:8888", druidOpts...)
assert.NoError(t, err, "error should be nil")
var spec = NewTaskIngestionSpec(
SetTaskType("index_parallel"),
SetTaskDataSource("test-datasource"),
)
assert.NoError(t, err, "error should be nil")
assert.NotNil(t, spec, "specification should not be nil")

// Waiting for druid coordinator service to start.
err = compose.
WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(60*time.Second)).
Up(ctx, tc.Wait(true))
assert.NoError(t, err, "coordinator should be up with no error")

// Test create supervisor -> get status -> terminate sequence.
_, err = d.Tasks().SubmitTask(spec)
assert.NoError(t, err, "error should be nil")
//assert.Equal(t, id, spec.DataSchema.DataSource)
//status, err := d.Supervisor().GetStatus(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.Equal(t, "PENDING", status.Payload.State)
//assert.False(t, status.Payload.Suspended)
//
//// suspend and check status
//suspendedSpec, err := d.Supervisor().Suspend(spec.DataSchema.DataSource)
//assert.True(t, suspendedSpec.Suspended)
//assert.NoError(t, err, "error should be nil")
//
//status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.True(t, status.Payload.Suspended)
//
//// resume and check status
//_, err = d.Supervisor().Resume(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//
//status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.Equal(t, "PENDING", status.Payload.State)
//assert.False(t, status.Payload.Suspended)
//
//// terminate
//id, err = d.Supervisor().Terminate(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.Equal(t, id, spec.DataSchema.DataSource)
}

0 comments on commit 2b1f24d

Please sign in to comment.