From ac7e5a943bb53d7fe11075aefc3e6292a56d666a Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 Oct 2020 19:02:01 +0300 Subject: [PATCH 01/10] Chunk worker output to 1024 characters When we output too much data on the zpty fd it can become corrupt in some cases, this commit protects against this by only printing 1024 bytes at a time. Further, when notifying via kill signas, we notify the parent process after every chunk so that the fd can be emptied. In my testing, 1024 bytes seems to be the maximum safe limit that can be used. Perhaps evidence to support this conclusion could be found in the zsh code base. --- async.zsh | 53 ++++++++++++++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/async.zsh b/async.zsh index de13e75..d5e4c60 100644 --- a/async.zsh +++ b/async.zsh @@ -12,6 +12,9 @@ typeset -g ASYNC_VERSION=1.8.6 # Produce debug output from zsh-async when set to 1. typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0} +# The maximum buffer size when outputing to zpty. +typeset -g ASYNC_MAX_BUFFER_SIZE=$((1024)) + # Execute commands that can manipulate the environment inside the async worker. Return output via callback. _async_eval() { local ASYNC_JOB_NAME @@ -20,7 +23,7 @@ _async_eval() { # simplicity, this could be improved in the future. { eval "$@" - } &> >(ASYNC_JOB_NAME=[async/eval] _async_job 'command -p cat') + } &> >(ASYNC_JOB_NAME=[async/eval] _async_job 0 'command -p cat') } # Wrapper for jobs executed by the async worker, gives output in parseable format with execution time @@ -31,6 +34,9 @@ _async_job() { # Store start time for job. float -F duration=$EPOCHREALTIME + # Parent pid for notifications via kill signal. + local parent_pid=$1; shift + # Run the command and capture both stdout (`eval`) and stderr (`cat`) in # separate subshells. When the command is complete, we grab write lock # (mutex token) and output everything except stderr inside the command @@ -56,8 +62,28 @@ _async_job() { # Grab mutex lock, stalls until token is available. read -r -k 1 -p tok || return 1 - # Return output ( ). - print -r -n - "$out" + # Chunk up the output so as to not fill up the entire fd. + for ((i = 1; i < $#out; i += ASYNC_MAX_BUFFER_SIZE)); do + # Return output ( ). + if ! print -r -n - "${out[$i,$((i + ASYNC_MAX_BUFFER_SIZE - 1))]}"; then + # BUG(mafredri): The worker and parent process should be informed. + break + fi + + # When notifications are enabled, inform the parent that the + # buffer is filling up and must be consumed. + if ((parent_pid)); then + # On older version of zsh (pre 5.2) we notify the parent through a + # SIGWINCH signal because `zpty` did not return a file descriptor (fd) + # prior to that. + if (( parent_pid )); then + # We use SIGWINCH for compatibility with older versions of zsh + # (pre 5.1.1) where other signals (INFO, ALRM, USR1, etc.) could + # cause a deadlock in the shell under certain circumstances. + kill -WINCH $parent_pid + fi + fi + done # Unlock mutex by inserting a token. print -n -p $tok @@ -115,22 +141,8 @@ _async_worker() { fi } - child_exit() { - close_idle_coproc - - # On older version of zsh (pre 5.2) we notify the parent through a - # SIGWINCH signal because `zpty` did not return a file descriptor (fd) - # prior to that. - if (( notify_parent )); then - # We use SIGWINCH for compatibility with older versions of zsh - # (pre 5.1.1) where other signals (INFO, ALRM, USR1, etc.) could - # cause a deadlock in the shell under certain circumstances. - kill -WINCH $parent_pid - fi - } - # Register a SIGCHLD trap to handle the completion of child processes. - trap child_exit CHLD + trap close_idle_coproc CHLD # Process option parameters passed to worker. while getopts "np:uz" opt; do @@ -141,6 +153,9 @@ _async_worker() { z) notify_parent=0;; # Uses ZLE watcher instead. esac done + if ((!notify_parent)) { + parent_pid=0 + } # Terminate all running jobs, note that this function does not # reinstall the child trap. @@ -243,7 +258,7 @@ _async_worker() { _async_eval $cmd else # Run job in background, completed jobs are printed to stdout. - _async_job $cmd & + _async_job $parent_pid $cmd & # Store pid because zsh job manager is extremely unflexible (show jobname as non-unique '$job')... storage[$job]="$!" fi From 2f79a01514bfc1837799ad6c884c8993bd84dc0e Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 Oct 2020 19:09:08 +0300 Subject: [PATCH 02/10] Force zpty fd to behave by wrapping message in newlines There seems to be a chance that, when the zpty simultaneously outputs and receives data, the output data is lost. Introducing newlines seems to fix the issue. Simply introducing trailing newlines did not. Reason is not entirely clear, perhaps there is something special about the zpty fd or perhaps this is a property of `zpty -r` / `read`. --- async.zsh | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/async.zsh b/async.zsh index d5e4c60..bf4e0e5 100644 --- a/async.zsh +++ b/async.zsh @@ -13,7 +13,8 @@ typeset -g ASYNC_VERSION=1.8.6 typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0} # The maximum buffer size when outputing to zpty. -typeset -g ASYNC_MAX_BUFFER_SIZE=$((1024)) +# Note: Subtract 4 to accomodate "\r\n" times two. +typeset -g ASYNC_MAX_BUFFER_SIZE=$((1024 - 4)) # Execute commands that can manipulate the environment inside the async worker. Return output via callback. _async_eval() { @@ -64,8 +65,13 @@ _async_job() { # Chunk up the output so as to not fill up the entire fd. for ((i = 1; i < $#out; i += ASYNC_MAX_BUFFER_SIZE)); do + # Note: We are surrounding the message in newlines here in an + # attempt to force zpty to behave. Literal newlines will be + # filtered by async_process_results. Any newlines in the job + # output will survive, as they are quoted. + # # Return output ( ). - if ! print -r -n - "${out[$i,$((i + ASYNC_MAX_BUFFER_SIZE - 1))]}"; then + if ! print -r -n - $'\n'"${out[$i,$((i + ASYNC_MAX_BUFFER_SIZE - 1))]}"$'\n'; then # BUG(mafredri): The worker and parent process should be informed. break fi @@ -307,7 +313,8 @@ async_process_results() { # Read output from zpty and parse it if available. while zpty -r -t $worker data 2>/dev/null; do - ASYNC_PROCESS_BUFFER[$worker]+=$data + # Trim newlines that are not part of the data. + ASYNC_PROCESS_BUFFER[$worker]+=${${data//$'\r'/}//$'\n'/} len=${#ASYNC_PROCESS_BUFFER[$worker]} pos=${ASYNC_PROCESS_BUFFER[$worker][(i)$null]} # Get index of NULL-character (delimiter). From 6e842999cdf655e05b3428fcf0d8b60a4e1a2e33 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 Oct 2020 19:16:20 +0300 Subject: [PATCH 03/10] Allow debugging the async worker --- async.zsh | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/async.zsh b/async.zsh index bf4e0e5..e9c8b02 100644 --- a/async.zsh +++ b/async.zsh @@ -11,6 +11,8 @@ typeset -g ASYNC_VERSION=1.8.6 # Produce debug output from zsh-async when set to 1. typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0} +# When ASYNC_DEBUG=1, worker stderr output will be redirected here. +typeset -g ASYNC_DEBUG_WORKER_STDERR=${ASYNC_DEBUG_WORKER_STDERR:-/dev/null} # The maximum buffer size when outputing to zpty. # Note: Subtract 4 to accomodate "\r\n" times two. @@ -29,9 +31,6 @@ _async_eval() { # Wrapper for jobs executed by the async worker, gives output in parseable format with execution time _async_job() { - # Disable xtrace as it would mangle the output. - setopt localoptions noxtrace - # Store start time for job. float -F duration=$EPOCHREALTIME @@ -47,6 +46,10 @@ _async_job() { local jobname=${ASYNC_JOB_NAME:-$1} out out="$( local stdout stderr ret tok + + # Disable xtrace as it would mangle the stderr. The user can + # still enable xtrace inside the async job, if required. + setopt noxtrace { stdout=$(eval "$@") ret=$? @@ -108,7 +111,14 @@ _async_worker() { # worker. For example: `fork failed: resource temporarily unavailable`. # Some older versions of zsh might also print malloc errors (know to happen # on at least zsh 5.0.2 and 5.0.8) likely due to kill signals. - exec 2>/dev/null + if ((ASYNC_DEBUG)); then + exec 2>>${ASYNC_DEBUG_WORKER_STDERR} + if [[ $ASYNC_DEBUG_WORKER_STDERR != /dev/null ]]; then + setopt xtrace + fi + else + exec 2>/dev/null + fi # When a zpty is deleted (using -d) all the zpty instances created before # the one being deleted receive a SIGHUP, unless we catch it, the async From dfa87a7cc454b9ee9360344719f597f0ce5756f8 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 Oct 2020 19:16:37 +0300 Subject: [PATCH 04/10] Fix tests --- async_test.zsh | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/async_test.zsh b/async_test.zsh index a89064e..51fe8ae 100644 --- a/async_test.zsh +++ b/async_test.zsh @@ -1,13 +1,16 @@ #!/usr/bin/env zsh +autoload -Uz is-at-least + test__async_job_print_hi() { coproc cat print -n -p t # Insert token into coproc. local line local -a out - line=$(_async_job print hi) + line=$(_async_job 0 print hi) # Remove leading/trailing null, parse, unquote and interpret as array. + line=${${line//$'\r'}//$'\n'} line=$line[2,$#line-1] out=("${(@Q)${(z)line}}") @@ -24,8 +27,9 @@ test__async_job_stderr() { local line local -a out - line=$(_async_job print 'hi 1>&2') + line=$(_async_job 0 print 'hi 1>&2') # Remove trailing null, parse, unquote and interpret as array. + line=${${line//$'\r'}//$'\n'} line=$line[1,$#line-1] out=("${(@Q)${(z)line}}") @@ -63,8 +67,9 @@ test__async_job_multiple_commands() { local line local -a out - line="$(_async_job print '-n hi; for i in "1 2" 3 4; do print -n $i; done')" + line="$(_async_job 0 print '-n hi; for i in "1 2" 3 4; do print -n $i; done')" # Remove trailing null, parse, unquote and interpret as array. + line=${${line//$'\r'}//$'\n'} line=$line[1,$#line-1] out=("${(@Q)${(z)line}}") @@ -507,7 +512,7 @@ setopt_helper() { test_all_options() { local -a opts exclude - if [[ $ZSH_VERSION == 5.0.? ]]; then + if ! is-at-least 5.1; then t_skip "Test is not reliable on zsh 5.0.X" fi @@ -593,7 +598,9 @@ zpty_deinit() { } test_zle_watcher() { - t_skip "Test is not reliable on zsh 5.0.X" + if ! is-at-least 5.1; then + t_skip "Test is not reliable on zsh 5.0.X" + fi setopt localoptions zpty_init ' From d22cd19cb338f13eec0e0ab997fc9ac2a4f3b067 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 23 Oct 2020 17:37:50 +0300 Subject: [PATCH 05/10] Fix wrong named child trap --- async.zsh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async.zsh b/async.zsh index e9c8b02..5600ae7 100644 --- a/async.zsh +++ b/async.zsh @@ -204,7 +204,7 @@ _async_worker() { (( coproc_pid )) && read -r -k 1 -p tok terminate_jobs - trap child_exit CHLD # Reinstall child trap. + trap close_idle_coproc CHLD # Reinstall child trap. } local request do_eval=0 From 945e267bde32a68f1ec32a642a24ed15e99d5821 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 23 Oct 2020 17:38:09 +0300 Subject: [PATCH 06/10] Fix redefine of local job every loop in worker --- async.zsh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async.zsh b/async.zsh index 5600ae7..f9ea5f5 100644 --- a/async.zsh +++ b/async.zsh @@ -207,7 +207,7 @@ _async_worker() { trap close_idle_coproc CHLD # Reinstall child trap. } - local request do_eval=0 + local request job do_eval=0 local -a cmd while :; do # Wait for jobs sent by async_job. @@ -242,7 +242,7 @@ _async_worker() { cmd=("${(z)request}") # Name of the job (first argument). - local job=$cmd[1] + job=$cmd[1] # Check if a worker should perform unique jobs, unless # this is an eval since they run synchronously. From 99e0d656c7bf15eb4c05489d3b5ca12428152e12 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 6 Jan 2023 00:11:52 +0200 Subject: [PATCH 07/10] Fix comment and remove superfluous if --- async.zsh | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/async.zsh b/async.zsh index f9ea5f5..cd339ad 100644 --- a/async.zsh +++ b/async.zsh @@ -83,14 +83,11 @@ _async_job() { # buffer is filling up and must be consumed. if ((parent_pid)); then # On older version of zsh (pre 5.2) we notify the parent through a - # SIGWINCH signal because `zpty` did not return a file descriptor (fd) - # prior to that. - if (( parent_pid )); then - # We use SIGWINCH for compatibility with older versions of zsh - # (pre 5.1.1) where other signals (INFO, ALRM, USR1, etc.) could - # cause a deadlock in the shell under certain circumstances. - kill -WINCH $parent_pid - fi + # SIGWINCH signal because `zpty` did not return a file descriptor + # (fd) prior to that. We use SIGWINCH for because other signals + # (INFO, ALRM, USR1, etc.) can cause a deadlock in some situations. + # (The deadlock was fixed in zsh 5.1.1.) + kill -WINCH $parent_pid fi done From a7336228f8bc2b6a4e253db55ec513074bc6c214 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 6 Jan 2023 01:32:03 +0200 Subject: [PATCH 08/10] Improve comments, fix typos, unlock mutex --- async.zsh | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/async.zsh b/async.zsh index cd339ad..9d88614 100644 --- a/async.zsh +++ b/async.zsh @@ -14,9 +14,17 @@ typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0} # When ASYNC_DEBUG=1, worker stderr output will be redirected here. typeset -g ASYNC_DEBUG_WORKER_STDERR=${ASYNC_DEBUG_WORKER_STDERR:-/dev/null} -# The maximum buffer size when outputing to zpty. -# Note: Subtract 4 to accomodate "\r\n" times two. -typeset -g ASYNC_MAX_BUFFER_SIZE=$((1024 - 4)) +# The maximum buffer size when outputting to zpty. +# Note: Subtract 4 to accommodate "\r\n" times two. +# +# When processing large amounts of data, the limit of 1024 bytes is +# slow. If you're going to output a lot more than that, consider +# increasing the buffer size. +# +# This value was chosen as a safe limit for macOS and other systems that +# have a low limit (1024) for the buffer, on Linux this can likely be +# raised significantly. +typeset -g ASYNC_MAX_BUFFER_SIZE=${ASYNC_MAX_BUFFER_SIZE:-$((1024 - 4))} # Execute commands that can manipulate the environment inside the async worker. Return output via callback. _async_eval() { @@ -76,6 +84,8 @@ _async_job() { # Return output ( ). if ! print -r -n - $'\n'"${out[$i,$((i + ASYNC_MAX_BUFFER_SIZE - 1))]}"$'\n'; then # BUG(mafredri): The worker and parent process should be informed. + # Unlock mutex to prevent a deadlock. + print -n -p $tok break fi @@ -104,7 +114,7 @@ _async_worker() { # pids of child processes. unsetopt monitor - # Redirect stderr to `/dev/null` in case unforseen errors produced by the + # Redirect stderr to `/dev/null` in case unforeseen errors produced by the # worker. For example: `fork failed: resource temporarily unavailable`. # Some older versions of zsh might also print malloc errors (know to happen # on at least zsh 5.0.2 and 5.0.8) likely due to kill signals. From d7faf08c75c82652cb47d4e5e87d4dda3e365fa1 Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 6 Jan 2023 01:32:57 +0200 Subject: [PATCH 09/10] Add more stress tests --- async_test.zsh | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/async_test.zsh b/async_test.zsh index 51fe8ae..bbf59f4 100644 --- a/async_test.zsh +++ b/async_test.zsh @@ -634,6 +634,83 @@ test_zle_watcher() { } } +test_lorem_ipsum_stress() { + local -a result + cb() { result=("$@") } + + # About 10k characters. + local want='Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' + local times=80 + + async_start_worker test + async_job test "out=; for i in {1..$times}; do out+=\$'$want\n'; done; print \"\$out\";" + while ! async_process_results test cb; do :; done + async_stop_worker test + + [[ $result[1] = 'out=' ]] || t_error "want command name: out=, got" $result[1] + [[ $result[2] = 0 ]] || t_error "want exit code: 0, got" $result[2] + + local want_full=$want + for i in {2..$times}; do + want_full+=$'\n'$want + done + + [[ $result[3] = $want_full ]] || { + t_error "want output: ${(Vq-)want} * $times, got" ${(Vq-)result[3]} + } +} + +test_lorem_ipsum_stress_zle() { + if ! is-at-least 5.1; then + t_skip "Test is not reliable on zsh 5.0.X" + fi + + setopt localoptions + zpty_init ' + emulate -R zsh + setopt zle + stty 38400 columns 80 rows 24 tabs -icanon -iexten + TERM=vt100 + + . "'$PWD'/async.zsh" + async_init + + print_result_cb() { print $3 } + async_start_worker test + async_register_callback test print_result_cb + ' || { + zpty_deinit + t_fatal "failed to init zpty" + } + + t_defer zpty_deinit # Deinit after test completion. + + # About 10k characters. + local want='Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.' + local times=80 + + cmd="out=; for i in {1..$times}; do out+=\$'$want\n'; done; print \"\${out}EOF\"" + zpty_run async_job test ${(q)cmd} || t_fatal "could not send async_job command" + + zpty -r -m zsh result "*EOF" || { + t_fatal "want lorem ipsum followed by \"EOF\", got output ${(Vq-)result}" + } + + # Remove terminal codes preceding the output. + result=Lorem${result#*Lorem} + result=${result//$'\r'/} + result=${result%$'\n'EOF} + + local want_full=$want + for i in {2..$times}; do + want_full+=$'\n'$want + done + + [[ $result = $want_full ]] || { + t_error "want output: ${(Vq-)want} * $times, got ${(Vq-)result}" + } +} + test_main() { # Load zsh-async before running each test. zmodload zsh/datetime From a082cba8489bad470b9c586df294676587b861bb Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Fri, 6 Jan 2023 02:01:47 +0200 Subject: [PATCH 10/10] Documentation and comments --- README.md | 1 + async.zsh | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index dd640c0..e3cdbdc 100644 --- a/README.md +++ b/README.md @@ -170,6 +170,7 @@ ok ./async_test.zsh 0.070s ## Limitations +* Processing a lot of output (>10240 bytes) can be slow due to the default value of `ASYNC_MAX_BUFFER_SIZE` (default `1020`). The low default value is a safety precaution, raising it is unlikely to cause problems on e.g. Linux, but caution is warranted. * A NULL-character (`$'\0'`) is used by `async_job` to signify the end of the command, it is recommended not to pass them as arguments, although they should work when passing multiple arguments to `async_job` (because of quoting). * Tell me? :) diff --git a/async.zsh b/async.zsh index 9d88614..ebc9d8a 100644 --- a/async.zsh +++ b/async.zsh @@ -15,7 +15,6 @@ typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0} typeset -g ASYNC_DEBUG_WORKER_STDERR=${ASYNC_DEBUG_WORKER_STDERR:-/dev/null} # The maximum buffer size when outputting to zpty. -# Note: Subtract 4 to accommodate "\r\n" times two. # # When processing large amounts of data, the limit of 1024 bytes is # slow. If you're going to output a lot more than that, consider @@ -24,6 +23,8 @@ typeset -g ASYNC_DEBUG_WORKER_STDERR=${ASYNC_DEBUG_WORKER_STDERR:-/dev/null} # This value was chosen as a safe limit for macOS and other systems that # have a low limit (1024) for the buffer, on Linux this can likely be # raised significantly. +# +# Note: Subtract 4 to accommodate "\r\n" times two. typeset -g ASYNC_MAX_BUFFER_SIZE=${ASYNC_MAX_BUFFER_SIZE:-$((1024 - 4))} # Execute commands that can manipulate the environment inside the async worker. Return output via callback. @@ -76,10 +77,11 @@ _async_job() { # Chunk up the output so as to not fill up the entire fd. for ((i = 1; i < $#out; i += ASYNC_MAX_BUFFER_SIZE)); do - # Note: We are surrounding the message in newlines here in an - # attempt to force zpty to behave. Literal newlines will be - # filtered by async_process_results. Any newlines in the job - # output will survive, as they are quoted. + # Note: We are surrounding the (potentially partial) message in newlines + # here in an attempt to flush the file descriptor and prevent behavior + # that could cause zpty to hang. Literal newlines will be filtered by + # async_process_results. Any newlines in the job output will survive, as + # they are quoted. # # Return output ( ). if ! print -r -n - $'\n'"${out[$i,$((i + ASYNC_MAX_BUFFER_SIZE - 1))]}"$'\n'; then