Skip to content

Commit

Permalink
WIP: add terminate method and test
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Dec 8, 2023
1 parent fc5f92e commit 76de2f7
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 20 deletions.
10 changes: 5 additions & 5 deletions common_spec_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ type InputFormat struct {
Type string `json:"type"`

// FlatTextInputFormat / DelimitedInputFormat fields
Delimiter string `json:"delimiter,omitempty"`
ListDelimiter string `json:"listDelimiter,omitempty"`
FindColumnsHeader string `json:"findColumnsHeader,omitempty"`
SkipHeaderRows int `json:"skipHeaderRows,omitempty"`
Columns []string `json:"columns,omitempty"`
Delimiter string `json:"delimiter,omitempty"`
ListDelimiter string `json:"listDelimiter,omitempty"`
FindColumnsFromHeader string `json:"findColumnsFromHeader,omitempty"`
SkipHeaderRows int `json:"skipHeaderRows,omitempty"`
Columns []string `json:"columns,omitempty"`

// JsonInputFormat fields
FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func SetTaskIOConfigType(typ string) TaskIngestionSpecOptions {
func SetTaskInputFormat(typ string, findColumnsHeader string, columns []string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.IOConfig.InputFormat.Type = typ
spec.Spec.IOConfig.InputFormat.FindColumnsHeader = findColumnsHeader
spec.Spec.IOConfig.InputFormat.FindColumnsFromHeader = findColumnsHeader
spec.Spec.IOConfig.InputFormat.Columns = columns
}
}
Expand Down
22 changes: 21 additions & 1 deletion tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ type TasksService struct {
client *Client
}

// SubmitTaskResponse is a response object of Druid SupervisorService's Terminate method.
// SubmitTaskResponse is a response object of Druid Task API Submit task method.
type SubmitTaskResponse struct {
Task string `json:"task"`
}

// ShutdownTaskResponse is a response object of Druid SupervisorService's Terminate method.
type ShutdownTaskResponse struct {
Task string `json:"task"`
}

// SubmitTask submits an ingestion specification to druid tasks API with a pre-configured druid client.
// https://druid.apache.org/docs/latest/api-reference/tasks-api/#submit-a-task
func (s *TasksService) SubmitTask(spec *TaskIngestionSpec) (string, error) {
Expand Down Expand Up @@ -59,6 +64,21 @@ func (s *TasksService) GetStatus(taskId string) (TaskStatusResponse, error) {
return result, nil
}

// Shutdown calls druid task service's shutdown task API.
// https://druid.apache.org/docs/latest/api-reference/tasks-api/#shut-down-a-task
func (s *TasksService) Shutdown(taskId string) (string, error) {
r, err := s.client.NewRequest("POST", applyTaskId(taskShutdownEndpoint, taskId), "")
var result ShutdownTaskResponse
if err != nil {
return "", err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result.Task, err
}
return result.Task, nil
}

func applyTaskId(input string, taskId string) string {
return strings.Replace(input, ":taskId", taskId, 1)
}
78 changes: 65 additions & 13 deletions tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ package druid
import (
"bytes"
"context"
"errors"
"testing"
"time"

"github.com/gocarina/gocsv"
"github.com/google/uuid"
"github.com/jmoiron/sqlx/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tc "github.com/testcontainers/testcontainers-go/modules/compose"
"github.com/testcontainers/testcontainers-go/wait"
)

// TestDO represents entry with payload.
type TestDO struct {
Timestamp time.Time `db:"ts"`
Id uuid.UUID `db:"id"` // output only
Id uuid.UUID `db:"id"`
Payload types.JSONText `db:"payload"`
}

Expand Down Expand Up @@ -47,7 +48,7 @@ func triggerIngestionTask(d *Client, dataSourceName string, entries []TestDO) (s
SetTaskDataSource(dataSourceName),
SetTaskTuningConfig("index_parallel", 25000, 5000000),
SetTaskIOConfigType("index_parallel"),
SetTaskInputFormat("csv", "false", []string{}),
SetTaskInputFormat("csv", "false", []string{"ts", "id", "payload"}),
SetTaskInlineInputData(csvEntriesBuff.String()),
)
taskID, err := d.Tasks().SubmitTask(spec)
Expand All @@ -69,6 +70,20 @@ func awaitTaskCompletion(client *Client, taskID string) error {
return nil
}

func awaitTaskRunning(client *Client, taskID string) error {
for range time.Tick(100 * time.Millisecond) {
res, err := client.Tasks().GetStatus(taskID)
if err != nil {
return err
}

if res.Status.Status == "RUNNING" {
return nil
}
}
return errors.New("task has not started")
}

func runInlineIngestionTask(client *Client, dataSourceName string, entries []TestDO, recordsCount int) error {
taskID, err := triggerIngestionTask(client, dataSourceName, entries)
if err != nil {
Expand Down Expand Up @@ -96,33 +111,70 @@ func runInlineIngestionTask(client *Client, dataSourceName string, entries []Tes
func TestTaskService(t *testing.T) {
// Set up druid containers using docker-compose.
compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml")
assert.NoError(t, err, "NewDockerComposeAPI()")
require.NoError(t, err, "NewDockerComposeAPI()")

// Set up cleanup for druid containers.
t.Cleanup(func() {
assert.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveVolumes(true), tc.RemoveImagesLocal), "compose.Down()")
require.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveVolumes(true), tc.RemoveImagesLocal), "compose.Down()")
})

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// Wait for druid contaners to start.
assert.NoError(t, compose.Up(ctx, tc.Wait(true)), "compose.Up()")
// Set up druid service and client.
var druidOpts []ClientOption
d, err := NewClient("http://localhost:8888", druidOpts...)
require.NoError(t, err, "error should be nil")

// Waiting for druid services to start.
err = compose.
WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("middlemanager", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8091/tcp").WithStartupTimeout(180*time.Second)).
Up(ctx, tc.Wait(true))
require.NoError(t, err, "druid services should be up with no error")

// Test create ingestion task -> get status -> complete sequence.
runInlineIngestionTask(d, "test-submit-task-datasource", testObjects, 2)
require.NoError(t, err, "error should be nil")
}

func TestTerminateTask(t *testing.T) {
// Set up druid containers using docker-compose.
compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml")
require.NoError(t, err, "NewDockerComposeAPI()")

// Set up cleanup for druid containers.
t.Cleanup(func() {
require.NoError(t, compose.Down(context.Background(), tc.RemoveOrphans(true), tc.RemoveVolumes(true), tc.RemoveImagesLocal), "compose.Down()")
})

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// Set up druid service and client.
var druidOpts []ClientOption
d, err := NewClient("http://localhost:8091", druidOpts...)
assert.NoError(t, err, "error should be nil")
d, err := NewClient("http://localhost:8888", druidOpts...)
require.NoError(t, err, "error should be nil")

// Waiting for druid coordinator service to start.
// Waiting for druid services to start.
err = compose.
WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("middlemanager", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8091/tcp").WithStartupTimeout(180*time.Second)).
Up(ctx, tc.Wait(true))
assert.NoError(t, err, "coordinator should be up with no error")
require.NoError(t, err, "druid services should be up with no error")

// Test create ingestion task -> get status -> terminate sequence.
runInlineIngestionTask(d, "test-datasource", testObjects, 2)
assert.NoError(t, err, "error should be nil")
taskID, err := triggerIngestionTask(d, "test-terminate-task-datasource", testObjects)
require.NoError(t, err, "error should be nil")

err = awaitTaskRunning(d, taskID)
require.NoError(t, err, "error should be nil")

shutdownTaskID, err := d.Tasks().Shutdown(taskID)
require.NoError(t, err, "error should be nil")
require.Equal(t, shutdownTaskID, taskID)
}

0 comments on commit 76de2f7

Please sign in to comment.