Skip to content

Commit

Permalink
Address CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts committed Dec 13, 2023
1 parent 2bf2f85 commit 920e3b0
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 16 deletions.
2 changes: 1 addition & 1 deletion sql/datasource_available.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
SELECT count(*) cnt
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME=?
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME=?
2 changes: 1 addition & 1 deletion sql/datasource_records.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
SELECT count(*) cnt
FROM "${{ datasource }}" ds
FROM "${{ datasource }}" ds
29 changes: 15 additions & 14 deletions task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec {
Spec: &IngestionSpecData{
DataSchema: &DataSchema{
DataSource: "some_datasource",
TimeStampSpec: &TimestampSpec{
Column: "ts",
Format: "auto",
},
GranularitySpec: &GranularitySpec{
Type: "uniform",
SegmentGranularity: "DAY",
Expand All @@ -63,16 +59,9 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec {
IOConfig: &IOConfig{
Type: "index_parallel",
InputSource: &InputSource{
Type: "sql",
Database: &Database{
Type: "postgresql",
ConnectorConfig: &ConnectorConfig{
ConnectURI: "jdbc:postgresql://host:port/schema",
User: "user",
Password: "password",
},
},
SQLs: []string{},
Type: "sql",
Database: &Database{},
SQLs: []string{},
},
InputFormat: &InputFormat{
Type: "json",
Expand All @@ -98,6 +87,18 @@ func SetTaskType(stype string) TaskIngestionSpecOptions {
}
}

// SetTaskTimestampColumn sets the type of the task IOConfig.
func SetTaskTimestampColumn(column string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
if column != "" {
spec.Spec.DataSchema.TimeStampSpec = &TimestampSpec{
Column: column,
Format: "auto",
}
}
}
}

// SetTaskDataSource sets the destination datasource of the task IOConfig.
func SetTaskDataSource(datasource string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
Expand Down
1 change: 1 addition & 0 deletions tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TriggerIngestionTask[T any](d *Client, dataSourceName string, entries []T)

var spec = NewTaskIngestionSpec(
SetTaskType("index_parallel"),
SetTaskTimestampColumn("ts"),
SetTaskDataSource(dataSourceName),
SetTaskTuningConfig("index_parallel", 25000, 5000000),
SetTaskIOConfigType("index_parallel"),
Expand Down

0 comments on commit 920e3b0

Please sign in to comment.