Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions service/domain_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type request = {
cancelled : unit Eio.Promise.t option;
}

type reply = ((OpamPackage.t list, string) result * float, [`Msg of string]) result
type reply = ((OpamPackage.t list, string) result * float, [`Msg of string | `Cancelled]) result

let env (vars : Worker.Vars.t) v =
match v with
Expand All @@ -31,7 +31,7 @@ let env (vars : Worker.Vars.t) v =

let solve { packages; root_pkgs; pinned_pkgs; vars; cancelled } =
match cancelled with
| Some p when Promise.is_resolved p -> Error (`Msg "Cancelled")
| Some p when Promise.is_resolved p -> Error `Cancelled
| _ ->
try
let pins = root_pkgs @ pinned_pkgs |> OpamPackage.Name.Map.of_list in
Expand Down
5 changes: 3 additions & 2 deletions service/domain_worker.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ type request = {
(** If resolved, the result is not needed. *)
}

type reply = ((OpamPackage.t list, string) result * float, [`Msg of string]) result
type reply = ((OpamPackage.t list, string) result * float, [`Msg of string | `Cancelled]) result
(** [Ok (Ok selection)] if there is a solution.
[Ok (Error msg)] if there is no solution.
[Error msg] if the request was invalid. *)
[Error msg] if the request was invalid.
[Error Cancelled] if the request was cancelled before started. *)

val env : Solver_service_api.Worker.Vars.t -> string -> OpamVariable.variable_contents option
(** [env vars name] is the value of [name] in [vars]. *)
Expand Down
16 changes: 12 additions & 4 deletions service/pool.ml
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
open Eio.Std

type ('request, 'reply) t = ('request * 'reply Promise.u) Eio.Stream.t
type ('request, 'reply) t = {
requests: ('request * 'reply Promise.u) Eio.Stream.t; running: int Atomic.t; n_workers: int
}

let rec run_worker t handle =
let request, set_reply = Eio.Stream.take t in
let request, set_reply = Eio.Stream.take t.requests in
Atomic.incr t.running;
handle request |> Promise.resolve set_reply;
Atomic.decr t.running;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern (incr, handle, decr) feels unsafe in the presence of exceptions from handle request cc @talex5

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle request needs to catch those exceptions otherwise we're losing a worker and that will result as a bug.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if a worker crashes then the whole service exits, so it's not strictly necessary to handle exceptions here, although it would make the code more robust to future changes. However, I don't think you need this counter. The number of running workers is always min n_workers n_jobs, so you can just calculate it as needed.

Copy link
Copy Markdown
Contributor Author

@moyodiallo moyodiallo Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, I did not get this min n_workers n_jobs. Thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we don't have n_jobs in the pool. All we have is waiting jobs (jobs Eio.Stream.t). It won't be precise if consider waiting jobs as n_jobs, ex. 2 jobs waiting and all the 8 workers busy.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be useful to see is:

  1. n_workers - static capacity of this solver_worker
  2. queued_requests - requests waiting for a Fiber to run on
  3. running_workers - Fibers actively running a solve job

That would let us answer questions like total solver_worker capacity available, utilisation of the total capacity, and saturation of the total capacity and per solver_worker saturation.

The number of running workers is always min n_workers n_jobs, so you can just calculate it as needed.

I thought from reading the code and https://github.com/ocaml-multicore/eio?search=1#multicore-support, Pool pre_forked a number of OS threads (Domains?) which would wake up when work was added to the Eio.Stream? Is that accurate @talex5 ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right - there is a fixed pool of workers and they will all be busy unless there just aren't any jobs in the queue.

The issue here is that run_worker is running in a worker domain, and so can't (currently) update Prometheus metrics itself (which I guess it why it's updating an atomic instead and waiting for the main domain to report that). But the main domain can work out how many workers are running just by knowing how many jobs are outstanding, so this isn't necessary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right there's a way to do that.

run_worker t handle

