Skip to content

Commit

Permalink
Updated code to run queries and anaylyze results, using goroutines, c…
Browse files Browse the repository at this point in the history
…hannels for concurrency.

Signed-off-by: Kedar Vijay Kulkarni <[email protected]>
  • Loading branch information
Kedar Vijay Kulkarni committed Nov 3, 2021
1 parent 2c7d3a4 commit b88950c
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 23 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ This tool allows OpenShift users to run a watcher for Prometheus queries and def
* [x] Determine Prometheus url, bearerToken for OpenShift
* [x] If Prometheus url, bearerToken already included in the yaml, use that
* [x] Create yaml format for queries, and expected outcomes (Use a struct to read that in)
* [x] Spwan go routine to run queries and analyze results
* [x] Spwan goroutine to receive notification when a query yields "False" value
* [ ] Notify/Do Something when results don't match conditions
* [ ] Spawn goroutines to keep running queries and evaluating results to handle scale
* [ ] Spawn goroutines to keep running queries and evaluating results to handle scale - e.g. when we have very large number of queries in the yaml file, we can divide and concurrently run queries
Binary file modified bin/cpa
Binary file not shown.
69 changes: 69 additions & 0 deletions cmd/analyze/analyze.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
package analyze

import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"strconv"
"time"

prometheus "github.com/kedark3/cpa/cmd/prometheus"
exutil "github.com/openshift/openshift-tests/test/extended/util"

"github.com/prometheus/common/model"

"sigs.k8s.io/yaml"
)
Expand All @@ -26,6 +35,7 @@ type watchList struct {
Key string `json:"Key"`
Val string `json:"Val"`
Threshold string `json:"threshold"`
Operator string `json:"operator"`
}

func (q *queryList) Parse(data []byte) error {
Expand All @@ -48,3 +58,62 @@ func ReadPrometheusQueries() (queriesList queryList, err error) {

return queriesList, nil
}

func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string) {
for _, items := range queryList {
fmt.Printf("\nQuery:%s\n", items.Query)
result, err := prometheus.RunQuery(items.Query, oc, baseURL, bearerToken)
if err != nil {
fmt.Println(err)
continue
}
opMap := map[string]string{"eq": "==", "lt": "<", "gt": ">", "lte": "<=", "gte": ">="}
for _, metric := range result.Data.Result {
for _, watchItems := range items.WatchFor {
// fmt.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
// fmt.Println(metric.Metric[model.LabelName(watchItems.Key)], model.LabelValue(watchItems.Val), metric.Value)
// e.g. if "metric.Metric[model.LabelName(watchItems.Key)]" --> metric.Metric["phase"] == model.LabelValue(watchItems.Val) --> "Running"
// or watchItems key is nil - meaning its a numerical query such as max()
if metric.Metric[model.LabelName(watchItems.Key)] == model.LabelValue(watchItems.Val) || watchItems.Key == "nil" {
// fmt.Println(metric.Metric[model.LabelName(watchItems.Key)], metric.Value, watchItems.Threshold, watchItems.Operator)
v1, _ := strconv.ParseFloat(metric.Value.String(), 64)
v2, _ := strconv.ParseFloat(watchItems.Threshold, 64)
b := true // if this becomes false we send message on go channel
switch watchItems.Operator {
case "eq":
b = v1 == v2
case "gt":
b = v1 > v2
case "lt":
b = v1 < v2
case "lte":
b = v1 <= v2
case "gte":
b = v1 >= v2
}
fmt.Printf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
if !b {
fmt.Printf("\n Comparison of Value and Threshold is %t. Notifying...\n", b)
c <- fmt.Sprintf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
}
}
}
}
}
}

func Notify(c chan string) {
waitChars := []string{"/", "-", "\\", "|"}
for {
select {
case msg := <-c:
fmt.Println("***************************************")
fmt.Println("Received following on the channel:", msg)
fmt.Println("***************************************")
default:
fmt.Printf("\r%s Please Wait. No new message received on the channel....", waitChars[rand.Intn(4)])
time.Sleep(time.Millisecond * 500)
}
}

}
4 changes: 2 additions & 2 deletions config/prometheus.yaml.template
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
# You can also set these or program will try to fetch this dynamically from OpenShift
# as long as KUBECONFIG is set
URL: # Use the public Route to prometheus
BearerToken:
url: # Use the public Route to prometheus
bearerToken:
58 changes: 53 additions & 5 deletions config/queries.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,63 @@
---
- query: "sum(kube_pod_status_phase{}) by (phase) > 0"
watchFor:
- key: Phase
watchFor: # watchFor will have only multiple value for status/count queries
- key: phase
val: "Pending"
threshold: 10
- key: Phase
operator: lte
- key: phase
val: "Failed"
threshold: 0
operator: eq
- key: phase
val: "Succeeded"
threshold: 77
operator: gte
- query: "sum(kube_namespace_status_phase) by (phase)"
watchFor:
- key: Phase
- key: phase
val: "Terminating"
threshold: 0

