Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions service/domain_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type request = {
cancelled : unit Eio.Promise.t option;
}

type reply = ((OpamPackage.t list, string) result * float, [`Msg of string]) result
type reply = ((OpamPackage.t list, string) result * float, [`Msg of string | `Cancelled]) result

let env (vars : Worker.Vars.t) v =
match v with
Expand All @@ -31,7 +31,7 @@ let env (vars : Worker.Vars.t) v =

let solve { packages; root_pkgs; pinned_pkgs; vars; cancelled } =
match cancelled with
| Some p when Promise.is_resolved p -> Error (`Msg "Cancelled")
| Some p when Promise.is_resolved p -> Error `Cancelled
| _ ->
try
let pins = root_pkgs @ pinned_pkgs |> OpamPackage.Name.Map.of_list in
Expand Down
5 changes: 3 additions & 2 deletions service/domain_worker.mli
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ type request = {
(** If resolved, the result is not needed. *)
}

type reply = ((OpamPackage.t list, string) result * float, [`Msg of string]) result
type reply = ((OpamPackage.t list, string) result * float, [`Msg of string | `Cancelled]) result
(** [Ok (Ok selection)] if there is a solution.
[Ok (Error msg)] if there is no solution.
[Error msg] if the request was invalid. *)
[Error msg] if the request was invalid.
[Error Cancelled] if the request was cancelled before started. *)

val env : Solver_service_api.Worker.Vars.t -> string -> OpamVariable.variable_contents option
(** [env vars name] is the value of [name] in [vars]. *)
Expand Down
14 changes: 10 additions & 4 deletions service/pool.ml
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
open Eio.Std

type ('request, 'reply) t = ('request * 'reply Promise.u) Eio.Stream.t
type ('request, 'reply) t = {
requests: ('request * 'reply Promise.u) Eio.Stream.t; running: int Atomic.t; n_workers: int
}

let rec run_worker t handle =
let request, set_reply = Eio.Stream.take t in
let request, set_reply = Eio.Stream.take t.requests in
handle request |> Promise.resolve set_reply;
run_worker t handle

let create ~sw ~domain_mgr ~n_workers handle =
let t = Eio.Stream.create 0 in
let t = { requests = Eio.Stream.create 0; running = Atomic.make 0; n_workers } in
for _i = 1 to n_workers do
Fiber.fork_daemon ~sw (fun () ->
Eio.Domain_manager.run domain_mgr (fun () -> run_worker t handle)
Expand All @@ -18,5 +20,9 @@ let create ~sw ~domain_mgr ~n_workers handle =

let use t request =
let reply, set_reply = Promise.create () in
Eio.Stream.add t (request, set_reply);
Eio.Stream.add t.requests (request, set_reply);
Promise.await reply

let n_workers t = t.n_workers

let wait_requests t = Eio.Stream.length t.requests
65 changes: 64 additions & 1 deletion service/solver.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,50 @@ type t = {

let ocaml = OpamPackage.Name.of_string "ocaml"


module Metrics = struct
open Prometheus

let namespace = "ocluster"
let subsystem = "worker"
Comment thread
moyodiallo marked this conversation as resolved.
Outdated

let request_handling_total =
let help = "Total number of handled solve requests" in
Counter.v ~help ~namespace ~subsystem "requests_handled_total"

let request_handling =
let help = "Number of handled requests by state" in
Gauge.v_label ~label_name:"state" ~help ~namespace ~subsystem "solve_request_state"

let update_request_handling pool n_requests =
let workers = Pool.n_workers pool in
let waiting = Pool.wait_requests pool in
let running = min !n_requests workers in
Gauge.set (request_handling "running") (float_of_int running);
Gauge.set (request_handling "waiting") (float_of_int waiting)

let request_ok =
let help = "Total number of success solve requests" in
Counter.v ~help ~namespace ~subsystem "success_solve"

let request_fail =
let help = "Total number of fail solve requests" in
Counter.v ~help ~namespace ~subsystem "fail_solve"

let request_no_solution =
let help = "Total number of no solution solve requests " in
Counter.v ~help ~namespace ~subsystem "no_solution_solve"

let request_cancelled =
let help = "Total number of cancel without running solve requests" in
Counter.v ~help ~namespace ~subsystem "cancel_without_running_solve"

let request_cancelled_after =
let help = "Total number of cancel when running solve requests" in
Counter.v ~help ~namespace ~subsystem "cancel_when_running_solve"

end

(* If a local package has a literal constraint on OCaml's version and it doesn't match
the platform, we just remove that package from the set to test, so other packages
can still be tested. *)
Expand Down Expand Up @@ -47,6 +91,7 @@ let solve_for_platform ?cancelled t ~log ~opam_repository_commits ~packages ~roo
) else (
let slice = { Domain_worker.vars; root_pkgs; packages; pinned_pkgs; cancelled } in
match Pool.use t.pool slice with
| Error `Cancelled -> Error `Cancelled
| Error (`Msg m) -> Error (`Msg m)
| Ok (results, time) ->
match results with
Expand Down Expand Up @@ -115,12 +160,17 @@ let solve ?cancelled t ~log request =
in
Log.info log "Solving for %a" Fmt.(list ~sep:comma string) root_pkgs;
let serious_errors = ref [] in
let cancels_without_running = ref 0 in
let n_requests = ref 0 in
let*! root_pkgs = parse_opams request.root_pkgs in
let*! pinned_pkgs = parse_opams request.pinned_pkgs in
let*! packages = Stores.packages t.stores opam_repository_commits in
let results =
platforms
|> Fiber.List.map (fun (id, vars) ->
Prometheus.Counter.inc_one Metrics.request_handling_total;
incr n_requests;
Metrics.update_request_handling t.pool n_requests;
let result =
solve_for_platform t id
?cancelled
Expand All @@ -132,30 +182,43 @@ let solve ?cancelled t ~log request =
~pins
~vars
in
decr n_requests;
Metrics.update_request_handling t.pool n_requests;
(id, result)
)
|> List.filter_map (fun (id, result) ->
Log.info log "= %s =" id;
match result with
| Ok result ->
Prometheus.Counter.inc_one Metrics.request_ok;
Log.info log "-> @[<hov>%a@]"
Fmt.(list ~sep:sp string)
result.Selection.packages;
Log.info log "(valid since opam-repository commit(s): @[%a@])"
Fmt.(list ~sep:semi (pair ~sep:comma string string))
result.Selection.commits;
Some result
| Error `Cancelled ->
Prometheus.Counter.inc_one Metrics.request_cancelled;
incr cancels_without_running;
Log.info log "%s" "Cancelled";
None
| Error (`No_solution msg) ->
Prometheus.Counter.inc_one Metrics.request_no_solution;
Log.info log "%s" msg;
None
| Error (`Msg msg) ->
Prometheus.Counter.inc_one Metrics.request_fail;
Log.info log "%s" msg;
serious_errors := msg :: !serious_errors;
None
)
in
match cancelled with
| Some p when Promise.is_resolved p -> Error `Cancelled
| Some p when Promise.is_resolved p ->
let cancels = (List.length platforms) - (!cancels_without_running) in
Prometheus.Counter.inc Metrics.request_cancelled_after (float_of_int cancels);
Error `Cancelled
| _ ->
match !serious_errors with
| [] -> Ok results
Expand Down