Skip to content

Commit

Permalink
Address CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Dec 11, 2023
1 parent b87f9a6 commit fa66989
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 20 deletions.
6 changes: 3 additions & 3 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (s *SupervisorService) GetSpec(supervisorId string) (OutputIngestionSpec, e
// GetStatus calls druid Supervisor service's Get status API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#get-supervisor-status
func (s *SupervisorService) GetStatus(supervisorId string) (SupervisorStatus, error) {
r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil)
var result SupervisorStatus
r, err := s.client.NewRequest("GET", applySupervisorId(supervisorStatusEndpoint, supervisorId), nil)
if err != nil {
return result, err
}
Expand All @@ -121,8 +121,8 @@ func (s *SupervisorService) GetAuditHistoryAll() (map[string]SupervisorAuditHist
// Suspend calls druid Supervisor service's Suspend API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#suspend-a-running-supervisor
func (s *SupervisorService) Suspend(supervisorID string) (OutputIngestionSpec, error) {
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil)
var result OutputIngestionSpec
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorSuspendEndpoint, supervisorID), nil)
if err != nil {
return result, err
}
Expand All @@ -142,8 +142,8 @@ func (s *SupervisorService) SuspendAll() (string, error) {
// Resume calls druid Supervisor service's Resume API.
// https://druid.apache.org/docs/latest/api-reference/supervisor-api/#resume-a-supervisor
func (s *SupervisorService) Resume(supervisorID string) (OutputIngestionSpec, error) {
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil)
var result OutputIngestionSpec
r, err := s.client.NewRequest(http.MethodPost, applySupervisorId(supervisorResumeEndpoint, supervisorID), nil)
if err != nil {
return result, err
}
Expand Down
2 changes: 1 addition & 1 deletion supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestSupervisorService(t *testing.T) {
SetBrokers("kafka:9092"),
SetTopic("test-topic"),
SetDataSource("test-datasource"),
SetDimensions(DimensionSet{Dimension{Name: "dim1"}, Dimension{Name: "dim2"}}))
SetDimensions(DimensionSet{{"dim1"}, {"dim2"}}))
assert.NoError(t, err, "error should be nil")
assert.NotNil(t, spec, "specification should not be nil")

Expand Down
70 changes: 61 additions & 9 deletions supervisor_types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package druid

import (
"encoding/json"
"fmt"
)

// InputIngestionSpec is the root-level type defining an ingestion spec used
// by Apache Druid.
type InputIngestionSpec struct {
Expand Down Expand Up @@ -125,7 +130,12 @@ type SpatialDimension struct {
type TransformSet []Transform

// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []any
type DimensionSet []SingleDimension

// SingleDimension is a single dataset dimension that can be represented by a typed Dimension or a string value
type SingleDimension struct {
Value any
}

// Dimension is a typed definition of a datasource dimension.
type Dimension struct {
Expand All @@ -150,6 +160,11 @@ type DimensionsSpec struct {
IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"`
UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"`
}

type Granularity struct {
Value any
}

type QueryGranularity struct {
Type string `json:"type,omitempty"`
}
Expand All @@ -158,12 +173,11 @@ type QueryGranularity struct {
// partitioning, truncating timestamps, time chunk segmentation or roll-up.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec
type GranularitySpec struct {
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
// TODO: this field is problematic as depending on value druid returns string or object
QueryGranularity any `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup,omitempty"`
Intervals []string `json:"intervals,omitempty"`
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
QueryGranularity Granularity `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup,omitempty"`
Intervals []string `json:"intervals,omitempty"`
}

// AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling.
Expand Down Expand Up @@ -408,7 +422,7 @@ func defaultKafkaIngestionSpec() *InputIngestionSpec {
GranularitySpec: &GranularitySpec{
Type: "uniform",
SegmentGranularity: "DAY",
QueryGranularity: "none",
QueryGranularity: Granularity{"none"},
Rollup: false,
},
},
Expand Down Expand Up @@ -543,7 +557,7 @@ func SetTimestampColumn(column string) IngestionSpecOptions {
}

// SetGranularitySpec sets granularity spec settings that are applied at druid ingestion partitioning stage.
func SetGranularitySpec(segmentGranularity string, queryGranularity QueryGranularity, rollup bool) IngestionSpecOptions {
func SetGranularitySpec(segmentGranularity string, queryGranularity Granularity, rollup bool) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.DataSchema.GranularitySpec = &GranularitySpec{
Type: "uniform",
Expand Down Expand Up @@ -571,3 +585,41 @@ func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string)
}
}
}

func (g *Granularity) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err == nil {
g.Value = str
return nil
}

var qg QueryGranularity
if err := json.Unmarshal(b, &qg); err == nil {
g.Value = qg
return nil
}
return fmt.Errorf("unsupported query granularity: %s", b)
}

func (g *Granularity) MarshalJSON() ([]byte, error) {
return json.Marshal(&g.Value)
}

func (g *SingleDimension) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err == nil {
g.Value = str
return nil
}

var qg Dimension
if err := json.Unmarshal(b, &qg); err == nil {
g.Value = qg
return nil
}
return fmt.Errorf("unsupported dimension value: %s", b)
}

func (g *SingleDimension) MarshalJSON() ([]byte, error) {
return json.Marshal(&g.Value)
}
22 changes: 15 additions & 7 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ func TestKafkaIngestionSpec(t *testing.T) {
{
name: "set labels",
options: []IngestionSpecOptions{
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{
{"ts"},
{"user_name"},
{"payload"},
}),
},
expected: func() *InputIngestionSpec {
out := defaultKafkaIngestionSpec()
out.DataSchema.DimensionsSpec.Dimensions = []any{"ts", "user_name", "payload"}
out.DataSchema.DimensionsSpec.Dimensions = DimensionSet{{"ts"}, {"user_name"}, {"payload"}}
return out
}(),
},
Expand Down Expand Up @@ -100,7 +104,11 @@ func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{
{"ts"},
{"user_name"},
{"payload"},
}),
)
actual, err := json.Marshal(spec)
if err != nil {
Expand Down Expand Up @@ -167,9 +175,9 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
SetDimensions(DimensionSet{
{Dimension{Type: "string", Name: "ts"}},
{Dimension{Type: "json", Name: "payload"}},
}),
)
actual, err := json.Marshal(spec)
Expand Down Expand Up @@ -238,7 +246,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetDimensions(DimensionSet{{"ts"}, {"user_name"}, {"payload"}}),
SetSQLInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
Expand Down

0 comments on commit fa66989

Please sign in to comment.