operator: eq
- query: 'max(sum by (instance) (rate(ovnkube_master_pod_creation_latency_seconds_sum[20m])))' # Pod annoation latency
watchFor: # watchFor will have only 1 value for latency/duration queries
- key: nil
val: nil
threshold: 0.02
operator: lt
- query: 'max(sum by (instance) (rate(ovnkube_node_cni_request_duration_seconds_sum{command="ADD"}[20m])))' # CNI Request duration for "ADD" command over 2m interval
watchFor:
- key: nil
val: nil
threshold: 0.1
operator: lt
- query: 'max(sum by (instance) (rate(ovnkube_node_cni_request_duration_seconds_sum{command="DEL"}[20m])))' # CNI Request duration for "DEL" command over 2m interval
watchFor:
- key: nil
val: nil
threshold: 0.02
operator: lt
- query: 'max(sum(container_memory_working_set_bytes{pod=~"ovnkube-master-.*",namespace="openshift-ovn-kubernetes",container=""}) by (node))'
watchFor:
- key: nil
val: nil
threshold: 209715200
operator: lt
- query: 'max(sum(container_memory_rss{namespace!="",name!="",container="prometheus"}) by (pod))/1073742000' # 1073742000 is bytes per GiB
watchFor:
- key: nil
val: nil
threshold: 2 # GiB
operator: lt
# - query: 'topk(10, rate(container_cpu_usage_seconds_total{pod=~"ovnkube-.*",namespace="openshift-ovn-kubernetes",container="ovn-controller"}[2m])*100)' # top 10 - ovn-controller cpu usage
# watchFor:
# - query: 'topk(10, sum(container_memory_working_set_bytes{pod=~"ovnkube-node-.*",namespace="openshift-ovn-kubernetes",container="ovn-controller"}) by (node))' # top 10 - ovn-controller memory usage
# watchFor:
# - query: 'sum(container_memory_rss{pod="prometheus-k8s-0",namespace!="",name!="",container="prometheus"}) by (pod)' # Prometheus replica 0 rss memory
# watchFor:
# - query: 'sum(container_memory_rss{pod="prometheus-k8s-1",namespace!="",name!="",container="prometheus"}) by (pod)' # Prometheus replica 1 rss memory
# watchFor:
# - query: 'rate(container_cpu_usage_seconds_total{pod=~"ovnkube-master.*",namespace="openshift-ovn-kubernetes",container!=""}[2m])*100' # CPU usage ovnkube-master components over 2m interval
# watchFor:
# - query: 'sum by (condition)(cluster_operator_conditions{condition!=""})'
# watchFor:
41 changes: 26 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package main
import (
"fmt"
"log"
"time"

analyze "github.com/kedark3/cpa/cmd/analyze"
prometheus "github.com/kedark3/cpa/cmd/prometheus"
exutil "github.com/openshift/openshift-tests/test/extended/util"

g "github.com/onsi/ginkgo"
o "github.com/onsi/gomega"
Expand All @@ -13,19 +16,19 @@ import (
func main() {
o.RegisterFailHandler(g.Fail)

// oc := exutil.NewCLI("prometheus-cpa", exutil.KubeConfigPath())
oc := exutil.NewCLI("prometheus-cpa", exutil.KubeConfigPath())
// secrets, err := oc.AdminKubeClient().CoreV1().Secrets("openshift-monitoring").List(metav1.ListOptions{})

// if err != nil {
// log.Printf("An Error has occured %s", err)
// return
// }
// log.Printf("Found following secrets %d", secrets.Size())
// url, bearerToken, ok := prometheus.LocatePrometheus(oc)
// if !ok {
// log.Printf("Oops something went wrong while trying to fetch Prometheus url and bearerToken")
// return
// }
url, bearerToken, ok := prometheus.LocatePrometheus(oc)
if !ok {
log.Printf("Oops something went wrong while trying to fetch Prometheus url and bearerToken")
return
}

// queries := []string{
// `sum(kube_pod_status_phase{}) by (phase) > 0`, // pod count by phase
Expand All @@ -48,14 +51,22 @@ func main() {
// fmt.Println(prometheus.RunQuery(query, oc, url, bearerToken))
// fmt.Println()
// }
queryList, err := analyze.ReadPrometheusQueries()
if err != nil {
log.Println(err)
}
for _, items := range queryList {
fmt.Println(items.Query)
for _, watchItems := range items.WatchFor {
fmt.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
c := make(chan string)

go func(c chan string) {
for i := 1; ; i++ {
fmt.Printf("\n\n\nIteration no. %d\n", i)
queryList, err := analyze.ReadPrometheusQueries()
if err != nil {
log.Println(err)
}
analyze.Queries(queryList, oc, url, bearerToken, c)
d := time.Second * 20
fmt.Printf("\n Sleeping for %.2f mins.\n\n\n\n", d.Minutes())
time.Sleep(d)
fmt.Print("\033[H\033[2J") // clears screen before printing next iteration
}
}
}(c)
go analyze.Notify(c)
time.Sleep(time.Hour * 4)
}

0 comments on commit b88950c

Please sign in to comment.