This commit is contained in:
Jeremie Dimino 2017-05-26 17:23:49 +01:00
parent 9f9660b91a
commit 550a8d8f57
1 changed files with 108 additions and 71 deletions

View File

@ -406,10 +406,75 @@ module Scheduler = struct
; log : Log.t
}
let running = Hashtbl.create 128
module Running_jobs : sig
val add : running_job -> unit
val wait : unit -> running_job * Unix.process_status
val wait_nonblocking : unit -> (running_job * Unix.process_status) option
val count : unit -> int
val all : unit -> running_job list
end = struct
let all = Hashtbl.create 128
let add job = Hashtbl.add all ~key:job.pid ~data:job
let count () = Hashtbl.length all
let resolve_and_remove_job pid =
let job =
Hashtbl.find_exn all pid ~string_of_key:(sprintf "<pid:%d>")
~table_desc:(fun _ -> "<running-jobs>")
in
Hashtbl.remove all pid;
job
exception Finished of running_job * Unix.process_status
let wait_nonblocking_win32 () =
match
Hashtbl.iter all ~f:(fun ~key:pid ~data:job ->
let pid, status = Unix.waitpid [WNOHANG] pid in
if pid <> 0 then
raise_notrace (Finished (job, status)))
with
| () -> None
| exception (Finished (job, status)) ->
Hashtbl.remove all job.pid;
Some (job, status)
let wait_nonblocking_unix () =
let pid, status = Unix.waitpid [WNOHANG] (-1) in
if pid = 0 then
None
else
Some (resolve_and_remove_job pid, status)
let wait_nonblocking =
if Sys.win32 then
wait_nonblocking_win32
else
wait_nonblocking_unix
let rec wait_win32 () =
match wait_nonblocking_win32 () with
| None ->
ignore (Unix.select [] [] [] 0.001);
wait_win32 ()
| Some x -> x
let wait_unix () =
let pid, status = Unix.wait () in
(resolve_and_remove_job pid, status)
let wait =
if Sys.win32 then
wait_win32
else
wait_unix
let all () = Hashtbl.fold all ~init:[] ~f:(fun ~key:_ ~data:job acc -> job :: acc)
end
let process_done ?(exiting=false) job (status : Unix.process_status) =
Hashtbl.remove running job.pid;
let output =
match job.output_filename with
| None -> ""
@ -473,30 +538,12 @@ module Scheduler = struct
end;
die ""
| WSTOPPED _ -> assert false;
end
let gen_id =
let next = ref (-1) in
fun () -> incr next; !next
let rec wait_win32 () =
let finished =
Hashtbl.fold running ~init:[] ~f:(fun ~key:pid ~data:job acc ->
let pid, status = Unix.waitpid [WNOHANG] pid in
if pid <> 0 then begin
(job, status) :: acc
end else
acc)
in
match finished with
| [] ->
ignore (Unix.select [] [] [] 0.001);
wait_win32 ()
| _ ->
List.iter finished ~f:(fun (job, status) ->
process_done job status)
let at_exit_handlers = Queue.create ()
let at_exit_after_waiting_for_commands f = Queue.push f at_exit_handlers
let exec_at_exit_handlers () =
@ -505,39 +552,37 @@ module Scheduler = struct
done
let wait_for_unfinished_jobs () =
let jobs =
Hashtbl.fold running ~init:[] ~f:(fun ~key:_ ~data:job acc -> job :: acc)
let rec loop n =
if Running_jobs.count () > 0 && n > 0 then
match Running_jobs.wait_nonblocking () with
| None ->
ignore (Unix.select [] [] [] 0.05 : _ * _ * _);
loop (n - 1)
| Some (job, status) ->
process_done job status ~exiting:true;
loop n
in
let rec wait_for_jobs msg_time jobs = match jobs with
| [] -> ()
| job :: jobs when msg_time > 0. ->
let pid, status = Unix.waitpid [WNOHANG] job.pid in
if pid <> 0 then begin
process_done job status ~exiting:true;
wait_for_jobs msg_time jobs
end else begin
let dt = 0.05 in
let _ = Unix.select [] [] [] dt in
wait_for_jobs (msg_time -. dt) (job :: jobs)
end
| jobs ->
if !Clflags.verbose then begin
let pp_job ppf job =
let (_, name, _) = split_prog job.job.prog in
Format.fprintf ppf "%s [@{<id>%d@}]" name job.id in
Format.eprintf "\nWaiting for the following jobs to finish: %a@."
(Format.pp_print_list ~pp_sep:(fun ppf () -> Format.fprintf ppf ", ") pp_job)
jobs;
end else begin
let n = List.length jobs in
Format.eprintf "\nWaiting for %d %s to finish.@."
n
(if n = 1 then "job" else "jobs")
end;
List.iter jobs ~f:(fun job ->
let _, status = Unix.waitpid [] job.pid in
process_done job status ~exiting:true) in
wait_for_jobs 0.5 jobs
loop 10;
if Running_jobs.count () > 0 then begin
if !Clflags.verbose then begin
let pp_job ppf job =
let _, name, _ = split_prog job.job.prog in
Format.fprintf ppf "%s [@{<id>%d@}]" name job.id
in
Format.eprintf "\nWaiting for the following jobs to finish: %a@."
(Format.pp_print_list ~pp_sep:(fun ppf () -> Format.fprintf ppf ", ") pp_job)
(Running_jobs.all ());
end else begin
let n = Running_jobs.count () in
Format.eprintf "\nWaiting for %d %s to finish.@."
n
(if n = 1 then "job" else "jobs")
end;
while Running_jobs.count () > 0 do
let job, status = Running_jobs.wait () in
process_done job status ~exiting:true
done
end
let () =
at_exit (fun () ->
@ -566,7 +611,7 @@ module Scheduler = struct
match (repr t).state with
| Return v -> v
| _ ->
while Hashtbl.length running < !Clflags.concurrency &&
while Running_jobs.count () < !Clflags.concurrency &&
not (Queue.is_empty to_run) do
let job = Queue.pop to_run in
let id = gen_id () in
@ -599,25 +644,17 @@ module Scheduler = struct
if Option.is_some output_filename then Unix.close output_fd;
close_std_output close_stdout;
close_std_output close_stderr;
Hashtbl.add running ~key:pid
~data:{ id
; job
; pid
; output_filename
; command_line
; log
}
Running_jobs.add
{ id
; job
; pid
; output_filename
; command_line
; log
}
done;
if Sys.win32 then
wait_win32 ()
else begin
let pid, status = Unix.wait () in
let job =
Hashtbl.find_exn running pid ~string_of_key:(sprintf "<pid:%d>")
~table_desc:(fun _ -> "<running-jobs>")
in
process_done job status
end;
let job, status = Running_jobs.wait () in
process_done job status;
go_rec cwd log t
let go ?(log=Log.no_log) t =