diff --git a/supervisor.go b/supervisor.go index 7305296..2d76589 100644 --- a/supervisor.go +++ b/supervisor.go @@ -2,6 +2,7 @@ package druid import ( "errors" + "net/http" "strings" ) @@ -92,8 +93,8 @@ func (s *SupervisorService) GetSpec(supervisorId string) (OutputIngestionSpec, e // GetStatus calls druid Supervisor service's Get status API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#get-supervisor-status func (s *SupervisorService) GetStatus(supervisorId string) (SupervisorStatus, error) { - r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil) var result SupervisorStatus + r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil) if err != nil { return result, err } @@ -119,9 +120,17 @@ func (s *SupervisorService) GetAuditHistoryAll() (map[string]SupervisorAuditHist // Suspend calls druid Supervisor service's Suspend API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#suspend-a-running-supervisor -func (s *SupervisorService) Suspend(string) (OutputIngestionSpec, error) { - var res OutputIngestionSpec - return res, errors.New("method Suspend not implemented") +func (s *SupervisorService) Suspend(supervisorID string) (OutputIngestionSpec, error) { + var result OutputIngestionSpec + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil) + if err != nil { + return result, err + } + _, err = s.client.Do(r, &result) + if err != nil { + return result, err + } + return result, nil } // SuspendAll calls druid Supervisor service's SuspendAll API. @@ -132,9 +141,17 @@ func (s *SupervisorService) SuspendAll() (string, error) { // Resume calls druid Supervisor service's Resume API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor -func (s *SupervisorService) Resume(string) (OutputIngestionSpec, error) { - var res OutputIngestionSpec - return res, errors.New("method Resume not implemented") +func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) { + var result OutputIngestionSpec + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil /*no data*/) + if err != nil { + return result, err + } + _, err = s.client.Do(r, &result) + if err != nil { + return result, err + } + return result, nil } // ResumeAll calls druid Supervisor service's ResumeAll API. diff --git a/supervisor_test.go b/supervisor_test.go index 2e565ba..ebbf65f 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -28,12 +28,13 @@ func TestSupervisorService(t *testing.T) { // Set up druid service and client. var druidOpts []ClientOption - d, err := NewClient("http://localhost:8081", druidOpts...) + d, err := NewClient("http://localhost:8888", druidOpts...) assert.NoError(t, err, "error should be nil") var spec = NewIngestionSpec(SetType("kafka"), - SetBrokers("telemetry-kafka.skaffold-telemetry-victorzaytsev.svc.cluster.local:9092"), + SetBrokers("kafka:9092"), SetTopic("test-topic"), - SetDataSource("test-datasource")) + SetDataSource("test-datasource"), + SetDimensions(DimensionSet{{"dim1"}, {"dim2"}})) assert.NoError(t, err, "error should be nil") assert.NotNil(t, spec, "specification should not be nil") @@ -50,6 +51,27 @@ func TestSupervisorService(t *testing.T) { status, err := d.Supervisor().GetStatus(spec.DataSchema.DataSource) assert.NoError(t, err, "error should be nil") assert.Equal(t, "PENDING", status.Payload.State) + assert.False(t, status.Payload.Suspended) + + // suspend and check status + suspendedSpec, err := d.Supervisor().Suspend(spec.DataSchema.DataSource) + assert.True(t, suspendedSpec.Suspended) + assert.NoError(t, err, "error should be nil") + + status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) + assert.NoError(t, err, "error should be nil") + assert.True(t, status.Payload.Suspended) + + // resume and check status + _, err = d.Supervisor().Resume(spec.DataSchema.DataSource) + assert.NoError(t, err, "error should be nil") + + status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) + assert.NoError(t, err, "error should be nil") + assert.Equal(t, "PENDING", status.Payload.State) + assert.False(t, status.Payload.Suspended) + + // terminate id, err = d.Supervisor().Terminate(spec.DataSchema.DataSource) assert.NoError(t, err, "error should be nil") assert.Equal(t, id, spec.DataSchema.DataSource) diff --git a/supervisor_types.go b/supervisor_types.go index d6fb624..01cb663 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -1,5 +1,10 @@ package druid +import ( + "encoding/json" + "fmt" +) + // InputIngestionSpec is the root-level type defining an ingestion spec used // by Apache Druid. type InputIngestionSpec struct { @@ -125,7 +130,12 @@ type SpatialDimension struct { type TransformSet []Transform // DimensionSet is a unique set of druid datasource dimensions(labels). -type DimensionSet []any +type DimensionSet []DimensionSpec + +// DimensionSpec is a single dataset dimension that can be represented by a typed Dimension or a string value. +type DimensionSpec struct { + Value any +} // Dimension is a typed definition of a datasource dimension. type Dimension struct { @@ -136,27 +146,41 @@ type Dimension struct { // SpatialDimensionSet is a unique set of druid datasource spatial dimensions. type SpatialDimensionSet []SpatialDimension +// DimensionExclusionsSet represents set of excluded dimensions. +type DimensionExclusionsSet []string + // DimensionsSpec is responsible for configuring Druid's dimensions. They're a // set of columns in Druid's data model that can be used for grouping, filtering // or applying aggregations. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec type DimensionsSpec struct { - Dimensions DimensionSet `json:"dimensions,omitempty"` - DimansionExclusions DimensionSet `json:"dimansionExclusions,omitempty"` - SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"` - IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` - UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` + Dimensions DimensionSet `json:"dimensions,omitempty"` + DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"` + SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"` + IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` + UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` +} + +// QueryGranularitySpec is an umbrella type for different representations of query granularity, can be string or +// QueryGranularity value. +type QueryGranularitySpec struct { + Value any +} + +// QueryGranularity is a typed representation of query granularity. +type QueryGranularity struct { + Type string `json:"type,omitempty"` } // GranularitySpec allows for configuring operations such as data segment // partitioning, truncating timestamps, time chunk segmentation or roll-up. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - QueryGranularity string `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + QueryGranularity *QueryGranularitySpec `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. @@ -177,7 +201,7 @@ type AutoScalerConfig struct { MinTriggerScaleActionFrequencyMillis int `json:"minTriggerScaleActionFrequencyMillis"` } -// Defines if and when stream Supervisor can become idle. +// IdleConfig defines if and when stream Supervisor can become idle. type IdleConfig struct { Enabled bool `json:"enabled"` InactiveAfterMillis int64 `json:"inactiveAfterMillis"` @@ -356,6 +380,7 @@ type SupervisorStatusPayload struct { Partitions int `json:"partitions"` Replicas int `json:"replicas"` DurationSeconds int `json:"durationSeconds"` + Suspended bool `json:"suspended"` } // SupervisorStatus is a response object containing status of a supervisor alongside @@ -400,7 +425,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { GranularitySpec: &GranularitySpec{ Type: "uniform", SegmentGranularity: "DAY", - QueryGranularity: "none", + QueryGranularity: &QueryGranularitySpec{"none"}, Rollup: false, }, }, @@ -535,7 +560,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions { } // SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage. -func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool) IngestionSpecOptions { +func SetGranularitySpec(segmentGranularity string, queryGranularity *QueryGranularitySpec, rollup bool) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.GranularitySpec = &GranularitySpec{ Type: "uniform", @@ -563,3 +588,41 @@ func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) } } } + +func (g *QueryGranularitySpec) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err == nil { + g.Value = str + return nil + } + + var qg QueryGranularity + if err := json.Unmarshal(b, &qg); err == nil { + g.Value = qg + return nil + } + return fmt.Errorf("unsupported query granularity: %s", b) +} + +func (g *QueryGranularitySpec) MarshalJSON() ([]byte, error) { + return json.Marshal(&g.Value) +} + +func (g *DimensionSpec) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err == nil { + g.Value = str + return nil + } + + var qg Dimension + if err := json.Unmarshal(b, &qg); err == nil { + g.Value = qg + return nil + } + return fmt.Errorf("unsupported dimension value: %s", b) +} + +func (g *DimensionSpec) MarshalJSON() ([]byte, error) { + return json.Marshal(&g.Value) +} diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 80a9716..67ba82e 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -35,11 +35,15 @@ func TestKafkaIngestionSpec(t *testing.T) { { name: "set labels", options: []IngestionSpecOptions{ - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{ + {"ts"}, + {"user_name"}, + {"payload"}, + }), }, expected: func() *InputIngestionSpec { out := defaultKafkaIngestionSpec() - out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"} + out.DataSchema.DimensionsSpec.Dimensions = DimensionSet{{"ts"}, {"user_name"}, {"payload"}} return out }(), }, @@ -74,9 +78,9 @@ var jsonBasic = `{ ] }, "granularitySpec": { - "type": "uniform", + "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity": "none" + "queryGranularity": "none" } }, "ioConfig": { @@ -100,7 +104,11 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{ + {"ts"}, + {"user_name"}, + {"payload"}, + }), ) actual, err := json.Marshal(spec) if err != nil { @@ -141,7 +149,7 @@ var jsonWithTypedDimensions = `{ ] }, "granularitySpec": { - "type": "uniform", + "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": "none" } @@ -167,9 +175,9 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]any{ - Dimension{Type: "string", Name: "ts"}, - Dimension{Type: "json", Name: "payload"}, + SetDimensions(DimensionSet{ + {Dimension{Type: "string", Name: "ts"}}, + {Dimension{Type: "json", Name: "payload"}}, }), ) actual, err := json.Marshal(spec) @@ -199,8 +207,8 @@ var jsonWithSqlInputSource = `{ ] }, "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", + "type": "uniform", + "segmentGranularity": "DAY", "queryGranularity": "none" } }, @@ -238,7 +246,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { SetType("index_parallel"), SetIOConfigType("index_parallel"), SetDataSource("test_datasource"), - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{{"ts"}, {"user_name"}, {"payload"}}), SetSQLInputSource("mysql", "jdbc:mysql://host:port/schema", "username", @@ -253,6 +261,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { t.Fatalf("unexpected error while marshalling: %v", err) } expected := []byte(jsonWithSqlInputSource) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) var checkSpec *InputIngestionSpec