Skip to content

Commit

Permalink
Adding analyzer code - so far reading query list, next add actual ana…
Browse files Browse the repository at this point in the history
…lysis

Signed-off-by: Kedar Vijay Kulkarni <[email protected]>
  • Loading branch information
Kedar Vijay Kulkarni committed Nov 1, 2021
1 parent 291d0db commit 2c7d3a4
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 68 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ This tool allows OpenShift users to run a watcher for Prometheus queries and def
* [x] Create oc cli connection to OpenShift/Kubernetes using Kubeconfig
* [x] Determine Prometheus url, bearerToken for OpenShift
* [x] If Prometheus url, bearerToken already included in the yaml, use that
* [ ] Create yaml format for queries, and expected outcomes (Use a struct to read that in)
* [ ] Spawn goroutines to keep running queries and evaluating results
* [ ] Notify/Do Something when results don't match conditions
* [x] Create yaml format for queries, and expected outcomes (Use a struct to read that in)
* [ ] Notify/Do Something when results don't match conditions
* [ ] Spawn goroutines to keep running queries and evaluating results to handle scale
50 changes: 50 additions & 0 deletions cmd/analyze/analyze.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package analyze

import (
"io/ioutil"
"log"

"sigs.k8s.io/yaml"
)

/*
This package will read the queries from relevant config file under `config/` dir
and run relevant queries using prometheus package. Then retrieve the result and
analyze it against the threshold.
*/

const configPath = "./config/"

type queryList []queries

type queries struct {
Query string `json:"query"`
WatchFor []watchList
}

type watchList struct {
Key string `json:"Key"`
Val string `json:"Val"`
Threshold string `json:"threshold"`
}

func (q *queryList) Parse(data []byte) error {
return yaml.Unmarshal(data, q)
}

func ReadPrometheusQueries() (queriesList queryList, err error) {
data, err := ioutil.ReadFile(configPath + "queries.yaml")
if err != nil {
log.Printf("Cound't read %s/queries.yaml", configPath)
return queriesList, err
}
if err := queriesList.Parse(data); err != nil {
log.Fatal(err)
}
// fmt.Println(queriesList)
if len(queriesList) == 0 {
return queriesList, nil
}

return queriesList, nil
}
96 changes: 48 additions & 48 deletions cmd/prometheus.go → cmd/prometheus/prometheus.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package cmd

/*
Program to read the prometheus config, and use it to run queries against OCP Cluster prometheus.
You can either set the Prometheus URL and BearerToken in config/prometheus.yaml or with the
KUBECONFIG env var being set, the program can use oc get routes and oc get secrets to find the URL
and bearer Token.
*/

