Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP: extend tasks apis with tests; refactor to extract common spec types
Browse files Browse the repository at this point in the history
vzayts committed Dec 8, 2023
1 parent d709f05 commit abb6448
Showing 6 changed files with 440 additions and 368 deletions.
325 changes: 325 additions & 0 deletions common_spec_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
package druid

// BitmapFactory is a field of IndexSpec.
type BitmapFactory struct {
Type string `json:"type"`
}

// StringEncodingStrategy type for specifying string encoding at indexing stage.
type StringEncodingStrategy struct {
Type string `json:"type"`
// FrontCoded fields
BucketSize int `json:"bucketSize,omitempty"`
FormatVersion int `json:"formatVersion,omitempty"`
}

// IndexSpec defines segment storage format options to be used at indexing time.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#indexspec
type IndexSpec struct {
Bitmap *BitmapFactory `json:"bitmap,omitempty"`
DimensionCompression string `json:"dimensionCompression"`
StringEncodingStrategy *StringEncodingStrategy `json:"stringEncodingStrategy,omitempty"`
MetricCompression string `json:"metricCompression,omitempty"`
LongEncoding string `json:"longEncoding,omitempty"`
JsonCompression string `json:"jsonCompression,omitempty"`
SegmentLoader string `json:"segmentLoader,omitempty"`
}

// TuningConfig controls various tuning parameters specific to each ingestion method.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#tuningconfig
type TuningConfig struct {
Type string `json:"type"`
IntermediatePersistPeriod string `json:"intermediatePersistPeriod,omitempty"`
MaxRowsPerSegment int `json:"maxRowsPerSegment,omitempty"`
MaxRowsInMemory int `json:"maxRowsInMemory,omitempty"`
IndexSpecForIntermediatePersists *IndexSpec `json:"indexSpecForIntermediatePersists,omitempty"`
}

// Metric is a Druid aggregator that is applied at ingestion time.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#metricsspec
type Metric struct {
Name string `json:"name"`
Type string `json:"type"`
FieldName string `json:"fieldName"`
}

// DataSchema represents the Druid dataSchema spec.
type DataSchema struct {
DataSource string `json:"dataSource"`
Parser string `json:"parser,omitempty"`
TimeStampSpec *TimestampSpec `json:"timestampSpec,omitempty"`
TransformSpec *TransformSpec `json:"transformSpec,omitempty"`
DimensionsSpec *DimensionsSpec `json:"dimensionsSpec,omitempty"`
GranularitySpec *GranularitySpec `json:"granularitySpec,omitempty"`
MetricSpec []Metric `json:"metricSpec,omitempty"`
}

// FlattenSpec is responsible for flattening nested input JSON data into Druid's flat data model.
type FlattenSpec struct {
UseFieldDiscovery bool `json:"useFieldDiscovery,omitempty"`
Fields FieldList `json:"fields"`
}

// TimestampSpec is responsible for configuring the primary timestamp.
type TimestampSpec struct {
Column string `json:"column"`
Format string `json:"format"`
}

// FieldList is a list of Fields for ingestion FlattenSpec.
type FieldList []Field

// Field defines a single filed configuration of the FlattenSpec.
type Field struct {
Type string `json:"type"`
Name string `json:"name"`
Expr string `json:"expr"`
}

// Transform defines a single filed transformation of the TransformSpec.
type Transform struct {
Type string `json:"type"`
Name string `json:"name"`
Expr string `json:"expression"`
}

// SpatialDimension represents single spatial dimension datum.
// https://druid.apache.org/docs/latest/querying/geo/#spatial-indexing
type SpatialDimension struct {
DimensionName string `json:"dimName"`
Dimensions []string `json:"dims,omitempty"`
}

// TransformSet is a unique set of transforms applied to the input.
type TransformSet []Transform

// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []any

// Dimension is a typed definition of a datasource dimension.
type Dimension struct {
Type string `json:"type"`
Name string `json:"name"`
}

// SpatialDimensionSet is a unique set of druid datasource spatial dimensions.
type SpatialDimensionSet []SpatialDimension

// DimensionExclusionsSet represents set of excluded dimensions.
type DimensionExclusionsSet []string

