Skip to content

Commit

Permalink
the log agent runs
Browse files Browse the repository at this point in the history
  • Loading branch information
rakesh-garimella committed Jan 27, 2025
1 parent e17aef3 commit 448516b
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 46 deletions.
2 changes: 2 additions & 0 deletions controllers/telemetry/logpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type LogPipelineControllerConfig struct {
OTelCollectorImage string
FluentBitPriorityClassName string
LogGatewayPriorityClassName string
LogAgentPriorityClassName string
RestConfig *rest.Config
SelfMonitorName string
TelemetryNamespace string
Expand Down Expand Up @@ -220,6 +221,7 @@ func configureOtelReconciler(client client.Client, config LogPipelineControllerC
config.TelemetryNamespace,
config.ModuleVersion,
agentConfigBuilder,
otelcollector.NewLogAgentApplierDeleter(config.OTelCollectorImage, config.TelemetryNamespace, config.LogAgentPriorityClassName),
&workloadstatus.DaemonSetProber{Client: client},
otelcollector.NewLogGatewayApplierDeleter(config.OTelCollectorImage, config.TelemetryNamespace, config.LogGatewayPriorityClassName),
&gateway.Builder{Reader: client},
Expand Down
7 changes: 5 additions & 2 deletions internal/otelcollector/config/log/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log"
)

type Config struct {
Expand Down Expand Up @@ -41,11 +42,11 @@ type Operator struct {

type Processors struct {
config.BaseProcessors `yaml:",inline"`
SetInstrumentationScopeRuntime *config.TransformProcessorStatements `yaml:"transform/set-instrumentation-scope-runtime,omitempty"`
SetInstrumentationScopeRuntime *log.TransformProcessor `yaml:"transform/set-instrumentation-scope-runtime,omitempty"`
}

type Exporters struct {
OTLP *config.OTLPExporter `yaml:",inline,omitempty"`
OTLP *config.OTLPExporter `yaml:"otlp"`
}

type Extensions struct {
Expand All @@ -54,5 +55,7 @@ type Extensions struct {
}

type FileStorage struct {
// Create directory if it does not exist
//CreateDirectory bool `yaml:"create_directory,omitempty"`
Directory string `yaml:"directory,omitempty"`
}
3 changes: 2 additions & 1 deletion internal/otelcollector/config/log/agent/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ func makeExtensionsConfig() Extensions {
return Extensions{
BaseExtensions: config.DefaultBaseExtensions(),
FileStorage: &FileStorage{
Directory: "/var/log/otel",
//CreateDirectory: true,
Directory: "/var/lib/otelcol",
},
}
}
18 changes: 12 additions & 6 deletions internal/otelcollector/config/log/agent/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent
import (
"fmt"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config"
"github.com/kyma-project/telemetry-manager/internal/otelcollector/config/log"
)

func makeProcessorsConfig(instrumentationScopeVersion string) Processors {
Expand All @@ -22,12 +23,17 @@ func makeMemoryLimiterConfig() *config.MemoryLimiter {
}
}

func makeInstrumentationScopeRuntime(instrumentationScopeVersion string) *config.TransformProcessorStatements {
return &config.TransformProcessorStatements{
Context: "scope",
Statements: []string{
fmt.Sprintf("set(version, \"%s\")", instrumentationScopeVersion),
fmt.Sprintf("set(name, \"%s\")", "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver"),
func makeInstrumentationScopeRuntime(instrumentationScopeVersion string) *log.TransformProcessor {
return &log.TransformProcessor{
ErrorMode: "ignore",
LogStatements: []config.TransformProcessorStatements{
{
Context: "scope",
Statements: []string{
fmt.Sprintf("set(version, \"%s\")", instrumentationScopeVersion),
fmt.Sprintf("set(name, \"%s\")", "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver"),
},
},
},
}
}
9 changes: 9 additions & 0 deletions internal/otelcollector/config/log/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package log

import "github.com/kyma-project/telemetry-manager/internal/otelcollector/config"

// TODO: abstract it out to common config as metrics has the same setup
type TransformProcessor struct {
ErrorMode string `yaml:"error_mode"`
LogStatements []config.TransformProcessorStatements `yaml:"log_statements"`
}
2 changes: 2 additions & 0 deletions internal/reconciler/logpipeline/otel/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func New(
telemetryNamespace string,
moduleVersion string,
agentConfigBuilder AgentConfigBuilder,
agentApplierDeleter AgentApplierDeleter,
agentProber commonstatus.Prober,
gatewayApplierDeleter GatewayApplierDeleter,
gatewayConfigBuilder GatewayConfigBuilder,
Expand All @@ -90,6 +91,7 @@ func New(
telemetryNamespace: telemetryNamespace,
moduleVersion: moduleVersion,
agentConfigBuilder: agentConfigBuilder,
agentApplierDeleter: agentApplierDeleter,
agentProber: agentProber,
gatewayApplierDeleter: gatewayApplierDeleter,
gatewayConfigBuilder: gatewayConfigBuilder,
Expand Down
186 changes: 166 additions & 20 deletions internal/resources/otelcollector/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"k8s.io/utils/ptr"
"maps"
"strconv"

Expand All @@ -24,33 +25,72 @@ import (
const (
IstioCertPath = "/etc/istio-output-certs"
MetricAgentName = "telemetry-metric-agent"
LogAgentName = "telemetry-log-agent"

istioCertVolumeName = "istio-certs"
metricAgentScrapeKey = "telemetry.kyma-project.io/metric-scrape"
logAgentScrapeKey = "telemetry.kyma-project.io/log-scrape"
)

var (
metricAgentMemoryLimit = resource.MustParse("1200Mi")
metricAgentCPURequest = resource.MustParse("15m")
metricAgentMemoryRequest = resource.MustParse("50Mi")

logAgentMemoryLimit = resource.MustParse("1200Mi")
logAgentCPURequest = resource.MustParse("15m")
logAgentMemoryRequest = resource.MustParse("50Mi")
)

func NewMetricAgentApplierDeleter(image, namespace, priorityClassName string) *AgentApplierDeleter {
func NewLogAgentApplierDeleter(image, namespace, priorityClassName string) *AgentApplierDeleter {
extraLabels := map[string]string{
metricAgentScrapeKey: "true",
logAgentScrapeKey: "true",
istioSidecarInjectKey: "true", // inject Istio sidecar for SDS certificates and agent-to-gateway communication
}

return &AgentApplierDeleter{
baseName: MetricAgentName,
baseName: LogAgentName,
extraPodLabel: extraLabels,
image: image,
namespace: namespace,
priorityClassName: priorityClassName,
rbac: makeMetricAgentRBAC(namespace),
memoryLimit: metricAgentMemoryLimit,
cpuRequest: metricAgentCPURequest,
memoryRequest: metricAgentMemoryRequest,
memoryLimit: logAgentMemoryLimit,
cpuRequest: logAgentCPURequest,
memoryRequest: logAgentMemoryRequest,
volumes: []corev1.Volume{
makeIstioCertVolume(),
makePodLogsVolume(),
makeFileLogCheckpointVolume(),
},
volumeMounts: []corev1.VolumeMount{
makeIstioCertVolumeMount(),
makePodLogsVolumeMount(),
makeFileLogCheckPointVolumeMount(),
},
securityContext: makeLogAgentSecurityContext(),
podSecurityContext: makeLogAgentPodSecurityContext(),
}
}

func NewMetricAgentApplierDeleter(image, namespace, priorityClassName string) *AgentApplierDeleter {
extraLabels := map[string]string{
metricAgentScrapeKey: "true",
istioSidecarInjectKey: "true", // inject Istio sidecar for SDS certificates and agent-to-gateway communication
}

return &AgentApplierDeleter{
baseName: MetricAgentName,
extraPodLabel: extraLabels,
image: image,
namespace: namespace,
priorityClassName: priorityClassName,
rbac: makeMetricAgentRBAC(namespace),
memoryLimit: metricAgentMemoryLimit,
cpuRequest: metricAgentCPURequest,
memoryRequest: metricAgentMemoryRequest,
volumes: []corev1.Volume{makeIstioCertVolume()},
volumeMounts: []corev1.VolumeMount{makeIstioCertVolumeMount()},
securityContext: makeMetricAgentSecurityContext(),
podSecurityContext: makeMetricAgentPodSecurityContext(),
}
}

Expand All @@ -62,6 +102,11 @@ type AgentApplierDeleter struct {
priorityClassName string
rbac rbac

volumes []corev1.Volume
volumeMounts []corev1.VolumeMount
securityContext *corev1.SecurityContext
podSecurityContext *corev1.PodSecurityContext

memoryLimit resource.Quantity
cpuRequest resource.Quantity
memoryRequest resource.Quantity
Expand Down Expand Up @@ -134,18 +179,10 @@ func (aad *AgentApplierDeleter) makeAgentDaemonSet(configChecksum string) *appsv
withEnvVarFromSource(config.EnvVarCurrentNodeName, fieldPathNodeName),
commonresources.WithGoMemLimitEnvVar(aad.memoryLimit),

// emptyDir volume for Istio certificates
withVolume(corev1.Volume{
Name: istioCertVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}),
withVolumeMount(corev1.VolumeMount{
Name: istioCertVolumeName,
MountPath: IstioCertPath,
ReadOnly: true,
}),
withVolumes(aad.volumes),
withVolumeMounts(aad.volumeMounts),
withSecurityContext(aad.securityContext),
withPodSecurityContext(aad.podSecurityContext),
}

podSpec := makePodSpec(aad.baseName, aad.image, opts...)
Expand Down Expand Up @@ -200,3 +237,112 @@ proxyMetadata:
"traffic.sidecar.istio.io/includeOutboundIPRanges": "",
}
}

func makeIstioCertVolume() corev1.Volume {
// emptyDir volume for Istio certificates
return corev1.Volume{
Name: istioCertVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}
}

func makeIstioCertVolumeMount() corev1.VolumeMount {
return corev1.VolumeMount{
Name: istioCertVolumeName,
MountPath: IstioCertPath,
ReadOnly: true,
}
}

func makePodLogsVolume() corev1.Volume {
return corev1.Volume{
Name: "varlogpods",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/log/pods",
Type: nil,
},
},
}
}

func makePodLogsVolumeMount() corev1.VolumeMount {
return corev1.VolumeMount{
Name: "varlogpods",
MountPath: "/var/log/pods",
ReadOnly: true,
}
}

func makeFileLogCheckpointVolume() corev1.Volume {
return corev1.Volume{
Name: "varlibotelcol",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/lib/otelcol",
Type: ptr.To(corev1.HostPathDirectoryOrCreate),
},
},
}
}

func makeFileLogCheckPointVolumeMount() corev1.VolumeMount {
return corev1.VolumeMount{
Name: "varlibotelcol",
MountPath: "/var/lib/otelcol",
}
}

func makeMetricAgentSecurityContext() *corev1.SecurityContext {
return &corev1.SecurityContext{
Privileged: ptr.To(false),
RunAsUser: ptr.To(collectorUser),
RunAsNonRoot: ptr.To(true),
ReadOnlyRootFilesystem: ptr.To(true),
AllowPrivilegeEscalation: ptr.To(false),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"ALL"},
},
}
}

func makeLogAgentSecurityContext() *corev1.SecurityContext {
return &corev1.SecurityContext{
Privileged: ptr.To(false),
RunAsUser: ptr.To(int64(0)),
RunAsNonRoot: ptr.To(false),
ReadOnlyRootFilesystem: ptr.To(true),
AllowPrivilegeEscalation: ptr.To(false),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
Capabilities: &corev1.Capabilities{
Add: []corev1.Capability{"FOWNER"},
Drop: []corev1.Capability{"ALL"},
},
}
}

func makeMetricAgentPodSecurityContext() *corev1.PodSecurityContext {
return &corev1.PodSecurityContext{
RunAsUser: ptr.To(collectorUser),
RunAsNonRoot: ptr.To(true),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
}
}

func makeLogAgentPodSecurityContext() *corev1.PodSecurityContext {
return &corev1.PodSecurityContext{
RunAsNonRoot: ptr.To(false),
SeccompProfile: &corev1.SeccompProfile{
Type: corev1.SeccompProfileTypeRuntimeDefault,
},
}
}
Loading

0 comments on commit 448516b

Please sign in to comment.