let create ~sw ~domain_mgr ~n_workers handle =
let t = Eio.Stream.create 0 in
let t = { requests = Eio.Stream.create 0; running = Atomic.make 0; n_workers } in
for _i = 1 to n_workers do
Fiber.fork_daemon ~sw (fun () ->
Eio.Domain_manager.run domain_mgr (fun () -> run_worker t handle)
Expand All @@ -18,5 +22,9 @@ let create ~sw ~domain_mgr ~n_workers handle =

let use t request =
let reply, set_reply = Promise.create () in
Eio.Stream.add t (request, set_reply);
Eio.Stream.add t.requests (request, set_reply);
Promise.await reply

let running_workers t = Atomic.get t.running

let n_workers t = t.n_workers
61 changes: 60 additions & 1 deletion service/solver.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,49 @@ type t = {

let ocaml = OpamPackage.Name.of_string "ocaml"

module Metrics = struct
open Prometheus

let namespace = "ocluster"
let subsystem = "worker"
Comment thread
moyodiallo marked this conversation as resolved.
Outdated

let request_handling_total =
let help = "Total number of handled solve requests" in
Counter.v ~help ~namespace ~subsystem "requests_handled_total"

let request_handling =
let help = "Number of handled requests by state" in
Gauge.v_label ~label_name:"state" ~help ~namespace ~subsystem "solve_request_state"

let update_request_handling pool =
let running = Pool.running_workers pool in
let waiting = (Pool.n_workers pool) - running in
Gauge.set (request_handling "running") (float_of_int running);
Gauge.set (request_handling "waiting") (float_of_int waiting)


let request_ok =
let help = "Total number of success solve requests" in
Counter.v ~help ~namespace ~subsystem "success_solve"

let request_fail =
let help = "Total number of fail solve requests" in
Counter.v ~help ~namespace ~subsystem "fail_solve"

let request_no_solution =
let help = "Total number of no solution solve requests " in
Counter.v ~help ~namespace ~subsystem "no_solution_solve"

let request_cancelled =
let help = "Total number of cancel without running solve requests" in
Counter.v ~help ~namespace ~subsystem "cancel_without_running_solve"

let request_cancelled_after =
let help = "Total number of cancel when running solve requests" in
Counter.v ~help ~namespace ~subsystem "cancel_when_running_solve"

end

(* If a local package has a literal constraint on OCaml's version and it doesn't match
the platform, we just remove that package from the set to test, so other packages
can still be tested. *)
Expand Down Expand Up @@ -47,6 +90,7 @@ let solve_for_platform ?cancelled t ~log ~opam_repository_commits ~packages ~roo
) else (
let slice = { Domain_worker.vars; root_pkgs; packages; pinned_pkgs; cancelled } in
match Pool.use t.pool slice with
| Error `Cancelled -> Error `Cancelled
| Error (`Msg m) -> Error (`Msg m)
| Ok (results, time) ->
match results with
Expand Down Expand Up @@ -115,12 +159,15 @@ let solve ?cancelled t ~log request =
in
Log.info log "Solving for %a" Fmt.(list ~sep:comma string) root_pkgs;
let serious_errors = ref [] in
let cancels_without_running = ref 0 in
let*! root_pkgs = parse_opams request.root_pkgs in
let*! pinned_pkgs = parse_opams request.pinned_pkgs in
let*! packages = Stores.packages t.stores opam_repository_commits in
let results =
platforms
|> Fiber.List.map (fun (id, vars) ->
Prometheus.Counter.inc_one Metrics.request_handling_total;
Metrics.update_request_handling t.pool;
let result =
solve_for_platform t id
?cancelled
Expand All @@ -132,30 +179,42 @@ let solve ?cancelled t ~log request =
~pins
~vars
in
Metrics.update_request_handling t.pool;
(id, result)
)
|> List.filter_map (fun (id, result) ->
Log.info log "= %s =" id;
match result with
| Ok result ->
Prometheus.Counter.inc_one Metrics.request_ok;
Log.info log "-> @[<hov>%a@]"
Fmt.(list ~sep:sp string)
result.Selection.packages;
Log.info log "(valid since opam-repository commit(s): @[%a@])"
Fmt.(list ~sep:semi (pair ~sep:comma string string))
result.Selection.commits;
Some result
| Error `Cancelled ->
Prometheus.Counter.inc_one Metrics.request_cancelled;
incr cancels_without_running;
Log.info log "%s" "Cancelled";
None
| Error (`No_solution msg) ->
Prometheus.Counter.inc_one Metrics.request_no_solution;
Log.info log "%s" msg;
None
| Error (`Msg msg) ->
Prometheus.Counter.inc_one Metrics.request_fail;
Log.info log "%s" msg;
serious_errors := msg :: !serious_errors;
None
)
in
match cancelled with
| Some p when Promise.is_resolved p -> Error `Cancelled
| Some p when Promise.is_resolved p ->
let cancels = (List.length platforms) - (!cancels_without_running) in
Prometheus.Counter.inc Metrics.request_cancelled_after (float_of_int cancels);
Error `Cancelled
| _ ->
match !serious_errors with
| [] -> Ok results
Expand Down