Skip to content

Commit

Permalink
pmap - Ensure all worker threads are joined
Browse files Browse the repository at this point in the history
  • Loading branch information
shirok committed Jan 11, 2025
1 parent 5bfaa35 commit a2980c2
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
5 changes: 5 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
2025-01-10 Shiro Kawai <[email protected]>

* lib/control/pmap.scm: Make sure all worker threads are joined,
even some of them result uncaught/timeout exceptions.

2025-01-07 Shiro Kawai <[email protected]>

* src/libnum.scm (number->string): Allow the first optional argument
Expand Down
60 changes: 45 additions & 15 deletions lib/control/pmap.scm
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,31 @@
(define (%start-threads threads)
(for-each thread-try-start! threads))

;; thread-join! may raise <uncaught-exception>. We want to join
;; all the threads, though, so we should wrap thread-join! to keep
;; going while recording the thrown exception, and re-raise it
;; when all threads are joined.
;; NB: There may be multiple exceptions, but we only keep one. It's
;; the same situation that 'map' throws just one error even multiple
;; elements could cause errors.

(define join-exc (make-parameter #f))

(define (%make-joiner r)
(^[thread :optional (timeout #f) (timeout-val #f)]
(guard (e [(terminated-thread-exception? e) r]
[else (join-exc e) '()])
(thread-join! thread timeout timeout-val))))

(define-syntax %with-wrapped-join
(syntax-rules ()
[(_ body ...)
(parameterize ([join-exc #f])
(receive rs (begin body ...)
(if (join-exc)
(raise (join-exc))
(apply values rs))))]))

;;
;; sequential-mapper
;;
Expand Down Expand Up @@ -137,7 +162,8 @@
(make-thread (cut map proc c))))
cols)])
(%start-threads ts)
(append-map thread-join! ts)))
(%with-wrapped-join
(append-map (%make-joiner '()) ts))))

;; (define (%split-collection coll n)
;; (define qs (list-tabulate n (^_ (make-queue))))
Expand Down Expand Up @@ -180,10 +206,11 @@
(cut with-stopper proc c ts))))
cols)])
(%start-threads ts)
(do ([ts ts (cdr ts)]
[r #f (guard (e [(<terminated-thread-exception> e) r])
(or (thread-join! (car ts)) r))])
[(null? ts) r])))
(%with-wrapped-join
(let1 join! (%make-joiner #f)
(do ([ts ts (cdr ts)]
[r #f (or (join! (car ts)) r)])
[(null? ts) r])))))

;;
;; pool mapper
Expand Down Expand Up @@ -261,12 +288,14 @@
[timeout (absolute-time (~ mapper'timeout))]
[timeout-val (~ mapper'timeout-val)])
(%start-threads ts)
(if timeout
($ map (^r (if (and (pair? r) (eq? (car r) unique))
(begin (thread-terminate! (cdr r)) timeout-val)
r))
$ map (^t (thread-join! t timeout (cons unique t))) ts)
(map thread-join! ts))))
(%with-wrapped-join
(let1 join! (%make-joiner #f)
(if timeout
($ map (^r (if (and (pair? r) (eq? (car r) unique))
(begin (thread-terminate! (cdr r)) timeout-val)
r))
$ map (^t (join! t timeout (cons unique t))) ts)
(map join! ts))))))

(define-method run-select ((mapper <fully-concurrent-mapper>) proc coll)
(define signaled (atom #f))
Expand All @@ -285,10 +314,11 @@
#f)))
(letrec ([ts (map (^e (make-thread (^[] (task e ts)))) coll)])
(%start-threads ts)
(do ([ts ts (cdr ts)]
[r #f (guard (e [(<terminated-thread-exception> e) r])
(or (thread-join! (car ts)) r))])
[(null? ts) r])))
(%with-wrapped-join
(let1 join! (%make-joiner #f)
(do ([ts ts (cdr ts)]
[r #f (or (join! (car ts)) r)])
[(null? ts) r])))))

;;
;; default mapper
Expand Down

0 comments on commit a2980c2

Please sign in to comment.