From 459f99c1e80321f7ffa43669b8d3699c5f3c3b22 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Thu, 30 Nov 2023 08:59:00 -0500 Subject: [PATCH] address CR comments --- supervisor_types.go | 12 ++-- supervisor_types_test.go | 124 +++++++++++++++++++-------------------- 2 files changed, 65 insertions(+), 71 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index 7a562ed..b600201 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -302,14 +302,14 @@ type HttpInputSourceConfig struct { AllowedProtocols []string `json:" allowedProtocols,omitempty"` } -// ConnectorConfig is connection configuration for Database +// ConnectorConfig is connection configuration for Database. type ConnectorConfig struct { ConnectURI string `json:"connectURI"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` + User string `json:"user"` + Password string `json:"password"` } -// Database configuration for InputSource "sql" +// Database configuration for InputSource "sql". type Database struct { Type string `json:"type"` ConnectorConfig *ConnectorConfig `json:"connectorConfig"` @@ -445,7 +445,7 @@ func SetType(stype string) IngestionSpecOptions { } } -// SetType sets the type of the supervisor (IOConfig). +// SetIOConfigType sets the type of the supervisor IOConfig. func SetIOConfigType(ioctype string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { if ioctype != "" { @@ -546,7 +546,7 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool } } -// SetSqlInputSource configures input source dimensions. +// SetSqlInputSource configures sql input source. func SetSqlInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.IOConfig.InputSource = &InputSource{ diff --git a/supervisor_types_test.go b/supervisor_types_test.go index aa7981f..015a02c 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -96,27 +96,25 @@ var jsonBasic = `{ }` func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { - t.Run("jsonBasic", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{"ts", "user_name", "payload"}), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonBasic) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{"ts", "user_name", "payload"}), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonBasic) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - var checkSpec *InputIngestionSpec - err = json.Unmarshal(actual, &checkSpec) - if err != nil { - t.Fatalf("unexpected error while unmarshalling: %v", err) - } - require.Equal(t, spec, checkSpec) - }) + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) } var jsonWithTypedDimensions = `{ @@ -165,23 +163,21 @@ var jsonWithTypedDimensions = `{ }` func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { - t.Run("jsonWithTypedDimensions", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{ - Dimension{Type: "string", Name: "ts"}, - Dimension{Type: "json", Name: "payload"}, - }), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonWithTypedDimensions) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - }) + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{ + Dimension{Type: "string", Name: "ts"}, + Dimension{Type: "json", Name: "payload"}, + }), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithTypedDimensions) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) } var jsonWithSqlInputSource = `{ @@ -238,33 +234,31 @@ var jsonWithSqlInputSource = `{ }` func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { - t.Run("jsonBasicWithSq", func(t *testing.T) { - spec := NewIngestionSpec( - SetType("index_parallel"), - SetIOConfigType("index_parallel"), - SetDataSource("test_datasource"), - SetDimensions([]any{"ts", "user_name", "payload"}), - SetSqlInputSource("mysql", - "jdbc:mysql://host:port/schema", - "username", - "password", - []string{ - "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", - "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", - }), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonWithSqlInputSource) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + spec := NewIngestionSpec( + SetType("index_parallel"), + SetIOConfigType("index_parallel"), + SetDataSource("test_datasource"), + SetDimensions([]any{"ts", "user_name", "payload"}), + SetSqlInputSource("mysql", + "jdbc:mysql://host:port/schema", + "username", + "password", + []string{ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + }), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithSqlInputSource) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - var checkSpec *InputIngestionSpec - err = json.Unmarshal(actual, &checkSpec) - if err != nil { - t.Fatalf("unexpected error while unmarshalling: %v", err) - } - require.Equal(t, spec, checkSpec) - }) + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) }