import (
"crypto/tls"
"encoding/json"
Expand All @@ -23,8 +30,8 @@ import (
const configPath = "./config/"

type prometheusConfig struct {
Url string `json:"URL"`
BearerToken string `json:"BearerToken"`
Url string `json:"url"`
BearerToken string `json:"bearerToken"`
}

func (c *prometheusConfig) Parse(data []byte) error {
Expand Down Expand Up @@ -127,65 +134,58 @@ func runQueryViaHTTP(url, bearer string) (string, error) {
return string(contents), nil
}

func RunQueries(promQueries map[string]bool, oc *exutil.CLI, baseURL, bearerToken string) {
func RunQuery(promQuery string, oc *exutil.CLI, baseURL, bearerToken string) (prometheusResponse, error) {
// expect all correct metrics within a reasonable time period
queryErrors := make(map[string]error)
passed := make(map[string]struct{})
var result prometheusResponse
for i := 0; i < maxPrometheusQueryAttempts; i++ {
for query, expected := range promQueries {
if _, ok := passed[query]; ok {
continue
}
//TODO when the http/query apis discussed at https://github.com/prometheus/client_golang#client-for-the-prometheus-http-api
// and introduced at https://github.com/prometheus/client_golang/blob/master/api/prometheus/v1/api.go are vendored into
// openshift/origin, look to replace this homegrown http request / query param with that API
url := fmt.Sprintf("%s/api/v1/query?%s", baseURL, (url.Values{"query": []string{query}}).Encode())
contents, err := runQueryViaHTTP(url, bearerToken)
if err != nil {
log.Fatal(err)
}
// check query result, if this is a new error log it, otherwise remain silent
var result prometheusResponse
if err := json.Unmarshal([]byte(contents), &result); err != nil {
e2e.Logf("unable to parse query response for %s: %v", query, err)
continue
}
metrics := result.Data.Result
if result.Status != "success" {
data, _ := json.Marshal(metrics)
msg := fmt.Sprintf("promQL query: %s had reported incorrect status:\n%s", query, data)
if prev, ok := queryErrors[query]; !ok || prev.Error() != msg {
e2e.Logf("%s", msg)
}
queryErrors[query] = fmt.Errorf(msg)
continue
}
if (len(metrics) > 0 && !expected) || (len(metrics) == 0 && expected) {
data, _ := json.Marshal(metrics)
msg := fmt.Sprintf("promQL query: %s had reported incorrect results:\n%s", query, data)
if prev, ok := queryErrors[query]; !ok || prev.Error() != msg {
e2e.Logf("%s", msg)
}
queryErrors[query] = fmt.Errorf(msg)
continue
}
for _, r := range result.Data.Result {
log.Printf("Type is %[1]T \n Metric is: %[1]s\n\n", r.Metric)
log.Printf("Type is %[1]T \n Value is: %[1]s\n\n", r.Value)
if _, ok := passed[promQuery]; ok {
continue
}

url := fmt.Sprintf("%s/api/v1/query?%s", baseURL, (url.Values{"query": []string{promQuery}}).Encode())
contents, err := runQueryViaHTTP(url, bearerToken)
if err != nil {
log.Fatal(err)
}
// check query result, if this is a new error log it, otherwise remain silent
if err := json.Unmarshal([]byte(contents), &result); err != nil {
e2e.Logf("unable to parse query response for %s: %v", promQuery, err)
continue
}
metrics := result.Data.Result
if result.Status != "success" {
data, _ := json.Marshal(metrics)
msg := fmt.Sprintf("promQL query: %s had reported incorrect status:\n%s", promQuery, data)
if prev, ok := queryErrors[promQuery]; !ok || prev.Error() != msg {
e2e.Logf("%s", msg)
}
log.Printf("Type is %[1]T \n Result is: %[1]s\n\n", result.Data.Result[0].Metric)
// query successful
passed[query] = struct{}{}
delete(queryErrors, query)
queryErrors[promQuery] = fmt.Errorf(msg)
continue
}

// for _, r := range result.Data.Result {
// log.Printf("Type is %[1]T \n Metric is: %[1]s\n\n", r.Metric["phase"])
// log.Printf("Type is %[1]T \n Value is: %[1]s\n\n", r.Value)
// if r.Metric["phase"] == "Running" {
// log.Printf("We have %v pod in %s phase", r.Value, r.Metric["phase"])
// }
// }
// query successful
passed[promQuery] = struct{}{}
delete(queryErrors, promQuery)
// if there were no errors let's break out of the loop
if len(queryErrors) == 0 {
break
return result, nil
}
// else sleep for 10 sec
time.Sleep(prometheusQueryRetrySleep)
}

// if there were errors, dump logs from prometheus pod
if len(queryErrors) != 0 {
exutil.DumpPodLogsStartingWith("prometheus-0", oc)
}
return result, queryErrors[promQuery]
}
15 changes: 15 additions & 0 deletions config/queries.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
- query: "sum(kube_pod_status_phase{}) by (phase) > 0"
watchFor:
- key: Phase
val: "Pending"
threshold: 10
- key: Phase
val: "Failed"
threshold: 0
- query: "sum(kube_namespace_status_phase) by (phase)"
watchFor:
- key: Phase
val: "Terminating"
threshold: 0

5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ module github.com/kedark3/cpa
go 1.16

require (
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.16.0 // indirect
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.16.0
github.com/openshift/openshift-tests v0.0.0-20210916082130-4fca21c38ee6
github.com/prometheus/common v0.6.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.17.1
k8s.io/apimachinery v0.17.1
k8s.io/kubernetes v1.21.0
Expand Down
53 changes: 38 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package main

import (
"fmt"
"log"

prometheus "github.com/kedark3/cpa/cmd"
exutil "github.com/openshift/openshift-tests/test/extended/util"
analyze "github.com/kedark3/cpa/cmd/analyze"

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"
Expand All @@ -13,26 +13,49 @@ import (
func main() {
o.RegisterFailHandler(g.Fail)

oc := exutil.NewCLI("prometheus-cpa", exutil.KubeConfigPath())
// oc := exutil.NewCLI("prometheus-cpa", exutil.KubeConfigPath())
// secrets, err := oc.AdminKubeClient().CoreV1().Secrets("openshift-monitoring").List(metav1.ListOptions{})

// if err != nil {
// log.Printf("An Error has occured %s", err)
// return
// }
// log.Printf("Found following secrets %d", secrets.Size())
url, bearerToken, ok := prometheus.LocatePrometheus(oc)
if !ok {
log.Printf("Oops something went wrong while trying to fetch Prometheus url and bearerToken")
return
}
// url, bearerToken, ok := prometheus.LocatePrometheus(oc)
// if !ok {
// log.Printf("Oops something went wrong while trying to fetch Prometheus url and bearerToken")
// return
// }

tests := map[string]bool{
`sum(kube_pod_status_phase{}) by (phase) > 0`: true,
`sum(kube_namespace_status_phase) by (phase)`: true,
`sum(kube_node_status_condition{status="true"}) by (condition) > 0`: true,
}
prometheus.RunQueries(tests, oc, url, bearerToken)
// queries := []string{
// `sum(kube_pod_status_phase{}) by (phase) > 0`, // pod count by phase
// `sum(kube_namespace_status_phase) by (phase)`, // namespace count by phase
// `sum(kube_node_status_condition{status="true"}) by (condition) > 0`, // node condition by status
// `sum by (instance) (rate(ovnkube_master_pod_creation_latency_seconds_sum[2m]))`, // OVN pod creation latency
// `sum by (instance) (rate(ovnkube_node_cni_request_duration_seconds_sum{command="ADD"}[2m]))`, // CNI Request duration for "ADD" command over 2m interval
// `sum by (instance) (rate(ovnkube_node_cni_request_duration_seconds_sum{command="DEL"}[2m]))`, // CNI Request duration for "DEL" command over 2m interval
// `sum(container_memory_working_set_bytes{pod=~"ovnkube-master-.*",namespace="openshift-ovn-kubernetes",container=""}) by (pod, node)`, // ovnkube-master Memory Usage
// `sum(container_memory_working_set_bytes{pod=~"ovnkube-master-.*",namespace="openshift-ovn-kubernetes",container!=""}) by (pod, node)`, // ovnkube-master Memory Usage
// `topk(10, rate(container_cpu_usage_seconds_total{pod=~"ovnkube-.*",namespace="openshift-ovn-kubernetes",container="ovn-controller"}[2m])*100)`, // top 10 - ovn-controller cpu usage
// `topk(10, sum(container_memory_working_set_bytes{pod=~"ovnkube-node-.*",namespace="openshift-ovn-kubernetes",container="ovn-controller"}) by (node))`, // top 10 - ovn-controller memory usage
// `sum(container_memory_rss{pod="prometheus-k8s-0",namespace!="",name!="",container="prometheus"}) by (pod)`, // Prometheus replica 0 rss memory
// `sum(container_memory_rss{pod="prometheus-k8s-1",namespace!="",name!="",container="prometheus"}) by (pod)`, // Prometheus replica 1 rss memory
// `rate(container_cpu_usage_seconds_total{pod=~"ovnkube-master.*",namespace="openshift-ovn-kubernetes",container!=""}[2m])*100`, // CPU usage ovnkube-master components over 2m interval
// `sum by (condition)(cluster_operator_conditions{condition!=""})`,
// }
// log.Printf("URL is %s and bearerToken is %s", url, bearerToken)

// for _, query := range queries {
// fmt.Println(prometheus.RunQuery(query, oc, url, bearerToken))
// fmt.Println()
// }
queryList, err := analyze.ReadPrometheusQueries()
if err != nil {
log.Println(err)
}
for _, items := range queryList {
fmt.Println(items.Query)
for _, watchItems := range items.WatchFor {
fmt.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
}
}
}

0 comments on commit 2c7d3a4

Please sign in to comment.