diff --git a/internal/controller/telemetry/collector_controller.go b/internal/controller/telemetry/collector_controller.go index c2dd298..6b6d078 100644 --- a/internal/controller/telemetry/collector_controller.go +++ b/internal/controller/telemetry/collector_controller.go @@ -50,7 +50,7 @@ import ( const ( otelCollectorKind = "OpenTelemetryCollector" requeueDelayOnFailedTenant = 20 * time.Second - axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0-dev10" + axoflowOtelCollectorImageRef = "ghcr.io/axoflow/axoflow-otel-collector/axoflow-otel-collector:0.112.0-dev12" ) var ( diff --git a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml index 8e2c489..fffe987 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml @@ -1,4 +1,11 @@ connectors: + bytes/exporter: + logs: + otelcol_exporter_sent_log_records_bytes: + description: + Bytes of log records successfully sent to destination + attributes: + - key: exporter count/output_metrics: logs: telemetry_controller_output_log_count: @@ -371,6 +378,7 @@ service: exporters: - otlphttp/collector_loki-test-output - count/output_metrics + - bytes/exporter processors: - memory_limiter - attributes/exporter_name_loki-test-output @@ -380,6 +388,7 @@ service: exporters: - otlp/collector_otlp-test-output - count/output_metrics + - bytes/exporter processors: - memory_limiter - attributes/exporter_name_otlp-test-output @@ -390,6 +399,7 @@ service: exporters: - otlp/collector_otlp-test-output-2 - count/output_metrics + - bytes/exporter processors: - memory_limiter - attributes/exporter_name_otlp-test-output-2 @@ -399,6 +409,7 @@ service: exporters: - fluentforwardexporter/collector_fluentforward-test-output - count/output_metrics + - bytes/exporter processors: - memory_limiter - attributes/exporter_name_fluentforward-test-output @@ -408,6 +419,7 @@ service: exporters: - otlp/collector_otlp-test-output-3 - count/output_metrics + - bytes/exporter processors: - memory_limiter - attributes/exporter_name_otlp-test-output-3 @@ -468,6 +480,15 @@ service: - attributes/metricattributes receivers: - count/output_metrics + metrics/output_bytes: + exporters: + - prometheus/message_metrics_exporter + processors: + - memory_limiter + - deltatocumulative + - attributes/metricattributes + receivers: + - bytes/exporter metrics/tenant: exporters: - prometheus/message_metrics_exporter diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index 66bd737..c372773 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -162,6 +162,7 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any { func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { connectors := make(map[string]any) maps.Copy(connectors, connector.GenerateCountConnectors()) + maps.Copy(connectors, connector.GenerateBytesConnectors()) for _, tenant := range cfgInput.Tenants { // Generate routing connector for the tenant's subscription if it has any @@ -189,6 +190,7 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1beta1.Pipeline { const outputCountConnectorName = "count/output_metrics" + const outputBytesConnectorName = "bytes/exporter" var namedPipelines = make(map[string]*otelv1beta1.Pipeline) tenants := []string{} @@ -235,15 +237,15 @@ func (cfgInput *OtelColConfigInput) generateNamedPipelines() map[string]*otelv1b var exporters []string if output.Output.Spec.OTLPGRPC != nil { - exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName} + exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName} } if output.Output.Spec.OTLPHTTP != nil { - exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName} + exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName} } if output.Output.Spec.Fluentforward != nil { - exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName} + exporters = []string{components.GetExporterNameForOutput(output.Output), outputCountConnectorName, outputBytesConnectorName} } if cfgInput.Debug { exporters = append(exporters, "debug") diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index 2e602a1..7bb985f 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -497,6 +497,11 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { []string{"deltatocumulative", "attributes/metricattributes"}, []string{"prometheus/message_metrics_exporter"}, ), + "metrics/output_bytes": pipeline.GeneratePipeline( + []string{"bytes/exporter"}, + []string{"deltatocumulative", "attributes/metricattributes"}, + []string{"prometheus/message_metrics_exporter"}, + ), "metrics/tenant": pipeline.GeneratePipeline( []string{"count/tenant_metrics"}, []string{"deltatocumulative", "attributes/metricattributes"}, @@ -708,6 +713,11 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Processors: []string{"deltatocumulative", "attributes/metricattributes"}, Exporters: []string{"prometheus/message_metrics_exporter"}, }, + "metrics/output_bytes": { + Receivers: []string{"bytes/exporter"}, + Processors: []string{"deltatocumulative", "attributes/metricattributes"}, + Exporters: []string{"prometheus/message_metrics_exporter"}, + }, "metrics/tenant": { Receivers: []string{"count/tenant_metrics"}, Processors: []string{"deltatocumulative", "attributes/metricattributes"}, diff --git a/internal/controller/telemetry/pipeline/components/connector/bytes_connector.go b/internal/controller/telemetry/pipeline/components/connector/bytes_connector.go new file mode 100644 index 0000000..6ff1b0a --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/connector/bytes_connector.go @@ -0,0 +1,45 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connector + +import "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" + +type BytesConnectorAttributes struct { + Key *string `json:"key,omitempty"` + DefaultValue *string `json:"default_value,omitempty"` +} + +type BytesConnector struct { + Description string `json:"description,omitempty"` + Attributes []BytesConnectorAttributes `json:"attributes,omitempty"` +} + +func GenerateBytesConnectors() map[string]any { + bytesConnectors := make(map[string]any) + + bytesConnectors["bytes/exporter"] = map[string]any{ + "logs": map[string]BytesConnector{ + "otelcol_exporter_sent_log_records_bytes": { + Description: "Bytes of log records successfully sent to destination", + Attributes: []BytesConnectorAttributes{{ + Key: utils.ToPtr("exporter"), + }}, + }, + }, + } + + return bytesConnectors + +} diff --git a/internal/controller/telemetry/pipeline/pipeline.go b/internal/controller/telemetry/pipeline/pipeline.go index c8366d8..8dbd963 100644 --- a/internal/controller/telemetry/pipeline/pipeline.go +++ b/internal/controller/telemetry/pipeline/pipeline.go @@ -63,6 +63,12 @@ func GenerateMetricsPipelines() map[string]*otelv1beta1.Pipeline { Exporters: []string{"prometheus/message_metrics_exporter"}, } + metricsPipelines["metrics/output_bytes"] = &otelv1beta1.Pipeline{ + Receivers: []string{"bytes/exporter"}, + Processors: []string{"deltatocumulative", "attributes/metricattributes"}, + Exporters: []string{"prometheus/message_metrics_exporter"}, + } + return metricsPipelines }