Skip to content

Commit

Permalink
Added threading for slack notifications
Browse files Browse the repository at this point in the history
Signed-off-by: Kedar Vijay Kulkarni <[email protected]>
  • Loading branch information
Kedar Vijay Kulkarni committed Nov 10, 2021
1 parent fc96a4d commit f94187a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 19 deletions.
27 changes: 17 additions & 10 deletions cmd/notify/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,32 @@ func ReadslackConfig() (config slackConfig, err error) {
log.Fatal(err)
return config, err
}
fmt.Println(config)
return config, nil
}
func (s slackConfig) SlackNotify(message string) {
api := slack.New(s.SlackToken, slack.OptionDebug(true))
msgText := slack.NewTextBlockObject("mrkdwn", fmt.Sprintf("Hi <@%s>, following query failed:%s", s.UserID, message), false, false)

func (s slackConfig) SlackNotify(message, thread_ts string) string {
// api := slack.New(s.SlackToken, slack.OptionDebug(true)) // To debug api requests, you uncomment this line and comment the one below
api := slack.New(s.SlackToken)
msgText := slack.NewTextBlockObject("mrkdwn", fmt.Sprintf("Hi <@%s>, %s", s.UserID, message), false, false)
msgSection := slack.NewSectionBlock(msgText, nil, nil)
msgBlock := slack.MsgOptionBlocks(
msgSection,
)
_, _, _, err := api.SendMessage(s.ChannelID, msgBlock)
var err error
if thread_ts != "" {
// if we have thread_ts - use it to send new messages on the thread
_, _, _, err = api.SendMessage(s.ChannelID, msgBlock, slack.MsgOptionTS(thread_ts))
} else {
// if thread_ts was empty, assume that this is first message we are sending and retrieve thread_ts and return it for subsequent requests
_, thread_ts, _, err = api.SendMessage(s.ChannelID, msgBlock)
}
if err != nil {
fmt.Printf("%s\n", err)
return
log.Fatal(err)
}

return thread_ts
}

func (s slackConfig) Notify(c chan string) {
func (s slackConfig) Notify(c chan string, thread_ts string) {
waitChars := []string{"/", "-", "\\", "|"}
for {
select {
Expand All @@ -63,7 +70,7 @@ Received following on the channel: %s
%[1]s
`, strings.Repeat("~", 80), msg)
fmt.Println(msgFmt)
s.SlackNotify(msg)
s.SlackNotify("Following query failed:"+msg, thread_ts)
default:
fmt.Printf("\r%s Please Wait. No new message received on the channel....", waitChars[rand.Intn(4)])
time.Sleep(time.Millisecond * 500)
Expand Down
25 changes: 16 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"os"
"os/exec"
"time"

"github.com/alexflint/go-arg"
Expand All @@ -20,12 +21,12 @@ import (

func main() {
var args struct {
NoClrscr bool `arg:"--noclrscr" help:"Do not clear screen after each iteration. Clears screen by default." default:"false"`
Queries string `arg:"-q,--queries" help:"queries file to use" default:"queries.yaml"`
QueryFrequency time.Duration `arg:"-f,--query-frequency" help:"How often do we run queries. You can pass values like 4h or 1h10m10s" default:"20s"`
Timeout time.Duration `arg:"-t,--timeout" help:"Duration to run Continuous Performance Analysis. You can pass values like 4h or 1h10m10s" default:"4h"`
LogOutput bool `arg:"-l,--log-output" help:"Output will be stored in a log file instead of stdout." default:"false"`
KillBenchmark string `arg:"-k,--kill-benchmark" help:"When CPA is running in parallel with benchmark job, let CPA know to kill benchmark if any query fail. E.g. -k run_nodedensity_from_git.sh Helpful to preserve cluster for further analysis." default:""`
NoClrscr bool `arg:"--noclrscr" help:"Do not clear screen after each iteration. Clears screen by default." default:"false"`
Queries string `arg:"-q,--queries" help:"queries file to use" default:"queries.yaml"`
QueryFrequency time.Duration `arg:"-f,--query-frequency" help:"How often do we run queries. You can pass values like 4h or 1h10m10s" default:"20s"`
Timeout time.Duration `arg:"-t,--timeout" help:"Duration to run Continuous Performance Analysis. You can pass values like 4h or 1h10m10s" default:"4h"`
LogOutput bool `arg:"-l,--log-output" help:"Output will be stored in a log file(cpa.log) in addition to stdout." default:"false"`
TerminateBenchmark string `arg:"-k,--terminate-benchmark" help:"When CPA is running in parallel with benchmark job, let CPA know to kill benchmark if any query fail. (E.g. -k <processID>) Helpful to preserve cluster for further analysis." default:""`
}
arg.MustParse(&args)

Expand All @@ -43,8 +44,13 @@ func main() {
//set output of logs to f
log.SetOutput(multiWriter)
}
if args.KillBenchmark != "" {
if args.TerminateBenchmark != "" {
// TODO Add logic to handle running benchmark processes when analyze sends notification on the channel.
err := exec.Command("kill", "-SIGTERM", args.TerminateBenchmark).Run()
if err != nil {
log.Fatal(err)
}
return
}

oc := exutil.NewCLI("prometheus-cpa", exutil.KubeConfigPath())
Expand All @@ -64,7 +70,7 @@ func main() {

slackConfig, err := notify.ReadslackConfig()
if err != nil {
log.Printf("Oops something went wrong while trying to fetch Prometheus url and bearerToken")
log.Printf("Oops something went wrong while trying to fetch Slack Config")
log.Println(err)
return
}
Expand Down Expand Up @@ -92,6 +98,7 @@ func main() {
// }
c := make(chan string)

thread_ts := slackConfig.SlackNotify("New benchmark started, we will monitor it for performance and notify here with the issues.", "")
go func(c chan string) {
for i := 1; ; i++ {
log.Printf("Iteration no. %d\n", i)
Expand All @@ -108,7 +115,7 @@ func main() {
}
}
}(c)
go slackConfig.Notify(c)
go slackConfig.Notify(c, thread_ts)
d, err := time.ParseDuration(args.Timeout.String())
if err != nil {
log.Println(err)
Expand Down

0 comments on commit f94187a

Please sign in to comment.