Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add log agent #1786

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions controllers/telemetry/logpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.

import (
"context"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log/agent"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -83,9 +84,11 @@ type LogPipelineControllerConfig struct {
OTelCollectorImage string
FluentBitPriorityClassName string
LogGatewayPriorityClassName string
LogAgentPriorityClassName string
RestConfig *rest.Config
SelfMonitorName string
TelemetryNamespace string
ModuleVersion string
}

func NewLogPipelineController(client client.Client, reconcileTriggerChan <-chan event.GenericEvent, config LogPipelineControllerConfig) (*LogPipelineController, error) {
Expand Down Expand Up @@ -207,9 +210,19 @@ func configureOtelReconciler(client client.Client, config LogPipelineControllerC
return nil, err
}

agentConfigBuilder := &agent.Builder{
Config: agent.BuilderConfig{
GatewayOTLPServiceName: types.NamespacedName{Namespace: config.TelemetryNamespace, Name: otelcollector.LogOTLPServiceName},
},
}

otelReconciler := logpipelineotel.New(
client,
config.TelemetryNamespace,
config.ModuleVersion,
agentConfigBuilder,
otelcollector.NewLogAgentApplierDeleter(config.OTelCollectorImage, config.TelemetryNamespace, config.LogAgentPriorityClassName),
&workloadstatus.DaemonSetProber{Client: client},
otelcollector.NewLogGatewayApplierDeleter(config.OTelCollectorImage, config.TelemetryNamespace, config.LogGatewayPriorityClassName),
&gateway.Builder{Reader: client},
&workloadstatus.DeploymentProber{Client: client},
Expand Down
15 changes: 15 additions & 0 deletions internal/otelcollector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ type Base struct {
Service Service `yaml:"service"`
}

type BaseExtensions struct {
HealthCheck Endpoint `yaml:"health_check,omitempty"`
Pprof Endpoint `yaml:"pprof,omitempty"`
}
type Extensions struct {
HealthCheck Endpoint `yaml:"health_check,omitempty"`
Pprof Endpoint `yaml:"pprof,omitempty"`
Expand Down Expand Up @@ -104,3 +108,14 @@ func DefaultExtensions() Extensions {
},
}
}

func DefaultBaseExtensions() BaseExtensions {
return BaseExtensions{
HealthCheck: Endpoint{
Endpoint: fmt.Sprintf("${%s}:%d", EnvVarCurrentPodIP, ports.HealthCheck),
},
Pprof: Endpoint{
Endpoint: fmt.Sprintf("127.0.0.1:%d", ports.Pprof),
},
}
}
59 changes: 59 additions & 0 deletions internal/otelcollector/config/log/agent/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package agent

import (
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log"
)

type Config struct {
Service config.Service `yaml:"service"`
Extensions Extensions `yaml:"extensions"`

Receivers Receivers `yaml:"receivers"`
Processors Processors `yaml:"processors"`
Exporters Exporters `yaml:"exporters"`
}

type Receivers struct {
FileLog *FileLog `yaml:"filelog"`
}

type FileLog struct {
Exclude []string `yaml:"exclude,omitempty"`
Include []string `yaml:"include,omitempty"`
IncludeFileName bool `yaml:"include_file_name,omitempty"`
IncludeFilePath bool `yaml:"include_file_path,omitempty"`
StartAt string `yaml:"start_at,omitempty"`
Storage string `yaml:"storage,omitempty"`
Operators []Operator `yaml:"operators,omitempty"`
}

type Operator struct {
Id string `yaml:"id,omitempty"`
Type string `yaml:"type,omitempty"`
AddMetadataFromFilePath *bool `yaml:"add_metadata_from_file_path"`
Format string `yaml:"format,omitempty"`
From string `yaml:"from,omitempty"`
To string `yaml:"to,omitempty"`
IfExpr string `yaml:"if,omitempty"`
ParseFrom string `yaml:"parse_from,omitempty"`
ParseTo string `yaml:"parse_to,omitempty"`
}

type Processors struct {
config.BaseProcessors `yaml:",inline"`
SetInstrumentationScopeRuntime *log.TransformProcessor `yaml:"transform/set-instrumentation-scope-runtime,omitempty"`
}

type Exporters struct {
OTLP *config.OTLPExporter `yaml:"otlp"`
}

type Extensions struct {
config.BaseExtensions `yaml:",inline"`
FileStorage *FileStorage `yaml:"file_storage,omitempty"`
}

type FileStorage struct {
Directory string `yaml:"directory,omitempty"`
}
64 changes: 64 additions & 0 deletions internal/otelcollector/config/log/agent/config_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package agent

import (
"fmt"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/ports"
"k8s.io/apimachinery/pkg/types"
)

type BuilderConfig struct {
GatewayOTLPServiceName types.NamespacedName
}
type Builder struct {
Config BuilderConfig
}

type BuildOptions struct {
InstrumentationScopeVersion string
AgentNamespace string
}

func (b *Builder) Build(opts BuildOptions) *Config {
logService := config.DefaultService(makePipelinesConfig())
// Overwrite the extension from default service name
logService.Extensions = []string{"health_check", "pprof", "file_storage"}
return &Config{
Service: logService,
Extensions: makeExtensionsConfig(),

Receivers: makeReceivers(),
Processors: makeProcessorsConfig(opts.InstrumentationScopeVersion),
Exporters: makeExportersConfig(b.Config.GatewayOTLPServiceName),
}
}

func makePipelinesConfig() config.Pipelines {
pipelinesConfig := make(config.Pipelines)
pipelinesConfig["logs"] = config.Pipeline{
Receivers: []string{"filelog"},
Processors: []string{"memory_limiter", "transform/set-instrumentation-scope-runtime"},
Exporters: []string{"otlp"},
}
return pipelinesConfig
}

func makeExportersConfig(gatewayServiceName types.NamespacedName) Exporters {
return Exporters{
OTLP: &config.OTLPExporter{
Endpoint: fmt.Sprintf("%s.%s.svc.cluster.local:%d", gatewayServiceName.Name, gatewayServiceName.Namespace, ports.OTLPGRPC),
TLS: config.TLS{
Insecure: true,
},
SendingQueue: config.SendingQueue{
Enabled: false,
},
RetryOnFailure: config.RetryOnFailure{
Enabled: true,
InitialInterval: "5s",
MaxInterval: "30s",
MaxElapsedTime: "300s",
},
},
}
}
81 changes: 81 additions & 0 deletions internal/otelcollector/config/log/agent/config_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package agent

import (
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/ports"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"
"testing"
)

func TestBuildAgentConfig(t *testing.T) {
gatewayServiceName := types.NamespacedName{Name: "logs", Namespace: "telemetry-system"}
sut := Builder{
Config: BuilderConfig{
GatewayOTLPServiceName: gatewayServiceName,
},
}

t.Run("otlp exporter endpoint", func(t *testing.T) {
collectorConfig := sut.Build(BuildOptions{})
actualExporterConfig := collectorConfig.Exporters.OTLP
require.Equal(t, "logs.telemetry-system.svc.cluster.local:4317", actualExporterConfig.Endpoint)
})

t.Run("insecure", func(t *testing.T) {
t.Run("otlp exporter endpoint", func(t *testing.T) {
collectorConfig := sut.Build(BuildOptions{})

actualExporterConfig := collectorConfig.Exporters.OTLP
require.True(t, actualExporterConfig.TLS.Insecure)
})
})

t.Run("extensions", func(t *testing.T) {
collectorConfig := sut.Build(BuildOptions{})

require.NotEmpty(t, collectorConfig.Extensions.HealthCheck.Endpoint)
require.Contains(t, collectorConfig.Service.Extensions, "health_check")

require.NotEmpty(t, t, collectorConfig.Extensions.Pprof.Endpoint)
require.Contains(t, collectorConfig.Service.Extensions, "pprof")

require.NotEmpty(t, collectorConfig.Extensions.FileStorage)
require.Contains(t, collectorConfig.Service.Extensions, "file_storage")
})

t.Run("telemetry", func(t *testing.T) {
collectorConfig := sut.Build(BuildOptions{})

metricreaders := []config.MetricReader{
{
Pull: config.PullMetricReader{
Exporter: config.MetricExporter{
Prometheus: config.PrometheusMetricExporter{
Host: "${MY_POD_IP}",
Port: ports.Metrics,
},
},
},
},
}
require.Equal(t, "info", collectorConfig.Service.Telemetry.Logs.Level)
require.Equal(t, "json", collectorConfig.Service.Telemetry.Logs.Encoding)
require.Equal(t, metricreaders, collectorConfig.Service.Telemetry.Metrics.Readers)
})

t.Run("single pipeline topology", func(t *testing.T) {
t.Run("application log input enabled", func(t *testing.T) {
collectorConfig := sut.Build(BuildOptions{})

require.Len(t, collectorConfig.Service.Pipelines, 1)
require.Contains(t, collectorConfig.Service.Pipelines, "logs")

require.Equal(t, []string{"filelog"}, collectorConfig.Service.Pipelines["logs"].Receivers)
require.Equal(t, []string{"memory_limiter", "transform/set-instrumentation-scope-runtime"}, collectorConfig.Service.Pipelines["logs"].Processors)
require.Equal(t, []string{"otlp"}, collectorConfig.Service.Pipelines["logs"].Exporters)

})

})
}
15 changes: 15 additions & 0 deletions internal/otelcollector/config/log/agent/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package agent

import (
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config"
"github.com/kyma-project/telemetry-manager/internal/resources/otelcollector"
)

func makeExtensionsConfig() Extensions {
return Extensions{
BaseExtensions: config.DefaultBaseExtensions(),
FileStorage: &FileStorage{
Directory: otelcollector.CheckpointVolumePath,
},
}
}
13 changes: 13 additions & 0 deletions internal/otelcollector/config/log/agent/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package agent

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestMakeExtension(t *testing.T) {
ext := makeExtensionsConfig()
require.Equal(t, "/var/log/otel", ext.FileStorage.Directory)
require.Equal(t, "${MY_POD_IP}:13133", ext.HealthCheck.Endpoint)
require.Equal(t, "127.0.0.1:1777", ext.Pprof.Endpoint)
}
39 changes: 39 additions & 0 deletions internal/otelcollector/config/log/agent/processors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package agent

import (
"fmt"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log"
)

func makeProcessorsConfig(instrumentationScopeVersion string) Processors {
return Processors{
BaseProcessors: config.BaseProcessors{
MemoryLimiter: makeMemoryLimiterConfig(),
},
SetInstrumentationScopeRuntime: makeInstrumentationScopeRuntime(instrumentationScopeVersion),
}
}

func makeMemoryLimiterConfig() *config.MemoryLimiter {
return &config.MemoryLimiter{
CheckInterval: "5s",
LimitPercentage: 80,
SpikeLimitPercentage: 25,
}
}

func makeInstrumentationScopeRuntime(instrumentationScopeVersion string) *log.TransformProcessor {
return &log.TransformProcessor{
ErrorMode: "ignore",
LogStatements: []config.TransformProcessorStatements{
{
Context: "scope",
Statements: []string{
fmt.Sprintf("set(version, \"%s\")", instrumentationScopeVersion),
fmt.Sprintf("set(name, \"%s\")", "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver"),
},
},
},
}
}
19 changes: 19 additions & 0 deletions internal/otelcollector/config/log/agent/processors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package agent

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestProcessorConfig(t *testing.T) {
processorsConfig := makeProcessorsConfig("v1.0.0")
require.Equal(t, "scope", processorsConfig.SetInstrumentationScopeRuntime.Context)

Check failure on line 10 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / lint

processorsConfig.SetInstrumentationScopeRuntime.Context undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Context)

Check failure on line 10 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / coverage

processorsConfig.SetInstrumentationScopeRuntime.Context undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Context)
require.Len(t, processorsConfig.SetInstrumentationScopeRuntime.Statements, 2)

