Skip to content

Commit

Permalink
Merge pull request #6025 from oasisprotocol/peternose/trivial/add-man…
Browse files Browse the repository at this point in the history
…ager-ctx

go/runtime/host/sandbox: Add ctx to manager method
  • Loading branch information
peternose authored Jan 29, 2025
2 parents 444f3bc + 016677a commit 77a3175
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 44 deletions.
Empty file added .changelog/6025.trivial.md
Empty file.
73 changes: 29 additions & 44 deletions go/runtime/host/sandbox/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
cmSync "github.com/oasisprotocol/oasis-core/go/common/sync"
"github.com/oasisprotocol/oasis-core/go/common/version"
"github.com/oasisprotocol/oasis-core/go/runtime/host"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
Expand All @@ -28,7 +29,7 @@ const (
runtimeInitTimeout = 1 * time.Second
runtimeExtendedInitTimeout = 120 * time.Second
runtimeInterruptTimeout = 1 * time.Second
resetTickerTimeout = 15 * time.Minute
stopTickerTimeout = 15 * time.Minute
watchdogInterval = 15 * time.Second
watchdogPingTimeout = 5 * time.Second

Expand Down Expand Up @@ -87,7 +88,7 @@ func (p *provisioner) NewRuntime(cfg host.Config) (host.Runtime, error) {
cfg: p.cfg,
rtCfg: cfg,
id: cfg.ID,
stopCh: make(chan struct{}),
startOne: cmSync.NewOne(),
ctrlCh: make(chan interface{}, ctrlChannelBufferSize),
notifier: pubsub.NewBroker(false),
notifyUpdateCapabilityTEECh: make(chan struct{}, 1),
Expand Down Expand Up @@ -121,10 +122,8 @@ type sandboxedRuntime struct {
rtCfg host.Config
id common.Namespace

startOnce sync.Once
stopOnce sync.Once
stopCh chan struct{}
ctrlCh chan interface{}
startOne cmSync.One
ctrlCh chan interface{}

process process.Process
conn protocol.Connection
Expand Down Expand Up @@ -230,9 +229,7 @@ func (r *sandboxedRuntime) WatchEvents() (<-chan *host.Event, pubsub.ClosableSub

// Implements host.Runtime.
func (r *sandboxedRuntime) Start() {
r.startOnce.Do(func() {
go r.manager()
})
r.startOne.TryStart(r.manager)
}

// Implements host.Runtime.
Expand Down Expand Up @@ -265,17 +262,15 @@ func (r *sandboxedRuntime) Abort(ctx context.Context, force bool) error {

// Implements host.Runtime.
func (r *sandboxedRuntime) Stop() {
r.stopOnce.Do(func() {
close(r.stopCh)
})
r.startOne.TryStop()
}

// Implements host.EmitEvent.
func (r *sandboxedRuntime) EmitEvent(ev *host.Event) {
r.notifier.Broadcast(ev)
}

func (r *sandboxedRuntime) startProcess() (err error) {
func (r *sandboxedRuntime) startProcess(ctx context.Context) (err error) {
// Create a temporary directory.
runtimeDir, err := os.MkdirTemp("", "oasis-runtime")
if err != nil {
Expand Down Expand Up @@ -353,17 +348,6 @@ func (r *sandboxedRuntime) startProcess() (err error) {
}
}()

// Create a context that gets cancelled if runtime is stopped.
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-ctx.Done():
case <-r.stopCh:
cancel()
}
}()
defer cancel()

// Populate the runtime-specific parts of host information.
hi := r.cfg.HostInfo.Clone()
hi.LocalConfig = r.rtCfg.LocalConfig
Expand Down Expand Up @@ -397,6 +381,7 @@ func (r *sandboxedRuntime) startProcess() (err error) {
// Perform configuration-specific host initialization.
exInitCtx, cancelExInit := context.WithTimeout(ctx, runtimeExtendedInitTimeout)
defer cancelExInit()

ev, err := r.cfg.HostInitializer(exInitCtx, hp)
if err != nil {
return fmt.Errorf("failed to initialize connection: %w", err)
Expand Down Expand Up @@ -434,14 +419,14 @@ drainLoop:
return nil
}

func (r *sandboxedRuntime) handleAbortRequest(rq *abortRequest) error {
func (r *sandboxedRuntime) handleAbortRequest(ctx context.Context, rq *abortRequest) error {
r.logger.Warn("interrupting runtime")

// First attempt to gracefully interrupt the runtime by sending a request.
ctx, cancel := context.WithTimeout(context.Background(), runtimeInterruptTimeout)
defer cancel()
callCtx, cancelCall := context.WithTimeout(ctx, runtimeInterruptTimeout)
defer cancelCall()

response, err := r.conn.Call(ctx, &protocol.Body{RuntimeAbortRequest: &protocol.Empty{}})
response, err := r.conn.Call(callCtx, &protocol.Body{RuntimeAbortRequest: &protocol.Empty{}})
if err == nil && response.RuntimeAbortResponse != nil && !rq.force {
// Successful response, and no force restart required.
return nil
Expand All @@ -457,8 +442,8 @@ func (r *sandboxedRuntime) handleAbortRequest(rq *abortRequest) error {
// request is only sent after the new runtime has been respawned and is ready to use.
select {
case <-r.process.Wait():
case <-r.stopCh:
return context.Canceled
case <-ctx.Done():
return ctx.Err()
}

r.logger.Warn("runtime terminated due to restart request")
Expand All @@ -481,8 +466,8 @@ func (r *sandboxedRuntime) handleAbortRequest(rq *abortRequest) error {

// watchdogPing pings the runtime for liveness and terminates the process in case response is not
// received in time.
func (r *sandboxedRuntime) watchdogPing() {
ctx, cancel := context.WithTimeout(context.Background(), watchdogPingTimeout)
func (r *sandboxedRuntime) watchdogPing(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, watchdogPingTimeout)
defer cancel()

// Send a single ping request and expect a response.
Expand All @@ -497,7 +482,7 @@ func (r *sandboxedRuntime) watchdogPing() {
r.process.Kill()
}

func (r *sandboxedRuntime) manager() {
func (r *sandboxedRuntime) manager(ctx context.Context) {
var ticker *backoff.Ticker

defer func() {
Expand Down Expand Up @@ -528,9 +513,9 @@ func (r *sandboxedRuntime) manager() {
defer evSub.Close()

var (
attempt int
resetTickerCh <-chan time.Time
watchdogCh <-chan time.Time
attempt int
stopTickerCh <-chan time.Time
watchdogCh <-chan time.Time
)
for {
// Make sure to restart the process if terminated.
Expand All @@ -546,7 +531,7 @@ func (r *sandboxedRuntime) manager() {
}

select {
case <-r.stopCh:
case <-ctx.Done():
r.logger.Warn("termination requested")
return
case <-firstTickCh:
Expand All @@ -558,7 +543,7 @@ func (r *sandboxedRuntime) manager() {
"attempt", attempt,
)

if err := r.startProcess(); err != nil {
if err := r.startProcess(ctx); err != nil {
r.logger.Error("failed to start runtime",
"err", err,
)
Expand All @@ -574,7 +559,7 @@ func (r *sandboxedRuntime) manager() {
}

// After the process has been (re)started, set up fresh tickers.
resetTickerCh = time.After(resetTickerTimeout)
stopTickerCh = time.After(stopTickerTimeout)
watchdogCh = time.Tick(watchdogInterval)
}

Expand All @@ -584,14 +569,14 @@ func (r *sandboxedRuntime) manager() {
switch rq := grq.(type) {
case *abortRequest:
// Request to abort the runtime.
rq.ch <- r.handleAbortRequest(rq)
rq.ch <- r.handleAbortRequest(ctx, rq)
close(rq.ch)
default:
r.logger.Error("received unknown request type",
"request_type", fmt.Sprintf("%T", rq),
)
}
case <-r.stopCh:
case <-ctx.Done():
r.logger.Warn("termination requested")
return
case <-r.process.Wait():
Expand All @@ -610,8 +595,8 @@ func (r *sandboxedRuntime) manager() {

// Notify subscribers that the runtime has stopped.
r.notifier.Broadcast(&host.Event{Stopped: &host.StoppedEvent{}})
case <-resetTickerCh:
// Reset the ticker if things work smoothly. Otherwise, keep on using the old ticker as
case <-stopTickerCh:
// Stop the ticker if things work smoothly. Otherwise, keep on using the old ticker as
// it can happen that the runtime constantly terminates after a successful start.
if ticker != nil {
ticker.Stop()
Expand All @@ -626,7 +611,7 @@ func (r *sandboxedRuntime) manager() {
}
case <-watchdogCh:
// Check for runtime liveness.
r.watchdogPing()
r.watchdogPing(ctx)
}
}
}
Expand Down

0 comments on commit 77a3175

Please sign in to comment.