Skip to content

Commit

Permalink
Added code to terminate a given process by ID, useful to terminate ru…
Browse files Browse the repository at this point in the history
…nning benchmark when queries fail

Signed-off-by: Kedar Vijay Kulkarni <[email protected]>
  • Loading branch information
Kedar Vijay Kulkarni committed Nov 10, 2021
1 parent b6ffbc4 commit 5af281c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 16 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ This tool allows OpenShift users to run a watcher for Prometheus queries and def
* [x] Print output to screen even when logging enabled - simultaneously
* [x] Let user decide query frequency
* [x] Slack Notification
* [ ] Notify/Do Something(e.g. Pause/Kill benchmark jobs to preserve cluster) when results don't match conditions
* [ ] Spawn goroutines to keep running queries and evaluating results to handle scale - e.g. when we have very large number of queries in the yaml file, we can divide and concurrently run queries
* [x] Notify/Do Something(e.g. Pause/Kill benchmark jobs to preserve cluster) when results don't match conditions
* [x] Spawn goroutines to keep running queries and evaluating results to handle scale - e.g. when we have very large number of queries in the yaml file, we can divide and concurrently run queries



Expand Down
Binary file added bin/cpa
Binary file not shown.
9 changes: 6 additions & 3 deletions cmd/analyze/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ func ReadPrometheusQueries(queriesFile string) (queriesList queryList, err error
return queriesList, nil
}

func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string) {
func Queries(queryList queryList, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string) {
// start := time.Now()
for _, item := range queryList {
go runQuery(item, oc, baseURL, bearerToken, c)
go runQuery(item, oc, baseURL, bearerToken, c, tb, terminateBenchmark)
}
wg.Wait()
// end := time.Since(start)
// log.Printf("\n It takes %s time to run queries", end)
}

func runQuery(q queries, oc *exutil.CLI, baseURL, bearerToken string, c chan string) {
func runQuery(q queries, oc *exutil.CLI, baseURL, bearerToken string, c chan string, tb chan bool, terminateBenchmark string) {
wg.Add(1)
defer wg.Done()
result, err := prometheus.RunQuery(q.Query, oc, baseURL, bearerToken)
Expand Down Expand Up @@ -109,6 +109,9 @@ Value: %.4f %s Threshold: %.4f is %t
if !b {
log.Printf("\n%[2]s\n Comparison of Value and Threshold is %[1]t. Notifying...\n%[2]s\n", b, strings.Repeat("~", 80))
c <- fmt.Sprintf("\nValue: %.4f %s Threshold: %.4f is %t\n", v1, opMap[watchItems.Operator], v2, b)
if terminateBenchmark != "" {
tb <- true // send signal to terminate benchmark channel
}
}
}
}
Expand Down
43 changes: 42 additions & 1 deletion cmd/notify/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"io/ioutil"
"log"
"math/rand"
"os"
"os/exec"
"strings"
"time"

Expand Down Expand Up @@ -69,7 +71,7 @@ func (s slackConfig) Notify(c chan string, thread_ts string) {
Received following on the channel: %s
%[1]s
`, strings.Repeat("~", 80), msg)
fmt.Println(msgFmt)
log.Println(msgFmt)
s.SlackNotify("Following query failed:"+msg, thread_ts)
default:
fmt.Printf("\r%s Please Wait. No new message received on the channel....", waitChars[rand.Intn(4)])
Expand All @@ -78,3 +80,42 @@ Received following on the channel: %s
}

}

func TerminateBenchmark(tb chan bool, processID string) {
for {
select {
case b := <-tb:
msgFmt := fmt.Sprintf(`
%s
Received signal %t to kill -SIGTERM %s
%[1]s
`, strings.Repeat("~", 80), b, processID)
log.Println(msgFmt)
// TODO: should we terminate CPA as a result of termination of benchmark
err := exec.Command("kill", "-SIGTERM", processID).Run()
if err != nil {
// if the process is already killed, subsequent kills will fail with exit status 1
log.Println("Failed to kill the process:", processID, err)
}
os.Exit(1)
// proc, err := os.FindProcess(processID)
// if err != nil {
// // if the process is already killed, subsequent kills will fail with exit status 1
// log.Println("Failed to find the process:", processID, err)
// break
// }
// proc.Kill()
// state, err := proc.Wait()
// if err != nil || state.ExitCode() != 0 {
// // if the process is already killed, subsequent kills will fail with exit status 1
// log.Println("Failed to kill the process:", processID, err)
// log.Println("Exit code was", state.ExitCode())
// } else {
// log.Println("Killed the process:", processID)
// log.Println("Exit code was: ", state.ExitCode())
// }
default:
time.Sleep(time.Millisecond * 500)
}
}
}
17 changes: 7 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"log"
"os"
"os/exec"
"time"

"github.com/alexflint/go-arg"
Expand Down Expand Up @@ -44,14 +43,6 @@ func main() {
//set output of logs to f
log.SetOutput(multiWriter)
}
if args.TerminateBenchmark != "" {
// TODO Add logic to handle running benchmark processes when analyze sends notification on the channel.
err := exec.Command("kill", "-SIGTERM", args.TerminateBenchmark).Run()
if err != nil {
log.Fatal(err)
}
return
}

oc := exutil.NewCLI("prometheus-cpa", exutil.KubeConfigPath())
// secrets, err := oc.AdminKubeClient().CoreV1().Secrets("openshift-monitoring").List(metav1.ListOptions{})
Expand Down Expand Up @@ -96,6 +87,7 @@ func main() {
// fmt.Println(prometheus.RunQuery(query, oc, url, bearerToken))
// fmt.Println()
// }
tb := make(chan bool)
c := make(chan string)

thread_ts := slackConfig.SlackNotify("New benchmark started, we will monitor it for performance and notify here with the issues.", "")
Expand All @@ -107,14 +99,19 @@ func main() {
log.Println(err)
return
}
analyze.Queries(queryList, oc, url, bearerToken, c)
analyze.Queries(queryList, oc, url, bearerToken, c, tb, args.TerminateBenchmark)
time.Sleep(args.QueryFrequency)
if !args.NoClrscr {
log.Print("\033[H\033[2J") // clears screen before printing next iteration
}
}
}(c)
go slackConfig.Notify(c, thread_ts)

if args.TerminateBenchmark != "" {
go notify.TerminateBenchmark(tb, args.TerminateBenchmark)
}

d, err := time.ParseDuration(args.Timeout.String())
if err != nil {
log.Println(err)
Expand Down

0 comments on commit 5af281c

Please sign in to comment.