From bc7e9a3fae365a080e4ca58d5aa2c418d615f721 Mon Sep 17 00:00:00 2001 From: Tomasz Date: Thu, 5 Oct 2023 14:39:56 +0200 Subject: [PATCH 01/12] Implement suspend and resume endpoints of supervisor API --- supervisor.go | 29 ++++++++++++++++----- supervisor_test.go | 24 ++++++++++++++++- supervisor_types.go | 56 ++++++++++++++++++++++++++-------------- supervisor_types_test.go | 6 ++--- 4 files changed, 85 insertions(+), 30 deletions(-) diff --git a/supervisor.go b/supervisor.go index 7305296..8d22d3c 100644 --- a/supervisor.go +++ b/supervisor.go @@ -2,6 +2,7 @@ package druid import ( "errors" + "net/http" "strings" ) @@ -119,9 +120,17 @@ func (s *SupervisorService) GetAuditHistoryAll() (map[string]SupervisorAuditHist // Suspend calls druid Supervisor service's Suspend API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#suspend-a-running-supervisor -func (s *SupervisorService) Suspend(string) (OutputIngestionSpec, error) { - var res OutputIngestionSpec - return res, errors.New("method Suspend not implemented") +func (s *SupervisorService) Suspend(supervisorID string) (OutputIngestionSpec, error) { + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil) + var result OutputIngestionSpec + if err != nil { + return result, err + } + _, err = s.client.Do(r, &result) + if err != nil { + return result, err + } + return result, nil } // SuspendAll calls druid Supervisor service's SuspendAll API. @@ -132,9 +141,17 @@ func (s *SupervisorService) SuspendAll() (string, error) { // Resume calls druid Supervisor service's Resume API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor -func (s *SupervisorService) Resume(string) (OutputIngestionSpec, error) { - var res OutputIngestionSpec - return res, errors.New("method Resume not implemented") +func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) { + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil) + var result OutputIngestionSpec + if err != nil { + return result, err + } + _, err = s.client.Do(r, &result) + if err != nil { + return result, err + } + return result, nil } // ResumeAll calls druid Supervisor service's ResumeAll API. diff --git a/supervisor_test.go b/supervisor_test.go index 2e565ba..a521083 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -33,7 +33,8 @@ func TestSupervisorService(t *testing.T) { var spec = NewIngestionSpec(SetType("kafka"), SetBrokers("telemetry-kafka.skaffold-telemetry-victorzaytsev.svc.cluster.local:9092"), SetTopic("test-topic"), - SetDataSource("test-datasource")) + SetDataSource("test-datasource"), + SetDimensions([]Dimension{{Name: "dim1"}, {Name: "dim2"}})) assert.NoError(t, err, "error should be nil") assert.NotNil(t, spec, "specification should not be nil") @@ -50,6 +51,27 @@ func TestSupervisorService(t *testing.T) { status, err := d.Supervisor().GetStatus(spec.DataSchema.DataSource) assert.NoError(t, err, "error should be nil") assert.Equal(t, "PENDING", status.Payload.State) + assert.False(t, status.Payload.Suspended) + + // suspend and check status + suspendedSpec, err := d.Supervisor().Suspend(spec.DataSchema.DataSource) + assert.True(t, suspendedSpec.Suspended) + assert.NoError(t, err, "error should be nil") + + status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) + assert.NoError(t, err, "error should be nil") + assert.True(t, status.Payload.Suspended) + + // resume and check status + _, err = d.Supervisor().Resume(spec.DataSchema.DataSource) + assert.NoError(t, err, "error should be nil") + + status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) + assert.NoError(t, err, "error should be nil") + assert.Equal(t, "PENDING", status.Payload.State) + assert.False(t, status.Payload.Suspended) + + // terminate id, err = d.Supervisor().Terminate(spec.DataSchema.DataSource) assert.NoError(t, err, "error should be nil") assert.Equal(t, id, spec.DataSchema.DataSource) diff --git a/supervisor_types.go b/supervisor_types.go index 8fe2ae6..6191ed6 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -126,33 +126,48 @@ type SpatialDimension struct { // TransformSet is a unique set of transforms applied to the input. type TransformSet []Transform -// DimensionSet is a unique set of druid datasource dimensions(labels). -type DimensionSet []string +// Dimension represents druid dimension. +type Dimension struct { + Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` + MultiValueHandling string `json:"multiValueHandling,omitempty"` + CreateBitmapIndex bool `json:"createBitmapIndex,omitempty"` +} + +// DimensionsSet is a unique set of druid datasource dimensions(labels). +type DimensionsSet []Dimension // SpatialDimensionSet is a unique set of druid datasource spatial dimensions. type SpatialDimensionSet []SpatialDimension +// DimensionExclusionsSet represents set of excluded dimensions. +type DimensionExclusionsSet []string + // DimensionsSpec is responsible for configuring Druid's dimensions. They're a // set of columns in Druid's data model that can be used for grouping, filtering // or applying aggregations. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec type DimensionsSpec struct { - Dimensions DimensionSet `json:"dimensions,omitempty"` - DimansionExclusions DimensionSet `json:"dimansionExclusions,omitempty"` - SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"` - IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` - UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` + Dimensions DimensionsSet `json:"dimensions,omitempty"` + DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"` + SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"` + IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` + UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` +} + +type QueryGranularity struct { + Type string `json:"type,omitempty"` } // GranularitySpec allows for configuring operations such as data segment // partitioning, truncating timestamps, time chunk segmentation or roll-up. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - QueryGranularity string `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + QueryGranularity QueryGranularity `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. @@ -173,7 +188,7 @@ type AutoScalerConfig struct { MinTriggerScaleActionFrequencyMillis int `json:"minTriggerScaleActionFrequencyMillis"` } -// Defines if and when stream Supervisor can become idle. +// IdleConfig defines if and when stream Supervisor can become idle. type IdleConfig struct { Enabled bool `json:"enabled"` InactiveAfterMillis int64 `json:"inactiveAfterMillis"` @@ -335,6 +350,7 @@ type SupervisorStatusPayload struct { Partitions int `json:"partitions"` Replicas int `json:"replicas"` DurationSeconds int `json:"durationSeconds"` + Suspended bool `json:"suspended"` } // SupervisorStatus is a response object containing status of a supervisor alongside @@ -380,16 +396,16 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { }, }, DimensionsSpec: &DimensionsSpec{ - Dimensions: DimensionSet{ - "id", - "ts", - "payload", + Dimensions: DimensionsSet{ + {Name: "id"}, + {Name: "ts"}, + {Name: "payload"}, }, }, GranularitySpec: &GranularitySpec{ Type: "uniform", SegmentGranularity: "DAY", - QueryGranularity: "none", + QueryGranularity: QueryGranularity{Type: "none"}, Rollup: false, }, }, @@ -481,7 +497,7 @@ func SetTaskDuration(duration string) IngestionSpecOptions { } // SetDimensions sets druid datasource dimensions. -func SetDimensions(dimensions DimensionSet) IngestionSpecOptions { +func SetDimensions(dimensions DimensionsSet) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.DimensionsSpec.Dimensions = dimensions } @@ -515,7 +531,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions { } // SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage. -func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool) IngestionSpecOptions { +func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranularity, rollup bool) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.GranularitySpec = &GranularitySpec{ Type: "uniform", diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 759d540..10b943d 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -34,11 +34,11 @@ func TestKafkaIngestionSpec(t *testing.T) { { name: "set labels", options: []IngestionSpecOptions{ - SetDimensions([]string{"ts", "user_name", "payload"}), + SetDimensions([]Dimension{{Name: "ts"}, {Name: "user_name"}, {Name: "payload"}}), }, expected: func() *InputIngestionSpec { out := defaultKafkaIngestionSpec() - out.DataSchema.DimensionsSpec.Dimensions = []string{"ts", "user_name", "payload"} + out.DataSchema.DimensionsSpec.Dimensions = []Dimension{{Name: "ts"}, {Name: "user_name"}, {Name: "payload"}} return out }(), }, @@ -106,7 +106,7 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]string{"ts", "user_name", "payload"}), + SetDimensions([]Dimension{{Name: "ts"}, {Name: "user_name"}, {Name: "payload"}}), ) actual, err := json.MarshalIndent(spec, "", " ") if err != nil { From b2369aa3510da69fb499fff0b8190bb6a5539235 Mon Sep 17 00:00:00 2001 From: Tomasz Date: Thu, 5 Oct 2023 16:14:07 +0200 Subject: [PATCH 02/12] Added TODO tag --- supervisor_types.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index 6191ed6..05e424f 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -154,7 +154,6 @@ type DimensionsSpec struct { IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` } - type QueryGranularity struct { Type string `json:"type,omitempty"` } @@ -163,11 +162,12 @@ type QueryGranularity struct { // partitioning, truncating timestamps, time chunk segmentation or roll-up. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - QueryGranularity QueryGranularity `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + // TODO: this field is problematic as depending on value druid returns string or object + QueryGranularity QueryGranularity `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. From 6bc0bca96b7e08b22224eeba55baa6ec0e5f9276 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Mon, 4 Dec 2023 22:22:13 -0500 Subject: [PATCH 03/12] WIP: fix build --- supervisor_types.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index 579a9ca..1fbc917 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -144,7 +144,7 @@ type DimensionExclusionsSet []string // or applying aggregations. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec type DimensionsSpec struct { - Dimensions DimensionsSet `json:"dimensions,omitempty"` + Dimensions DimensionSet `json:"dimensions,omitempty"` DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"` SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"` IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` @@ -509,7 +509,7 @@ func SetTaskDuration(duration string) IngestionSpecOptions { } // SetDimensions sets druid datasource dimensions. -func SetDimensions(dimensions DimensionsSet) IngestionSpecOptions { +func SetDimensions(dimensions DimensionSet) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.DimensionsSpec.Dimensions = dimensions } From c226455ba5ba236b1c5367c5483e0c52a2df8a07 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Mon, 4 Dec 2023 22:29:26 -0500 Subject: [PATCH 04/12] WIP: fix build --- supervisor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supervisor_test.go b/supervisor_test.go index a521083..22b497a 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -34,7 +34,7 @@ func TestSupervisorService(t *testing.T) { SetBrokers("telemetry-kafka.skaffold-telemetry-victorzaytsev.svc.cluster.local:9092"), SetTopic("test-topic"), SetDataSource("test-datasource"), - SetDimensions([]Dimension{{Name: "dim1"}, {Name: "dim2"}})) + SetDimensions(DimensionSet{Dimension{Name: "dim1"}, Dimension{Name: "dim2"}})) assert.NoError(t, err, "error should be nil") assert.NotNil(t, spec, "specification should not be nil") From 9f0b91740081c4e0526d95383b6f10f1e70b935d Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Mon, 4 Dec 2023 23:20:15 -0500 Subject: [PATCH 05/12] WIP: fix test --- supervisor_types_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 80a9716..a40933b 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -74,7 +74,7 @@ var jsonBasic = `{ ] }, "granularitySpec": { - "type": "uniform", + "queryGranularity": { "type": "none" }, "segmentGranularity": "DAY", "queryGranularity": "none" } From 3ea7ac6e08fceea012f1e33de66c81210203dc91 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Tue, 5 Dec 2023 10:27:59 -0500 Subject: [PATCH 06/12] WIP: fix merge/rebase test issues --- supervisor_types_test.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/supervisor_types_test.go b/supervisor_types_test.go index a40933b..9fd3f64 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -74,9 +74,9 @@ var jsonBasic = `{ ] }, "granularitySpec": { - "queryGranularity": { "type": "none" }, + "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity": "none" + "queryGranularity": { "type": "none" } } }, "ioConfig": { @@ -141,9 +141,9 @@ var jsonWithTypedDimensions = `{ ] }, "granularitySpec": { - "type": "uniform", + "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity": "none" + "queryGranularity": {"type": "none"} } }, "ioConfig": { @@ -193,15 +193,20 @@ var jsonWithSqlInputSource = `{ }, "dimensionsSpec": { "dimensions": [ - "ts", - "user_name", - "payload" + { + "type": "string", + "name": "ts" + }, + { + "type": "json", + "name": "payload" + } ] }, "granularitySpec": { - "type": "uniform", - "segmentGranularity": "DAY", - "queryGranularity": "none" + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity: {type": "none"} } }, "ioConfig": { @@ -253,6 +258,10 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { t.Fatalf("unexpected error while marshalling: %v", err) } expected := []byte(jsonWithSqlInputSource) + + fmt.Println("Expected: " + string(expected)) + fmt.Println("Actual : " + string(actual)) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) var checkSpec *InputIngestionSpec From 54b09ed9f3c65afd9d4f25ac867caa9916554b67 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Tue, 5 Dec 2023 11:46:20 -0500 Subject: [PATCH 07/12] WIP: fix merge/rebase test issues and remove debug print --- supervisor_types_test.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 9fd3f64..7c98070 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -193,20 +193,15 @@ var jsonWithSqlInputSource = `{ }, "dimensionsSpec": { "dimensions": [ - { - "type": "string", - "name": "ts" - }, - { - "type": "json", - "name": "payload" - } + "ts", + "user_name", + "payload" ] }, "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity: {type": "none"} + "queryGranularity": {"type": "none"} } }, "ioConfig": { @@ -259,9 +254,6 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { } expected := []byte(jsonWithSqlInputSource) - fmt.Println("Expected: " + string(expected)) - fmt.Println("Actual : " + string(actual)) - require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) var checkSpec *InputIngestionSpec From b87f9a634cd3e8af8be8685db73141b96618acff Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Wed, 6 Dec 2023 00:10:47 -0500 Subject: [PATCH 08/12] WIP fix query granularity --- supervisor_test.go | 4 ++-- supervisor_types.go | 8 ++++---- supervisor_types_test.go | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/supervisor_test.go b/supervisor_test.go index 22b497a..66ee0fe 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -28,10 +28,10 @@ func TestSupervisorService(t *testing.T) { // Set up druid service and client. var druidOpts []ClientOption - d, err := NewClient("http://localhost:8081", druidOpts...) + d, err := NewClient("http://localhost:8888", druidOpts...) assert.NoError(t, err, "error should be nil") var spec = NewIngestionSpec(SetType("kafka"), - SetBrokers("telemetry-kafka.skaffold-telemetry-victorzaytsev.svc.cluster.local:9092"), + SetBrokers("kafka:9092"), SetTopic("test-topic"), SetDataSource("test-datasource"), SetDimensions(DimensionSet{Dimension{Name: "dim1"}, Dimension{Name: "dim2"}})) diff --git a/supervisor_types.go b/supervisor_types.go index 1fbc917..3bafb85 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -161,9 +161,9 @@ type GranularitySpec struct { Type string `json:"type"` SegmentGranularity string `json:"segmentGranularity,omitempty"` // TODO: this field is problematic as depending on value druid returns string or object - QueryGranularity QueryGranularity `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + QueryGranularity any `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. @@ -408,7 +408,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { GranularitySpec: &GranularitySpec{ Type: "uniform", SegmentGranularity: "DAY", - QueryGranularity: QueryGranularity{Type: "none"}, + QueryGranularity: "none", Rollup: false, }, }, diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 7c98070..e45223a 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -76,7 +76,7 @@ var jsonBasic = `{ "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity": { "type": "none" } + "queryGranularity": "none" } }, "ioConfig": { @@ -143,7 +143,7 @@ var jsonWithTypedDimensions = `{ "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity": {"type": "none"} + "queryGranularity": "none" } }, "ioConfig": { @@ -201,7 +201,7 @@ var jsonWithSqlInputSource = `{ "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", - "queryGranularity": {"type": "none"} + "queryGranularity": "none" } }, "ioConfig": { From fa66989c715964a7af92ea2adcd9f80d8e6fe55a Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Mon, 11 Dec 2023 13:23:21 -0500 Subject: [PATCH 09/12] Address CR comments --- supervisor.go | 6 ++-- supervisor_test.go | 2 +- supervisor_types.go | 70 ++++++++++++++++++++++++++++++++++------ supervisor_types_test.go | 22 +++++++++---- 4 files changed, 80 insertions(+), 20 deletions(-) diff --git a/supervisor.go b/supervisor.go index 8d22d3c..cdc0428 100644 --- a/supervisor.go +++ b/supervisor.go @@ -93,8 +93,8 @@ func (s *SupervisorService) GetSpec(supervisorId string) (OutputIngestionSpec, e // GetStatus calls druid Supervisor service's Get status API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#get-supervisor-status func (s *SupervisorService) GetStatus(supervisorId string) (SupervisorStatus, error) { - r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil) var result SupervisorStatus + r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil) if err != nil { return result, err } @@ -121,8 +121,8 @@ func (s *SupervisorService) GetAuditHistoryAll() (map[string]SupervisorAuditHist // Suspend calls druid Supervisor service's Suspend API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#suspend-a-running-supervisor func (s *SupervisorService) Suspend(supervisorID string) (OutputIngestionSpec, error) { - r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil) var result OutputIngestionSpec + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil) if err != nil { return result, err } @@ -142,8 +142,8 @@ func (s *SupervisorService) SuspendAll() (string, error) { // Resume calls druid Supervisor service's Resume API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) { - r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil) var result OutputIngestionSpec + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil) if err != nil { return result, err } diff --git a/supervisor_test.go b/supervisor_test.go index 66ee0fe..ebbf65f 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -34,7 +34,7 @@ func TestSupervisorService(t *testing.T) { SetBrokers("kafka:9092"), SetTopic("test-topic"), SetDataSource("test-datasource"), - SetDimensions(DimensionSet{Dimension{Name: "dim1"}, Dimension{Name: "dim2"}})) + SetDimensions(DimensionSet{{"dim1"}, {"dim2"}})) assert.NoError(t, err, "error should be nil") assert.NotNil(t, spec, "specification should not be nil") diff --git a/supervisor_types.go b/supervisor_types.go index 3bafb85..ff3c4c3 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -1,5 +1,10 @@ package druid +import ( + "encoding/json" + "fmt" +) + // InputIngestionSpec is the root-level type defining an ingestion spec used // by Apache Druid. type InputIngestionSpec struct { @@ -125,7 +130,12 @@ type SpatialDimension struct { type TransformSet []Transform // DimensionSet is a unique set of druid datasource dimensions(labels). -type DimensionSet []any +type DimensionSet []SingleDimension + +// SingleDimension is a single dataset dimension that can be represented by a typed Dimension or a string value +type SingleDimension struct { + Value any +} // Dimension is a typed definition of a datasource dimension. type Dimension struct { @@ -150,6 +160,11 @@ type DimensionsSpec struct { IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` } + +type Granularity struct { + Value any +} + type QueryGranularity struct { Type string `json:"type,omitempty"` } @@ -158,12 +173,11 @@ type QueryGranularity struct { // partitioning, truncating timestamps, time chunk segmentation or roll-up. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - // TODO: this field is problematic as depending on value druid returns string or object - QueryGranularity any `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + QueryGranularity Granularity `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. @@ -408,7 +422,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { GranularitySpec: &GranularitySpec{ Type: "uniform", SegmentGranularity: "DAY", - QueryGranularity: "none", + QueryGranularity: Granularity{"none"}, Rollup: false, }, }, @@ -543,7 +557,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions { } // SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage. -func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranularity, rollup bool) IngestionSpecOptions { +func SetGranularitySpec(segmentGranularity string, queryGranularity Granularity, rollup bool) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.GranularitySpec = &GranularitySpec{ Type: "uniform", @@ -571,3 +585,41 @@ func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) } } } + +func (g *Granularity) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err == nil { + g.Value = str + return nil + } + + var qg QueryGranularity + if err := json.Unmarshal(b, &qg); err == nil { + g.Value = qg + return nil + } + return fmt.Errorf("unsupported query granularity: %s", b) +} + +func (g *Granularity) MarshalJSON() ([]byte, error) { + return json.Marshal(&g.Value) +} + +func (g *SingleDimension) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err == nil { + g.Value = str + return nil + } + + var qg Dimension + if err := json.Unmarshal(b, &qg); err == nil { + g.Value = qg + return nil + } + return fmt.Errorf("unsupported dimension value: %s", b) +} + +func (g *SingleDimension) MarshalJSON() ([]byte, error) { + return json.Marshal(&g.Value) +} diff --git a/supervisor_types_test.go b/supervisor_types_test.go index e45223a..67ba82e 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -35,11 +35,15 @@ func TestKafkaIngestionSpec(t *testing.T) { { name: "set labels", options: []IngestionSpecOptions{ - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{ + {"ts"}, + {"user_name"}, + {"payload"}, + }), }, expected: func() *InputIngestionSpec { out := defaultKafkaIngestionSpec() - out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"} + out.DataSchema.DimensionsSpec.Dimensions = DimensionSet{{"ts"}, {"user_name"}, {"payload"}} return out }(), }, @@ -100,7 +104,11 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{ + {"ts"}, + {"user_name"}, + {"payload"}, + }), ) actual, err := json.Marshal(spec) if err != nil { @@ -167,9 +175,9 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]any{ - Dimension{Type: "string", Name: "ts"}, - Dimension{Type: "json", Name: "payload"}, + SetDimensions(DimensionSet{ + {Dimension{Type: "string", Name: "ts"}}, + {Dimension{Type: "json", Name: "payload"}}, }), ) actual, err := json.Marshal(spec) @@ -238,7 +246,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { SetType("index_parallel"), SetIOConfigType("index_parallel"), SetDataSource("test_datasource"), - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{{"ts"}, {"user_name"}, {"payload"}}), SetSQLInputSource("mysql", "jdbc:mysql://host:port/schema", "username", From b543780081d764aed48b94426aa595960b03cb99 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Mon, 11 Dec 2023 14:45:17 -0500 Subject: [PATCH 10/12] Fix comment and rename types. --- supervisor_types.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index ff3c4c3..ebebd9c 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -130,10 +130,10 @@ type SpatialDimension struct { type TransformSet []Transform // DimensionSet is a unique set of druid datasource dimensions(labels). -type DimensionSet []SingleDimension +type DimensionSet []DimensionSpec -// SingleDimension is a single dataset dimension that can be represented by a typed Dimension or a string value -type SingleDimension struct { +// DimensionSpec is a single dataset dimension that can be represented by a typed Dimension or a string value. +type DimensionSpec struct { Value any } @@ -161,10 +161,13 @@ type DimensionsSpec struct { UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` } -type Granularity struct { +// QueryGranularitySpec is an umbrella type for different representations of query granularity, can be string or +// QueryGranularity value. +type QueryGranularitySpec struct { Value any } +// QueryGranularity is a typed representation of query granularity. type QueryGranularity struct { Type string `json:"type,omitempty"` } @@ -173,11 +176,11 @@ type QueryGranularity struct { // partitioning, truncating timestamps, time chunk segmentation or roll-up. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - QueryGranularity Granularity `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + QueryGranularity QueryGranularitySpec `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. @@ -422,7 +425,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { GranularitySpec: &GranularitySpec{ Type: "uniform", SegmentGranularity: "DAY", - QueryGranularity: Granularity{"none"}, + QueryGranularity: QueryGranularitySpec{"none"}, Rollup: false, }, }, @@ -557,7 +560,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions { } // SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage. -func SetGranularitySpec(segmentGranularity string, queryGranularity Granularity, rollup bool) IngestionSpecOptions { +func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranularitySpec, rollup bool) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.GranularitySpec = &GranularitySpec{ Type: "uniform", @@ -586,7 +589,7 @@ func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) } } -func (g *Granularity) UnmarshalJSON(b []byte) error { +func (g *QueryGranularitySpec) UnmarshalJSON(b []byte) error { var str string if err := json.Unmarshal(b, &str); err == nil { g.Value = str @@ -601,11 +604,11 @@ func (g *Granularity) UnmarshalJSON(b []byte) error { return fmt.Errorf("unsupported query granularity: %s", b) } -func (g *Granularity) MarshalJSON() ([]byte, error) { +func (g *QueryGranularitySpec) MarshalJSON() ([]byte, error) { return json.Marshal(&g.Value) } -func (g *SingleDimension) UnmarshalJSON(b []byte) error { +func (g *DimensionSpec) UnmarshalJSON(b []byte) error { var str string if err := json.Unmarshal(b, &str); err == nil { g.Value = str @@ -620,6 +623,6 @@ func (g *SingleDimension) UnmarshalJSON(b []byte) error { return fmt.Errorf("unsupported dimension value: %s", b) } -func (g *SingleDimension) MarshalJSON() ([]byte, error) { +func (g *DimensionSpec) MarshalJSON() ([]byte, error) { return json.Marshal(&g.Value) } From 2510094a2aab5c7e7b7b6f118c4acaf1e69dda53 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Mon, 11 Dec 2023 15:43:54 -0500 Subject: [PATCH 11/12] Make QueryGranularity field a pointer field. --- supervisor_types.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index ebebd9c..01cb663 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -176,11 +176,11 @@ type QueryGranularity struct { // partitioning, truncating timestamps, time chunk segmentation or roll-up. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - QueryGranularity QueryGranularitySpec `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + QueryGranularity *QueryGranularitySpec `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. @@ -425,7 +425,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { GranularitySpec: &GranularitySpec{ Type: "uniform", SegmentGranularity: "DAY", - QueryGranularity: QueryGranularitySpec{"none"}, + QueryGranularity: &QueryGranularitySpec{"none"}, Rollup: false, }, }, @@ -560,7 +560,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions { } // SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage. -func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranularitySpec, rollup bool) IngestionSpecOptions { +func SetGranularitySpec(segmentGranularity string, queryGranularity *QueryGranularitySpec, rollup bool) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.GranularitySpec = &GranularitySpec{ Type: "uniform", From 24498ae9490cf5f000d096f53cb91f8310cc827b Mon Sep 17 00:00:00 2001 From: Victor Zaytsev <94850767+vzaytsev1981@users.noreply.github.com> Date: Tue, 12 Dec 2023 08:10:41 -0500 Subject: [PATCH 12/12] Update supervisor.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Luboš Pokorný --- supervisor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supervisor.go b/supervisor.go index cdc0428..2d76589 100644 --- a/supervisor.go +++ b/supervisor.go @@ -143,7 +143,7 @@ func (s *SupervisorService) SuspendAll() (string, error) { // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) { var result OutputIngestionSpec - r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil) + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil /*no data*/) if err != nil { return result, err }