From c00829bfc08c37042915f66334d09b8ca61785d5 Mon Sep 17 00:00:00 2001 From: Victor Zaytsev <94850767+vzaytsev1981@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:48:47 -0500 Subject: [PATCH] add sql input source config and relevant IngestionSpecOptions (#10) --- supervisor_types.go | 44 ++++++++++++ supervisor_types_test.go | 151 +++++++++++++++++++++++++++++---------- 2 files changed, 159 insertions(+), 36 deletions(-) diff --git a/supervisor_types.go b/supervisor_types.go index 576b571..d6fb624 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -302,6 +302,19 @@ type HttpInputSourceConfig struct { AllowedProtocols []string `json:" allowedProtocols,omitempty"` } +// ConnectorConfig is connection configuration for Database. +type ConnectorConfig struct { + ConnectURI string `json:"connectURI"` + User string `json:"user"` + Password string `json:"password"` +} + +// Database configuration for InputSource "sql". +type Database struct { + Type string `json:"type"` + ConnectorConfig *ConnectorConfig `json:"connectorConfig"` +} + // InputSource is a specification of the storage system where input data is stored. type InputSource struct { Type string `json:"type"` @@ -322,6 +335,10 @@ type InputSource struct { // CombiningInputSource fields Delegates []InputSource `json:"delegates,omitempty"` + + // SqlInputSource + SQLs []string `json:"sqls,omitempty"` + Database *Database `json:"database,omitempty"` } // TransformSpec is responsible for transforming druid input data @@ -428,6 +445,15 @@ func SetType(stype string) IngestionSpecOptions { } } +// SetIOConfigType sets the type of the supervisor IOConfig. +func SetIOConfigType(ioctype string) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + if ioctype != "" { + spec.IOConfig.Type = ioctype + } + } +} + // SetTopic sets the Kafka topic to consume data from. func SetTopic(topic string) IngestionSpecOptions { return func(spec *InputIngestionSpec) { @@ -519,3 +545,21 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool } } } + +// SetSQLInputSource configures sql input source. +func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions { + return func(spec *InputIngestionSpec) { + spec.IOConfig.InputSource = &InputSource{ + Type: "sql", + SQLs: sqls, + Database: &Database{ + Type: dbType, + ConnectorConfig: &ConnectorConfig{ + ConnectURI: connectURI, + User: user, + Password: password, + }, + }, + } + } +} diff --git a/supervisor_types_test.go b/supervisor_types_test.go index 360d837..80a9716 100644 --- a/supervisor_types_test.go +++ b/supervisor_types_test.go @@ -96,27 +96,25 @@ var jsonBasic = `{ }` func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) { - t.Run("jsonBasic", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{"ts", "user_name", "payload"}), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonBasic) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{"ts", "user_name", "payload"}), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonBasic) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - var checkSpec *InputIngestionSpec - err = json.Unmarshal(actual, &checkSpec) - if err != nil { - t.Fatalf("unexpected error while unmarshalling: %v", err) - } - require.Equal(t, spec, checkSpec) - }) + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) } var jsonWithTypedDimensions = `{ @@ -165,21 +163,102 @@ var jsonWithTypedDimensions = `{ }` func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) { - t.Run("jsonWithTypedDimensions", func(t *testing.T) { - spec := NewIngestionSpec( - SetDataSource("test_datasource"), - SetTopic("test_topic"), - SetBrokers("test_brokers"), - SetDimensions([]any{ - Dimension{Type: "string", Name: "ts"}, - Dimension{Type: "json", Name: "payload"}, + spec := NewIngestionSpec( + SetDataSource("test_datasource"), + SetTopic("test_topic"), + SetBrokers("test_brokers"), + SetDimensions([]any{ + Dimension{Type: "string", Name: "ts"}, + Dimension{Type: "json", Name: "payload"}, + }), + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithTypedDimensions) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) +} + +var jsonWithSqlInputSource = `{ + "type": "index_parallel", + "dataSchema": { + "dataSource": "test_datasource", + "timestampSpec": { + "column": "ts", + "format": "auto" + }, + "transformSpec": { + "transforms": [] + }, + "dimensionsSpec": { + "dimensions": [ + "ts", + "user_name", + "payload" + ] + }, + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "none" + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "sql", + "sqls": [ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'" + ], + "database": { + "type": "mysql", + "connectorConfig": { + "connectURI": "jdbc:mysql://host:port/schema", + "user": "username", + "password": "password" + } + } + }, + "consumerProperties": {}, + "taskDuration": "PT1H", + "useEarliestOffset": false, + "flattenSpec": { + "fields": [] + }, + "inputFormat": { + "type": "json" + } + } +}` + +func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) { + spec := NewIngestionSpec( + SetType("index_parallel"), + SetIOConfigType("index_parallel"), + SetDataSource("test_datasource"), + SetDimensions([]any{"ts", "user_name", "payload"}), + SetSQLInputSource("mysql", + "jdbc:mysql://host:port/schema", + "username", + "password", + []string{ + "SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", + "SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'", }), - ) - actual, err := json.MarshalIndent(spec, "", " ") - if err != nil { - t.Fatalf("unexpected error while marshalling: %v", err) - } - expected := []byte(jsonWithTypedDimensions) - require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) - }) + ) + actual, err := json.Marshal(spec) + if err != nil { + t.Fatalf("unexpected error while marshalling: %v", err) + } + expected := []byte(jsonWithSqlInputSource) + require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual))) + + var checkSpec *InputIngestionSpec + err = json.Unmarshal(actual, &checkSpec) + if err != nil { + t.Fatalf("unexpected error while unmarshalling: %v", err) + } + require.Equal(t, spec, checkSpec) }