From fa66989c715964a7af92ea2adcd9f80d8e6fe55a Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Mon, 11 Dec 2023 13:23:21 -0500 Subject: [PATCH] Address CR comments --- supervisor.go | 6 ++-- supervisor_test.go | 2 +- supervisor_types.go | 70 ++++++++++++++++++++++++++++++++++------ supervisor_types_test.go | 22 +++++++++---- 4 files changed, 80 insertions(+), 20 deletions(-) diff --git a/supervisor.go b/supervisor.go index 8d22d3c..cdc0428 100644 --- a/supervisor.go +++ b/supervisor.go @@ -93,8 +93,8 @@ func (s *SupervisorService) GetSpec(supervisorId string) (OutputIngestionSpec, e // GetStatus calls druid Supervisor service's Get status API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#get-supervisor-status func (s *SupervisorService) GetStatus(supervisorId string) (SupervisorStatus, error) { - r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil) var result SupervisorStatus + r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil) if err != nil { return result, err } @@ -121,8 +121,8 @@ func (s *SupervisorService) GetAuditHistoryAll() (map[string]SupervisorAuditHist // Suspend calls druid Supervisor service's Suspend API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#suspend-a-running-supervisor func (s *SupervisorService) Suspend(supervisorID string) (OutputIngestionSpec, error) { - r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil) var result OutputIngestionSpec + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil) if err != nil { return result, err } @@ -142,8 +142,8 @@ func (s *SupervisorService) SuspendAll() (string, error) { // Resume calls druid Supervisor service's Resume API. // https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) { - r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil) var result OutputIngestionSpec + r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil) if err != nil { return result, err } diff --git a/supervisor_test.go b/supervisor_test.go index 66ee0fe..ebbf65f 100644 --- a/supervisor_test.go +++ b/supervisor_test.go @@ -34,7 +34,7 @@ func TestSupervisorService(t *testing.T) { SetBrokers("kafka:9092"), SetTopic("test-topic"), SetDataSource("test-datasource"), - SetDimensions(DimensionSet{Dimension{Name: "dim1"}, Dimension{Name: "dim2"}})) + SetDimensions(DimensionSet{{"dim1"}, {"dim2"}})) assert.NoError(t, err, "error should be nil") assert.NotNil(t, spec, "specification should not be nil") diff --git a/supervisor_types.go b/supervisor_types.go index 3bafb85..ff3c4c3 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -1,5 +1,10 @@ package druid +import ( + "encoding/json" + "fmt" +) + // InputIngestionSpec is the root-level type defining an ingestion spec used // by Apache Druid. type InputIngestionSpec struct { @@ -125,7 +130,12 @@ type SpatialDimension struct { type TransformSet []Transform // DimensionSet is a unique set of druid datasource dimensions(labels). -type DimensionSet []any +type DimensionSet []SingleDimension + +// SingleDimension is a single dataset dimension that can be represented by a typed Dimension or a string value +type SingleDimension struct { + Value any +} // Dimension is a typed definition of a datasource dimension. type Dimension struct { @@ -150,6 +160,11 @@ type DimensionsSpec struct { IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` } + +type Granularity struct { + Value any +} + type QueryGranularity struct { Type string `json:"type,omitempty"` } @@ -158,12 +173,11 @@ type QueryGranularity struct { // partitioning, truncating timestamps, time chunk segmentation or roll-up. // https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - // TODO: this field is problematic as depending on value druid returns string or object - QueryGranularity any `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + QueryGranularity Granularity `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup,omitempty"` + Intervals []string `json:"intervals,omitempty"` } // AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. @@ -408,7 +422,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec { GranularitySpec: &GranularitySpec{ Type: "uniform", SegmentGranularity: "DAY", - QueryGranularity: "none", + QueryGranularity: Granularity{"none"}, Rollup: false, }, }, @@ -543,7 +557,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions { } // SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage. -func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranularity, rollup bool) IngestionSpecOptions { +func SetGranularitySpec(segmentGranularity string, queryGranularity Granularity, rollup bool) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.DataSchema.GranularitySpec = &GranularitySpec{ Type: "uniform", @@ -571,3 +585,41 @@ func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) } } } + +func (g *Granularity) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err == nil { + g.Value = str + return nil + } + + var qg QueryGranularity + if err := json.Unmarshal(b, &qg); err == nil { + g.Value = qg + return nil + } + return fmt.Errorf("unsupported query granularity: %s", b) +} + +func (g *Granularity) MarshalJSON() ([]byte, error) { + return json.Marshal(&g.Value) +} + +func (g *SingleDimension) UnmarshalJSON(b []byte) error { + var str string + if err := json.Unmarshal(b, &str); err == nil { + g.Value = str + return nil + } + + var qg Dimension + if err := json.Unmarshal(b, &qg); err == nil { + g.Value = qg + return nil + } + return fmt.Errorf("unsupported dimension value: %s", b) +} + +func (g *SingleDimension) MarshalJSON() ([]byte, error) { + return json.Marshal(&g.Value) +} diff --git a/supervisor_types_test.go b/supervisor_types_test.go index e45223a..67ba82e 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -35,11 +35,15 @@ func TestKafkaIngestionSpec(t *testing.T) { { name: "set labels", options: []IngestionSpecOptions{ - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{ + {"ts"}, + {"user_name"}, + {"payload"}, + }), }, expected: func() *InputIngestionSpec { out := defaultKafkaIngestionSpec() - out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"} + out.DataSchema.DimensionsSpec.Dimensions = DimensionSet{{"ts"}, {"user_name"}, {"payload"}} return out }(), }, @@ -100,7 +104,11 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{ + {"ts"}, + {"user_name"}, + {"payload"}, + }), ) actual, err := json.Marshal(spec) if err != nil { @@ -167,9 +175,9 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { SetDataSource("test_datasource"), SetTopic("test_topic"), SetBrokers("test_brokers"), - SetDimensions([]any{ - Dimension{Type: "string", Name: "ts"}, - Dimension{Type: "json", Name: "payload"}, + SetDimensions(DimensionSet{ + {Dimension{Type: "string", Name: "ts"}}, + {Dimension{Type: "json", Name: "payload"}}, }), ) actual, err := json.Marshal(spec) @@ -238,7 +246,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { SetType("index_parallel"), SetIOConfigType("index_parallel"), SetDataSource("test_datasource"), - SetDimensions([]any{"ts", "user_name", "payload"}), + SetDimensions(DimensionSet{{"ts"}, {"user_name"}, {"payload"}}), SetSQLInputSource("mysql", "jdbc:mysql://host:port/schema", "username",