// DimensionsSpec is responsible for configuring Druid's dimensions. They're a
// set of columns in Druid's data model that can be used for grouping, filtering
// or applying aggregations.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec
type DimensionsSpec struct {
Dimensions DimensionSet `json:"dimensions,omitempty"`
DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"`
SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"`
IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"`
UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"`
}
type QueryGranularity struct {
Type string `json:"type,omitempty"`
}

// GranularitySpec allows for configuring operations such as data segment
// partitioning, truncating timestamps, time chunk segmentation or roll-up.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec
type GranularitySpec struct {
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
// TODO: this field is problematic as depending on value druid returns string or object
QueryGranularity any `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup"`
Intervals []string `json:"intervals,omitempty"`
}

// AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling.
type AutoScalerConfig struct {
EnableTaskAutoScaler bool `json:"enableTaskAutoScaler"`
LagCollectionIntervalMillis int `json:"lagCollectionIntervalMillis"`
LagCollectionRangeMillis int `json:"lagCollectionRangeMillis"`
ScaleOutThreshold int `json:"scaleOutThreshold"`
TriggerScaleOutFractionThreshold float64 `json:"triggerScaleOutFractionThreshold"`
ScaleInThreshold int `json:"scaleInThreshold"`
TriggerScaleInFractionThreshold float64 `json:"triggerScaleInFractionThreshold"`
ScaleActionStartDelayMillis int `json:"scaleActionStartDelayMillis"`
ScaleActionPeriodMillis int `json:"scaleActionPeriodMillis"`
TaskCountMax int `json:"taskCountMax"`
TaskCountMin int `json:"taskCountMin"`
ScaleInStep int `json:"scaleInStep"`
ScaleOutStep int `json:"scaleOutStep"`
MinTriggerScaleActionFrequencyMillis int `json:"minTriggerScaleActionFrequencyMillis"`
}

// IdleConfig defines if and when stream Supervisor can become idle.
type IdleConfig struct {
Enabled bool `json:"enabled"`
InactiveAfterMillis int64 `json:"inactiveAfterMillis"`
}

// Firehose is an IOConfig firehose configuration.
type Firehose struct {
Type string `json:"type,omitempty"`

// EventReceiverFirehoseFactory fields
ServiceName string `json:"serviceName,omitempty"`
BufferSize int `json:"bufferSize,omitempty"`
MaxIdleTime int64 `json:"maxIdleTime,omitempty"`

// FixedCountFirehoseFactory / ClippedFirehoseFactory / TimedShutoffFirehoseFactory fields
Delegate []Firehose `json:"delegate,omitempty"`
Count int `json:"count,omitempty"`
Interval string `json:"interval,omitempty"`
ShutoffTime string `json:"shutoffTime,omitempty"`
}

// CompactionInputSpec is a specification for compaction task.
type CompactionInputSpec struct {
Type string `json:"type"`
// CompactionIntervalSpec fields
Interval string `json:"interval,omitempty"`
Sha256OfSortedSegmentIds string `json:"sha256OfSortedSegmentIds,omitempty"`
// SpecificSegmentsSpec fields
Segments []string `json:"segments,omitempty"`
}

// MetadataStorageUpdaterJobSpec is a specification of endpoint for HadoopIOConfig.
type MetadataStorageUpdaterJobSpec struct {
Type string `json:"type"`
ConnectURI string `json:"connectURI"`
User string `json:"user"`
Password string `json:"password"`
SegmentTable string `json:"segmentTable"`
CreteTable bool `json:"creteTable"`
Host string `json:"host"`
Port string `json:"port"`
DBCPProperties map[string]any `json:"dbcp"`
}

