-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcores.ml
119 lines (108 loc) · 3.97 KB
/
cores.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
(**************************************************************************)
(* *)
(* Functory: a distributed computing library for OCaml *)
(* Copyright (C) 2010- Jean-Christophe Filliatre and Kalyan Krishnamani *)
(* *)
(* This software is free software; you can redistribute it and/or *)
(* modify it under the terms of the GNU Library General Public *)
(* License version 2.1, with the special exception on linking *)
(* described in file LICENSE. *)
(* *)
(* This software is distributed in the hope that it will be useful, *)
(* but WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *)
(* *)
(**************************************************************************)
open Format
open Control
open Unix
(* main loop: assigns tasks to workers, until no more task *)
let run
~(create_job : 'worker -> 'task -> unit)
~(wait : unit -> 'worker * 'task list)
(workers : 'worker list)
(tasks : 'task list)
=
let todo = Queue.create () in
List.iter (fun t -> Queue.push t todo) tasks;
let towait = ref 0 in
let idle = Queue.create () in
List.iter (fun w -> Queue.push w idle) workers;
while not (Queue.is_empty todo) || !towait > 0 do
(* if possible, start new workers *)
while not (Queue.is_empty idle) && not (Queue.is_empty todo) do
let t = Queue.pop todo in
let w = Queue.pop idle in
create_job w t;
incr towait
done;
assert (!towait > 0);
(* otherwise, wait for results *)
let w, tl = wait () in
decr towait;
Queue.push w idle;
List.iter (fun t -> Queue.push t todo) tl
done;
assert (Queue.is_empty todo && !towait = 0)
let ncores = ref 1
let set_number_of_cores n = ncores := n
let rec listij acc i j = if i > j then acc else listij (j :: acc) i (j-1)
let workers () = listij [] 1 !ncores
(*** using local files ***************************************************)
type 'a job = {
worker : int;
pid : int;
file : string;
task : 'a;
}
let create_worker w (f : 'a -> 'b) (t : 'a * 'c) : ('a * 'c) job =
let file = Filename.temp_file "mapfold" "output" in
match fork () with
| 0 -> (* child *)
let r = f (fst t) in
let c = open_out file in
output_value c r;
close_out c;
exit 0
| pid -> (* parent *)
{ worker = w;
pid = pid;
file = file;
task = t }
let compute
~(worker : 'a -> 'b) ~(master : ('a * 'c) -> 'b -> ('a * 'c) list) tasks =
let jobs = Hashtbl.create 17 in (* PID -> job *)
let rec wait () =
match Unix.wait () with
| p, WEXITED e ->
dprintf "master: got result from worker PID %d@." p;
begin try
let j = Hashtbl.find jobs p in
Hashtbl.remove jobs p;
dprintf "master: got result from worker %d@." j.worker;
let c = open_in (*in_channel_of_descr *) j.file in
let r : 'b = input_value c in
close_in c;
Sys.remove j.file;
let l = master j.task r in j.worker, l
with Not_found ->
(* If the pid is unknown to us, it's probably a process created
by one of the workers. In this case, simply continue to wait. *)
wait ()
end
| p, _ ->
Format.eprintf "master: ** PID %d killed or stopped! **@." p;
wait ()
in
try
run
~create_job:(fun w t ->
let j = create_worker w worker t in
dprintf "master: started worker %d (PID %d)@." w j.pid;
Hashtbl.add jobs j.pid j)
~wait (workers ()) tasks
with e ->
Hashtbl.iter (fun p _ -> try Unix.kill p Sys.sigkill with _ -> ()) jobs;
raise e
(* derived API *)
include Map_fold.Make(struct let compute = compute end)