From 82da532335c3cbac9f2ae8d5a3cb5888cbbcc9bd Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Mon, 9 Sep 2024 11:13:41 -0600 Subject: [PATCH] stdlib: Significantly refactor the Runner api (#1640) * stdlib: Partially implement runner refactor * Extract local prims into typed functions * Refactor mkdirRunner and writeRunner * Update cache runners * Rename functions * format * Simplify prims with named types * fix test * Fix tests * address comments * Apply suggestions from code review Co-authored-by: Colin Schmidt * address comments * address comments --------- Co-authored-by: Colin Schmidt --- .wakemanifest | 1 - share/wake/lib/system/io.wake | 107 ++++-- share/wake/lib/system/job.wake | 165 +++++++-- share/wake/lib/system/job_cache_runner.wake | 333 +++++++++--------- share/wake/lib/system/plan.wake | 2 +- share/wake/lib/system/plan_scorer.wake | 78 ---- .../wake/lib/system/remote_cache_runner.wake | 106 +++--- share/wake/lib/system/runner.wake | 324 ++++++++--------- src/runtime/job.cpp | 8 +- tests/inspection/canceled/stdout | 2 +- tests/job-cache/runner-hash/test.wake | 4 +- 11 files changed, 559 insertions(+), 571 deletions(-) delete mode 100644 share/wake/lib/system/plan_scorer.wake diff --git a/.wakemanifest b/.wakemanifest index 40a5e7c01..71eca8ff9 100644 --- a/.wakemanifest +++ b/.wakemanifest @@ -31,7 +31,6 @@ share/wake/lib/system/io.wake share/wake/lib/system/job_cache_runner.wake share/wake/lib/system/job.wake share/wake/lib/system/path.wake -share/wake/lib/system/plan_scorer.wake share/wake/lib/system/plan.wake share/wake/lib/system/runner.wake share/wake/lib/system/remote_cache_api.wake diff --git a/share/wake/lib/system/io.wake b/share/wake/lib/system/io.wake index 6fd049ee7..73bd439a2 100644 --- a/share/wake/lib/system/io.wake +++ b/share/wake/lib/system/io.wake @@ -129,22 +129,44 @@ export def read (path: Path): Result String Error = Pass body -> Pass body Fail f -> Fail (makeError f) -target writeImp inputs mode path content = - def writeRunner = - def imp m p c = prim "write" - def pre input = Pair input Unit +# writeRunner: A runner that processes special write jobs +# +# Allows for calls to the write prim to be tracked in the database as any other job. +# Ideally content would be part of RunnerInputCmd however this gets tracked exactly in the database +# which means all writes would use 2x the total storage in the database. +def writeRunner (content: String) = + def primWrite (mode: Integer) (path: String) (content: String): Result String String = + (\_ \_ \_ prim "write") mode path content + + def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + # Command must be ("", "-m", "{string mode}", "{string path}", Nil) + require "", "-m", smode, path, Nil = cmd + else panic "writeImp violated command-line contract" + + # Insert the job into the database + def _ = primJobVirtual job "" "" predict + + # Actually trigger the effects required by the job + require Some mode = int smode + else failWithError "write {path}: Unable to convert mode to Integer ({smode})" - def post = match _ - Pair (Fail f) _ -> Fail f - Pair (Pass output) Unit -> - if mode < 0 || mode > 0x1ff then - Fail (makeError "write {path}: Invalid mode ({strOctal mode})") - else match (imp mode path content) - Fail f -> Fail (makeError f) - Pass path -> Pass (editRunnerOutputOutputs (path, _) output) + require True = mode >= 0 && mode <= 0x1ff + else failWithError "write {path}: Invalid mode ({strOctal mode})" - makeRunner "write" (\_ Pass 0.0) pre post virtualRunner + def writeTask = primWrite mode path content + # Wait for the virtual job to complete + require Pass reality = job.getJobReality + + match writeTask + Fail f -> failWithError f + Pass path -> + RunnerOutput (vis | map getPathName) (path,) reality + | Pass + + makeRunner "write" run + +target writeImp inputs mode path content = # There are a couple likely bad paths that we don't want the user writing to # so we give good error messages for these cases require False = path ==* "" @@ -174,11 +196,11 @@ target writeImp inputs mode path content = # If all those checks pass we go ahead and perform the write. The write will # overwrite single files but it will not overwrite a whole directory with a file. - makeExecPlan ("", "0{strOctal mode}", path, Nil) inputs + makeExecPlan ("", "-m", "0{strOctal mode}", path, Nil) inputs | setPlanLabel "write: {path} 0{strOctal mode}" | setPlanOnce False | setPlanEnvironment Nil - | runJobWith writeRunner + | runJobWith (writeRunner content) | setJobInspectVisibilityHidden | getJobOutput @@ -248,29 +270,40 @@ export def installIn (toRoot: String) (fromRoot: String) (sourcePath: Path): Res else installAs (in toRoot rel) sourcePath +# mkdirRunner: A runner that processes special mkdir jobs +# +# Allows for calls to the mkdir prim to be tracked in the database as any other job def mkdirRunner: Runner = - def imp m p = prim "mkdir" - - def pre = match _ - Fail f -> Pair (Fail f) (Pair "" "") - Pass input -> match input.getRunnerInputCommand - _, _, mode, dir, Nil -> Pair (Pass input) (Pair mode dir) - _ -> unreachable "mkdirImp violated command-line contract" - - def post = match _ - Pair (Fail f) _ -> Fail f - Pair (Pass output) (Pair smode dir) -> - def mode = - int smode - | getOrElse 0x200 - - if mode < 0 || mode > 0x1ff then - Fail (makeError "mkdir {dir}: Invalid mode ({smode})") - else match (imp mode dir) - Fail f -> Fail (makeError f) - Pass path -> Pass (editRunnerOutputOutputs (path, _) output) - - makeRunner "mkdir" (\_ Pass 0.0) pre post virtualRunner + def primMkdir (mode: Integer) (path: String): Result String String = + (\_ \_ prim "mkdir") mode path + + def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + # Command must be ("", "-m", "{string mode}", "{string path}", Nil) + require "", "-m", smode, path, Nil = cmd + else panic "mkdirImp violated command-line contract" + + # Insert the job into the database + def _ = primJobVirtual job "" "" predict + + # Actually trigger the effects required by the job + require Some mode = int smode + else failWithError "write {path}: Unable to convert mode to Integer ({smode})" + + require True = mode >= 0 && mode <= 0x1ff + else failWithError "mkdir {path}: Invalid mode ({smode})" + + def mkdirTask = primMkdir mode path + + # Wait for the virtual job to complete + require Pass reality = job.getJobReality + + match mkdirTask + Fail f -> failWithError f + Pass path -> + RunnerOutput (vis | map getPathName) (path,) reality + | Pass + + makeRunner "mkdir" run def mkdirImp inputs mode path = makeExecPlan ("", "-m", "0{strOctal mode}", path, Nil) inputs diff --git a/share/wake/lib/system/job.wake b/share/wake/lib/system/job.wake index 4110fd2fe..263141c1c 100644 --- a/share/wake/lib/system/job.wake +++ b/share/wake/lib/system/job.wake @@ -15,41 +15,134 @@ package wake +# JobKey: The values that the database uses to discern a unique job +# +# If all values of two jobs are identical, then the jobs are considered identical. +# Used to determine reuse eligibility. +export tuple JobKey = + # The working directory of the job + export dir: String + # A string path to a file which should be passed as the stdin to a job + export stdin: String + # The environement that the job runs in + export env: String + # The commmand of the job + export cmd: String + # A unique hash used to discern two nearly identical jobs with some environmental change + export signature: Integer + # The list of files (separated by \0) that the job can see when running + export visible: String + # Boolean integer representing if the job should be launched such that it appears to be + # launched directly by a human (ie launched interactively) + export isatty: Integer + +# Create/reserve a job handle, parameters aren't necessarily finalized +export def primJobCreate (label: String) (jobKey: JobKey) (keep: Integer) (echo: String) (stdout: String) (stderr: String): Job = + def JobKey dir stdin env cmd signature visible isatty = jobKey + + (\_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_create") + label + dir + stdin + env + cmd + signature + visible + keep + echo + stdout + stderr + isatty + +# Imediatly complete a job with the provided ouputs without launching a process +export def primJobVirtual (job: Job) (stdout: String) (stderr: String) (usage: Usage): Unit = + def Usage status runtime cputime membytes ibytes obytes = usage + + (\_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_virtual") + job + stdout + stderr + status + runtime + cputime + membytes + ibytes + obytes + +# Launch the job via a child process. Values such as command or environment can be freely changed from the initial reservation. +export def primJobLaunch (job: Job) (jobKey: JobKey) (usage: Usage): Unit = + def JobKey dir stdin env cmd _signature _visible isatty = jobKey + def Usage status runtime cputime membytes ibytes obytes = usage + + (\_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_launch") + job + dir + stdin + env + cmd + status + runtime + cputime + membytes + ibytes + obytes + isatty + +# Complete a job before launch with userland defined failure +export def primJobFailLaunch (job: Job) (error: Error): Unit = + (\_ \_ prim "job_fail_launch") job error + +# Complete a job after launch with userland defined failure +export def primJobFailFinish (job: Job) (error: Error): Unit = + (\_ \_ prim "job_fail_finish") job error + +# Complete a job successfully by providing to the runtime the inputs/outputs/usage of the job +export def primJobFinish (job: Job) (inputs: String) (outputs: String) (all_outputs: String) (usage: Usage): Unit = + def Usage status runtime cputime membytes ibytes obytes = usage + + (\_ \_ \_ \_ \_ \_ \_ \_ \_ \_ prim "job_finish") + job + inputs + outputs + all_outputs + status + runtime + cputime + membytes + ibytes + obytes + +# Look up a job in the local database. Returns a completed Job handle with outputs already resolved if it is already cached +export def primJobCache (jobKey: JobKey): Pair (List Job) (List (Pair String String)) = + def JobKey dir stdin env cmd signature visible isatty = jobKey + + (\_ \_ \_ \_ \_ \_ \_ prim "job_cache") dir stdin env cmd signature visible isatty + +# Creates a hash of 5 elements +export def primHash5 a b c d e: Integer = + (\_ \_ \_ \_ \_ prim "hash") a b c d e + # Helper function similar to cat with that adds a null byte after each string then combines them. # Leaves a null byte as the last character of the string def implode strings = cat (foldr (_, "\0", _) Nil strings) +# Helper function that hashs the signature parts of a job +def jobSignature cmd res fni fno keep = + primHash5 cmd res fni fno keep + def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo stdout stderr label isatty: Job = - def create label dir stdin env cmd signature visible keep echo stdout stderr isatty = - prim "job_create" + def hash = jobSignature cmd res finputs foutputs keep - def finish job inputs outputs all_outputs status runtime cputime membytes ibytes obytes = - prim "job_finish" + def visKey = + vis + | map getPathName + | implode - def badfinish job error = prim "job_fail_finish" - def cache dir stdin env cmd signature visible isatty = prim "job_cache" - def signature cmd res fni fno keep = prim "hash" - def hash = signature cmd res finputs foutputs keep + def jobKey = JobKey dir stdin env.implode cmd.implode hash visKey isatty.booleanToInteger def build Unit = - def visStrings = map getPathName vis - - def job = - create - label - dir - stdin - env.implode - cmd.implode - hash - visStrings.implode - (booleanToInteger keep) - echo - stdout - stderr - (booleanToInteger isatty) - + def job = primJobCreate label jobKey keep.booleanToInteger echo stdout stderr def prefix = str (getJobId job) def usage = @@ -60,8 +153,8 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty)) def final _ = match output - Fail e -> badfinish job e - Pass (RunnerOutput inputs outputs (Usage status runtime cputime mem in out)) -> + Fail e -> primJobFailLaunch job e + Pass (RunnerOutput inputs outputs reality) -> def input = finputs inputs | map simplify @@ -72,7 +165,7 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st | computeHashes prefix | implode - finish job input output (implode outputs) status runtime cputime mem in out + primJobFinish job input output (implode outputs) reality # Make sure we don't hash files before the job has stopped running def _ = waitJobMerged final job @@ -96,12 +189,14 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st job - match keep - False -> build Unit - True -> - match (cache dir stdin env.implode cmd.implode hash (map getPathName vis).implode (booleanToInteger isatty)) - Pair (job, _) last -> confirm True last job - Pair Nil last -> confirm False last (build Unit) + require True = keep + else build Unit + + def cache = primJobCache jobKey + + match cache + Pair (job, _) last -> confirm True last job + Pair Nil last -> confirm False last (build Unit) # Only run if the first four arguments differ target runOnce cmd env dir stdin vis isatty run \ res usage finputs foutputs keep echo stdout stderr label = @@ -145,7 +240,7 @@ export def runJobImp label cmd env dir stdin res usage finputs foutputs vis pers label isatty -export def runJobWith (Runner _ _ run) (Plan label cmd vis env dir stdin stdout stderr echo pers res usage finputs foutputs isatty) = +export def runJobWith (Runner _ run) (Plan label cmd vis env dir stdin stdout stderr echo pers res usage finputs foutputs isatty) = runJobImp label cmd env dir stdin res usage finputs foutputs vis pers run echo stdout stderr isatty # Set the value of a tag on a Job diff --git a/share/wake/lib/system/job_cache_runner.wake b/share/wake/lib/system/job_cache_runner.wake index 7fc130d62..8ed0c1d36 100644 --- a/share/wake/lib/system/job_cache_runner.wake +++ b/share/wake/lib/system/job_cache_runner.wake @@ -55,199 +55,194 @@ def getPath input = # wakeroot is the absolute sandbox-path from which input and output files will # be interpreted as being relative to if they're in fact relative. -export def mkJobCacheRunner (hashFn: Result RunnerInput Error => Result String Error) (wakeroot: String) ((Runner name score baseDoIt): Runner): Runner = - def virtual job stdout stderr status runtime cputime membytes ibytes obytes = prim "job_virtual" - def badlaunch job error = prim "job_fail_launch" +export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakeroot: String) ((Runner name baseDoIt): Runner): Runner = def job_cache_read str = prim "job_cache_read" def job_cache_add str = prim "job_cache_add" - def doit job runnerInput = match runnerInput - Fail e -> - def _ = badlaunch job e + def run (job: Job) (input: RunnerInput): Result RunnerOutput Error = + def (RunnerInput label cmd vis env dir stdin _ prefix _ _) = input - Fail e - Pass (RunnerInput label cmd vis env dir stdin _ prefix _ _) -> - def mkVisJson (Path path hash) = - JObject ( - "path" :-> JString path, - "hash" :-> JString hash, - ) + def mkVisJson (Path path hash) = + JObject ( + "path" :-> JString path, + "hash" :-> JString hash, + ) - def jobCacheVisible = JArray (map mkVisJson vis) + def jobCacheVisible = JArray (map mkVisJson vis) - require Pass hashKey = hashFn runnerInput + require Pass hashKey = hashFn input - def jobCacheJsonIn = - prettyJSON - $ JObject ( - "wakeroot" :-> JString wakeroot, - "cwd" :-> JString dir, - "command_line" :-> JString cmd.implode, - "environment" :-> JString env.implode, - "stdin" :-> JString stdin, - "input_files" :-> jobCacheVisible, - "client_cwd" :-> JString workspace, - "runner_hash" :-> JString hashKey, - "dir_redirects" :-> JObject (wakeroot :-> JString "./",), - ) + def jobCacheJsonIn = + prettyJSON + $ JObject ( + "wakeroot" :-> JString wakeroot, + "cwd" :-> JString dir, + "command_line" :-> JString cmd.implode, + "environment" :-> JString env.implode, + "stdin" :-> JString stdin, + "input_files" :-> jobCacheVisible, + "client_cwd" :-> JString workspace, + "runner_hash" :-> JString hashKey, + "dir_redirects" :-> JObject (wakeroot :-> JString "./",), + ) - require Pass cacheResult = - job_cache_read jobCacheJsonIn - | rmapFail failWithError + require Pass cacheResult = + job_cache_read jobCacheJsonIn + | rmapFail failWithError - require Pass jobCacheJsonOut = parseJSONBody cacheResult + require Pass jobCacheJsonOut = parseJSONBody cacheResult - require Pass (JBoolean cacheHit) = jField jobCacheJsonOut "found" - else failWithError "job-cache returned unexpected json schema" + require Pass (JBoolean cacheHit) = jField jobCacheJsonOut "found" + else failWithError "job-cache returned unexpected json schema" - def isDebugOn = - require Some value = getenv "DEBUG_WAKE_SHARED_CACHE" - else False + def isDebugOn = + require Some value = getenv "DEBUG_WAKE_SHARED_CACHE" + else False - value ==~ "1" - - require False = cacheHit - else - def _ = - require True = isDebugOn - - def _ = write ".cache-hit/read.{prefix}.json" "//{label}\n{jobCacheJsonIn}" - def _ = write ".cache-hit/out.{prefix}.json" "//{label}\n{cacheResult}" - - True - - require Pass match_info = jField jobCacheJsonOut "match" - require Pass output_info = jField match_info "output_info" - - require Pass status = - jField output_info "status" - | jInteger - - require Pass runtime = - jField output_info "runtime" - | jDouble - - require Pass cputime = - jField output_info "cputime" - | jDouble - - require Pass mem = - jField output_info "mem" - | jInteger - - require Pass ibytes = - jField output_info "ibytes" - | jInteger - - require Pass obytes = - jField output_info "obytes" - | jInteger - - require Pass inputs = - jField match_info "input_files" - | jArray jString - - require Pass output_files = - jField match_info "output_files" - | jArray getPath - - require Pass output_dirs = - jField match_info "output_dirs" - | jArray getPath - - require Pass output_symlinks = - jField match_info "output_symlinks" - | jArray getPath - - require Pass stdout = - jField output_info "stdout" - | jString - - require Pass stderr = - jField output_info "stderr" - | jString - - def outputs = output_files ++ output_dirs ++ output_symlinks - def predict = Usage status runtime cputime mem ibytes obytes - def _ = virtual job stdout stderr status runtime cputime mem ibytes obytes - - Pass (RunnerOutput inputs outputs predict) + value ==~ "1" + require False = cacheHit + else def _ = require True = isDebugOn - def _ = write ".cache-misses/read.{prefix}.json" "//{label}\n{jobCacheJsonIn}" + def _ = write ".cache-hit/read.{prefix}.json" "//{label}\n{jobCacheJsonIn}" + def _ = write ".cache-hit/out.{prefix}.json" "//{label}\n{cacheResult}" True - # Now we need to run the job - require Pass (RunnerOutput inputs outputs useage) = baseDoIt job runnerInput - - def Usage status runtime cputime mem ibytes obytes = useage - def inputsTree = listToTree scmpCanonical inputs - - def mkOutputFileJson src = - JObject ( - "src" :-> JString src, - "path" :-> JString "{wakeroot}/{src}", - ) - - def jobCacheOutputFiles = JArray (map mkOutputFileJson outputs) - - def jobCacheReadFiles = - def readPaths = filter (tcontains _.getPathName inputsTree) vis - - JArray (map mkVisJson readPaths) - - require Pass stdout = job.getJobFailedStdoutRaw - require Pass stderr = job.getJobFailedStderrRaw - - def jobCacheAddJson = - prettyJSON - $ JObject ( - "wakeroot" :-> JString wakeroot, - "cwd" :-> JString dir, - "command_line" :-> JString cmd.implode, - "environment" :-> JString env.implode, - "stdin" :-> JString stdin, - "runner_hash" :-> JString hashKey, - "input_files" :-> jobCacheReadFiles, - "input_dirs" :-> JArray Nil, # TODO: This will need some fuse work to make good on - "output_files" :-> jobCacheOutputFiles, - "status" :-> JInteger status, - "runtime" :-> JDouble runtime, - "cputime" :-> JDouble cputime, - "mem" :-> JInteger mem, - "ibytes" :-> JInteger ibytes, - "obytes" :-> JInteger obytes, - "stdout" :-> JString stdout, - "stderr" :-> JString stderr, - "client_cwd" :-> JString workspace, - ) - - # We put this in a def so that it does not block the return below. - # This ensures that the effect still occurs and blocks wake finishing but the job - # itself is not blocked by writing to the cache. This allows us to tolorate a lot - # of slow down on cache writing. + require Pass match_info = jField jobCacheJsonOut "match" + require Pass output_info = jField match_info "output_info" + + require Pass status = + jField output_info "status" + | jInteger + + require Pass runtime = + jField output_info "runtime" + | jDouble + + require Pass cputime = + jField output_info "cputime" + | jDouble + + require Pass mem = + jField output_info "mem" + | jInteger + + require Pass ibytes = + jField output_info "ibytes" + | jInteger + + require Pass obytes = + jField output_info "obytes" + | jInteger + + require Pass inputs = + jField match_info "input_files" + | jArray jString + + require Pass output_files = + jField match_info "output_files" + | jArray getPath + + require Pass output_dirs = + jField match_info "output_dirs" + | jArray getPath + + require Pass output_symlinks = + jField match_info "output_symlinks" + | jArray getPath + + require Pass stdout = + jField output_info "stdout" + | jString + + require Pass stderr = + jField output_info "stderr" + | jString + + def outputs = output_files ++ output_dirs ++ output_symlinks + def predict = Usage status runtime cputime mem ibytes obytes + def _ = primJobVirtual job stdout stderr predict + + Pass (RunnerOutput inputs outputs predict) + + def _ = + require True = isDebugOn + + def _ = write ".cache-misses/read.{prefix}.json" "//{label}\n{jobCacheJsonIn}" + + True + + # Now we need to run the job + require Pass (RunnerOutput inputs outputs usage) = baseDoIt job (Pass input) + + def Usage status runtime cputime mem ibytes obytes = usage + def inputsTree = listToTree scmpCanonical inputs + + def mkOutputFileJson src = + JObject ( + "src" :-> JString src, + "path" :-> JString "{wakeroot}/{src}", + ) + + def jobCacheOutputFiles = JArray (map mkOutputFileJson outputs) + + def jobCacheReadFiles = + def readPaths = filter (tcontains _.getPathName inputsTree) vis + + JArray (map mkVisJson readPaths) + + require Pass stdout = job.getJobFailedStdoutRaw + require Pass stderr = job.getJobFailedStderrRaw + + def jobCacheAddJson = + prettyJSON + $ JObject ( + "wakeroot" :-> JString wakeroot, + "cwd" :-> JString dir, + "command_line" :-> JString cmd.implode, + "environment" :-> JString env.implode, + "stdin" :-> JString stdin, + "runner_hash" :-> JString hashKey, + "input_files" :-> jobCacheReadFiles, + "input_dirs" :-> JArray Nil, # TODO: This will need some fuse work to make good on + "output_files" :-> jobCacheOutputFiles, + "status" :-> JInteger status, + "runtime" :-> JDouble runtime, + "cputime" :-> JDouble cputime, + "mem" :-> JInteger mem, + "ibytes" :-> JInteger ibytes, + "obytes" :-> JInteger obytes, + "stdout" :-> JString stdout, + "stderr" :-> JString stderr, + "client_cwd" :-> JString workspace, + ) + + # We put this in a def so that it does not block the return below. + # This ensures that the effect still occurs and blocks wake finishing but the job + # itself is not blocked by writing to the cache. This allows us to tolorate a lot + # of slow down on cache writing. + def _ = def _ = - def _ = - require True = isDebugOn + require True = isDebugOn - def _ = write ".cache-misses/write.{prefix}.json" "//{label}\n{jobCacheAddJson}" + def _ = write ".cache-misses/write.{prefix}.json" "//{label}\n{jobCacheAddJson}" - True + True - # Caching a failed job is a waste of space, never do that - require True = status == 0 - else Pass "" + # Caching a failed job is a waste of space, never do that + require True = status == 0 + else Pass "" - # Sometimes we want a read-only cache. For instance read-only pre-merge - # with read-write post-merge. - require None = getenv "WAKE_LOCAL_JOB_CACHE_READ_ONLY" - else Pass "" + # Sometimes we want a read-only cache. For instance read-only pre-merge + # with read-write post-merge. + require None = getenv "WAKE_LOCAL_JOB_CACHE_READ_ONLY" + else Pass "" - job_cache_add jobCacheAddJson + job_cache_add jobCacheAddJson - Pass (RunnerOutput (map getPathName vis) outputs useage) + Pass (RunnerOutput (map getPathName vis) outputs usage) - Runner "job-cache: {name}" score doit + makeRunner "job-cache: {name}" run diff --git a/share/wake/lib/system/plan.wake b/share/wake/lib/system/plan.wake index 1b271c8e9..11accbb50 100644 --- a/share/wake/lib/system/plan.wake +++ b/share/wake/lib/system/plan.wake @@ -16,7 +16,7 @@ package wake export data Persistence = - # Job should be re-executed on every runJob call. + # Job should be re-executed on every runJobWith call. # # In this case, no job deduplication is performed and so it must # *not* write any files (stdout/stderr are fine) or be guaranteed to only be diff --git a/share/wake/lib/system/plan_scorer.wake b/share/wake/lib/system/plan_scorer.wake deleted file mode 100644 index 517fda0a3..000000000 --- a/share/wake/lib/system/plan_scorer.wake +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright 2019 SiFive, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You should have received a copy of LICENSE.Apache2 along with -# this software. If not, you may obtain a copy at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -package plan_scorer - -from wake import _ - -# Runners usable by plan scorer. -export topic runner: Runner - -publish runner = - defaultRunner, Nil - -# Run a job, via a Runner chosen based on 'score' functions. -export def runJob (p: Plan): Job = match p - Plan label cmd vis env dir stdin stdout stderr echo pers res usage finputs foutputs isatty -> - def implode l = cat (foldr (_, "\0", _) Nil l) - def bToInt b = if b then 1 else 0 - - # Transform the 'List Runner' into 'List RunnerOption' - def qualify runner = match runner - Runner name scorefn fn -> match (scorefn p) - Pass x if x <=. 0.0 -> Reject "{name}: non-positive score" - Pass x -> Accept x fn - Fail x -> Reject "{name} {x}" - - def opts = - subscribe runner - | map qualify - - def best acc = match _ acc - (Reject _) _ -> acc - (Accept score fn) (Pair bests _bestr) -> - if score >. bests then - Pair score (Some fn) - else - acc - - match (opts | foldl best (Pair 0.0 None) | getPairSecond) - Some r -> - runJobImp label cmd env dir stdin res usage finputs foutputs vis pers r echo stdout stderr isatty - None -> - def create label dir stdin env cmd signature visible keep echo stdout stderr isatty = - prim "job_create" - - def badfinish job e = prim "job_fail_finish" - def badlaunch job e = prim "job_fail_launch" - - def job = - create label dir stdin env.implode cmd.implode 0 "" 0 "echo" "info" "error" (bToInt isatty) - - def error = - def pretty = match _ - Accept _ _ -> "" - Reject why -> why - - makeError "No runner for '{job.getJobDescription}' available, because: {map pretty opts | catWith ", "}" - - # Make sure badlaunch completes before badfinish - def _ = wait (\_ badfinish job error) (badlaunch job error) - - job - -data RunnerOption = - Accept (score: Double) (runnerFn: Job => Result RunnerInput Error => Result RunnerOutput Error) - Reject (why: String) diff --git a/share/wake/lib/system/remote_cache_runner.wake b/share/wake/lib/system/remote_cache_runner.wake index 001febcdf..b75d159ec 100644 --- a/share/wake/lib/system/remote_cache_runner.wake +++ b/share/wake/lib/system/remote_cache_runner.wake @@ -35,10 +35,7 @@ export target rscRunner (rscApi: RemoteCacheApi): Runner = # ``` # mkRemoteCacheRunner (RemoteCacheApi ...) (\_ Pass "") "" baseRunner = (Runner ...) # ``` -export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: Result RunnerInput Error => Result String Error) (wakeroot: String) ((Runner name score baseDoIt): Runner): Runner = - def virtual job stdout stderr status runtime cputime membytes ibytes obytes = prim "job_virtual" - def badlaunch job error = prim "job_fail_launch" - +export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => Result String Error) (wakeroot: String) ((Runner name baseDoIt): Runner): Runner = def runJobAndUpload job input hashKey = # Run the job to get the results require Pass output = baseDoIt job (Pass input) @@ -63,10 +60,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: Result RunnerIn Pass output - def rehydrateJob response label input job = - require (Match details) = response - else unreachable "two-constructor tuple must have one value" - + def rehydrateJob details label input job = def _ = require True = shouldDebugRemoteCache Unit @@ -220,79 +214,65 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: Result RunnerIn | findFail | addErrorContext "rsc: Failed to create a symlink" - def _ = virtual job stdout stderr status runtime cputime mem ibytes obytes + def _ = primJobVirtual job stdout stderr predict Pass (RunnerOutput inputs outputs predict) - def doit job runnerInput = match runnerInput - Fail e -> - def _ = badlaunch job e + def run (job: Job) (input: RunnerInput): Result RunnerOutput Error = + def label = input.getRunnerInputLabel - Fail e - Pass input -> - def label = input.getRunnerInputLabel + require Pass hashKey = hashFn input - require Pass hashKey = hashFn runnerInput + # If pulling from the cache is not enabled don't bother searching. + require True = rscApi.getRemoteCacheApiCanPull + else runJobAndUpload job input hashKey - # If pulling from the cache is not enabled don't bother searching. - require True = rscApi.getRemoteCacheApiCanPull - else runJobAndUpload job input hashKey + # If the cache server is down or the response is invalid gracefully fallback + def cacheLookupFailed err = + # Always leave a breadcrumb since this should be a rare error. + def _ = breadcrumb "{label}: Failed to search for job in the cache" - # ------------------------------------- - # --- Search the cache for the job --- - # ------------------------------------- + # Leave detailed info if debugging is enabled + def _ = + require True = shouldDebugRemoteCache Unit - # Search the cache for a match - def requestTask = - rscApi - | rscApiFindMatchingJob (mkSearchRequest input hashKey) + def _ = + writeTempFile "remote.cache.lookup.fail" "label: {input.getRunnerInputLabel}\nerror: {err | format}" - require Pass response = requestTask - else - require Fail err = requestTask - else unreachable "Result must be either Pass or Fail" + True - # Always leave a breadcrumb since this should be a rare error. - def _ = breadcrumb "{label}: Failed to search for job in the cache" + # This job isn't getting cached. That's probably preferable since the server + # request failed but good to keep in mind. + baseDoIt job (Pass input) - # Leave detailed info if debugging is enabled + # If a valid response is received from the server then handle it + def cacheLookupSucceeded response = match response + NoMatch -> def _ = require True = shouldDebugRemoteCache Unit + def _ = breadcrumb "{label}: Did not find a match" + def _ = - writeTempFile "remote.cache.lookup.fail" "label: {input.getRunnerInputLabel}\nerror: {err | format}" + writeTempFile "remote.cache.lookup.miss" "label: {input.getRunnerInputLabel}" True - # This job isn't getting cached. That's probably preferable since the server - # request failed but good to keep in mind. - baseDoIt job (Pass input) - - # If a match was found use it - require NoMatch = response - else - match (rehydrateJob response label input job) - Pass x -> Pass x - # If the job hydration fails for any reason just run the job as normal. - # There is no point in attempting to push since the server just said its cached - Fail _ -> baseDoIt job (Pass input) - - def _ = - require True = shouldDebugRemoteCache Unit - - def _ = breadcrumb "{label}: Did not find a match" - - def _ = - writeTempFile "remote.cache.lookup.miss" "label: {input.getRunnerInputLabel}" - - True - - # ------------------------------------- - # --- Insert the job into the cache --- - # ------------------------------------- - runJobAndUpload job input hashKey - - Runner "remote-cache: {name}" score doit + # Run the job locally then insert it into the remote cache + runJobAndUpload job input hashKey + Match details -> match (rehydrateJob details label input job) + # If the rehydration succeeds return the job directly + Pass x -> Pass x + # Otherwise if hydration fails for any reason just run the job as normal. + # There is no point in attempting to push since the server just said its cached + Fail _ -> baseDoIt job (Pass input) + + # Search the remote cache for the job + match (rscApi | rscApiFindMatchingJob (mkSearchRequest input hashKey)) + Pass response -> cacheLookupSucceeded response + Fail err -> cacheLookupFailed err + + makeRunner "remote-cache: {name}" run ## --- Helper functions --- diff --git a/share/wake/lib/system/runner.wake b/share/wake/lib/system/runner.wake index 809244b7a..2caf4b23a 100644 --- a/share/wake/lib/system/runner.wake +++ b/share/wake/lib/system/runner.wake @@ -17,7 +17,8 @@ package wake from remote_cache import rscRunner makeRemoteCacheApi -export tuple Usage = # Unknown quantities are 0 +# Unknown quantities are 0 +export tuple Usage = export Status: Integer export Runtime: Double export CPUtime: Double @@ -55,97 +56,74 @@ export tuple RunnerOutput = # A Runner describes a way to invoke a Plan to get a Job export tuple Runner = export Name: String - export Score: Plan => Result Double String Fn: Job => Result RunnerInput Error => Result RunnerOutput Error -# Create new Runner given pre- and post-hooks around an existing Runner -# param name: String -# param score: type Plan → Result Double String -# Called by runJob to produce a score representing the priority of a runner with respect to the given Plan. -# param pre: type Result RunnerInput Error → Pair (Result RunnerInput Error) a -# Called before the job is run, allowing the runner to modify the input to provide the requested resources. -# param post: type Pair (Result RunnerOutput Error) a → Result RunnerOutput Error -# Similar to the pre function but called after the job has run. post is for editing the reported outputs/inputs/usage of the job. -# param (Runner _ _ run): base runner that the current runner is built on top of -# i.e. JSONRunner is built on localRunner. - -export def makeRunner name score pre post (Runner _ _ run) = - def doit job preInput = match (pre preInput) - Pair runInput state -> - def runOutput = run job runInput - def final _ = post (Pair runOutput state) +# makeRunner: Hides some of the boiler plate required to create a runner +# +# This function requires very advanced wake experience and should be used with the greatest amount +# of caution. Callers must ensure at the very least that `run` calls primJobLaunch and one of the +# many job "wait" functions. Historically runners allowed dispatching to an arbirary "base" or +# "inner" runner. This significantly complicated the system and led to very unexpected interactions. +# It is recomennded that runners don't accept an "inner" runner and instead directly call the job +# primatives. If wrapping is unavoidable then the specific runner being wrapped should be named +# instead of accepting an arbitrary runner parameter. +# +# localRunner is a good reference implementation of the run function. +export def makeRunner (name: String) (run: Job => RunnerInput => Result RunnerOutput Error): Runner = + def do job maybeInput = match maybeInput + Fail e -> + def _ = primJobFailLaunch job e - # Don't run any 'post' steps until the Job has stopped running - waitJobMerged final job + Fail e + Pass input -> run job input - Runner name score doit + Runner name do -# This runner does not detect inputs/outputs on it's own -# You must use Fn{Inputs,Outputs} to fill in this information +# localRunner: A runner that provides no sandboxing protections and no file tracking. +# +# You must use Fn{Inputs,Outputs} to fill in this information for wake to maintain safety and reusability +# Advanced usage only, proceed with caution export def localRunner: Runner = - def launch job dir stdin env cmd status runtime cputime membytes ibytes obytes isatty = - prim "job_launch" + def run (job: Job) ((RunnerInput _ cmd vis env dir stdin _ _ predict isatty): RunnerInput): Result RunnerOutput Error = + def jobKey = JobKey dir stdin env.implode cmd.implode 0 "" isatty.booleanToInteger + def _ = primJobLaunch job jobKey predict - def badlaunch job error = prim "job_fail_launch" + job.getJobReality + |< RunnerOutput (vis | map getPathName) Nil - def doit job = match _ - Fail e -> - def _ = badlaunch job e - - Fail e - Pass (RunnerInput _ cmd vis env dir stdin _ _ predict isatty) -> - def Usage status runtime cputime mem in out = predict - - def _ = - launch - job - dir - stdin - env.implode - cmd.implode - status - runtime - cputime - mem - in - out - (booleanToInteger isatty) - - match (getJobReality job) - Pass reality -> Pass (RunnerOutput (map getPathName vis) Nil reality) - Fail f -> Fail f - - def score _ = Pass 1.0 - - Runner "local" score doit + makeRunner "local" run +# virtualRunner: A runner that immediatly marks the job as complete using the predicted usage +# +# This runner is useful for tracking a unit of work that is job like but not launched as a process export def virtualRunner: Runner = - def virtual job stdout stderr status runtime cputime membytes ibytes obytes = prim "job_virtual" - def badlaunch job error = prim "job_fail_launch" + def run (job: Job) ((RunnerInput _ _ vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + def _ = primJobVirtual job "" "" predict - def doit job = match _ - Fail e -> - def _ = badlaunch job e + job.getJobReality + |< RunnerOutput (vis | map getPathName) Nil - Fail e - Pass (RunnerInput _ _ vis _ _ _ _ _ predict _) -> - def Usage status runtime cputime mem in out = predict + makeRunner "virtual" run - def _ = - virtual job "" "" status runtime cputime mem in out # sets predict+reality +# wrapRunner: Deprecated. Do not use this function. +# +# It will be deleted in the next release. See makeRunner for migration +export def wrapRunner name pre post (Runner _ run) = + def doit job preInput = match (pre preInput) + Pair runInput state -> + def runOutput = run job runInput + def final _ = post (Pair runOutput state) - match (getJobReality job) - Pass reality -> Pass (RunnerOutput (map getPathName vis) Nil reality) - Fail f -> Fail f + # Don't run any 'post' steps until the Job has stopped running + waitJobMerged final job - Runner "virtual" (\_ Pass 0.0) doit + Runner name doit # Implement FUSE-based Runner export def fuseRunner: Runner = def fuse = "{wakePath}/wakebox" - def score _ = Pass 2.0 - makeJSONRunnerPlan fuse score + makeJSONRunnerPlan fuse | editJSONRunnerPlanExtraEnv (editEnvironment "DEBUG_FUSE_WAKE" (\_ getenv "DEBUG_FUSE_WAKE")) | makeJSONRunner @@ -176,124 +154,112 @@ export def defaultRunner: Runner = # RawScript: the path to the script to run jobs with # ExtraArgs: extra arguments to pass to ``RawScript`` # ExtraEnv: environment variables to pass to the script -# Score: runJob chooses the runner with the largest score for a Plan # Estimate: predict local usage based on prior recorded usage tuple JSONRunnerPlan = export RawScript: String export ExtraArgs: List String export ExtraEnv: List String - export Score: Plan => Result Double String export Estimate: Usage => Usage # make a ``JSONRunnerPlan`` with ``Nil`` and ``(_)`` as defaults for ``ExtraArgs`` and ``Estimate`` respectively # rawScript: String; the path to the script to run jobs with -# score: runJob chooses the runner with the largest score for a Plan -export def makeJSONRunnerPlan (rawScript: String) (score: Plan => Result Double String): JSONRunnerPlan = - JSONRunnerPlan rawScript Nil Nil score (_) +export def makeJSONRunnerPlan (rawScript: String): JSONRunnerPlan = + JSONRunnerPlan rawScript Nil Nil (_) # Make a Runner that runs a named script to run jobs # plan: JSONRunnerPlan; a tuple containing the arguments for this function -export def makeJSONRunner (plan: JSONRunnerPlan): Runner = - def rawScript = plan.getJSONRunnerPlanRawScript - def extraArgs = plan.getJSONRunnerPlanExtraArgs - def extraEnv = plan.getJSONRunnerPlanExtraEnv - def score = plan.getJSONRunnerPlanScore - def estimate = plan.getJSONRunnerPlanEstimate +export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate): JSONRunnerPlan): Runner = def script = which (simplify rawScript) - def ok = access script xOK - - def pre = match _ - Fail f -> Pair (Fail f) "" - _ if !ok -> Pair (Fail (makeError "Runner {script} is not executable")) "" - Pass (RunnerInput label command visible environment directory stdin res prefix record isatty) -> - def Usage status runtime cputime membytes inbytes outbytes = record - - def json = - JObject ( - "label" :-> JString label, - "command" :-> command | map JString | JArray, - "environment" :-> environment | map JString | JArray, - "visible" :-> visible | map (_.getPathName.JString) | JArray, - "directory" :-> JString directory, - "stdin" :-> JString stdin, - "resources" :-> res | map JString | JArray, - "version" :-> JString version, - "isolate-network" :-> JBoolean False, - "isolate-pids" :-> JBoolean False, - "mount-ops" :-> JArray (JObject ("type" :-> JString "workspace", "destination" :-> JString ".", Nil), Nil), - "usage" :-> JObject ( - "status" :-> JInteger status, - "runtime" :-> JDouble runtime, - "cputime" :-> JDouble cputime, - "membytes" :-> JInteger membytes, - "inbytes" :-> JInteger inbytes, - "outbytes" :-> JInteger outbytes, - Nil - ), + def executeOk = access script xOK + + def run (job: Job) ((RunnerInput label command visible environment directory stdin res prefix record isatty): RunnerInput): Result RunnerOutput Error = + require True = executeOk + else failWithError "Runner {script} is not executable" + + def Usage status runtime cputime membytes inbytes outbytes = record + + def json = + JObject ( + "label" :-> JString label, + "command" :-> command | map JString | JArray, + "environment" :-> environment | map JString | JArray, + "visible" :-> visible | map (_.getPathName.JString) | JArray, + "directory" :-> JString directory, + "stdin" :-> JString stdin, + "resources" :-> res | map JString | JArray, + "version" :-> JString version, + "isolate-network" :-> JBoolean False, + "isolate-pids" :-> JBoolean False, + "mount-ops" :-> JArray (JObject ("type" :-> JString "workspace", "destination" :-> JString ".", Nil), Nil), + "usage" :-> JObject ( + "status" :-> JInteger status, + "runtime" :-> JDouble runtime, + "cputime" :-> JDouble cputime, + "membytes" :-> JInteger membytes, + "inbytes" :-> JInteger inbytes, + "outbytes" :-> JInteger outbytes, Nil - ) - - require Pass build = - mkdir ".build" - | rmap getPathName - else Pair (Fail (makeError "Failed to 'mkdir .build'.")) "" - - def specFilePath = "{build}/spec-{prefix}.json" - - require Pair (Pass inFile) _ = - write specFilePath (prettyJSON json) - | rmap getPathName - | addErrorContext "Failed to 'write {specFilePath}: '" - | (Pair _ "") - - def outFile = resultPath inFile - def cmd = script, "-I", "-p", inFile, "-o", outFile, extraArgs - - def proxy = - RunnerInput label cmd Nil (extraEnv ++ environment) "." "" Nil prefix (estimate record) isatty - - Pair (Pass proxy) inFile - - def resultPath specPath = replace `spec-` "result-" specPath - - def post = match _ - Pair (Fail f) _ -> Fail f - Pair (Pass (RunnerOutput _ _ (Usage x _ _ _ _ _))) inFile if x != 0 -> - Fail (makeError "Non-zero exit status ({str x}) for JSON runner {script} on {inFile}") - Pair (Pass _) inFile -> - def outFile = resultPath inFile - def json = parseJSONFile (Path outFile "BadHash") - - match json - Fail f -> Fail f - Pass content -> - def _ = markFileCleanable outFile - - def field name = match _ _ - _ (Fail f) -> Fail f - None (Pass fn) -> - Fail "{script} produced {outFile}, which is missing usage/{name}" - (Some x) (Pass fn) -> Pass (fn x) - - def usage = content // `usage` - - def usageResult = - Pass (Usage _ _ _ _ _ _) - | field "status" (usage // `status` | getJInteger) - | field "runtime" (usage // `runtime` | getJDouble) - | field "cputime" (usage // `cputime` | getJDouble) - | field "membytes" (usage // `membytes` | getJInteger) - | field "inbytes" (usage // `inbytes` | getJInteger) - | field "outbytes" (usage // `outbytes` | getJInteger) - - def getK exp = - content // exp - | getJArray - | getOrElse Nil - | mapPartial getJString - - match usageResult - Fail f -> Fail (makeError f) - Pass usage -> Pass (RunnerOutput (getK `inputs`) (getK `outputs`) usage) - - makeRunner "json-{script}" score pre post localRunner + ), + Nil + ) + + require Pass build = + mkdir ".build" + | addErrorContext "Failed to 'mkdir .build'." + |< getPathName + + def specFile = "{build}/spec-{prefix}.json" + def resultFile = "{build}/result-{prefix}.json" + + require Pass _ = + write specFile (prettyJSON json) + | addErrorContext "Failed to 'write {specFile}: '" + |< getPathName + + def cmd = script, "-I", "-p", specFile, "-o", resultFile, extraArgs + + # Rewrite input so that the local runner can run the job with a configured sandbox + def localInput = + RunnerInput label cmd Nil (extraEnv ++ environment) "." "" Nil prefix (estimate record) isatty + + # Dispatch to the local runner via composition and get the outputs + def (Runner _ localRun) = localRunner + + require Pass localOutput = localRun job (Pass localInput) + + def statusCode = localOutput.getRunnerOutputUsage.getUsageStatus + + require 0 = statusCode + else failWithError "Non-zero exit status ({str statusCode}) for JSON runner {script} on {specFile}" + + require Pass content = parseJSONFile (Path resultFile "BadHash") + + def _ = markFileCleanable resultFile + + def field name = match _ _ + _ (Fail f) -> Fail f + None (Pass fn) -> Fail "{script} produced {resultFile}, which is missing usage/{name}" + (Some x) (Pass fn) -> Pass (fn x) + + def usage = content // `usage` + + def usageResult = + Pass (Usage _ _ _ _ _ _) + | field "status" (usage // `status` | getJInteger) + | field "runtime" (usage // `runtime` | getJDouble) + | field "cputime" (usage // `cputime` | getJDouble) + | field "membytes" (usage // `membytes` | getJInteger) + | field "inbytes" (usage // `inbytes` | getJInteger) + | field "outbytes" (usage // `outbytes` | getJInteger) + + def getK exp = + content // exp + | getJArray + | getOrElse Nil + | mapPartial getJString + + match usageResult + Fail f -> Fail (makeError f) + Pass usage -> Pass (RunnerOutput (getK `inputs`) (getK `outputs`) usage) + + makeRunner "json-{script}" run diff --git a/src/runtime/job.cpp b/src/runtime/job.cpp index 6af609b52..cecf29631 100644 --- a/src/runtime/job.cpp +++ b/src/runtime/job.cpp @@ -1880,12 +1880,12 @@ void prim_register_job(JobTable *jobtable, PrimMap &pmap) { // job_virtual. prim_register(pmap, "job_reality", prim_job_reality, type_job_reality, PRIM_PURE); - // The useage reported to job_finish. This is useful because a remote machine or a job + // The usage reported to job_finish. This is useful because a remote machine or a job // that uses caching might appear from observation (e.g. job_reality) to consume far // fewer resources than what we actully care about. prim_register(pmap, "job_report", prim_job_report, type_job_report, PRIM_PURE); - // Previous useage (returns Option Usage if no prior use exists) if previouslly in the database + // Previous usage (returns Option Usage if no prior use exists) if previouslly in the database prim_register(pmap, "job_record", prim_job_record, type_job_record, PRIM_PURE); /***************************************************************************************** @@ -1907,7 +1907,7 @@ void prim_register_job(JobTable *jobtable, PrimMap &pmap) { // the created job. prim_register(pmap, "job_virtual", prim_job_virtual, type_job_virtual, PRIM_IMPURE, jobtable); - // This is where you "finish" a job by explaining what its inputs, outputs, useage etc... + // This is where you "finish" a job by explaining what its inputs, outputs, usage etc... // are. This call unblocks things like `job_output` for instance. prim_register(pmap, "job_finish", prim_job_finish, type_job_finish, PRIM_IMPURE); @@ -1920,7 +1920,7 @@ void prim_register_job(JobTable *jobtable, PrimMap &pmap) { // a pre-step of a runner fails in someway for instance. prim_register(pmap, "job_fail_launch", prim_job_fail_launch, type_job_fail, PRIM_IMPURE); - // Explain to the wake runtime that the job failed to finihs. This can happen if a + // Explain to the wake runtime that the job failed to finish. This can happen if a // post-step of a runner fails in someway for instance. prim_register(pmap, "job_fail_finish", prim_job_fail_finish, type_job_fail, PRIM_IMPURE); diff --git a/tests/inspection/canceled/stdout b/tests/inspection/canceled/stdout index a30ffd581..61adfa2f4 100644 --- a/tests/inspection/canceled/stdout +++ b/tests/inspection/canceled/stdout @@ -1,2 +1,2 @@ -# (3) +# (1) $ sleep 10 diff --git a/tests/job-cache/runner-hash/test.wake b/tests/job-cache/runner-hash/test.wake index b1b82cd37..d7362246e 100644 --- a/tests/job-cache/runner-hash/test.wake +++ b/tests/job-cache/runner-hash/test.wake @@ -14,9 +14,7 @@ def mkTestPlan (s: String): Result Plan Error = | Pass def resourceAwareCacheRunner = - def hashResource x = - require Pass input = x - + def hashResource input = Pass (catWith "." input.getRunnerInputResources) mkJobCacheRunner hashResource "/workspace" fuseRunner