// IOConfig influences how data is read into Druid from a source system.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec/#ioconfig
type IOConfig struct {
Type string `json:"type,omitempty"`

// IndexIOConfig / RealtimeIOConfig shared field
Firehose *Firehose `json:"firehose,omitempty"`
// IndexIOConfig field
InputSource *InputSource `json:"inputSource,omitempty"`
AppendToExisting bool `json:"appendToExisting,omitempty"`
// IndexIOConfig / CompactionIOConfig shared field
DropExisting bool `json:"dropExisting,omitempty"`

// CompactionIOConfig / HadoopIOConfig fields
InputSpec map[string]any `json:"inputSpec,omitempty"`

// CompactionIOConfig field
AllowNonAlignedInterval bool `json:"allowNonAlignedInterval,omitempty"`

// HadoopIOConfig fields
MetadataUpdateSpec *MetadataStorageUpdaterJobSpec `json:"metadataUpdateSpec,omitempty"`
SegmentOutputPath string `json:"segmentOutputPath,omitempty"`

// KafkaIndexTaskIOConfig / KinesisIndexTaskIOConfig fields
Topic string `json:"topic,omitempty"`
ConsumerProperties *ConsumerProperties `json:"consumerProperties,omitempty"`
TaskDuration string `json:"taskDuration,omitempty"`
Replicas int `json:"replicas,omitempty"`
TaskCount int `json:"taskCount,omitempty"`
UseEarliestOffset bool `json:"useEarliestOffset"`
AutoScalerConfig *AutoScalerConfig `json:"autoScalerConfig,omitempty"`
TaskGroupID int `json:"taskGroupID,omitempty"`
BaseSequenceName string `json:"baseSequenceName,omitempty"`
CompletionTimeout string `json:"completionTimeout,omitempty"`
PollTimeout int `json:"pollTimeout,omitempty"`
StartDelay string `json:"startDelay,omitempty"`
Period string `json:"period,omitempty"`
Stream string `json:"stream,omitempty"`
UseEarliestSequenceNumber bool `json:"useEarliestSequenceNumber,omitempty"`

// common fields
FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"`
InputFormat *InputFormat `json:"inputFormat,omitempty"`
IdleConfig *IdleConfig `json:"idleConfig,omitempty"`
}

// ConsumerProperties is a set of properties that is passed to a specific
// consumer, i.e. Kafka consumer.
type ConsumerProperties struct {
BootstrapServers string `json:"bootstrap.servers,omitempty"`
}

// InputFormat specifies kafka messages format type and describes any conversions applied to
// the input data while parsing.
// Type can take values 'json', 'protobuf' or 'kafka'.
type InputFormat struct {
Type string `json:"type"`

// FlatTextInputFormat / DelimitedInputFormat fields
Delimiter string `json:"delimiter,omitempty"`
ListDelimiter string `json:"listDelimiter,omitempty"`
FindColumnsHeader string `json:"findColumnsHeader,omitempty"`
SkipHeaderRows int `json:"skipHeaderRows,omitempty"`
Columns []string `json:"columns,omitempty"`

// JsonInputFormat fields
FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"`
FeatureSpec map[string]bool `json:"featureSpec,omitempty"`

// Common CsvInputFormat / JsonInputFormat fields
KeepNullColumns bool `json:"keepNullColumns,omitempty"`
AssumeNewlineDelimited bool `json:"assumeNewlineDelimited,omitempty"`
UseJsonNodeReader bool `json:"useJsonNodeReader,omitempty"`
}

// HttpInputSourceConfig is a field of HttpInputSource specification.
type HttpInputSourceConfig struct {
AllowedProtocols []string `json:" allowedProtocols,omitempty"`
}

// ConnectorConfig is connection configuration for Database.
type ConnectorConfig struct {
ConnectURI string `json:"connectURI"`
User string `json:"user"`
Password string `json:"password"`
}

// Database configuration for InputSource "sql".
type Database struct {
Type string `json:"type"`
ConnectorConfig *ConnectorConfig `json:"connectorConfig"`
}

// InputSource is a specification of the storage system where input data is stored.
type InputSource struct {
Type string `json:"type"`

// LocalInputSource fields
BaseDir string `json:"baseDir,omitempty"`
Filter string `json:"filter,omitempty"`
Files []string `json:"files,omitempty"`

// HttpInputSource fields
URIs []string `json:"uris,omitempty"`
HttpAuthenticationUsername string `json:"httpAuthenticationUsername,omitempty"`
HttpAuthenticationPassword string `json:"httpAuthenticationPassword,omitempty"`
HttpSourceConfig *HttpInputSourceConfig `json:"config,omitempty"`

// InlineInputSource fields
Data string `json:"data,omitempty"`

// CombiningInputSource fields
Delegates []InputSource `json:"delegates,omitempty"`

// SqlInputSource
SQLs []string `json:"sqls,omitempty"`
Database *Database `json:"database,omitempty"`
}