Check failure on line 11 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / lint

processorsConfig.SetInstrumentationScopeRuntime.Statements undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Statements)

Check failure on line 11 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / coverage

processorsConfig.SetInstrumentationScopeRuntime.Statements undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Statements)
require.Equal(t, "set(version, \"v1.0.0\")", processorsConfig.SetInstrumentationScopeRuntime.Statements[0])

Check failure on line 12 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / lint

processorsConfig.SetInstrumentationScopeRuntime.Statements undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Statements)

Check failure on line 12 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / coverage

processorsConfig.SetInstrumentationScopeRuntime.Statements undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Statements)
require.Equal(t, "set(name, \"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver\")", processorsConfig.SetInstrumentationScopeRuntime.Statements[1])

Check failure on line 13 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / lint

processorsConfig.SetInstrumentationScopeRuntime.Statements undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Statements) (typecheck)

Check failure on line 13 in internal/otelcollector/config/log/agent/processors_test.go

View workflow job for this annotation

GitHub Actions / coverage

processorsConfig.SetInstrumentationScopeRuntime.Statements undefined (type *"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log".TransformProcessor has no field or method Statements)

require.Equal(t, "5s", processorsConfig.BaseProcessors.MemoryLimiter.CheckInterval)
require.Equal(t, 80, processorsConfig.BaseProcessors.MemoryLimiter.LimitPercentage)
require.Equal(t, 25, processorsConfig.BaseProcessors.MemoryLimiter.SpikeLimitPercentage)

}
Loading
Loading