Skip to content

Commit

Permalink
slim down to watch single service
Browse files Browse the repository at this point in the history
  • Loading branch information
tmacro committed Jan 6, 2025
1 parent 99f393c commit 05dadf7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 69 deletions.
35 changes: 12 additions & 23 deletions pkg/cmd/supervise/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,27 @@ func runCommon(ctx context.Context, cmd *cobra.Command, args []string, leaderRun
return
}

monitor := supervisord.NewMonitor(cmd.InOrStdin(), cmd.OutOrStdout())
serviceName := conf.GetTargetService()
monitor := supervisord.NewMonitor(cmd.InOrStdin(), cmd.OutOrStdout(), serviceName)

go func() {
if err := monitor.Listen(ctx); err != nil {
logger.Error(err)
}
}()

serviceName := conf.GetTargetService()

runner := func() <-chan process.RunStatus {
c := make(chan process.RunStatus)
go func() {
defer close(c)
serviceEvs, stop := monitor.Watch(serviceName)
defer stop()
ctx, cancel := context.WithCancel(ctx)

go func() {
defer close(c)
defer cancel()
proc := monitor.WaitForNextExit(ctx)
logger.Infof("Process %s exited", proc.Name)
c <- process.RunStatus{}
}()

started, err := supervd.StartProcess(serviceName, false)
if err != nil {
Expand All @@ -91,27 +96,11 @@ func runCommon(ctx context.Context, cmd *cobra.Command, args []string, leaderRun
ExitCode: 1,
Err: err,
}
cancel()
return
}

logger.Infof("Started %s: %v", serviceName, started)
// fmt.Fprintf(os.Stderr, "Started %s: %v\n", serviceName, started)

for ev := range serviceEvs {
if ev.State == supervisord.Stopped {
c <- process.RunStatus{
ExitCode: 0,
Err: nil,
}
return
} else if ev.State == supervisord.Exited {
c <- process.RunStatus{
ExitCode: 1,
Err: nil,
}
return
}
}
}()
return c
}
Expand Down
77 changes: 37 additions & 40 deletions pkg/supervisord/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,44 @@ package supervisord

import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
)

type Monitor struct {
l *Listener
procs map[string]*Process
watches map[string][]chan<- *Process
l *Listener
hooks map[chan<- Process]interface{}
mu sync.Mutex
}

func NewMonitor(r io.Reader, w io.Writer) *Monitor {
func NewMonitor(r io.Reader, w io.Writer, serviceName string) *Monitor {
return &Monitor{
l: NewListener(r, w),
procs: make(map[string]*Process),
watches: make(map[string][]chan<- *Process),
}
}

func (m *Monitor) removeWatch(service string, ch chan<- *Process) {
watches := m.watches[service]
for i, c := range watches {
if c == ch {
m.watches[service] = append(watches[:i], watches[i+1:]...)
return
}
}
}

func (m *Monitor) Watch(name string) (<-chan *Process, func()) {
ch := make(chan *Process)
m.watches[name] = append(m.watches[name], ch)

return ch, func() {
m.removeWatch(name, ch)
close(ch)
l: NewListener(r, w),
hooks: make(map[chan<- Process]interface{}),
}
}

func (m *Monitor) Listen(ctx context.Context) error {
events := make(chan Event)
defer close(events)
go func() {
for {
select {
case <-ctx.Done():
return
case event := <-events:
fmt.Fprintf(os.Stderr, "event: %v\n", event)
if strings.HasPrefix(event.Name(), "PROCESS_STATE_") {
m.updateProcess(event)
proc, err := ProcessFromEvent(event)
if err != nil {
continue
}

m.mu.Lock()
for ch := range m.hooks {
ch <- proc
}
m.mu.Unlock()
}
}
}
Expand All @@ -61,16 +48,26 @@ func (m *Monitor) Listen(ctx context.Context) error {
return m.l.Listen(events)
}

func (m *Monitor) updateProcess(event Event) {
proc := &Process{}
err := proc.updateFromListener(event)
if err != nil {
return
}
func (m *Monitor) WaitForNextExit(ctx context.Context) Process {
ch := make(chan Process)
m.mu.Lock()
m.hooks[ch] = nil
m.mu.Unlock()

m.procs[proc.Name] = proc
defer func() {
m.mu.Lock()
delete(m.hooks, ch)
m.mu.Unlock()
}()

for _, ch := range m.watches[proc.Name] {
ch <- proc
for {
select {
case <-ctx.Done():
return Process{}
case proc := <-ch:
if proc.State == Exited || proc.State == Stopped {
return proc
}
}
}
}
12 changes: 6 additions & 6 deletions pkg/supervisord/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,29 @@ type Process struct {
PID int
}

func (proc *Process) updateFromListener(event Event) error {
func ProcessFromEvent(event Event) (proc Process, err error) {
name, ok := event.Meta["processname"]
if !ok {
return errors.New("processname not found in metadata")
err = errors.New("processname not found in metadata")
return
}

var pid int
var str string
var err error

if str, ok = event.Meta["pid"]; ok {
if pid, err = strconv.Atoi(str); err != nil {
return err
return
}
}

state := event.State()
if state == Stopped {
if state == Stopped || state == Exited {
pid = 0
}

proc.Name = name
proc.State = state
proc.PID = pid
return nil
return
}

0 comments on commit 05dadf7

Please sign in to comment.