// TransformSpec is responsible for transforming druid input data
// after it was read from kafka and after flattenSpec was applied.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#transformspec
type TransformSpec struct {
Transforms TransformSet `json:"transforms"`
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -6,8 +6,11 @@ toolchain go1.21.1

require (
github.com/davecgh/go-spew v1.1.1
github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a
github.com/google/go-querystring v1.1.0
github.com/google/uuid v1.3.1
github.com/hashicorp/go-retryablehttp v0.7.2
github.com/jmoiron/sqlx v1.3.5
github.com/magefile/mage v1.11.0
github.com/stretchr/testify v1.8.4
github.com/testcontainers/testcontainers-go v0.24.1
@@ -74,7 +77,6 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -250,6 +250,8 @@ github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a h1:RYfmiM0zluBJOiPDJseKLEN4BapJ42uSi9SZBQ2YyiA=
github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0=
@@ -423,6 +425,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v0.0.0-20150723085316-0dad96c0b94f/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
@@ -446,6 +449,7 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh
github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk=
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/mattn/go-sqlite3 v1.6.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
324 changes: 0 additions & 324 deletions supervisor_types.go
Original file line number Diff line number Diff line change
@@ -31,330 +31,6 @@ type SupervisorAuditHistory struct {
Version string `json:"version"`
}

// BitmapFactory is a field of IndexSpec.
type BitmapFactory struct {
Type string `json:"type"`
}

// StringEncodingStrategy type for specifying string encoding at indexing stage.
type StringEncodingStrategy struct {
Type string `json:"type"`
// FrontCoded fields
BucketSize int `json:"bucketSize,omitempty"`
FormatVersion int `json:"formatVersion,omitempty"`
}

// IndexSpec defines segment storage format options to be used at indexing time.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#indexspec
type IndexSpec struct {
Bitmap *BitmapFactory `json:"bitmap,omitempty"`
DimensionCompression string `json:"dimensionCompression"`
StringEncodingStrategy *StringEncodingStrategy `json:"stringEncodingStrategy,omitempty"`
MetricCompression string `json:"metricCompression,omitempty"`
LongEncoding string `json:"longEncoding,omitempty"`
JsonCompression string `json:"jsonCompression,omitempty"`
SegmentLoader string `json:"segmentLoader,omitempty"`
}

// TuningConfig controls various tuning parameters specific to each ingestion method.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#tuningconfig
type TuningConfig struct {
Type string `json:"type"`
IntermediatePersistPeriod string `json:"intermediatePersistPeriod,omitempty"`
MaxRowsPerSegment int `json:"maxRowsPerSegment,omitempty"`
MaxRowsInMemory int `json:"maxRowsInMemory,omitempty"`
IndexSpecForIntermediatePersists *IndexSpec `json:"indexSpecForIntermediatePersists"`
}

// Metric is a Druid aggregator that is applied at ingestion time.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#metricsspec
type Metric struct {
Name string `json:"name"`
Type string `json:"type"`
FieldName string `json:"fieldName"`
}

// DataSchema represents the Druid dataSchema spec.
type DataSchema struct {
DataSource string `json:"dataSource"`
Parser string `json:"parser,omitempty"`
TimeStampSpec *TimestampSpec `json:"timestampSpec,omitempty"`
TransformSpec *TransformSpec `json:"transformSpec,omitempty"`
DimensionsSpec *DimensionsSpec `json:"dimensionsSpec,omitempty"`
GranularitySpec *GranularitySpec `json:"granularitySpec,omitempty"`
MetricSpec []Metric `json:"metricSpec,omitempty"`
}

// FlattenSpec is responsible for flattening nested input JSON data into Druid's flat data model.
type FlattenSpec struct {
UseFieldDiscovery bool `json:"useFieldDiscovery,omitempty"`
Fields FieldList `json:"fields"`
}

// TimestampSpec is responsible for configuring the primary timestamp.
type TimestampSpec struct {
Column string `json:"column"`
Format string `json:"format"`
}

// FieldList is a list of Fields for ingestion FlattenSpec.
type FieldList []Field

// Field defines a single filed configuration of the FlattenSpec.
type Field struct {
Type string `json:"type"`
Name string `json:"name"`
Expr string `json:"expr"`
}

// Transform defines a single filed transformation of the TransformSpec.
type Transform struct {
Type string `json:"type"`
Name string `json:"name"`
Expr string `json:"expression"`
}

// SpatialDimension represents single spatial dimension datum.
// https://druid.apache.org/docs/latest/querying/geo/#spatial-indexing
type SpatialDimension struct {
DimensionName string `json:"dimName"`
Dimensions []string `json:"dims,omitempty"`
}

// TransformSet is a unique set of transforms applied to the input.
type TransformSet []Transform

// DimensionSet is a unique set of druid datasource dimensions(labels).
type DimensionSet []any

// Dimension is a typed definition of a datasource dimension.
type Dimension struct {
Type string `json:"type"`
Name string `json:"name"`
}

// SpatialDimensionSet is a unique set of druid datasource spatial dimensions.
type SpatialDimensionSet []SpatialDimension

// DimensionExclusionsSet represents set of excluded dimensions.
type DimensionExclusionsSet []string

// DimensionsSpec is responsible for configuring Druid's dimensions. They're a
// set of columns in Druid's data model that can be used for grouping, filtering
// or applying aggregations.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec
type DimensionsSpec struct {
Dimensions DimensionSet `json:"dimensions,omitempty"`
DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"`
SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"`
IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"`
UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"`
}
type QueryGranularity struct {
Type string `json:"type,omitempty"`
}

// GranularitySpec allows for configuring operations such as data segment
// partitioning, truncating timestamps, time chunk segmentation or roll-up.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec
type GranularitySpec struct {
Type string `json:"type"`
SegmentGranularity string `json:"segmentGranularity,omitempty"`
// TODO: this field is problematic as depending on value druid returns string or object
QueryGranularity any `json:"queryGranularity,omitempty"`
Rollup bool `json:"rollup,omitempty"`
Intervals []string `json:"intervals,omitempty"`
}

// AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling.
type AutoScalerConfig struct {
EnableTaskAutoScaler bool `json:"enableTaskAutoScaler"`
LagCollectionIntervalMillis int `json:"lagCollectionIntervalMillis"`
LagCollectionRangeMillis int `json:"lagCollectionRangeMillis"`
ScaleOutThreshold int `json:"scaleOutThreshold"`
TriggerScaleOutFractionThreshold float64 `json:"triggerScaleOutFractionThreshold"`
ScaleInThreshold int `json:"scaleInThreshold"`
TriggerScaleInFractionThreshold float64 `json:"triggerScaleInFractionThreshold"`
ScaleActionStartDelayMillis int `json:"scaleActionStartDelayMillis"`
ScaleActionPeriodMillis int `json:"scaleActionPeriodMillis"`
TaskCountMax int `json:"taskCountMax"`
TaskCountMin int `json:"taskCountMin"`
ScaleInStep int `json:"scaleInStep"`
ScaleOutStep int `json:"scaleOutStep"`
MinTriggerScaleActionFrequencyMillis int `json:"minTriggerScaleActionFrequencyMillis"`
}

// IdleConfig defines if and when stream Supervisor can become idle.
type IdleConfig struct {
Enabled bool `json:"enabled"`
InactiveAfterMillis int64 `json:"inactiveAfterMillis"`
}

// Firehose is an IOConfig firehose configuration.
type Firehose struct {
Type string `json:"type,omitempty"`

// EventReceiverFirehoseFactory fields
ServiceName string `json:"serviceName,omitempty"`
BufferSize int `json:"bufferSize,omitempty"`
MaxIdleTime int64 `json:"maxIdleTime,omitempty"`

// FixedCountFirehoseFactory / ClippedFirehoseFactory / TimedShutoffFirehoseFactory fields
Delegate []Firehose `json:"delegate,omitempty"`
Count int `json:"count,omitempty"`
Interval string `json:"interval,omitempty"`
ShutoffTime string `json:"shutoffTime,omitempty"`
}

// CompactionInputSpec is a specification for compaction task.
type CompactionInputSpec struct {
Type string `json:"type"`
// CompactionIntervalSpec fields
Interval string `json:"interval,omitempty"`
Sha256OfSortedSegmentIds string `json:"sha256OfSortedSegmentIds,omitempty"`
// SpecificSegmentsSpec fields
Segments []string `json:"segments,omitempty"`
}

// MetadataStorageUpdaterJobSpec is a specification of endpoint for HadoopIOConfig.
type MetadataStorageUpdaterJobSpec struct {
Type string `json:"type"`
ConnectURI string `json:"connectURI"`
User string `json:"user"`
Password string `json:"password"`
SegmentTable string `json:"segmentTable"`
CreteTable bool `json:"creteTable"`
Host string `json:"host"`
Port string `json:"port"`
DBCPProperties map[string]any `json:"dbcp"`
}

// IOConfig influences how data is read into Druid from a source system.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec/#ioconfig
type IOConfig struct {
Type string `json:"type,omitempty"`

// IndexIOConfig / RealtimeIOConfig shared field
Firehose *Firehose `json:"firehose,omitempty"`
// IndexIOConfig field
InputSource *InputSource `json:"inputSource,omitempty"`
AppendToExisting bool `json:"appendToExisting,omitempty"`
// IndexIOConfig / CompactionIOConfig shared field
DropExisting bool `json:"dropExisting,omitempty"`

// CompactionIOConfig / HadoopIOConfig fields
InputSpec map[string]any `json:"inputSpec,omitempty"`

// CompactionIOConfig field
AllowNonAlignedInterval bool `json:"allowNonAlignedInterval,omitempty"`

// HadoopIOConfig fields
MetadataUpdateSpec *MetadataStorageUpdaterJobSpec `json:"metadataUpdateSpec,omitempty"`
SegmentOutputPath string `json:"segmentOutputPath,omitempty"`

// KafkaIndexTaskIOConfig / KinesisIndexTaskIOConfig fields
Topic string `json:"topic,omitempty"`
ConsumerProperties *ConsumerProperties `json:"consumerProperties,omitempty"`
TaskDuration string `json:"taskDuration,omitempty"`
Replicas int `json:"replicas,omitempty"`
TaskCount int `json:"taskCount,omitempty"`
UseEarliestOffset bool `json:"useEarliestOffset"`
AutoScalerConfig *AutoScalerConfig `json:"autoScalerConfig,omitempty"`
TaskGroupID int `json:"taskGroupID,omitempty"`
BaseSequenceName string `json:"baseSequenceName,omitempty"`
CompletionTimeout string `json:"completionTimeout,omitempty"`
PollTimeout int `json:"pollTimeout,omitempty"`
StartDelay string `json:"startDelay,omitempty"`
Period string `json:"period,omitempty"`
Stream string `json:"stream,omitempty"`
UseEarliestSequenceNumber bool `json:"useEarliestSequenceNumber,omitempty"`

// common fields
FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"`
InputFormat *InputFormat `json:"inputFormat,omitempty"`
IdleConfig *IdleConfig `json:"idleConfig,omitempty"`
}

// ConsumerProperties is a set of properties that is passed to a specific
// consumer, i.e. Kafka consumer.
type ConsumerProperties struct {
BootstrapServers string `json:"bootstrap.servers,omitempty"`
}

// InputFormat specifies kafka messages format type and describes any conversions applied to
// the input data while parsing.
// Type can take values 'json', 'protobuf' or 'kafka'.
type InputFormat struct {
Type string `json:"type"`

// FlatTextInputFormat / DelimitedInputFormat fields
Delimiter string `json:"delimiter,omitempty"`
ListDelimiter string `json:"listDelimiter,omitempty"`
FindColumnsHeader string `json:"findColumnsHeader,omitempty"`
SkipHeaderRows int `json:"skipHeaderRows,omitempty"`
Columns []string `json:"columns,omitempty"`

// JsonInputFormat fields
FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"`
FeatureSpec map[string]bool `json:"featureSpec,omitempty"`

// Common CsvInputFormat / JsonInputFormat fields
KeepNullColumns bool `json:"keepNullColumns,omitempty"`
AssumeNewlineDelimited bool `json:"assumeNewlineDelimited,omitempty"`
UseJsonNodeReader bool `json:"useJsonNodeReader,omitempty"`
}

// HttpInputSourceConfig is a field of HttpInputSource specification.
type HttpInputSourceConfig struct {
AllowedProtocols []string `json:" allowedProtocols,omitempty"`
}

// ConnectorConfig is connection configuration for Database.
type ConnectorConfig struct {
ConnectURI string `json:"connectURI"`
User string `json:"user"`
Password string `json:"password"`
}

// Database configuration for InputSource "sql".
type Database struct {
Type string `json:"type"`
ConnectorConfig *ConnectorConfig `json:"connectorConfig"`
}

// InputSource is a specification of the storage system where input data is stored.
type InputSource struct {
Type string `json:"type"`

// LocalInputSource fields
BaseDir string `json:"baseDir,omitempty"`
Filter string `json:"filter,omitempty"`
Files []string `json:"files,omitempty"`

// HttpInputSource fields
URIs []string `json:"uris,omitempty"`
HttpAuthenticationUsername string `json:"httpAuthenticationUsername,omitempty"`
HttpAuthenticationPassword string `json:"httpAuthenticationPassword,omitempty"`
HttpSourceConfig *HttpInputSourceConfig `json:"config,omitempty"`

// InlineInputSource fields
Data string `json:"data,omitempty"`

// CombiningInputSource fields
Delegates []InputSource `json:"delegates,omitempty"`

// SqlInputSource
SQLs []string `json:"sqls,omitempty"`
Database *Database `json:"database,omitempty"`
}

// TransformSpec is responsible for transforming druid input data
// after it was read from kafka and after flattenSpec was applied.
// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#transformspec
type TransformSpec struct {
Transforms TransformSet `json:"transforms"`
}

// SupervisorStatusPayload is an object representing the status of supervisor.
type SupervisorStatusPayload struct {
Datasource string `json:"dataSource"`
64 changes: 59 additions & 5 deletions task_types.go
Original file line number Diff line number Diff line change
@@ -53,7 +53,11 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec {
QueryGranularity: "none",
},
DimensionsSpec: &DimensionsSpec{
Dimensions: DimensionSet{},
UseSchemaDiscovery: true,
Dimensions: DimensionSet{},
},
TransformSpec: &TransformSpec{
Transforms: []Transform{},
},
},
IOConfig: &IOConfig{
@@ -68,7 +72,7 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec {
Password: "password",
},
},
SQLs: []string{"SELECT * FROM some_table"},
SQLs: []string{"SELECT * FROM druid"},
},
InputFormat: &InputFormat{
Type: "json",
@@ -104,16 +108,66 @@ func SetTaskDataSource(datasource string) TaskIngestionSpecOptions {
}

// SetTuningConfig sets the type of the supervisor (IOConfig).
func SetTuningConfig(ttype string, maxRowsInMemory, maxRowsPerSegment int) TaskIngestionSpecOptions {
func SetTaskTuningConfig(typ string, maxRowsInMemory, maxRowsPerSegment int) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
if ttype != "" {
spec.Spec.TuningConfig.Type = ttype
if typ != "" {
spec.Spec.TuningConfig.Type = typ
spec.Spec.TuningConfig.MaxRowsInMemory = maxRowsInMemory
spec.Spec.TuningConfig.MaxRowsPerSegment = maxRowsPerSegment
}
}
}

// SetDimensions sets druid datasource dimensions.
func SetTaskDataDimensions(dimensions DimensionSet) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.DataSchema.DimensionsSpec.Dimensions = dimensions
}
}

