Skip to content

Commit

Permalink
Merge pull request #176 from tgross/gh175_coprocess
Browse files Browse the repository at this point in the history
Implementation of co-process hooks
  • Loading branch information
tgross authored Jun 13, 2016
2 parents 8b2dde6 + fedaec3 commit 37b9465
Show file tree
Hide file tree
Showing 15 changed files with 539 additions and 24 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ See the [ContainerPilot documentation](https://www.joyent.com/containerpilot/doc
- [Signals and operations](https://www.joyent.com/containerpilot/docs/signals)
- [Periodic tasks](https://www.joyent.com/containerpilot/docs/tasks)
- [Telemetry](https://www.joyent.com/containerpilot/docs/telemetry)
- [Coprocesses](https://www.joyent.com/containerpilot/docs/coprocesses)
- [Frequently asked questions](https://www.joyent.com/containerpilot/docs/faq)
- [Support](https://www.joyent.com/containerpilot/docs/support)

Expand All @@ -62,4 +63,4 @@ We've published a number of example applications demonstrating how ContainerPilo

Please report any issues you encounter with ContainerPilot or its documentation by [opening a Github issue](https://github.com/joyent/containerpilot/issues). Roadmap items will be maintained as [enhancements](https://github.com/joyent/containerpilot/issues?q=is%3Aopen+is%3Aissue+label%3Aenhancement). PRs are welcome on any issue.

Details about contributing to documentation are in [documentation/CONTRIBUTING.md](https://github.com/joyent/containerpilot/blob/master/documentation/CONTRIBUTING.md)
Details about contributing to documentation are in [documentation/CONTRIBUTING.md](https://github.com/joyent/containerpilot/blob/master/documentation/CONTRIBUTING.md)
57 changes: 38 additions & 19 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/joyent/containerpilot/backends"
"github.com/joyent/containerpilot/coprocesses"
"github.com/joyent/containerpilot/discovery"
"github.com/joyent/containerpilot/services"
"github.com/joyent/containerpilot/tasks"
Expand All @@ -21,30 +22,32 @@ import (
)

type rawConfig struct {
logConfig *LogConfig
onStart interface{}
preStart interface{}
preStop interface{}
postStop interface{}
stopTimeout int
servicesConfig []interface{}
backendsConfig []interface{}
tasksConfig []interface{}
telemetryConfig interface{}
logConfig *LogConfig
onStart interface{}
preStart interface{}
preStop interface{}
postStop interface{}
stopTimeout int
coprocessesConfig []interface{}
servicesConfig []interface{}
backendsConfig []interface{}
tasksConfig []interface{}
telemetryConfig interface{}
}

// Config contains the parsed config elements
type Config struct {
ServiceBackend discovery.ServiceBackend
LogConfig *LogConfig
PreStart *exec.Cmd
PreStop *exec.Cmd
PostStop *exec.Cmd
StopTimeout int
Services []*services.Service
Backends []*backends.Backend
Tasks []*tasks.Task
Telemetry *telemetry.Telemetry
LogConfig *LogConfig
PreStart *exec.Cmd
PreStop *exec.Cmd
PostStop *exec.Cmd
StopTimeout int
Coprocesses []*coprocesses.Coprocess
Services []*services.Service
Backends []*backends.Backend
Tasks []*tasks.Task
Telemetry *telemetry.Telemetry
}

const (
Expand Down Expand Up @@ -102,6 +105,14 @@ func (cfg *rawConfig) parseServices(discoveryService discovery.ServiceBackend) (
return services, nil
}

func (cfg *rawConfig) parseCoprocesses() ([]*coprocesses.Coprocess, error) {
coprocesses, err := coprocesses.NewCoprocesses(cfg.coprocessesConfig)
if err != nil {
return nil, err
}
return coprocesses, nil
}

// parseStopTimeout ...
func (cfg *rawConfig) parseStopTimeout() (int, error) {
if cfg.stopTimeout == 0 {
Expand Down Expand Up @@ -249,6 +260,12 @@ func ParseConfig(configFlag string) (*Config, error) {
}
cfg.Tasks = tasks

coprocesses, err := raw.parseCoprocesses()
if err != nil {
return nil, err
}
cfg.Coprocesses = coprocesses

return cfg, nil
}

Expand Down Expand Up @@ -342,6 +359,7 @@ func decodeConfig(configMap map[string]interface{}, result *rawConfig) error {
result.servicesConfig = decodeArray(configMap["services"])
result.backendsConfig = decodeArray(configMap["backends"])
result.tasksConfig = decodeArray(configMap["tasks"])
result.coprocessesConfig = decodeArray(configMap["coprocesses"])
result.telemetryConfig = configMap["telemetry"]

delete(configMap, "logging")
Expand All @@ -353,6 +371,7 @@ func decodeConfig(configMap map[string]interface{}, result *rawConfig) error {
delete(configMap, "services")
delete(configMap, "backends")
delete(configMap, "tasks")
delete(configMap, "coprocesses")
delete(configMap, "telemetry")
var unused []string
for key := range configMap {
Expand Down
168 changes: 168 additions & 0 deletions coprocesses/coprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package coprocesses

import (
"fmt"
"io"
"os/exec"
"strconv"
"strings"

log "github.com/Sirupsen/logrus"
"github.com/joyent/containerpilot/utils"
)

// Some magic numbers used internally by the coprocess restartLimits
const (
haltRestarts = -1
unlimitedRestarts = -2
)

// Coprocess configures a process that will run alongside the main process
type Coprocess struct {
Name string `mapstructure:"name"`
Command interface{} `mapstructure:"command"`
Restarts interface{} `mapstructure:"restarts"`

Args []string
restart bool
restartLimit int
restartsRemain int
cmd *exec.Cmd
logWriters []io.WriteCloser
}

// NewCoprocesses parses json config into an array of Coprocesses
func NewCoprocesses(raw []interface{}) ([]*Coprocess, error) {
var coprocesses []*Coprocess
if raw == nil {
return coprocesses, nil
}
var configs []*Coprocess
if err := utils.DecodeRaw(raw, &configs); err != nil {
return nil, fmt.Errorf("Coprocess configuration error: %v", err)
}
for _, t := range configs {
if err := parseCoprocess(t); err != nil {
return nil, err
}
coprocesses = append(coprocesses, t)
}
return coprocesses, nil
}

func parseCoprocess(coprocess *Coprocess) error {
args, err := utils.ToStringArray(coprocess.Command)
if err != nil {
return err
}
coprocess.Args = args
if coprocess.Args == nil || len(coprocess.Args) == 0 {
return fmt.Errorf("Coprocess did not provide a command")
}
cmd := utils.ArgsToCmd(coprocess.Args)
coprocess.cmd = cmd

if coprocess.Name == "" {
coprocess.Name = strings.Join(coprocess.Args, " ")
}

return parseCoprocessRestarts(coprocess)
}

func parseCoprocessRestarts(coprocess *Coprocess) error {

// defaults if omitted
if coprocess.Restarts == nil {
coprocess.restart = false
coprocess.restartLimit = 0
coprocess.restartsRemain = 0
return nil
}

const msg = `Invalid 'restarts' field "%v": accepts positive integers, "unlimited" or "never"`

switch t := coprocess.Restarts.(type) {
case string:
if t == "unlimited" {
coprocess.restartLimit = unlimitedRestarts
} else if t == "never" {
coprocess.restartLimit = 0
} else if i, err := strconv.Atoi(t); err == nil && i >= 0 {
coprocess.restartLimit = i
} else {
return fmt.Errorf(msg, coprocess.Restarts)
}
case float64, int:
// mapstructure can figure out how to decode strings into int fields
// but doesn't try to guess and just gives us a float64 if it's got
// a number that it's decoding to an interface{}. We'll only return
// an error if we can't cast it to an int. This means that an end-user
// can pass in `restarts: 1.2` and have undocumented truncation but the
// wtf would be all on them at that point.
if i, ok := t.(int); ok && i >= 0 {
coprocess.restartLimit = i
} else if i, ok := t.(float64); ok && i >= 0 {
coprocess.restartLimit = int(i)
} else {
return fmt.Errorf(msg, coprocess.Restarts)
}
default:
return fmt.Errorf(msg, coprocess.Restarts)
}

coprocess.restart = coprocess.restartLimit > 0 || coprocess.restartLimit == unlimitedRestarts
coprocess.restartsRemain = coprocess.restartLimit
return nil
}

// Start runs the coprocess
func (coprocess *Coprocess) Start() {
log.Debugf("coprocess[%s].Start", coprocess.Name)
fields := log.Fields{"process": "coprocess", "coprocess": coprocess.Name}
stdout := utils.NewLogWriter(fields, log.InfoLevel)
stderr := utils.NewLogWriter(fields, log.DebugLevel)
coprocess.logWriters = []io.WriteCloser{stdout, stderr}
defer coprocess.closeLogs()

// always reset restartsRemain when we load the config
coprocess.restartsRemain = coprocess.restartLimit
for {
if coprocess.restartLimit != unlimitedRestarts &&
coprocess.restartsRemain <= haltRestarts {
break
}
cmd := utils.ArgsToCmd(coprocess.Args)
coprocess.cmd = cmd
cmd.Stdout = stdout
cmd.Stderr = stderr
if _, err := utils.ExecuteAndWait(cmd); err != nil {
log.Errorf("coprocess[%s] exited: %s", coprocess.Name, err)
}
log.Debugf("coprocess[%s] exited", coprocess.Name)
if !coprocess.restart {
break
}
coprocess.restartsRemain--
}
}

// Stop kills a running coprocess
func (coprocess *Coprocess) Stop() {
log.Debugf("coprocess[%s].Stop", coprocess.Name)
coprocess.restartsRemain = haltRestarts
if coprocess.cmd != nil && coprocess.cmd.Process != nil {
log.Warnf("Killing coprocess %s: %d", coprocess.Name, coprocess.cmd.Process.Pid)
coprocess.cmd.Process.Kill()
}
}

func (coprocess *Coprocess) closeLogs() {
if coprocess.logWriters == nil {
return
}
for _, w := range coprocess.logWriters {
if err := w.Close(); err != nil {
log.Errorf("Unable to close log writer : %v", err)
}
}
}
Loading

0 comments on commit 37b9465

Please sign in to comment.