Skip to content

Commit

Permalink
address CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Nov 30, 2023
1 parent 09d4260 commit 459f99c
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 71 deletions.
12 changes: 6 additions & 6 deletions supervisor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,14 @@ type HttpInputSourceConfig struct {
AllowedProtocols []string `json:" allowedProtocols,omitempty"`
}

// ConnectorConfig is connection configuration for Database
// ConnectorConfig is connection configuration for Database.
type ConnectorConfig struct {
ConnectURI string `json:"connectURI"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
User string `json:"user"`
Password string `json:"password"`
}

// Database configuration for InputSource "sql"
// Database configuration for InputSource "sql".
type Database struct {
Type string `json:"type"`
ConnectorConfig *ConnectorConfig `json:"connectorConfig"`
Expand Down Expand Up @@ -445,7 +445,7 @@ func SetType(stype string) IngestionSpecOptions {
}
}

// SetType sets the type of the supervisor (IOConfig).
// SetIOConfigType sets the type of the supervisor IOConfig.
func SetIOConfigType(ioctype string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
if ioctype != "" {
Expand Down Expand Up @@ -546,7 +546,7 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool
}
}

// SetSqlInputSource configures input source dimensions.
// SetSqlInputSource configures sql input source.
func SetSqlInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.InputSource = &InputSource{
Expand Down
124 changes: 59 additions & 65 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,25 @@ var jsonBasic = `{
}`

func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
t.Run("jsonBasic", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
})
var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

var jsonWithTypedDimensions = `{
Expand Down Expand Up @@ -165,23 +163,21 @@ var jsonWithTypedDimensions = `{
}`

func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
t.Run("jsonWithTypedDimensions", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
})
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
}

var jsonWithSqlInputSource = `{
Expand Down Expand Up @@ -238,33 +234,31 @@ var jsonWithSqlInputSource = `{
}`

func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
t.Run("jsonBasicWithSq", func(t *testing.T) {
spec := NewIngestionSpec(
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetSqlInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
"password",
[]string{
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithSqlInputSource)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
spec := NewIngestionSpec(
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetSqlInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
"password",
[]string{
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithSqlInputSource)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
})
var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

0 comments on commit 459f99c

Please sign in to comment.