// SetSQLInputSource configures sql input source.
func SetTaskSQLInputSource(typ, connectURI, user, password string, sqls []string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.IOConfig.InputSource = &InputSource{
Type: "sql",
SQLs: sqls,
Database: &Database{
Type: typ,
ConnectorConfig: &ConnectorConfig{
ConnectURI: connectURI,
User: user,
Password: password,
},
},
}
}
}

// SetTaskIOConfigType sets the type of the supervisor IOConfig.
func SetTaskIOConfigType(typ string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
if typ != "" {
spec.Spec.IOConfig.Type = typ
}
}
}

// SetTaskInputFormat
func SetTaskInputFormat(typ string, findColumnsHeader string, columns []string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.IOConfig.InputFormat.Type = typ
spec.Spec.IOConfig.InputFormat.FindColumnsHeader = findColumnsHeader
spec.Spec.IOConfig.InputFormat.Columns = columns
}
}

func SetTaskInlineInputData(data string) TaskIngestionSpecOptions {
return func(spec *TaskIngestionSpec) {
spec.Spec.IOConfig.InputSource.Type = "inline"
spec.Spec.IOConfig.InputSource.Data = data
}
}

// NewTaskIngestionSpec returns a default TaskIngestionSpec and applies any
// options passed to it.
func NewTaskIngestionSpec(options ...TaskIngestionSpecOptions) *TaskIngestionSpec {
87 changes: 49 additions & 38 deletions tasks_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,59 @@
package druid

import (
"bytes"
"context"
"testing"
"time"

"github.com/gocarina/gocsv"
"github.com/google/uuid"
"github.com/jmoiron/sqlx/types"
"github.com/stretchr/testify/assert"
tc "github.com/testcontainers/testcontainers-go/modules/compose"
"github.com/testcontainers/testcontainers-go/wait"
)

// TestDO represents entry with payload.
type TestDO struct {
Timestamp time.Time `db:"ts"`
Id uuid.UUID `db:"id"` // output only
Payload types.JSONText `db:"payload"`
}

var testObjects = []TestDO{
{
Id: uuid.New(),
Timestamp: time.Now(),
Payload: types.JSONText("{\"test\": \"json\"}"),
},
{
Id: uuid.New(),
Timestamp: time.Now().Add(time.Hour),
Payload: types.JSONText("{\"test\": \"json2\"}"),
},
}

func triggerIngestionTask(d *Client, dataSourceName string, entries []TestDO) (string, error) {
csvEntriesBuff := &bytes.Buffer{}

err := gocsv.MarshalWithoutHeaders(entries, csvEntriesBuff)
if err != nil {
return "", err
}

var spec = NewTaskIngestionSpec(
SetTaskType("index_parallel"),
SetTaskDataSource(dataSourceName),
SetTaskTuningConfig("index_parallel", 25000, 5000000),
SetTaskIOConfigType("index_parallel"),
SetTaskInputFormat("csv", "false", []string{}),
SetTaskInlineInputData(csvEntriesBuff.String()),
)
taskID, err := d.Tasks().SubmitTask(spec)
return taskID, err
}

func TestTaskService(t *testing.T) {
// Set up druid containers using docker-compose.
compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml")
@@ -28,51 +72,18 @@ func TestTaskService(t *testing.T) {

// Set up druid service and client.
var druidOpts []ClientOption
d, err := NewClient("http://localhost:8888", druidOpts...)
assert.NoError(t, err, "error should be nil")
var spec = NewTaskIngestionSpec(
SetTaskType("index_parallel"),
SetTaskDataSource("test-datasource"),
SetTuningConfig("index_parallel", 25000, 5000000),
)
d, err := NewClient("http://localhost:8091", druidOpts...)
assert.NoError(t, err, "error should be nil")
assert.NotNil(t, spec, "specification should not be nil")

// Waiting for druid coordinator service to start.
err = compose.
WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(60*time.Second)).
WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)).
WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)).
Up(ctx, tc.Wait(true))
assert.NoError(t, err, "coordinator should be up with no error")

// Test create supervisor -> get status -> terminate sequence.
_, err = d.Tasks().SubmitTask(spec)
triggerIngestionTask(d, "test-datasource", testObjects)
assert.NoError(t, err, "error should be nil")
//assert.Equal(t, id, spec.DataSchema.DataSource)
//status, err := d.Supervisor().GetStatus(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.Equal(t, "PENDING", status.Payload.State)
//assert.False(t, status.Payload.Suspended)
//
//// suspend and check status
//suspendedSpec, err := d.Supervisor().Suspend(spec.DataSchema.DataSource)
//assert.True(t, suspendedSpec.Suspended)
//assert.NoError(t, err, "error should be nil")
//
//status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.True(t, status.Payload.Suspended)
//
//// resume and check status
//_, err = d.Supervisor().Resume(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//
//status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.Equal(t, "PENDING", status.Payload.State)
//assert.False(t, status.Payload.Suspended)
//
//// terminate
//id, err = d.Supervisor().Terminate(spec.DataSchema.DataSource)
//assert.NoError(t, err, "error should be nil")
//assert.Equal(t, id, spec.DataSchema.DataSource)
}

0 comments on commit abb6448

Please sign in to comment.