Skip to content

Commit

Permalink
Adding concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Kedar Vijay Kulkarni <[email protected]>
  • Loading branch information
Kedar Vijay Kulkarni committed Nov 10, 2021
1 parent f94187a commit b6ffbc4
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
Binary file removed bin/cpa
Binary file not shown.
87 changes: 51 additions & 36 deletions cmd/analyze/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io/ioutil"
"log"
"strconv"
"strings"
"sync"

prometheus "github.com/kedark3/cpa/cmd/prometheus"
exutil "github.com/openshift/openshift-tests/test/extended/util"
Expand All @@ -19,6 +21,7 @@ This package will read the queries from relevant config file under `config/` dir
and run relevant queries using prometheus package. Then retrieve the result and
analyze it against the threshold.
*/
var wg sync.WaitGroup

const configPath = "./config/"

Expand Down Expand Up @@ -58,42 +61,54 @@ func ReadPrometheusQueries(queriesFile string) (queriesList queryList, err error
}

func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string) {
for _, items := range queryList {
log.Printf("\nQuery:%s\n", items.Query)
result, err := prometheus.RunQuery(items.Query, oc, baseURL, bearerToken)
if err != nil {
log.Println(err)
continue
}
opMap := map[string]string{"eq": "==", "lt": "<", "gt": ">", "lte": "<=", "gte": ">="}
for _, metric := range result.Data.Result {
for _, watchItems := range items.WatchFor {
// log.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
// log.Println(metric.Metric[model.LabelName(watchItems.Key)], model.LabelValue(watchItems.Val), metric.Value)
// e.g. if "metric.Metric[model.LabelName(watchItems.Key)]" --> metric.Metric["phase"] == model.LabelValue(watchItems.Val) --> "Running"
// or watchItems key is nil - meaning its a numerical query such as max()
if metric.Metric[model.LabelName(watchItems.Key)] == model.LabelValue(watchItems.Val) || watchItems.Key == "nil" {
// log.Println(metric.Metric[model.LabelName(watchItems.Key)], metric.Value, watchItems.Threshold, watchItems.Operator)
v1, _ := strconv.ParseFloat(metric.Value.String(), 64)
v2, _ := strconv.ParseFloat(watchItems.Threshold, 64)
b := true // if this becomes false we send message on go channel
switch watchItems.Operator {
case "eq":
b = v1 == v2
case "gt":
b = v1 > v2
case "lt":
b = v1 < v2
case "lte":
b = v1 <= v2
case "gte":
b = v1 >= v2
}
log.Printf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
if !b {
log.Printf("\n Comparison of Value and Threshold is %t. Notifying...\n", b)
c <- fmt.Sprintf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
}
// start := time.Now()
for _, item := range queryList {
go runQuery(item, oc, baseURL, bearerToken, c)
}
wg.Wait()
// end := time.Since(start)
// log.Printf("\n It takes %s time to run queries", end)
}

func runQuery(q queries, oc *exutil.CLI, baseURL, bearerToken string, c chan string) {
wg.Add(1)
defer wg.Done()
result, err := prometheus.RunQuery(q.Query, oc, baseURL, bearerToken)
if err != nil {
log.Println(err)
return
}
opMap := map[string]string{"eq": "==", "lt": "<", "gt": ">", "lte": "<=", "gte": ">="}
for _, metric := range result.Data.Result {
for _, watchItems := range q.WatchFor {
// log.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
// log.Println(metric.Metric[model.LabelName(watchItems.Key)], model.LabelValue(watchItems.Val), metric.Value)
// e.g. if "metric.Metric[model.LabelName(watchItems.Key)]" --> metric.Metric["phase"] == model.LabelValue(watchItems.Val) --> "Running"
// or watchItems key is nil - meaning its a numerical query such as max()
if metric.Metric[model.LabelName(watchItems.Key)] == model.LabelValue(watchItems.Val) || watchItems.Key == "nil" {
// log.Println(metric.Metric[model.LabelName(watchItems.Key)], metric.Value, watchItems.Threshold, watchItems.Operator)
v1, _ := strconv.ParseFloat(metric.Value.String(), 64)
v2, _ := strconv.ParseFloat(watchItems.Threshold, 64)
b := true // if this becomes false we send message on go channel
switch watchItems.Operator {
case "eq":
b = v1 == v2
case "gt":
b = v1 > v2
case "lt":
b = v1 < v2
case "lte":
b = v1 <= v2
case "gte":
b = v1 >= v2
}
log.Printf(`
Query:%s
Value: %.4f %s Threshold: %.4f is %t
`, q.Query, v1, opMap[watchItems.Operator], v2, b)
if !b {
log.Printf("\n%[2]s\n Comparison of Value and Threshold is %[1]t. Notifying...\n%[2]s\n", b, strings.Repeat("~", 80))
c <- fmt.Sprintf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
}
}
}
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func main() {
return
}
analyze.Queries(queryList, oc, url, bearerToken, c)
log.Printf("\n Sleeping for %.2f mins.\n\n\n\n", args.QueryFrequency.Minutes())
time.Sleep(args.QueryFrequency)
if !args.NoClrscr {
log.Print("\033[H\033[2J") // clears screen before printing next iteration
Expand Down

0 comments on commit b6ffbc4

Please sign in to comment.