Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Add logs and metrics k8s attributes verification tests #1610

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 176 additions & 1 deletion functional_tests/configuration_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func Test_Functions(t *testing.T) {
t.Run("agent logs and metrics enabled or disabled", testAgentLogsAndMetrics)
t.Run("logs and metrics index switch", testIndexSwitch)
t.Run("cluster receiver enabled or disabled", testClusterReceiverEnabledOrDisabled)
t.Run("resource attributes verification", testVerifyResourceAttributes)

}

Expand Down Expand Up @@ -347,6 +348,103 @@ func testClusterReceiverEnabledOrDisabled(t *testing.T) {
resetLogsSink(t, logsObjectsConsumer)
}

func testVerifyResourceAttributes(t *testing.T) {
attributesList := [4]string{"k8s.node.name", "k8s.pod.name", "k8s.pod.uid", "k8s.namespace.name"}

hostEp := hostEndpoint(t)
if len(hostEp) == 0 {
require.Fail(t, "Host endpoint not found")
}

t.Run("verify cluster receiver resource attributes", func(t *testing.T) {
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
logsObjectsConsumer := setupOnce(t).logsObjectsConsumer
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)

replacements := map[string]interface{}{
"ClusterReceiverEnabled": true,
"LogObjectsHecEndpoint": logsObjectsHecEndpoint,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetLogsSink(t, logsObjectsConsumer)
waitForLogs(t, 5, logsObjectsConsumer)
t.Logf("===> >>>> Logs: %v", len(logsObjectsConsumer.AllLogs()))

for _, attr := range attributesList {
t.Log("Checking resourceAttribute: ", attr)
resourceAttrValues, notFoundCounter := getLogsResourceAttribute(logsObjectsConsumer.AllLogs(), attr)
assert.True(t, len(resourceAttrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Resource Attributes: %v", resourceAttrValues)
}
})

t.Run("verify cluster receiver metrics resource attributes", func(t *testing.T) {
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)

replacements := map[string]interface{}{
"ClusterReceiverEnabled": true,
"LogObjectsHecEndpoint": logsObjectsHecEndpoint,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetMetricsSink(t, hecMetricsConsumer)
t.Logf("===> >>>> Metrics: %v", len(hecMetricsConsumer.AllMetrics()))

waitForMetrics(t, 5, hecMetricsConsumer)
for _, attr := range attributesList {
t.Log("Checking resourceAttribute: ", attr)
resourceAttrValues, notFoundCounter := getMetricsResourceAttribute(hecMetricsConsumer.AllMetrics(), attr)
assert.True(t, len(resourceAttrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Resource Attributes for %s: %v", attr, resourceAttrValues)
}
})

t.Run("verify agent logs resource attributes", func(t *testing.T) {
valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl"
agentLogsConsumer := setupOnce(t).logsConsumer

replacements := map[string]interface{}{
"MetricsEnabled": true,
"LogsEnabled": true,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetLogsSink(t, agentLogsConsumer)

waitForLogs(t, 5, agentLogsConsumer)
for _, attr := range attributesList {
t.Log("Checking resourceAttribute: ", attr)
resourceAttrValues, notFoundCounter := getLogsResourceAttribute(agentLogsConsumer.AllLogs(), attr)
assert.True(t, len(resourceAttrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Resource Attributes: %v", resourceAttrValues)
}
})

t.Run("verify metrics resource attributes", func(t *testing.T) {
valuesFileName := "values_logs_and_metrics_switching.yaml.tmpl"
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer

replacements := map[string]interface{}{
"MetricsEnabled": true,
"LogsEnabled": true,
}
deployChartsAndApps(t, valuesFileName, replacements)
resetMetricsSink(t, hecMetricsConsumer)

waitForMetrics(t, 5, hecMetricsConsumer)
for _, attr := range attributesList {
t.Log("Checking resourceAttribute: ", attr)
resourceAttrValues, notFoundCounter := getMetricsResourceAttribute(hecMetricsConsumer.AllMetrics(), attr)
assert.True(t, len(resourceAttrValues) >= 1)
assert.Equal(t, 0, notFoundCounter)
t.Logf("Resource Attributes for %s: %v", attr, resourceAttrValues)
}
})
}

func checkPodExists(pods *corev1.PodList, podNamePrefix string) bool {
for _, pod := range pods.Items {
if strings.HasPrefix(pod.Name, podNamePrefix) {
Expand Down Expand Up @@ -410,7 +508,6 @@ func getMetricsIndex(metrics []pmetric.Metrics) []string {
var indices []string
for i := 0; i < len(metrics); i++ {
m := metrics[i]
fmt.Printf("Metrics: %v", m.ResourceMetrics().At(0).Resource().Attributes())
if value, ok := m.ResourceMetrics().At(0).Resource().Attributes().Get("com.splunk.index"); ok {
index := value.AsString()
if !contains(indices, index) {
Expand All @@ -430,6 +527,84 @@ func contains(list []string, newValue string) bool {
return false
}

func getLogsResourceAttribute(logs []plog.Logs, attributeName string) ([]string, int) {
var resourceAttributes []string
var notFoundCounter int = 0

for i := 0; i < len(logs); i++ {
l := logs[i]
for j := 0; j < l.ResourceLogs().Len(); j++ {
rl := l.ResourceLogs().At(j)
for k := 0; k < rl.ScopeLogs().Len(); k++ {
sl := rl.ScopeLogs().At(k)
for m := 0; m < sl.LogRecords().Len(); m++ {
tmpAttribute, ok := sl.LogRecords().At(m).Attributes().Get(attributeName)
if ok {
if !contains(resourceAttributes, tmpAttribute.AsString()) {
resourceAttributes = append(resourceAttributes, tmpAttribute.AsString())
}
} else {
fmt.Println("== Resource Attribute not found: ", attributeName)
notFoundCounter++
}
}
}
}
}
return resourceAttributes, notFoundCounter
}

func getMetricsResourceAttribute(metrics []pmetric.Metrics, attributeName string) ([]string, int) {
jvoravong marked this conversation as resolved.
Show resolved Hide resolved
var resourceAttributes []string
var notFoundCounter int = 0
var foundCounter int = 0
var skippedCounter int = 0
prefixesForMetricsToSkip := []string{
// agent metrics
"system.", "k8s.node.",
// cluster receiver metrics
"k8s.deployment.", "k8s.namespace.", "k8s.replicaset.", "k8s.daemonset.", "k8s.node.",
}

for i := 0; i < len(metrics); i++ {
m := metrics[i]
for j := 0; j < m.ResourceMetrics().Len(); j++ {
rm := m.ResourceMetrics().At(j)
for k := 0; k < rm.ScopeMetrics().Len(); k++ {
sm := rm.ScopeMetrics().At(k)
for l := 0; l < sm.Metrics().Len(); l++ {
skip := false
for _, prefix := range prefixesForMetricsToSkip {
if strings.HasPrefix(sm.Metrics().At(l).Name(), prefix) {
skip = true
break
}
}
if skip {
skippedCounter++
continue
}
for m := 0; m < sm.Metrics().At(l).Gauge().DataPoints().Len(); m++ {
tmpAttribute, ok := sm.Metrics().At(l).Gauge().DataPoints().At(m).Attributes().Get(attributeName)

if ok {
if !contains(resourceAttributes, tmpAttribute.AsString()) {
resourceAttributes = append(resourceAttributes, tmpAttribute.AsString())
}
foundCounter++
} else {
fmt.Printf("Resource Attribute %s not found for metric: %v \n", attributeName, sm.Metrics().At(l).Name())
notFoundCounter++
}
}
}
}
}
}
fmt.Printf("Counters: Found: %d, Skipped: %d, not Found: %d\n", foundCounter, skippedCounter, notFoundCounter)
return resourceAttributes, notFoundCounter
}

func uninstallDeployment(t *testing.T) {
testKubeConfig, setKubeConfig := os.LookupEnv("KUBECONFIG")
require.True(t, setKubeConfig, "the environment variable KUBECONFIG must be set")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
splunkPlatform:
token: foobar
endpoint: {{ .LogHecEndpoint }}
metricsEnabled: true
metricsIndex: myMetricsIndex

agent:
enabled: false

clusterReceiver:
enabled: {{ .ClusterReceiverEnabled }}
config:
exporters:
splunk_hec/platform_logs:
endpoint: {{ .LogObjectsHecEndpoint }}
splunk_hec/platform_metrics:
endpoint: {{ .MetricHecEndpoint }}
k8sObjects:
- name: pods
mode: pull
interval: 5s
- name: events
mode: watch

clusterName: dev-operator
Loading