Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement suspend and resume endpoints of supervisor API #7

Merged
merged 13 commits into from
Dec 12, 2023
6 changes: 3 additions & 3 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (s *SupervisorService) GetSpec(supervisorId string) (OutputIngestionSpec, e
// GetStatus calls druid Supervisor service's Get status API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#get-supervisor-status
func (s *SupervisorService) GetStatus(supervisorId string) (SupervisorStatus, error) {
r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil)
var result SupervisorStatus
r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please consider using http.MethodGet

if err != nil {
return result, err
}
Expand All @@ -121,8 +121,8 @@ func (s *SupervisorService) GetAuditHistoryAll() (map[string]SupervisorAuditHist
// Suspend calls druid Supervisor service's Suspend API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#suspend-a-running-supervisor
func (s *SupervisorService) Suspend(supervisorID string) (OutputIngestionSpec, error) {
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil)
var result OutputIngestionSpec
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil)
if err != nil {
return result, err
}
Expand All @@ -142,8 +142,8 @@ func (s *SupervisorService) SuspendAll() (string, error) {
// Resume calls druid Supervisor service's Resume API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor
func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) {
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil)
var result OutputIngestionSpec
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil)
vzayts marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return result, err
}
Expand Down
2 changes: 1 addition & 1 deletion supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestSupervisorService(t *testing.T) {
SetBrokers("kafka:9092"),
SetTopic("test-topic"),
SetDataSource("test-datasource"),
SetDimensions(DimensionSet{Dimension{Name: "dim1"}, Dimension{Name: "dim2"}}))
SetDimensions(DimensionSet{{"dim1"}, {"dim2"}}))
assert.NoError(t, err, "error should be nil")
assert.NotNil(t, spec, "specification should not be nil")

Expand Down
73 changes: 64 additions & 9 deletions supervisor_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package druid

import (
"encoding/json"
"fmt"
)

// InputIngestionSpec is the root-level type defining an ingestion spec used
// by Apache Druid.
type InputIngestionSpec struct {
Expand Down Expand Up @@ -125,7 +130,12 @@ type SpatialDimension struct {
type TransformSet []Transform

// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []any
type DimensionSet []DimensionSpec

// DimensionSpec is a single dataset dimension that can be represented by a typed Dimension or a string value.
type DimensionSpec struct {
Value any
}

// Dimension is a typed definition of a datasource dimension.
type Dimension struct {
Expand All @@ -150,6 +160,14 @@ type DimensionsSpec struct {
IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"`
UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"`
}

// QueryGranularitySpec is an umbrella type for different representations of query granularity, can be string or
// QueryGranularity value.
type QueryGranularitySpec struct {
Value any
}

// QueryGranularity is a typed representation of query granularity.
type QueryGranularity struct {
Type string `json:"type,omitempty"`
}
Expand All @@ -158,12 +176,11 @@ type QueryGranularity struct {
// partitioning, truncating timestamps, time chunk segmentation or roll-up.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec
type GranularitySpec struct {
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
// TODO: this field is problematic as depending on value druid returns string or object
QueryGranularity any `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup,omitempty"`
Intervals []string `json:"intervals,omitempty"`
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
QueryGranularity *QueryGranularitySpec `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup,omitempty"`
Intervals []string `json:"intervals,omitempty"`
}

// AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling.
Expand Down Expand Up @@ -408,7 +425,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec {
GranularitySpec: &GranularitySpec{
Type: "uniform",
SegmentGranularity: "DAY",
QueryGranularity: "none",
QueryGranularity: &QueryGranularitySpec{"none"},
Rollup: false,
},
},
Expand Down Expand Up @@ -543,7 +560,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions {
}

// SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage.
func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranularity, rollup bool) IngestionSpecOptions {
func SetGranularitySpec(segmentGranularity string, queryGranularity *QueryGranularitySpec, rollup bool) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.DataSchema.GranularitySpec = &GranularitySpec{
Type: "uniform",
Expand Down Expand Up @@ -571,3 +588,41 @@ func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string)
}
}
}

func (g *QueryGranularitySpec) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err == nil {
g.Value = str
return nil
}

var qg QueryGranularity
if err := json.Unmarshal(b, &qg); err == nil {
g.Value = qg
return nil
}
return fmt.Errorf("unsupported query granularity: %s", b)
}

func (g *QueryGranularitySpec) MarshalJSON() ([]byte, error) {
return json.Marshal(&g.Value)
}

func (g *DimensionSpec) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err == nil {
g.Value = str
return nil
}

var qg Dimension
if err := json.Unmarshal(b, &qg); err == nil {
g.Value = qg
return nil
}
return fmt.Errorf("unsupported dimension value: %s", b)
}

func (g *DimensionSpec) MarshalJSON() ([]byte, error) {
return json.Marshal(&g.Value)
}
22 changes: 15 additions & 7 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ func TestKafkaIngestionSpec(t *testing.T) {
{
name: "set labels",
options: []IngestionSpecOptions{
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{
{"ts"},
{"user_name"},
{"payload"},
}),
},
expected: func() *InputIngestionSpec {
out := defaultKafkaIngestionSpec()
out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"}
out.DataSchema.DimensionsSpec.Dimensions = DimensionSet{{"ts"}, {"user_name"}, {"payload"}}
return out
}(),
},
Expand Down Expand Up @@ -100,7 +104,11 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{
{"ts"},
{"user_name"},
{"payload"},
}),
)
actual, err := json.Marshal(spec)
if err != nil {
Expand Down Expand Up @@ -167,9 +175,9 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
SetDimensions(DimensionSet{
{Dimension{Type: "string", Name: "ts"}},
{Dimension{Type: "json", Name: "payload"}},
}),
)
actual, err := json.Marshal(spec)
Expand Down Expand Up @@ -238,7 +246,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{{"ts"}, {"user_name"}, {"payload"}}),
SetSQLInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
Expand Down
Loading