Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sql input source config and relevant IngestionSpecOptions #10

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions supervisor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,19 @@ type HttpInputSourceConfig struct {
AllowedProtocols []string `json:" allowedProtocols,omitempty"`
}

// ConnectorConfig is connection configuration for Database.
type ConnectorConfig struct {
ConnectURI string `json:"connectURI"`
User string `json:"user"`
Password string `json:"password"`
}

// Database configuration for InputSource "sql".
type Database struct {
Type string `json:"type"`
ConnectorConfig *ConnectorConfig `json:"connectorConfig"`
}

// InputSource is a specification of the storage system where input data is stored.
type InputSource struct {
Type string `json:"type"`
Expand All @@ -322,6 +335,10 @@ type InputSource struct {

// CombiningInputSource fields
Delegates []InputSource `json:"delegates,omitempty"`

// SqlInputSource
SQLs []string `json:"sqls,omitempty"`
Database *Database `json:"database,omitempty"`
}

// TransformSpec is responsible for transforming druid input data
Expand Down Expand Up @@ -428,6 +445,15 @@ func SetType(stype string) IngestionSpecOptions {
}
}

// SetIOConfigType sets the type of the supervisor IOConfig.
func SetIOConfigType(ioctype string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
if ioctype != "" {
spec.IOConfig.Type = ioctype
}
}
}

// SetTopic sets the Kafka topic to consume data from.
func SetTopic(topic string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
Expand Down Expand Up @@ -519,3 +545,21 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool
}
}
}

// SetSqlInputSource configures sql input source.
func SetSqlInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.InputSource = &InputSource{
Type: "sql",
SQLs: sqls,
Database: &Database{
Type: dbType,
ConnectorConfig: &ConnectorConfig{
ConnectURI: connectURI,
User: user,
Password: password,
},
},
}
}
}
151 changes: 115 additions & 36 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,25 @@ var jsonBasic = `{
}`

func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
t.Run("jsonBasic", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
})
var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

var jsonWithTypedDimensions = `{
Expand Down Expand Up @@ -165,21 +163,102 @@ var jsonWithTypedDimensions = `{
}`

func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
t.Run("jsonWithTypedDimensions", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
}

var jsonWithSqlInputSource = `{
tomasz-h2o marked this conversation as resolved.
Show resolved Hide resolved
"type": "index_parallel",
"dataSchema": {
"dataSource": "test_datasource",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"transformSpec": {
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
"ts",
"user_name",
"payload"
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "sql",
"sqls": [
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"
],
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://host:port/schema",
"user": "username",
"password": "password"
}
}
},
"consumerProperties": {},
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
},
"inputFormat": {
"type": "json"
}
}
}`

func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
spec := NewIngestionSpec(
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetSqlInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
"password",
[]string{
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
})
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithSqlInputSource)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}
Loading