Skip to content

Commit

Permalink
stdlib: Significantly refactor the Runner api (#1640)
Browse files Browse the repository at this point in the history
* stdlib: Partially implement runner refactor

* Extract local prims into typed functions

* Refactor mkdirRunner and writeRunner

* Update cache runners

* Rename functions

* format

* Simplify prims with named types

* fix test

* Fix tests

* address comments

* Apply suggestions from code review

Co-authored-by: Colin Schmidt <[email protected]>

* address comments

* address comments

---------

Co-authored-by: Colin Schmidt <[email protected]>
  • Loading branch information
V-FEXrt and colinschmidt authored Sep 9, 2024
1 parent a2ad5bb commit 82da532
Show file tree
Hide file tree
Showing 11 changed files with 559 additions and 571 deletions.
1 change: 0 additions & 1 deletion .wakemanifest
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ share/wake/lib/system/io.wake
share/wake/lib/system/job_cache_runner.wake
share/wake/lib/system/job.wake
share/wake/lib/system/path.wake
share/wake/lib/system/plan_scorer.wake
share/wake/lib/system/plan.wake
share/wake/lib/system/runner.wake
share/wake/lib/system/remote_cache_api.wake
Expand Down
107 changes: 70 additions & 37 deletions share/wake/lib/system/io.wake
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,44 @@ export def read (path: Path): Result String Error =
Pass body -> Pass body
Fail f -> Fail (makeError f)

target writeImp inputs mode path content =
def writeRunner =
def imp m p c = prim "write"
def pre input = Pair input Unit
# writeRunner: A runner that processes special write jobs
#
# Allows for calls to the write prim to be tracked in the database as any other job.
# Ideally content would be part of RunnerInputCmd however this gets tracked exactly in the database
# which means all writes would use 2x the total storage in the database.
def writeRunner (content: String) =
def primWrite (mode: Integer) (path: String) (content: String): Result String String =
(\_ \_ \_ prim "write") mode path content

def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error =
# Command must be ("<write>", "-m", "{string mode}", "{string path}", Nil)
require "<write>", "-m", smode, path, Nil = cmd
else panic "writeImp violated command-line contract"

# Insert the <write> job into the database
def _ = primJobVirtual job "" "" predict

# Actually trigger the effects required by the job
require Some mode = int smode
else failWithError "write {path}: Unable to convert mode to Integer ({smode})"

def post = match _
Pair (Fail f) _ -> Fail f
Pair (Pass output) Unit ->
if mode < 0 || mode > 0x1ff then
Fail (makeError "write {path}: Invalid mode ({strOctal mode})")
else match (imp mode path content)
Fail f -> Fail (makeError f)
Pass path -> Pass (editRunnerOutputOutputs (path, _) output)
require True = mode >= 0 && mode <= 0x1ff
else failWithError "write {path}: Invalid mode ({strOctal mode})"

makeRunner "write" (\_ Pass 0.0) pre post virtualRunner
def writeTask = primWrite mode path content

# Wait for the virtual job to complete
require Pass reality = job.getJobReality

match writeTask
Fail f -> failWithError f
Pass path ->
RunnerOutput (vis | map getPathName) (path,) reality
| Pass

makeRunner "write" run

target writeImp inputs mode path content =
# There are a couple likely bad paths that we don't want the user writing to
# so we give good error messages for these cases
require False = path ==* ""
Expand Down Expand Up @@ -174,11 +196,11 @@ target writeImp inputs mode path content =

# If all those checks pass we go ahead and perform the write. The write will
# overwrite single files but it will not overwrite a whole directory with a file.
makeExecPlan ("<write>", "0{strOctal mode}", path, Nil) inputs
makeExecPlan ("<write>", "-m", "0{strOctal mode}", path, Nil) inputs
| setPlanLabel "write: {path} 0{strOctal mode}"
| setPlanOnce False
| setPlanEnvironment Nil
| runJobWith writeRunner
| runJobWith (writeRunner content)
| setJobInspectVisibilityHidden
| getJobOutput

Expand Down Expand Up @@ -248,29 +270,40 @@ export def installIn (toRoot: String) (fromRoot: String) (sourcePath: Path): Res
else
installAs (in toRoot rel) sourcePath

# mkdirRunner: A runner that processes special mkdir jobs
#
# Allows for calls to the mkdir prim to be tracked in the database as any other job
def mkdirRunner: Runner =
def imp m p = prim "mkdir"

def pre = match _
Fail f -> Pair (Fail f) (Pair "" "")
Pass input -> match input.getRunnerInputCommand
_, _, mode, dir, Nil -> Pair (Pass input) (Pair mode dir)
_ -> unreachable "mkdirImp violated command-line contract"

def post = match _
Pair (Fail f) _ -> Fail f
Pair (Pass output) (Pair smode dir) ->
def mode =
int smode
| getOrElse 0x200

if mode < 0 || mode > 0x1ff then
Fail (makeError "mkdir {dir}: Invalid mode ({smode})")
else match (imp mode dir)
Fail f -> Fail (makeError f)
Pass path -> Pass (editRunnerOutputOutputs (path, _) output)

makeRunner "mkdir" (\_ Pass 0.0) pre post virtualRunner
def primMkdir (mode: Integer) (path: String): Result String String =
(\_ \_ prim "mkdir") mode path

def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error =
# Command must be ("<mkdir>", "-m", "{string mode}", "{string path}", Nil)
require "<mkdir>", "-m", smode, path, Nil = cmd
else panic "mkdirImp violated command-line contract"

# Insert the <mkdir> job into the database
def _ = primJobVirtual job "" "" predict

# Actually trigger the effects required by the job
require Some mode = int smode
else failWithError "write {path}: Unable to convert mode to Integer ({smode})"

require True = mode >= 0 && mode <= 0x1ff
else failWithError "mkdir {path}: Invalid mode ({smode})"

def mkdirTask = primMkdir mode path

# Wait for the virtual job to complete
require Pass reality = job.getJobReality

match mkdirTask
Fail f -> failWithError f
Pass path ->
RunnerOutput (vis | map getPathName) (path,) reality
| Pass

makeRunner "mkdir" run

def mkdirImp inputs mode path =
makeExecPlan ("<mkdir>", "-m", "0{strOctal mode}", path, Nil) inputs
Expand Down
165 changes: 130 additions & 35 deletions share/wake/lib/system/job.wake
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,134 @@

package wake

# JobKey: The values that the database uses to discern a unique job
#
# If all values of two jobs are identical, then the jobs are considered identical.
# Used to determine reuse eligibility.
export tuple JobKey =
# The working directory of the job
export dir: String
# A string path to a file which should be passed as the stdin to a job
export stdin: String
# The environement that the job runs in
export env: String
# The commmand of the job
export cmd: String
# A unique hash used to discern two nearly identical jobs with some environmental change
export signature: Integer
# The list of files (separated by \0) that the job can see when running
export visible: String
# Boolean integer representing if the job should be launched such that it appears to be
# launched directly by a human (ie launched interactively)
export isatty: Integer

# Create/reserve a job handle, parameters aren't necessarily finalized
export def primJobCreate (label: String) (jobKey: JobKey) (keep: Integer) (echo: String) (stdout: String) (stderr: String): Job =
def JobKey dir stdin env cmd signature visible isatty = jobKey

(\_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_create")
label
dir
stdin
env
cmd
signature
visible
keep
echo
stdout
stderr
isatty

# Imediatly complete a job with the provided ouputs without launching a process
export def primJobVirtual (job: Job) (stdout: String) (stderr: String) (usage: Usage): Unit =
def Usage status runtime cputime membytes ibytes obytes = usage

(\_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_virtual")
job
stdout
stderr
status
runtime
cputime
membytes
ibytes
obytes

# Launch the job via a child process. Values such as command or environment can be freely changed from the initial reservation.
export def primJobLaunch (job: Job) (jobKey: JobKey) (usage: Usage): Unit =
def JobKey dir stdin env cmd _signature _visible isatty = jobKey
def Usage status runtime cputime membytes ibytes obytes = usage

(\_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_launch")
job
dir
stdin
env
cmd
status
runtime
cputime
membytes
ibytes
obytes
isatty

# Complete a job before launch with userland defined failure
export def primJobFailLaunch (job: Job) (error: Error): Unit =
(\_ \_ prim "job_fail_launch") job error

# Complete a job after launch with userland defined failure
export def primJobFailFinish (job: Job) (error: Error): Unit =
(\_ \_ prim "job_fail_finish") job error

# Complete a job successfully by providing to the runtime the inputs/outputs/usage of the job
export def primJobFinish (job: Job) (inputs: String) (outputs: String) (all_outputs: String) (usage: Usage): Unit =
def Usage status runtime cputime membytes ibytes obytes = usage

(\_ \_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_finish")
job
inputs
outputs
all_outputs
status
runtime
cputime
membytes
ibytes
obytes

# Look up a job in the local database. Returns a completed Job handle with outputs already resolved if it is already cached
export def primJobCache (jobKey: JobKey): Pair (List Job) (List (Pair String String)) =
def JobKey dir stdin env cmd signature visible isatty = jobKey

(\_ \_ \_ \_ \_ \_ \_ prim "job_cache") dir stdin env cmd signature visible isatty

# Creates a hash of 5 elements
export def primHash5 a b c d e: Integer =
(\_ \_ \_ \_ \_ prim "hash") a b c d e

# Helper function similar to cat with that adds a null byte after each string then combines them.
# Leaves a null byte as the last character of the string
def implode strings =
cat (foldr (_, "\0", _) Nil strings)

# Helper function that hashs the signature parts of a job
def jobSignature cmd res fni fno keep =
primHash5 cmd res fni fno keep

def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo stdout stderr label isatty: Job =
def create label dir stdin env cmd signature visible keep echo stdout stderr isatty =
prim "job_create"
def hash = jobSignature cmd res finputs foutputs keep

def finish job inputs outputs all_outputs status runtime cputime membytes ibytes obytes =
prim "job_finish"
def visKey =
vis
| map getPathName
| implode

def badfinish job error = prim "job_fail_finish"
def cache dir stdin env cmd signature visible isatty = prim "job_cache"
def signature cmd res fni fno keep = prim "hash"
def hash = signature cmd res finputs foutputs keep
def jobKey = JobKey dir stdin env.implode cmd.implode hash visKey isatty.booleanToInteger

def build Unit =
def visStrings = map getPathName vis

def job =
create
label
dir
stdin
env.implode
cmd.implode
hash
visStrings.implode
(booleanToInteger keep)
echo
stdout
stderr
(booleanToInteger isatty)

def job = primJobCreate label jobKey keep.booleanToInteger echo stdout stderr
def prefix = str (getJobId job)

def usage =
Expand All @@ -60,8 +153,8 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st
run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty))

def final _ = match output
Fail e -> badfinish job e
Pass (RunnerOutput inputs outputs (Usage status runtime cputime mem in out)) ->
Fail e -> primJobFailLaunch job e
Pass (RunnerOutput inputs outputs reality) ->
def input =
finputs inputs
| map simplify
Expand All @@ -72,7 +165,7 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st
| computeHashes prefix
| implode

finish job input output (implode outputs) status runtime cputime mem in out
primJobFinish job input output (implode outputs) reality

# Make sure we don't hash files before the job has stopped running
def _ = waitJobMerged final job
Expand All @@ -96,12 +189,14 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st

job

match keep
False -> build Unit
True ->
match (cache dir stdin env.implode cmd.implode hash (map getPathName vis).implode (booleanToInteger isatty))
Pair (job, _) last -> confirm True last job
Pair Nil last -> confirm False last (build Unit)
require True = keep
else build Unit

def cache = primJobCache jobKey

match cache
Pair (job, _) last -> confirm True last job
Pair Nil last -> confirm False last (build Unit)

# Only run if the first four arguments differ
target runOnce cmd env dir stdin vis isatty run \ res usage finputs foutputs keep echo stdout stderr label =
Expand Down Expand Up @@ -145,7 +240,7 @@ export def runJobImp label cmd env dir stdin res usage finputs foutputs vis pers
label
isatty

export def runJobWith (Runner _ _ run) (Plan label cmd vis env dir stdin stdout stderr echo pers res usage finputs foutputs isatty) =
export def runJobWith (Runner _ run) (Plan label cmd vis env dir stdin stdout stderr echo pers res usage finputs foutputs isatty) =
runJobImp label cmd env dir stdin res usage finputs foutputs vis pers run echo stdout stderr isatty

# Set the value of a tag on a Job
Expand Down
Loading

0 comments on commit 82da532

Please sign in to comment.