Skip to content

Commit

Permalink
Merge pull request #2 from kedark3/working
Browse files Browse the repository at this point in the history
Merging current working branch
  • Loading branch information
kedark3 authored Nov 18, 2021
2 parents a53dcba + d23ba48 commit ac86a4e
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 125 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1 @@
config/prometheus.yaml
config/*.yaml
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
all: update
.PHONY: all

update: clean build

OUT_DIR=bin
build:
mkdir -p "${OUT_DIR}"
go build -v -o "${OUT_DIR}/cpa" main.go

clean:
$(RM) ./bin/cpa
.PHONY: clean
42 changes: 40 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,43 @@ This tool allows OpenShift users to run a watcher for Prometheus queries and def
* [x] Create yaml format for queries, and expected outcomes (Use a struct to read that in)
* [x] Spwan go routine to run queries and analyze results
* [x] Spwan goroutine to receive notification when a query yields "False" value
* [ ] Notify/Do Something when results don't match conditions
* [ ] Spawn goroutines to keep running queries and evaluating results to handle scale - e.g. when we have very large number of queries in the yaml file, we can divide and concurrently run queries
* [x] Update to latest go and recompile
* [x] Add CLI to the program
* [x] Add a parameter to read different query files in config dir
* [x] Add parameter for clearing/not-clearing screen
* [x] Add Parameter for timeout
* [x] Add a Makefile
* [x] File logging the output
* [x] Print output to screen even when logging enabled - simultaneously
* [x] Let user decide query frequency
* [x] Slack Notification
* [x] Notify/Do Something(e.g. Pause/Kill benchmark jobs to preserve cluster) when results don't match conditions
* [x] Spawn goroutines to keep running queries and evaluating results to handle scale - e.g. when we have very large number of queries in the yaml file, we can divide and concurrently run queries
* [x] If slack config is not set, it is ignored and no attempts will be made to notify via slack
* [] debug mode
* [] use env vars


## Usage:

* Then build the binary using make file: `make build` or update your binary using `make update`. You Can clean existin binary with `make clean` or do clean and update/build using `make all`.
* Set `KUBECONFIG` envvar, and make sure to review `config/queries.yaml`.
* You can then run the following command:
```sh

./bin/cpa -t 60s -h
Usage: cpa [--noclrscr] [--queries QUERIES] [--query-frequency QUERY-FREQUENCY] [--timeout TIMEOUT] [--log-output] [--terminate-benchmark TERMINATE-BENCHMARK]

Options:
--noclrscr Do not clear screen after each iteration. Clears screen by default. [default: false]
--queries QUERIES, -q QUERIES
queries file to use [default: queries.yaml]
--query-frequency QUERY-FREQUENCY, -f QUERY-FREQUENCY
How often do we run queries. You can pass values like 4h or 1h10m10s [default: 20s]
--timeout TIMEOUT, -t TIMEOUT
Duration to run Continuous Performance Analysis. You can pass values like 4h or 1h10m10s [default: 4h]
--log-output, -l Output will be stored in a log file(cpa.log) in addition to stdout. [default: false]
--terminate-benchmark TERMINATE-BENCHMARK, -k TERMINATE-BENCHMARK
When CPA is running in parallel with benchmark job, let CPA know to kill benchmark if any query fail. (E.g. -k <processID>) Helpful to preserve cluster for further analysis.
--help, -h display this help and exit
```
Binary file modified bin/cpa
Binary file not shown.
116 changes: 58 additions & 58 deletions cmd/analyze/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"strconv"
"time"
"strings"
"sync"

prometheus "github.com/kedark3/cpa/cmd/prometheus"
exutil "github.com/openshift/openshift-tests/test/extended/util"
Expand All @@ -21,6 +21,7 @@ This package will read the queries from relevant config file under `config/` dir
and run relevant queries using prometheus package. Then retrieve the result and
analyze it against the threshold.
*/
var wg sync.WaitGroup

const configPath = "./config/"

Expand All @@ -42,78 +43,77 @@ func (q *queryList) Parse(data []byte) error {
return yaml.Unmarshal(data, q)
}

