Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement suspend and resume endpoints of supervisor API #7

Merged
merged 13 commits into from
Dec 12, 2023
31 changes: 24 additions & 7 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package druid

import (
"errors"
"net/http"
"strings"
)

Expand Down Expand Up @@ -92,8 +93,8 @@ func (s *SupervisorService) GetSpec(supervisorId string) (OutputIngestionSpec, e
// GetStatus calls druid Supervisor service's Get status API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#get-supervisor-status
func (s *SupervisorService) GetStatus(supervisorId string) (SupervisorStatus, error) {
r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil)
var result SupervisorStatus
r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please consider using http.MethodGet

if err != nil {
return result, err
}
Expand All @@ -119,9 +120,17 @@ func (s *SupervisorService) GetAuditHistoryAll() (map[string]SupervisorAuditHist

// Suspend calls druid Supervisor service's Suspend API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#suspend-a-running-supervisor
func (s *SupervisorService) Suspend(string) (OutputIngestionSpec, error) {
var res OutputIngestionSpec
return res, errors.New("method Suspend not implemented")
func (s *SupervisorService) Suspend(supervisorID string) (OutputIngestionSpec, error) {
var result OutputIngestionSpec
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil)
if err != nil {
return result, err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result, err
}
return result, nil
}

// SuspendAll calls druid Supervisor service's SuspendAll API.
Expand All @@ -132,9 +141,17 @@ func (s *SupervisorService) SuspendAll() (string, error) {

// Resume calls druid Supervisor service's Resume API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor
func (s *SupervisorService) Resume(string) (OutputIngestionSpec, error) {
var res OutputIngestionSpec
return res, errors.New("method Resume not implemented")
func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) {
var result OutputIngestionSpec
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil /*no data*/)
if err != nil {
return result, err
}
_, err = s.client.Do(r, &result)
if err != nil {
return result, err
}
return result, nil
}

// ResumeAll calls druid Supervisor service's ResumeAll API.
Expand Down
28 changes: 25 additions & 3 deletions supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ func TestSupervisorService(t *testing.T) {

// Set up druid service and client.
var druidOpts []ClientOption
d, err := NewClient("http://localhost:8081", druidOpts...)
d, err := NewClient("http://localhost:8888", druidOpts...)
assert.NoError(t, err, "error should be nil")
var spec = NewIngestionSpec(SetType("kafka"),
SetBrokers("telemetry-kafka.skaffold-telemetry-victorzaytsev.svc.cluster.local:9092"),
SetBrokers("kafka:9092"),
SetTopic("test-topic"),
SetDataSource("test-datasource"))
SetDataSource("test-datasource"),
SetDimensions(DimensionSet{{"dim1"}, {"dim2"}}))
assert.NoError(t, err, "error should be nil")
assert.NotNil(t, spec, "specification should not be nil")

Expand All @@ -50,6 +51,27 @@ func TestSupervisorService(t *testing.T) {
status, err := d.Supervisor().GetStatus(spec.DataSchema.DataSource)
assert.NoError(t, err, "error should be nil")
assert.Equal(t, "PENDING", status.Payload.State)
assert.False(t, status.Payload.Suspended)

// suspend and check status
suspendedSpec, err := d.Supervisor().Suspend(spec.DataSchema.DataSource)
assert.True(t, suspendedSpec.Suspended)
assert.NoError(t, err, "error should be nil")

status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource)
assert.NoError(t, err, "error should be nil")
assert.True(t, status.Payload.Suspended)

// resume and check status
_, err = d.Supervisor().Resume(spec.DataSchema.DataSource)
assert.NoError(t, err, "error should be nil")

status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource)
assert.NoError(t, err, "error should be nil")
assert.Equal(t, "PENDING", status.Payload.State)
assert.False(t, status.Payload.Suspended)

// terminate
id, err = d.Supervisor().Terminate(spec.DataSchema.DataSource)
assert.NoError(t, err, "error should be nil")
assert.Equal(t, id, spec.DataSchema.DataSource)
Expand Down
91 changes: 77 additions & 14 deletions supervisor_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package druid

import (
"encoding/json"
"fmt"
)

// InputIngestionSpec is the root-level type defining an ingestion spec used
// by Apache Druid.
type InputIngestionSpec struct {
Expand Down Expand Up @@ -125,7 +130,12 @@ type SpatialDimension struct {
type TransformSet []Transform

// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []any
type DimensionSet []DimensionSpec

// DimensionSpec is a single dataset dimension that can be represented by a typed Dimension or a string value.
type DimensionSpec struct {
Value any
}

// Dimension is a typed definition of a datasource dimension.
type Dimension struct {
Expand All @@ -136,27 +146,41 @@ type Dimension struct {
// SpatialDimensionSet is a unique set of druid datasource spatial dimensions.
type SpatialDimensionSet []SpatialDimension

// DimensionExclusionsSet represents set of excluded dimensions.
type DimensionExclusionsSet []string

// DimensionsSpec is responsible for configuring Druid's dimensions. They're a
// set of columns in Druid's data model that can be used for grouping, filtering
// or applying aggregations.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec
type DimensionsSpec struct {
Dimensions DimensionSet `json:"dimensions,omitempty"`
DimansionExclusions DimensionSet `json:"dimansionExclusions,omitempty"`
SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"`
IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"`
UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"`
Dimensions DimensionSet `json:"dimensions,omitempty"`
DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"`
SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"`
IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"`
UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"`
}

// QueryGranularitySpec is an umbrella type for different representations of query granularity, can be string or
// QueryGranularity value.
type QueryGranularitySpec struct {
Value any
}

// QueryGranularity is a typed representation of query granularity.
type QueryGranularity struct {
Type string `json:"type,omitempty"`
}
Comment on lines +171 to 173
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type QueryGranularity struct {
Type string `json:"type,omitempty"`
}
type QueryGranularity struct {
Type string `json:"type,omitempty"`
}

And add a comment please.


// GranularitySpec allows for configuring operations such as data segment
// partitioning, truncating timestamps, time chunk segmentation or roll-up.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec
type GranularitySpec struct {
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
QueryGranularity string `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup,omitempty"`
Intervals []string `json:"intervals,omitempty"`
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
QueryGranularity *QueryGranularitySpec `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup,omitempty"`
Intervals []string `json:"intervals,omitempty"`
}

// AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling.
Expand All @@ -177,7 +201,7 @@ type AutoScalerConfig struct {
MinTriggerScaleActionFrequencyMillis int `json:"minTriggerScaleActionFrequencyMillis"`
}

// Defines if and when stream Supervisor can become idle.
// IdleConfig defines if and when stream Supervisor can become idle.
vzayts marked this conversation as resolved.
Show resolved Hide resolved
type IdleConfig struct {
Enabled bool `json:"enabled"`
InactiveAfterMillis int64 `json:"inactiveAfterMillis"`
Expand Down Expand Up @@ -356,6 +380,7 @@ type SupervisorStatusPayload struct {
Partitions int `json:"partitions"`
Replicas int `json:"replicas"`
DurationSeconds int `json:"durationSeconds"`
Suspended bool `json:"suspended"`
}

// SupervisorStatus is a response object containing status of a supervisor alongside
Expand Down Expand Up @@ -400,7 +425,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec {
GranularitySpec: &GranularitySpec{
Type: "uniform",
SegmentGranularity: "DAY",
QueryGranularity: "none",
QueryGranularity: &QueryGranularitySpec{"none"},
Rollup: false,
},
},
Expand Down Expand Up @@ -535,7 +560,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions {
}

// SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage.
func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool) IngestionSpecOptions {
func SetGranularitySpec(segmentGranularity string, queryGranularity *QueryGranularitySpec, rollup bool) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.DataSchema.GranularitySpec = &GranularitySpec{
Type: "uniform",
Expand Down Expand Up @@ -563,3 +588,41 @@ func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string)
}
}
}

func (g *QueryGranularitySpec) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err == nil {
g.Value = str
return nil
}

var qg QueryGranularity
if err := json.Unmarshal(b, &qg); err == nil {
g.Value = qg
return nil
}
return fmt.Errorf("unsupported query granularity: %s", b)
}

func (g *QueryGranularitySpec) MarshalJSON() ([]byte, error) {
return json.Marshal(&g.Value)
}

func (g *DimensionSpec) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err == nil {
g.Value = str
return nil
}

var qg Dimension
if err := json.Unmarshal(b, &qg); err == nil {
g.Value = qg
return nil
}
return fmt.Errorf("unsupported dimension value: %s", b)
}

func (g *DimensionSpec) MarshalJSON() ([]byte, error) {
return json.Marshal(&g.Value)
}
33 changes: 21 additions & 12 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ func TestKafkaIngestionSpec(t *testing.T) {
{
name: "set labels",
options: []IngestionSpecOptions{
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{
{"ts"},
{"user_name"},
{"payload"},
}),
},
expected: func() *InputIngestionSpec {
out := defaultKafkaIngestionSpec()
out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"}
out.DataSchema.DimensionsSpec.Dimensions = DimensionSet{{"ts"}, {"user_name"}, {"payload"}}
return out
}(),
},
Expand Down Expand Up @@ -74,9 +78,9 @@ var jsonBasic = `{
]
},
"granularitySpec": {
"type": "uniform",
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
"queryGranularity": "none"
}
},
"ioConfig": {
Expand All @@ -100,7 +104,11 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{
{"ts"},
{"user_name"},
{"payload"},
}),
)
actual, err := json.Marshal(spec)
if err != nil {
Expand Down Expand Up @@ -141,7 +149,7 @@ var jsonWithTypedDimensions = `{
]
},
"granularitySpec": {
"type": "uniform",
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
Expand All @@ -167,9 +175,9 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
SetDimensions(DimensionSet{
{Dimension{Type: "string", Name: "ts"}},
{Dimension{Type: "json", Name: "payload"}},
}),
)
actual, err := json.Marshal(spec)
Expand Down Expand Up @@ -199,8 +207,8 @@ var jsonWithSqlInputSource = `{
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
},
Expand Down Expand Up @@ -238,7 +246,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{{"ts"}, {"user_name"}, {"payload"}}),
SetSQLInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
Expand All @@ -253,6 +261,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithSqlInputSource)

require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
Expand Down
Loading