diff --git a/common_spec_types.go b/common_spec_types.go new file mode 100644 index 0000000..4d963fb --- /dev/null +++ b/common_spec_types.go @@ -0,0 +1,325 @@ +package druid + +// BitmapFactory is a field of IndexSpec. +type BitmapFactory struct { + Type string `json:"type"` +} + +// StringEncodingStrategy type for specifying string encoding at indexing stage. +type StringEncodingStrategy struct { + Type string `json:"type"` + // FrontCoded fields + BucketSize int `json:"bucketSize,omitempty"` + FormatVersion int `json:"formatVersion,omitempty"` +} + +// IndexSpec defines segment storage format options to be used at indexing time. +// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#indexspec +type IndexSpec struct { + Bitmap *BitmapFactory `json:"bitmap,omitempty"` + DimensionCompression string `json:"dimensionCompression"` + StringEncodingStrategy *StringEncodingStrategy `json:"stringEncodingStrategy,omitempty"` + MetricCompression string `json:"metricCompression,omitempty"` + LongEncoding string `json:"longEncoding,omitempty"` + JsonCompression string `json:"jsonCompression,omitempty"` + SegmentLoader string `json:"segmentLoader,omitempty"` +} + +// TuningConfig controls various tuning parameters specific to each ingestion method. +// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#tuningconfig +type TuningConfig struct { + Type string `json:"type"` + IntermediatePersistPeriod string `json:"intermediatePersistPeriod,omitempty"` + MaxRowsPerSegment int `json:"maxRowsPerSegment,omitempty"` + MaxRowsInMemory int `json:"maxRowsInMemory,omitempty"` + IndexSpecForIntermediatePersists *IndexSpec `json:"indexSpecForIntermediatePersists,omitempty"` +} + +// Metric is a Druid aggregator that is applied at ingestion time. +// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#metricsspec +type Metric struct { + Name string `json:"name"` + Type string `json:"type"` + FieldName string `json:"fieldName"` +} + +// DataSchema represents the Druid dataSchema spec. +type DataSchema struct { + DataSource string `json:"dataSource"` + Parser string `json:"parser,omitempty"` + TimeStampSpec *TimestampSpec `json:"timestampSpec,omitempty"` + TransformSpec *TransformSpec `json:"transformSpec,omitempty"` + DimensionsSpec *DimensionsSpec `json:"dimensionsSpec,omitempty"` + GranularitySpec *GranularitySpec `json:"granularitySpec,omitempty"` + MetricSpec []Metric `json:"metricSpec,omitempty"` +} + +// FlattenSpec is responsible for flattening nested input JSON data into Druid's flat data model. +type FlattenSpec struct { + UseFieldDiscovery bool `json:"useFieldDiscovery,omitempty"` + Fields FieldList `json:"fields"` +} + +// TimestampSpec is responsible for configuring the primary timestamp. +type TimestampSpec struct { + Column string `json:"column"` + Format string `json:"format"` +} + +// FieldList is a list of Fields for ingestion FlattenSpec. +type FieldList []Field + +// Field defines a single filed configuration of the FlattenSpec. +type Field struct { + Type string `json:"type"` + Name string `json:"name"` + Expr string `json:"expr"` +} + +// Transform defines a single filed transformation of the TransformSpec. +type Transform struct { + Type string `json:"type"` + Name string `json:"name"` + Expr string `json:"expression"` +} + +// SpatialDimension represents single spatial dimension datum. +// https://druid.apache.org/docs/latest/querying/geo/#spatial-indexing +type SpatialDimension struct { + DimensionName string `json:"dimName"` + Dimensions []string `json:"dims,omitempty"` +} + +// TransformSet is a unique set of transforms applied to the input. +type TransformSet []Transform + +// DimensionSet is a unique set of druid datasource dimensions(labels). +type DimensionSet []any + +// Dimension is a typed definition of a datasource dimension. +type Dimension struct { + Type string `json:"type"` + Name string `json:"name"` +} + +// SpatialDimensionSet is a unique set of druid datasource spatial dimensions. +type SpatialDimensionSet []SpatialDimension + +// DimensionExclusionsSet represents set of excluded dimensions. +type DimensionExclusionsSet []string + +// DimensionsSpec is responsible for configuring Druid's dimensions. They're a +// set of columns in Druid's data model that can be used for grouping, filtering +// or applying aggregations. +// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec +type DimensionsSpec struct { + Dimensions DimensionSet `json:"dimensions,omitempty"` + DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"` + SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"` + IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` + UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` +} +type QueryGranularity struct { + Type string `json:"type,omitempty"` +} + +// GranularitySpec allows for configuring operations such as data segment +// partitioning, truncating timestamps, time chunk segmentation or roll-up. +// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec +type GranularitySpec struct { + Type string `json:"type"` + SegmentGranularity string `json:"segmentGranularity,omitempty"` + // TODO: this field is problematic as depending on value druid returns string or object + QueryGranularity any `json:"queryGranularity,omitempty"` + Rollup bool `json:"rollup"` + Intervals []string `json:"intervals,omitempty"` +} + +// AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. +type AutoScalerConfig struct { + EnableTaskAutoScaler bool `json:"enableTaskAutoScaler"` + LagCollectionIntervalMillis int `json:"lagCollectionIntervalMillis"` + LagCollectionRangeMillis int `json:"lagCollectionRangeMillis"` + ScaleOutThreshold int `json:"scaleOutThreshold"` + TriggerScaleOutFractionThreshold float64 `json:"triggerScaleOutFractionThreshold"` + ScaleInThreshold int `json:"scaleInThreshold"` + TriggerScaleInFractionThreshold float64 `json:"triggerScaleInFractionThreshold"` + ScaleActionStartDelayMillis int `json:"scaleActionStartDelayMillis"` + ScaleActionPeriodMillis int `json:"scaleActionPeriodMillis"` + TaskCountMax int `json:"taskCountMax"` + TaskCountMin int `json:"taskCountMin"` + ScaleInStep int `json:"scaleInStep"` + ScaleOutStep int `json:"scaleOutStep"` + MinTriggerScaleActionFrequencyMillis int `json:"minTriggerScaleActionFrequencyMillis"` +} + +// IdleConfig defines if and when stream Supervisor can become idle. +type IdleConfig struct { + Enabled bool `json:"enabled"` + InactiveAfterMillis int64 `json:"inactiveAfterMillis"` +} + +// Firehose is an IOConfig firehose configuration. +type Firehose struct { + Type string `json:"type,omitempty"` + + // EventReceiverFirehoseFactory fields + ServiceName string `json:"serviceName,omitempty"` + BufferSize int `json:"bufferSize,omitempty"` + MaxIdleTime int64 `json:"maxIdleTime,omitempty"` + + // FixedCountFirehoseFactory / ClippedFirehoseFactory / TimedShutoffFirehoseFactory fields + Delegate []Firehose `json:"delegate,omitempty"` + Count int `json:"count,omitempty"` + Interval string `json:"interval,omitempty"` + ShutoffTime string `json:"shutoffTime,omitempty"` +} + +// CompactionInputSpec is a specification for compaction task. +type CompactionInputSpec struct { + Type string `json:"type"` + // CompactionIntervalSpec fields + Interval string `json:"interval,omitempty"` + Sha256OfSortedSegmentIds string `json:"sha256OfSortedSegmentIds,omitempty"` + // SpecificSegmentsSpec fields + Segments []string `json:"segments,omitempty"` +} + +// MetadataStorageUpdaterJobSpec is a specification of endpoint for HadoopIOConfig. +type MetadataStorageUpdaterJobSpec struct { + Type string `json:"type"` + ConnectURI string `json:"connectURI"` + User string `json:"user"` + Password string `json:"password"` + SegmentTable string `json:"segmentTable"` + CreteTable bool `json:"creteTable"` + Host string `json:"host"` + Port string `json:"port"` + DBCPProperties map[string]any `json:"dbcp"` +} + +// IOConfig influences how data is read into Druid from a source system. +// https://druid.apache.org/docs/latest/ingestion/ingestion-spec/#ioconfig +type IOConfig struct { + Type string `json:"type,omitempty"` + + // IndexIOConfig / RealtimeIOConfig shared field + Firehose *Firehose `json:"firehose,omitempty"` + // IndexIOConfig field + InputSource *InputSource `json:"inputSource,omitempty"` + AppendToExisting bool `json:"appendToExisting,omitempty"` + // IndexIOConfig / CompactionIOConfig shared field + DropExisting bool `json:"dropExisting,omitempty"` + + // CompactionIOConfig / HadoopIOConfig fields + InputSpec map[string]any `json:"inputSpec,omitempty"` + + // CompactionIOConfig field + AllowNonAlignedInterval bool `json:"allowNonAlignedInterval,omitempty"` + + // HadoopIOConfig fields + MetadataUpdateSpec *MetadataStorageUpdaterJobSpec `json:"metadataUpdateSpec,omitempty"` + SegmentOutputPath string `json:"segmentOutputPath,omitempty"` + + // KafkaIndexTaskIOConfig / KinesisIndexTaskIOConfig fields + Topic string `json:"topic,omitempty"` + ConsumerProperties *ConsumerProperties `json:"consumerProperties,omitempty"` + TaskDuration string `json:"taskDuration,omitempty"` + Replicas int `json:"replicas,omitempty"` + TaskCount int `json:"taskCount,omitempty"` + UseEarliestOffset bool `json:"useEarliestOffset"` + AutoScalerConfig *AutoScalerConfig `json:"autoScalerConfig,omitempty"` + TaskGroupID int `json:"taskGroupID,omitempty"` + BaseSequenceName string `json:"baseSequenceName,omitempty"` + CompletionTimeout string `json:"completionTimeout,omitempty"` + PollTimeout int `json:"pollTimeout,omitempty"` + StartDelay string `json:"startDelay,omitempty"` + Period string `json:"period,omitempty"` + Stream string `json:"stream,omitempty"` + UseEarliestSequenceNumber bool `json:"useEarliestSequenceNumber,omitempty"` + + // common fields + FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"` + InputFormat *InputFormat `json:"inputFormat,omitempty"` + IdleConfig *IdleConfig `json:"idleConfig,omitempty"` +} + +// ConsumerProperties is a set of properties that is passed to a specific +// consumer, i.e. Kafka consumer. +type ConsumerProperties struct { + BootstrapServers string `json:"bootstrap.servers,omitempty"` +} + +// InputFormat specifies kafka messages format type and describes any conversions applied to +// the input data while parsing. +// Type can take values 'json', 'protobuf' or 'kafka'. +type InputFormat struct { + Type string `json:"type"` + + // FlatTextInputFormat / DelimitedInputFormat fields + Delimiter string `json:"delimiter,omitempty"` + ListDelimiter string `json:"listDelimiter,omitempty"` + FindColumnsHeader string `json:"findColumnsHeader,omitempty"` + SkipHeaderRows int `json:"skipHeaderRows,omitempty"` + Columns []string `json:"columns,omitempty"` + + // JsonInputFormat fields + FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"` + FeatureSpec map[string]bool `json:"featureSpec,omitempty"` + + // Common CsvInputFormat / JsonInputFormat fields + KeepNullColumns bool `json:"keepNullColumns,omitempty"` + AssumeNewlineDelimited bool `json:"assumeNewlineDelimited,omitempty"` + UseJsonNodeReader bool `json:"useJsonNodeReader,omitempty"` +} + +// HttpInputSourceConfig is a field of HttpInputSource specification. +type HttpInputSourceConfig struct { + AllowedProtocols []string `json:" allowedProtocols,omitempty"` +} + +// ConnectorConfig is connection configuration for Database. +type ConnectorConfig struct { + ConnectURI string `json:"connectURI"` + User string `json:"user"` + Password string `json:"password"` +} + +// Database configuration for InputSource "sql". +type Database struct { + Type string `json:"type"` + ConnectorConfig *ConnectorConfig `json:"connectorConfig"` +} + +// InputSource is a specification of the storage system where input data is stored. +type InputSource struct { + Type string `json:"type"` + + // LocalInputSource fields + BaseDir string `json:"baseDir,omitempty"` + Filter string `json:"filter,omitempty"` + Files []string `json:"files,omitempty"` + + // HttpInputSource fields + URIs []string `json:"uris,omitempty"` + HttpAuthenticationUsername string `json:"httpAuthenticationUsername,omitempty"` + HttpAuthenticationPassword string `json:"httpAuthenticationPassword,omitempty"` + HttpSourceConfig *HttpInputSourceConfig `json:"config,omitempty"` + + // InlineInputSource fields + Data string `json:"data,omitempty"` + + // CombiningInputSource fields + Delegates []InputSource `json:"delegates,omitempty"` + + // SqlInputSource + SQLs []string `json:"sqls,omitempty"` + Database *Database `json:"database,omitempty"` +} + +// TransformSpec is responsible for transforming druid input data +// after it was read from kafka and after flattenSpec was applied. +// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#transformspec +type TransformSpec struct { + Transforms TransformSet `json:"transforms"` +} diff --git a/go.mod b/go.mod index dc1f1e0..3aabb28 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,11 @@ toolchain go1.21.1 require ( github.com/davecgh/go-spew v1.1.1 + github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a github.com/google/go-querystring v1.1.0 + github.com/google/uuid v1.3.1 github.com/hashicorp/go-retryablehttp v0.7.2 + github.com/jmoiron/sqlx v1.3.5 github.com/magefile/mage v1.11.0 github.com/stretchr/testify v1.8.4 github.com/testcontainers/testcontainers-go v0.24.1 @@ -74,7 +77,6 @@ require ( github.com/google/go-cmp v0.5.9 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect - github.com/google/uuid v1.3.1 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect diff --git a/go.sum b/go.sum index 4ae924b..8123f2e 100644 --- a/go.sum +++ b/go.sum @@ -250,6 +250,8 @@ github.com/go-sql-driver/mysql v1.3.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a h1:RYfmiM0zluBJOiPDJseKLEN4BapJ42uSi9SZBQ2YyiA= +github.com/gocarina/gocsv v0.0.0-20231116093920-b87c2d0e983a/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0= @@ -423,6 +425,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v0.0.0-20150723085316-0dad96c0b94f/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls= @@ -446,6 +449,7 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk= github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= github.com/mattn/go-sqlite3 v1.6.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= diff --git a/supervisor_types.go b/supervisor_types.go index 3bafb85..0a01ed7 100644 --- a/supervisor_types.go +++ b/supervisor_types.go @@ -31,330 +31,6 @@ type SupervisorAuditHistory struct { Version string `json:"version"` } -// BitmapFactory is a field of IndexSpec. -type BitmapFactory struct { - Type string `json:"type"` -} - -// StringEncodingStrategy type for specifying string encoding at indexing stage. -type StringEncodingStrategy struct { - Type string `json:"type"` - // FrontCoded fields - BucketSize int `json:"bucketSize,omitempty"` - FormatVersion int `json:"formatVersion,omitempty"` -} - -// IndexSpec defines segment storage format options to be used at indexing time. -// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#indexspec -type IndexSpec struct { - Bitmap *BitmapFactory `json:"bitmap,omitempty"` - DimensionCompression string `json:"dimensionCompression"` - StringEncodingStrategy *StringEncodingStrategy `json:"stringEncodingStrategy,omitempty"` - MetricCompression string `json:"metricCompression,omitempty"` - LongEncoding string `json:"longEncoding,omitempty"` - JsonCompression string `json:"jsonCompression,omitempty"` - SegmentLoader string `json:"segmentLoader,omitempty"` -} - -// TuningConfig controls various tuning parameters specific to each ingestion method. -// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#tuningconfig -type TuningConfig struct { - Type string `json:"type"` - IntermediatePersistPeriod string `json:"intermediatePersistPeriod,omitempty"` - MaxRowsPerSegment int `json:"maxRowsPerSegment,omitempty"` - MaxRowsInMemory int `json:"maxRowsInMemory,omitempty"` - IndexSpecForIntermediatePersists *IndexSpec `json:"indexSpecForIntermediatePersists"` -} - -// Metric is a Druid aggregator that is applied at ingestion time. -// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#metricsspec -type Metric struct { - Name string `json:"name"` - Type string `json:"type"` - FieldName string `json:"fieldName"` -} - -// DataSchema represents the Druid dataSchema spec. -type DataSchema struct { - DataSource string `json:"dataSource"` - Parser string `json:"parser,omitempty"` - TimeStampSpec *TimestampSpec `json:"timestampSpec,omitempty"` - TransformSpec *TransformSpec `json:"transformSpec,omitempty"` - DimensionsSpec *DimensionsSpec `json:"dimensionsSpec,omitempty"` - GranularitySpec *GranularitySpec `json:"granularitySpec,omitempty"` - MetricSpec []Metric `json:"metricSpec,omitempty"` -} - -// FlattenSpec is responsible for flattening nested input JSON data into Druid's flat data model. -type FlattenSpec struct { - UseFieldDiscovery bool `json:"useFieldDiscovery,omitempty"` - Fields FieldList `json:"fields"` -} - -// TimestampSpec is responsible for configuring the primary timestamp. -type TimestampSpec struct { - Column string `json:"column"` - Format string `json:"format"` -} - -// FieldList is a list of Fields for ingestion FlattenSpec. -type FieldList []Field - -// Field defines a single filed configuration of the FlattenSpec. -type Field struct { - Type string `json:"type"` - Name string `json:"name"` - Expr string `json:"expr"` -} - -// Transform defines a single filed transformation of the TransformSpec. -type Transform struct { - Type string `json:"type"` - Name string `json:"name"` - Expr string `json:"expression"` -} - -// SpatialDimension represents single spatial dimension datum. -// https://druid.apache.org/docs/latest/querying/geo/#spatial-indexing -type SpatialDimension struct { - DimensionName string `json:"dimName"` - Dimensions []string `json:"dims,omitempty"` -} - -// TransformSet is a unique set of transforms applied to the input. -type TransformSet []Transform - -// DimensionSet is a unique set of druid datasource dimensions(labels). -type DimensionSet []any - -// Dimension is a typed definition of a datasource dimension. -type Dimension struct { - Type string `json:"type"` - Name string `json:"name"` -} - -// SpatialDimensionSet is a unique set of druid datasource spatial dimensions. -type SpatialDimensionSet []SpatialDimension - -// DimensionExclusionsSet represents set of excluded dimensions. -type DimensionExclusionsSet []string - -// DimensionsSpec is responsible for configuring Druid's dimensions. They're a -// set of columns in Druid's data model that can be used for grouping, filtering -// or applying aggregations. -// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dimensionsspec -type DimensionsSpec struct { - Dimensions DimensionSet `json:"dimensions,omitempty"` - DimensionExclusions DimensionExclusionsSet `json:"dimensionExclusions,omitempty"` - SpatialDimensions SpatialDimensionSet `json:"spatialDimensions,omitempty"` - IncludeAllDimensions bool `json:"includeAllDimensions,omitempty"` - UseSchemaDiscovery bool `json:"useSchemaDiscovery,omitempty"` -} -type QueryGranularity struct { - Type string `json:"type,omitempty"` -} - -// GranularitySpec allows for configuring operations such as data segment -// partitioning, truncating timestamps, time chunk segmentation or roll-up. -// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#granularityspec -type GranularitySpec struct { - Type string `json:"type"` - SegmentGranularity string `json:"segmentGranularity,omitempty"` - // TODO: this field is problematic as depending on value druid returns string or object - QueryGranularity any `json:"queryGranularity,omitempty"` - Rollup bool `json:"rollup,omitempty"` - Intervals []string `json:"intervals,omitempty"` -} - -// AutoScalerConfig is part of IOConfig that controls ingestion auto-scaling. -type AutoScalerConfig struct { - EnableTaskAutoScaler bool `json:"enableTaskAutoScaler"` - LagCollectionIntervalMillis int `json:"lagCollectionIntervalMillis"` - LagCollectionRangeMillis int `json:"lagCollectionRangeMillis"` - ScaleOutThreshold int `json:"scaleOutThreshold"` - TriggerScaleOutFractionThreshold float64 `json:"triggerScaleOutFractionThreshold"` - ScaleInThreshold int `json:"scaleInThreshold"` - TriggerScaleInFractionThreshold float64 `json:"triggerScaleInFractionThreshold"` - ScaleActionStartDelayMillis int `json:"scaleActionStartDelayMillis"` - ScaleActionPeriodMillis int `json:"scaleActionPeriodMillis"` - TaskCountMax int `json:"taskCountMax"` - TaskCountMin int `json:"taskCountMin"` - ScaleInStep int `json:"scaleInStep"` - ScaleOutStep int `json:"scaleOutStep"` - MinTriggerScaleActionFrequencyMillis int `json:"minTriggerScaleActionFrequencyMillis"` -} - -// IdleConfig defines if and when stream Supervisor can become idle. -type IdleConfig struct { - Enabled bool `json:"enabled"` - InactiveAfterMillis int64 `json:"inactiveAfterMillis"` -} - -// Firehose is an IOConfig firehose configuration. -type Firehose struct { - Type string `json:"type,omitempty"` - - // EventReceiverFirehoseFactory fields - ServiceName string `json:"serviceName,omitempty"` - BufferSize int `json:"bufferSize,omitempty"` - MaxIdleTime int64 `json:"maxIdleTime,omitempty"` - - // FixedCountFirehoseFactory / ClippedFirehoseFactory / TimedShutoffFirehoseFactory fields - Delegate []Firehose `json:"delegate,omitempty"` - Count int `json:"count,omitempty"` - Interval string `json:"interval,omitempty"` - ShutoffTime string `json:"shutoffTime,omitempty"` -} - -// CompactionInputSpec is a specification for compaction task. -type CompactionInputSpec struct { - Type string `json:"type"` - // CompactionIntervalSpec fields - Interval string `json:"interval,omitempty"` - Sha256OfSortedSegmentIds string `json:"sha256OfSortedSegmentIds,omitempty"` - // SpecificSegmentsSpec fields - Segments []string `json:"segments,omitempty"` -} - -// MetadataStorageUpdaterJobSpec is a specification of endpoint for HadoopIOConfig. -type MetadataStorageUpdaterJobSpec struct { - Type string `json:"type"` - ConnectURI string `json:"connectURI"` - User string `json:"user"` - Password string `json:"password"` - SegmentTable string `json:"segmentTable"` - CreteTable bool `json:"creteTable"` - Host string `json:"host"` - Port string `json:"port"` - DBCPProperties map[string]any `json:"dbcp"` -} - -// IOConfig influences how data is read into Druid from a source system. -// https://druid.apache.org/docs/latest/ingestion/ingestion-spec/#ioconfig -type IOConfig struct { - Type string `json:"type,omitempty"` - - // IndexIOConfig / RealtimeIOConfig shared field - Firehose *Firehose `json:"firehose,omitempty"` - // IndexIOConfig field - InputSource *InputSource `json:"inputSource,omitempty"` - AppendToExisting bool `json:"appendToExisting,omitempty"` - // IndexIOConfig / CompactionIOConfig shared field - DropExisting bool `json:"dropExisting,omitempty"` - - // CompactionIOConfig / HadoopIOConfig fields - InputSpec map[string]any `json:"inputSpec,omitempty"` - - // CompactionIOConfig field - AllowNonAlignedInterval bool `json:"allowNonAlignedInterval,omitempty"` - - // HadoopIOConfig fields - MetadataUpdateSpec *MetadataStorageUpdaterJobSpec `json:"metadataUpdateSpec,omitempty"` - SegmentOutputPath string `json:"segmentOutputPath,omitempty"` - - // KafkaIndexTaskIOConfig / KinesisIndexTaskIOConfig fields - Topic string `json:"topic,omitempty"` - ConsumerProperties *ConsumerProperties `json:"consumerProperties,omitempty"` - TaskDuration string `json:"taskDuration,omitempty"` - Replicas int `json:"replicas,omitempty"` - TaskCount int `json:"taskCount,omitempty"` - UseEarliestOffset bool `json:"useEarliestOffset"` - AutoScalerConfig *AutoScalerConfig `json:"autoScalerConfig,omitempty"` - TaskGroupID int `json:"taskGroupID,omitempty"` - BaseSequenceName string `json:"baseSequenceName,omitempty"` - CompletionTimeout string `json:"completionTimeout,omitempty"` - PollTimeout int `json:"pollTimeout,omitempty"` - StartDelay string `json:"startDelay,omitempty"` - Period string `json:"period,omitempty"` - Stream string `json:"stream,omitempty"` - UseEarliestSequenceNumber bool `json:"useEarliestSequenceNumber,omitempty"` - - // common fields - FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"` - InputFormat *InputFormat `json:"inputFormat,omitempty"` - IdleConfig *IdleConfig `json:"idleConfig,omitempty"` -} - -// ConsumerProperties is a set of properties that is passed to a specific -// consumer, i.e. Kafka consumer. -type ConsumerProperties struct { - BootstrapServers string `json:"bootstrap.servers,omitempty"` -} - -// InputFormat specifies kafka messages format type and describes any conversions applied to -// the input data while parsing. -// Type can take values 'json', 'protobuf' or 'kafka'. -type InputFormat struct { - Type string `json:"type"` - - // FlatTextInputFormat / DelimitedInputFormat fields - Delimiter string `json:"delimiter,omitempty"` - ListDelimiter string `json:"listDelimiter,omitempty"` - FindColumnsHeader string `json:"findColumnsHeader,omitempty"` - SkipHeaderRows int `json:"skipHeaderRows,omitempty"` - Columns []string `json:"columns,omitempty"` - - // JsonInputFormat fields - FlattenSpec *FlattenSpec `json:"flattenSpec,omitempty"` - FeatureSpec map[string]bool `json:"featureSpec,omitempty"` - - // Common CsvInputFormat / JsonInputFormat fields - KeepNullColumns bool `json:"keepNullColumns,omitempty"` - AssumeNewlineDelimited bool `json:"assumeNewlineDelimited,omitempty"` - UseJsonNodeReader bool `json:"useJsonNodeReader,omitempty"` -} - -// HttpInputSourceConfig is a field of HttpInputSource specification. -type HttpInputSourceConfig struct { - AllowedProtocols []string `json:" allowedProtocols,omitempty"` -} - -// ConnectorConfig is connection configuration for Database. -type ConnectorConfig struct { - ConnectURI string `json:"connectURI"` - User string `json:"user"` - Password string `json:"password"` -} - -// Database configuration for InputSource "sql". -type Database struct { - Type string `json:"type"` - ConnectorConfig *ConnectorConfig `json:"connectorConfig"` -} - -// InputSource is a specification of the storage system where input data is stored. -type InputSource struct { - Type string `json:"type"` - - // LocalInputSource fields - BaseDir string `json:"baseDir,omitempty"` - Filter string `json:"filter,omitempty"` - Files []string `json:"files,omitempty"` - - // HttpInputSource fields - URIs []string `json:"uris,omitempty"` - HttpAuthenticationUsername string `json:"httpAuthenticationUsername,omitempty"` - HttpAuthenticationPassword string `json:"httpAuthenticationPassword,omitempty"` - HttpSourceConfig *HttpInputSourceConfig `json:"config,omitempty"` - - // InlineInputSource fields - Data string `json:"data,omitempty"` - - // CombiningInputSource fields - Delegates []InputSource `json:"delegates,omitempty"` - - // SqlInputSource - SQLs []string `json:"sqls,omitempty"` - Database *Database `json:"database,omitempty"` -} - -// TransformSpec is responsible for transforming druid input data -// after it was read from kafka and after flattenSpec was applied. -// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#transformspec -type TransformSpec struct { - Transforms TransformSet `json:"transforms"` -} - // SupervisorStatusPayload is an object representing the status of supervisor. type SupervisorStatusPayload struct { Datasource string `json:"dataSource"` diff --git a/task_types.go b/task_types.go index e1de81f..55849c8 100644 --- a/task_types.go +++ b/task_types.go @@ -53,7 +53,11 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec { QueryGranularity: "none", }, DimensionsSpec: &DimensionsSpec{ - Dimensions: DimensionSet{}, + UseSchemaDiscovery: true, + Dimensions: DimensionSet{}, + }, + TransformSpec: &TransformSpec{ + Transforms: []Transform{}, }, }, IOConfig: &IOConfig{ @@ -68,7 +72,7 @@ func defaultTaskIngestionSpec() *TaskIngestionSpec { Password: "password", }, }, - SQLs: []string{"SELECT * FROM some_table"}, + SQLs: []string{"SELECT * FROM druid"}, }, InputFormat: &InputFormat{ Type: "json", @@ -104,16 +108,66 @@ func SetTaskDataSource(datasource string) TaskIngestionSpecOptions { } // SetTuningConfig sets the type of the supervisor (IOConfig). -func SetTuningConfig(ttype string, maxRowsInMemory, maxRowsPerSegment int) TaskIngestionSpecOptions { +func SetTaskTuningConfig(typ string, maxRowsInMemory, maxRowsPerSegment int) TaskIngestionSpecOptions { return func(spec *TaskIngestionSpec) { - if ttype != "" { - spec.Spec.TuningConfig.Type = ttype + if typ != "" { + spec.Spec.TuningConfig.Type = typ spec.Spec.TuningConfig.MaxRowsInMemory = maxRowsInMemory spec.Spec.TuningConfig.MaxRowsPerSegment = maxRowsPerSegment } } } +// SetDimensions sets druid datasource dimensions. +func SetTaskDataDimensions(dimensions DimensionSet) TaskIngestionSpecOptions { + return func(spec *TaskIngestionSpec) { + spec.Spec.DataSchema.DimensionsSpec.Dimensions = dimensions + } +} + +// SetSQLInputSource configures sql input source. +func SetTaskSQLInputSource(typ, connectURI, user, password string, sqls []string) TaskIngestionSpecOptions { + return func(spec *TaskIngestionSpec) { + spec.Spec.IOConfig.InputSource = &InputSource{ + Type: "sql", + SQLs: sqls, + Database: &Database{ + Type: typ, + ConnectorConfig: &ConnectorConfig{ + ConnectURI: connectURI, + User: user, + Password: password, + }, + }, + } + } +} + +// SetTaskIOConfigType sets the type of the supervisor IOConfig. +func SetTaskIOConfigType(typ string) TaskIngestionSpecOptions { + return func(spec *TaskIngestionSpec) { + if typ != "" { + spec.Spec.IOConfig.Type = typ + } + } +} + +// SetTaskInputFormat +func SetTaskInputFormat(typ string, findColumnsHeader string, columns []string) TaskIngestionSpecOptions { + return func(spec *TaskIngestionSpec) { + spec.Spec.IOConfig.InputFormat.Type = typ + spec.Spec.IOConfig.InputFormat.FindColumnsHeader = findColumnsHeader + spec.Spec.IOConfig.InputFormat.Columns = columns + } +} + +func SetTaskInlineInputData(data string) TaskIngestionSpecOptions { + return func(spec *TaskIngestionSpec) { + spec.Spec.IOConfig.InputSource.Type = "inline" + spec.Spec.IOConfig.InputSource.Data = data + } +} + // NewTaskIngestionSpec returns a default TaskIngestionSpec and applies any // options passed to it. func NewTaskIngestionSpec(options ...TaskIngestionSpecOptions) *TaskIngestionSpec { diff --git a/tasks_test.go b/tasks_test.go index 4bd5469..618c9e2 100644 --- a/tasks_test.go +++ b/tasks_test.go @@ -1,15 +1,59 @@ package druid import ( + "bytes" "context" "testing" "time" + "github.com/gocarina/gocsv" + "github.com/google/uuid" + "github.com/jmoiron/sqlx/types" "github.com/stretchr/testify/assert" tc "github.com/testcontainers/testcontainers-go/modules/compose" "github.com/testcontainers/testcontainers-go/wait" ) +// TestDO represents entry with payload. +type TestDO struct { + Timestamp time.Time `db:"ts"` + Id uuid.UUID `db:"id"` // output only + Payload types.JSONText `db:"payload"` +} + +var testObjects = []TestDO{ + { + Id: uuid.New(), + Timestamp: time.Now(), + Payload: types.JSONText("{\"test\": \"json\"}"), + }, + { + Id: uuid.New(), + Timestamp: time.Now().Add(time.Hour), + Payload: types.JSONText("{\"test\": \"json2\"}"), + }, +} + +func triggerIngestionTask(d *Client, dataSourceName string, entries []TestDO) (string, error) { + csvEntriesBuff := &bytes.Buffer{} + + err := gocsv.MarshalWithoutHeaders(entries, csvEntriesBuff) + if err != nil { + return "", err + } + + var spec = NewTaskIngestionSpec( + SetTaskType("index_parallel"), + SetTaskDataSource(dataSourceName), + SetTaskTuningConfig("index_parallel", 25000, 5000000), + SetTaskIOConfigType("index_parallel"), + SetTaskInputFormat("csv", "false", []string{}), + SetTaskInlineInputData(csvEntriesBuff.String()), + ) + taskID, err := d.Tasks().SubmitTask(spec) + return taskID, err +} + func TestTaskService(t *testing.T) { // Set up druid containers using docker-compose. compose, err := tc.NewDockerCompose("testdata/docker-compose.yaml") @@ -28,51 +72,18 @@ func TestTaskService(t *testing.T) { // Set up druid service and client. var druidOpts []ClientOption - d, err := NewClient("http://localhost:8888", druidOpts...) - assert.NoError(t, err, "error should be nil") - var spec = NewTaskIngestionSpec( - SetTaskType("index_parallel"), - SetTaskDataSource("test-datasource"), - SetTuningConfig("index_parallel", 25000, 5000000), - ) + d, err := NewClient("http://localhost:8091", druidOpts...) assert.NoError(t, err, "error should be nil") - assert.NotNil(t, spec, "specification should not be nil") // Waiting for druid coordinator service to start. err = compose. - WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(60*time.Second)). + WaitForService("coordinator", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8081/tcp").WithStartupTimeout(180*time.Second)). + WaitForService("router", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8888/tcp").WithStartupTimeout(180*time.Second)). + WaitForService("broker", wait.NewHTTPStrategy(processInformationPathPrefix).WithPort("8082/tcp").WithStartupTimeout(180*time.Second)). Up(ctx, tc.Wait(true)) assert.NoError(t, err, "coordinator should be up with no error") // Test create supervisor -> get status -> terminate sequence. - _, err = d.Tasks().SubmitTask(spec) + triggerIngestionTask(d, "test-datasource", testObjects) assert.NoError(t, err, "error should be nil") - //assert.Equal(t, id, spec.DataSchema.DataSource) - //status, err := d.Supervisor().GetStatus(spec.DataSchema.DataSource) - //assert.NoError(t, err, "error should be nil") - //assert.Equal(t, "PENDING", status.Payload.State) - //assert.False(t, status.Payload.Suspended) - // - //// suspend and check status - //suspendedSpec, err := d.Supervisor().Suspend(spec.DataSchema.DataSource) - //assert.True(t, suspendedSpec.Suspended) - //assert.NoError(t, err, "error should be nil") - // - //status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) - //assert.NoError(t, err, "error should be nil") - //assert.True(t, status.Payload.Suspended) - // - //// resume and check status - //_, err = d.Supervisor().Resume(spec.DataSchema.DataSource) - //assert.NoError(t, err, "error should be nil") - // - //status, err = d.Supervisor().GetStatus(spec.DataSchema.DataSource) - //assert.NoError(t, err, "error should be nil") - //assert.Equal(t, "PENDING", status.Payload.State) - //assert.False(t, status.Payload.Suspended) - // - //// terminate - //id, err = d.Supervisor().Terminate(spec.DataSchema.DataSource) - //assert.NoError(t, err, "error should be nil") - //assert.Equal(t, id, spec.DataSchema.DataSource) }