Skip to content

Commit

Permalink
add sql input source config and relevant IngestionSpecOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Nov 30, 2023
1 parent b0fa8ee commit 09d4260
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
44 changes: 44 additions & 0 deletions supervisor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,19 @@ type HttpInputSourceConfig struct {
AllowedProtocols []string `json:" allowedProtocols,omitempty"`
}

// ConnectorConfig is connection configuration for Database
type ConnectorConfig struct {
ConnectURI string `json:"connectURI"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
}

// Database configuration for InputSource "sql"
type Database struct {
Type string `json:"type"`
ConnectorConfig *ConnectorConfig `json:"connectorConfig"`
}

// InputSource is a specification of the storage system where input data is stored.
type InputSource struct {
Type string `json:"type"`
Expand All @@ -322,6 +335,10 @@ type InputSource struct {

// CombiningInputSource fields
Delegates []InputSource `json:"delegates,omitempty"`

// SqlInputSource
SQLs []string `json:"sqls,omitempty"`
Database *Database `json:"database,omitempty"`
}

// TransformSpec is responsible for transforming druid input data
Expand Down Expand Up @@ -428,6 +445,15 @@ func SetType(stype string) IngestionSpecOptions {
}
}

// SetType sets the type of the supervisor (IOConfig).
func SetIOConfigType(ioctype string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
if ioctype != "" {
spec.IOConfig.Type = ioctype
}
}
}

// SetTopic sets the Kafka topic to consume data from.
func SetTopic(topic string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
Expand Down Expand Up @@ -519,3 +545,21 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool
}
}
}

// SetSqlInputSource configures input source dimensions.
func SetSqlInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.InputSource = &InputSource{
Type: "sql",
SQLs: sqls,
Database: &Database{
Type: dbType,
ConnectorConfig: &ConnectorConfig{
ConnectURI: connectURI,
User: user,
Password: password,
},
},
}
}
}
85 changes: 85 additions & 0 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,88 @@ func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
})
}

var jsonWithSqlInputSource = `{
"type": "index_parallel",
"dataSchema": {
"dataSource": "test_datasource",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"transformSpec": {
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
"ts",
"user_name",
"payload"
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "sql",
"sqls": [
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"
],
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://host:port/schema",
"user": "username",
"password": "password"
}
}
},
"consumerProperties": {},
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
},
"inputFormat": {
"type": "json"
}
}
}`

func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
t.Run("jsonBasicWithSq", func(t *testing.T) {
spec := NewIngestionSpec(
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetSqlInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
"password",
[]string{
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithSqlInputSource)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
})
}

0 comments on commit 09d4260

Please sign in to comment.