From 09d42605e86f2cef024b77259dee3da0481aafd5 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Wed, 29 Nov 2023 22:04:19 -0500 Subject: [PATCH 1/3] add sql input source config and relevant IngestionSpecOptions --- supervisor_types.go | 44 +++++++++++++++++++++ supervisor_types_test.go | 85 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/supervisor_types.go b/supervisor_types.go index 576b571..7a562ed 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -302,6 +302,19 @@ type HttpInputSourceConfig struct { AllowedProtocols []string `json:" allowedProtocols,omitempty"` } +// ConnectorConfig is connection configuration for Database +type ConnectorConfig struct { + ConnectURI string `json:"connectURI"` + User string `json:"user,omitempty"` + Password string `json:"password,omitempty"` +} + +// Database configuration for InputSource "sql" +type Database struct { + Type string `json:"type"` + ConnectorConfig *ConnectorConfig `json:"connectorConfig"` +} + // InputSource is a specification of the storage system where input data is stored. type InputSource struct { Type string `json:"type"` @@ -322,6 +335,10 @@ type InputSource struct { // CombiningInputSource fields Delegates []InputSource `json:"delegates,omitempty"` + + // SqlInputSource + SQLs []string `json:"sqls,omitempty"` + Database *Database `json:"database,omitempty"` } // TransformSpec is responsible for transforming druid input data @@ -428,6 +445,15 @@ func SetType(stype string) IngestionSpecOptions { } } +// SetType sets the type of the supervisor (IOConfig). +func SetIOConfigType(ioctype string) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + if ioctype != "" { + spec.IOConfig.Type = ioctype + } + } +} + // SetTopic sets the Kafka topic to consume data from. func SetTopic(topic string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { @@ -519,3 +545,21 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool } } } + +// SetSqlInputSource configures input source dimensions. +func SetSqlInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + spec.IOConfig.InputSource = &InputSource{ + Type: "sql", + SQLs: sqls, + Database: &Database{ + Type: dbType, + ConnectorConfig: &ConnectorConfig{ + ConnectURI: connectURI, + User: user, + Password: password, + }, + }, + } + } +} diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 360d837..aa7981f 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -183,3 +183,88 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) }) } + +var jsonWithSqlInputSource = `{ + "type": "index_parallel", + "dataSchema": { + "dataSource": "test_datasource", + "timestampSpec": { + "column": "ts", + "format": "auto" + }, + "transformSpec": { + "transforms": [] + }, + "dimensionsSpec": { + "dimensions": [ + "ts", + "user_name", + "payload" + ] + }, + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "none" + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "sql", + "sqls": [ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'" + ], + "database": { + "type": "mysql", + "connectorConfig": { + "connectURI": "jdbc:mysql://host:port/schema", + "user": "username", + "password": "password" + } + } + }, + "consumerProperties": {}, + "taskDuration": "PT1H", + "useEarliestOffset": false, + "flattenSpec": { + "fields": [] + }, + "inputFormat": { + "type": "json" + } + } +}` + +func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { + t.Run("jsonBasicWithSq", func(t *testing.T) { + spec := NewIngestionSpec( + SetType("index_parallel"), + SetIOConfigType("index_parallel"), + SetDataSource("test_datasource"), + SetDimensions([]any{"ts", "user_name", "payload"}), + SetSqlInputSource("mysql", + "jdbc:mysql://host:port/schema", + "username", + "password", + []string{ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + }), + ) + actual, err := json.MarshalIndent(spec, "", " ") + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithSqlInputSource) + require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) + }) +} From 459f99c1e80321f7ffa43669b8d3699c5f3c3b22 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Thu, 30 Nov 2023 08:59:00 -0500 Subject: [PATCH 2/3] address CR comments --- supervisor_types.go | 12 ++-- supervisor_types_test.go | 124 +++++++++++++++++++-------------------- 2 files changed, 65 insertions(+), 71 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index 7a562ed..b600201 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -302,14 +302,14 @@ type HttpInputSourceConfig struct { AllowedProtocols []string `json:" allowedProtocols,omitempty"` } -// ConnectorConfig is connection configuration for Database +// ConnectorConfig is connection configuration for Database. type ConnectorConfig struct { ConnectURI string `json:"connectURI"` - User string `json:"user,omitempty"` - Password string `json:"password,omitempty"` + User string `json:"user"` + Password string `json:"password"` } -// Database configuration for InputSource "sql" +// Database configuration for InputSource "sql". type Database struct { Type string `json:"type"` ConnectorConfig *ConnectorConfig `json:"connectorConfig"` @@ -445,7 +445,7 @@ func SetType(stype string) IngestionSpecOptions { } } -// SetType sets the type of the supervisor (IOConfig). +// SetIOConfigType sets the type of the supervisor IOConfig. func SetIOConfigType(ioctype string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { if ioctype != "" { @@ -546,7 +546,7 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool } } -// SetSqlInputSource configures input source dimensions. +// SetSqlInputSource configures sql input source. func SetSqlInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.IOConfig.InputSource = &InputSource{ diff --git a/supervisor_types_test.go b/supervisor_types_test.go index aa7981f..015a02c 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -96,27 +96,25 @@ var jsonBasic = `{ }` func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { - t.Run("jsonBasic", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{"ts", "user_name", "payload"}), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonBasic) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{"ts", "user_name", "payload"}), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonBasic) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - var checkSpec *InputIngestionSpec - err = json.Unmarshal(actual, &checkSpec) - if err != nil { - t.Fatalf("unexpected error while unmarshalling: %v", err) - } - require.Equal(t, spec, checkSpec) - }) + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) } var jsonWithTypedDimensions = `{ @@ -165,23 +163,21 @@ var jsonWithTypedDimensions = `{ }` func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { - t.Run("jsonWithTypedDimensions", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{ - Dimension{Type: "string", Name: "ts"}, - Dimension{Type: "json", Name: "payload"}, - }), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonWithTypedDimensions) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - }) + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{ + Dimension{Type: "string", Name: "ts"}, + Dimension{Type: "json", Name: "payload"}, + }), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithTypedDimensions) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) } var jsonWithSqlInputSource = `{ @@ -238,33 +234,31 @@ var jsonWithSqlInputSource = `{ }` func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { - t.Run("jsonBasicWithSq", func(t *testing.T) { - spec := NewIngestionSpec( - SetType("index_parallel"), - SetIOConfigType("index_parallel"), - SetDataSource("test_datasource"), - SetDimensions([]any{"ts", "user_name", "payload"}), - SetSqlInputSource("mysql", - "jdbc:mysql://host:port/schema", - "username", - "password", - []string{ - "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", - "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", - }), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonWithSqlInputSource) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + spec := NewIngestionSpec( + SetType("index_parallel"), + SetIOConfigType("index_parallel"), + SetDataSource("test_datasource"), + SetDimensions([]any{"ts", "user_name", "payload"}), + SetSqlInputSource("mysql", + "jdbc:mysql://host:port/schema", + "username", + "password", + []string{ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + }), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithSqlInputSource) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - var checkSpec *InputIngestionSpec - err = json.Unmarshal(actual, &checkSpec) - if err != nil { - t.Fatalf("unexpected error while unmarshalling: %v", err) - } - require.Equal(t, spec, checkSpec) - }) + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) } From 8f48a3dae048ebb59fb9172999334a2384a91375 Mon Sep 17 00:00:00 2001 From: vzaytsev Date: Thu, 30 Nov 2023 09:46:46 -0500 Subject: [PATCH 3/3] address CR comment --- supervisor_types.go | 4 ++-- supervisor_types_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index b600201..d6fb624 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -546,8 +546,8 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool } } -// SetSqlInputSource configures sql input source. -func SetSqlInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { +// SetSQLInputSource configures sql input source. +func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { spec.IOConfig.InputSource = &InputSource{ Type: "sql", diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 015a02c..80a9716 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -239,7 +239,7 @@ func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { SetIOConfigType("index_parallel"), SetDataSource("test_datasource"), SetDimensions([]any{"ts", "user_name", "payload"}), - SetSqlInputSource("mysql", + SetSQLInputSource("mysql", "jdbc:mysql://host:port/schema", "username", "password",