func ReadPrometheusQueries() (queriesList queryList, err error) {
data, err := ioutil.ReadFile(configPath + "queries.yaml")
func ReadPrometheusQueries(queriesFile string) (queriesList queryList, err error) {
data, err := ioutil.ReadFile(configPath + queriesFile)
if err != nil {
log.Printf("Cound't read %s/queries.yaml", configPath)
return queriesList, err
}
if err := queriesList.Parse(data); err != nil {
log.Fatal(err)
}
// fmt.Println(queriesList)
// log.Println(queriesList)
if len(queriesList) == 0 {
return queriesList, nil
return queriesList, fmt.Errorf("query list is empty: %v", queriesList)
}

return queriesList, nil
}

func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string) {
for _, items := range queryList {
fmt.Printf("\nQuery:%s\n", items.Query)
result, err := prometheus.RunQuery(items.Query, oc, baseURL, bearerToken)
if err != nil {
fmt.Println(err)
continue
}
opMap := map[string]string{"eq": "==", "lt": "<", "gt": ">", "lte": "<=", "gte": ">="}
for _, metric := range result.Data.Result {
for _, watchItems := range items.WatchFor {
// fmt.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
// fmt.Println(metric.Metric[model.LabelName(watchItems.Key)], model.LabelValue(watchItems.Val), metric.Value)
// e.g. if "metric.Metric[model.LabelName(watchItems.Key)]" --> metric.Metric["phase"] == model.LabelValue(watchItems.Val) --> "Running"
// or watchItems key is nil - meaning its a numerical query such as max()
if metric.Metric[model.LabelName(watchItems.Key)] == model.LabelValue(watchItems.Val) || watchItems.Key == "nil" {
// fmt.Println(metric.Metric[model.LabelName(watchItems.Key)], metric.Value, watchItems.Threshold, watchItems.Operator)
v1, _ := strconv.ParseFloat(metric.Value.String(), 64)
v2, _ := strconv.ParseFloat(watchItems.Threshold, 64)
b := true // if this becomes false we send message on go channel
switch watchItems.Operator {
case "eq":
b = v1 == v2
case "gt":
b = v1 > v2
case "lt":
b = v1 < v2
case "lte":
b = v1 <= v2
case "gte":
b = v1 >= v2
}
fmt.Printf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
if !b {
fmt.Printf("\n Comparison of Value and Threshold is %t. Notifying...\n", b)
c <- fmt.Sprintf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
}
}
}
}
func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string) {
// start := time.Now()
for _, item := range queryList {
go runQuery(item, oc, baseURL, bearerToken, c, tb, terminateBenchmark)
}
wg.Wait()
// end := time.Since(start)
// log.Printf("\n It takes %s time to run queries", end)
}

func Notify(c chan string) {
waitChars := []string{"/", "-", "\\", "|"}
for {
select {
case msg := <-c:
fmt.Println("***************************************")
fmt.Println("Received following on the channel:", msg)
fmt.Println("***************************************")
default:
fmt.Printf("\r%s Please Wait. No new message received on the channel....", waitChars[rand.Intn(4)])
time.Sleep(time.Millisecond * 500)
func runQuery(q queries, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string) {
wg.Add(1)
defer wg.Done()
result, err := prometheus.RunQuery(q.Query, oc, baseURL, bearerToken)
if err != nil {
log.Println(err)
return
}
opMap := map[string]string{"eq": "==", "lt": "<", "gt": ">", "lte": "<=", "gte": ">="}
for _, metric := range result.Data.Result {
for _, watchItems := range q.WatchFor {
// log.Println(watchItems.Key, watchItems.Val, watchItems.Threshold)
// log.Println(metric.Metric[model.LabelName(watchItems.Key)], model.LabelValue(watchItems.Val), metric.Value)
// e.g. if "metric.Metric[model.LabelName(watchItems.Key)]" --> metric.Metric["phase"] == model.LabelValue(watchItems.Val) --> "Running"
// or watchItems key is nil - meaning its a numerical query such as max()
if metric.Metric[model.LabelName(watchItems.Key)] == model.LabelValue(watchItems.Val) || watchItems.Key == "nil" {
// log.Println(metric.Metric[model.LabelName(watchItems.Key)], metric.Value, watchItems.Threshold, watchItems.Operator)
v1, _ := strconv.ParseFloat(metric.Value.String(), 64)
v2, _ := strconv.ParseFloat(watchItems.Threshold, 64)
b := true // if this becomes false we send message on go channel
switch watchItems.Operator {
case "eq":
b = v1 == v2
case "gt":
b = v1 > v2
case "lt":
b = v1 < v2
case "lte":
b = v1 <= v2
case "gte":
b = v1 >= v2
}
log.Printf(`
Query:%s
Value: %.4f %s Threshold: %.4f is %t
`, q.Query, v1, opMap[watchItems.Operator], v2, b)
if !b {
log.Printf("\n%[2]s\n Comparison of Value and Threshold is %[1]t. Notifying...\n%[2]s\n", b, strings.Repeat("~", 80))
c <- fmt.Sprintf("\nQuery: %s\nValue: %.4f %s Threshold: %.4f is %t for key: %q and val: %q\n", q.Query, v1, opMap[watchItems.Operator], v2, b, watchItems.Key, watchItems.Val)
if terminateBenchmark != "" {
tb <- true // send signal to terminate benchmark channel
}
}
}
}
}

}
123 changes: 123 additions & 0 deletions cmd/notify/notifications.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package Notify

import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"os/exec"
"strings"
"time"

"github.com/slack-go/slack"
"gopkg.in/yaml.v2"
)

const configPath = "./config/"

type slackConfig struct {
UserID string `json:"userid"`
ChannelID string `json:"channelid"`
SlackToken string `json:"slacktoken"`
}

func (c *slackConfig) Parse(data []byte) error {
return yaml.Unmarshal(data, c)
}

func ReadslackConfig() (config slackConfig, err error) {
data, err := ioutil.ReadFile(configPath + "slack.yaml")
msg := fmt.Sprintf("Cound't read %sslack.yaml", configPath)
if err != nil {
return config, fmt.Errorf(msg)
}
if err := config.Parse(data); err != nil {
log.Fatal(err)
return config, err
}
return config, nil
}

func (s slackConfig) SlackNotify(message, thread_ts string) string {
// api := slack.New(s.SlackToken, slack.OptionDebug(true)) // To debug api requests, you uncomment this line and comment the one below
api := slack.New(s.SlackToken)
msgText := slack.NewTextBlockObject("mrkdwn", fmt.Sprintf("Hi <@%s>, %s", s.UserID, message), false, false)
msgSection := slack.NewSectionBlock(msgText, nil, nil)
msgBlock := slack.MsgOptionBlocks(
msgSection,
)
var err error
if thread_ts != "" {
// if we have thread_ts - use it to send new messages on the thread
_, _, _, err = api.SendMessage(s.ChannelID, msgBlock, slack.MsgOptionTS(thread_ts))
} else {
// if thread_ts was empty, assume that this is first message we are sending and retrieve thread_ts and return it for subsequent requests
_, thread_ts, _, err = api.SendMessage(s.ChannelID, msgBlock)
}
if err != nil {
log.Fatal(err)
}
return thread_ts
}

func (s slackConfig) Notify(c chan string, thread_ts string) {
waitChars := []string{"/", "-", "\\", "|"}
for {
select {
case msg := <-c:
msgFmt := fmt.Sprintf(`
%s
Received following on the channel: %s
%[1]s
`, strings.Repeat("~", 80), msg)
log.Println(msgFmt)
if s.ChannelID != "" && s.UserID != "" && s.SlackToken != "" {
s.SlackNotify("Following query failed:"+msg, thread_ts)
}
default:
fmt.Printf("\r%s Please Wait. No new message received on the channel....", waitChars[rand.Intn(4)])
time.Sleep(time.Millisecond * 500)
}
}

}

func TerminateBenchmark(tb chan bool, processID string) {
for {
select {
case b := <-tb:
msgFmt := fmt.Sprintf(`
%s
Received signal %t to kill -SIGTERM %s
%[1]s
`, strings.Repeat("~", 80), b, processID)
log.Println(msgFmt)
// TODO: should we terminate CPA as a result of termination of benchmark
err := exec.Command("kill", "-SIGTERM", processID).Run()
if err != nil {
// if the process is already killed, subsequent kills will fail with exit status 1
log.Println("Failed to kill the process:", processID, err)
}
os.Exit(1)
// proc, err := os.FindProcess(processID)
// if err != nil {
// // if the process is already killed, subsequent kills will fail with exit status 1
// log.Println("Failed to find the process:", processID, err)
// break
// }
// proc.Kill()
// state, err := proc.Wait()
// if err != nil || state.ExitCode() != 0 {
// // if the process is already killed, subsequent kills will fail with exit status 1
// log.Println("Failed to kill the process:", processID, err)
// log.Println("Exit code was", state.ExitCode())
// } else {
// log.Println("Killed the process:", processID)
// log.Println("Exit code was: ", state.ExitCode())
// }
default:
time.Sleep(time.Millisecond * 500)
}
}
}
Loading

0 comments on commit ac86a4e

Please sign in to comment.