diff --git a/docs/src/.vitepress/config.mts b/docs/src/.vitepress/config.mts index 1921fdd0..e77256c8 100644 --- a/docs/src/.vitepress/config.mts +++ b/docs/src/.vitepress/config.mts @@ -131,6 +131,7 @@ export default defineConfig({ text: 'Advanced', items: [ { text: 'Tuning', link: '/tuning' }, + { text: 'Slurm', link: '/slurm' }, { text: 'Backend', link: '/backend' }, ] } diff --git a/docs/src/slurm.md b/docs/src/slurm.md new file mode 100644 index 00000000..bfb14c06 --- /dev/null +++ b/docs/src/slurm.md @@ -0,0 +1,51 @@ +# Slurm (Multi-node) + +PySR supports running across multiple nodes on Slurm via `cluster_manager="slurm"`. +This backend is **allocation-based**: you request resources with Slurm (`sbatch`/`salloc`), then PySR launches Julia workers inside that allocation (using `SlurmClusterManager.jl`). + +Here is a minimal `sbatch` example using 3 workers on each of 2 nodes (6 workers total). + +Save this as `pysr_job.sh`: + +```bash +#!/bin/bash +#SBATCH --job-name=pysr +#SBATCH --partition=normal +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=3 +#SBATCH --time=01:00:00 + +set -euo pipefail +python pysr_script.py +``` + +Save this as `pysr_script.py` in the same directory: + +```python +import numpy as np +from pysr import PySRRegressor + +X = np.random.RandomState(0).randn(1000, 2) +y = X[:, 0] + 2 * X[:, 1] + +model = PySRRegressor( + niterations=200, + populations=2, + parallelism="multiprocessing", + cluster_manager="slurm", + procs=6, # must match the Slurm allocation's total task count +) +model.fit(X, y) +print(model) +``` + +Submit it with: + +```bash +sbatch pysr_job.sh +``` + +## Notes + +- `procs` is the number of Julia worker processes. It must match the Slurm allocation's total tasks (e.g., `--ntasks` or `--nodes * --ntasks-per-node`). +- Run the Python script once (as the master) inside the allocation; do not wrap it in `srun`. diff --git a/docs/src/tuning.md b/docs/src/tuning.md index 56ef1e5d..7d2be354 100644 --- a/docs/src/tuning.md +++ b/docs/src/tuning.md @@ -6,7 +6,7 @@ First, my general tips would be to avoid using redundant operators, like how `po When running PySR, I usually do the following: -I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation. I set `procs` equal to the total number of cores over my entire allocation. +I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation. I set `procs` equal to the total number of tasks across my entire allocation (see the [Slurm page](slurm.md) for a complete multi-node example). I use the [tensorboard feature](https://ai.damtp.cam.ac.uk/pysr/examples/#12-using-tensorboard-for-logging) for experiment tracking. diff --git a/pysr/julia_extensions.py b/pysr/julia_extensions.py index 6d16756e..cf6532d9 100644 --- a/pysr/julia_extensions.py +++ b/pysr/julia_extensions.py @@ -4,10 +4,20 @@ from typing import Literal +from .julia_helpers import KNOWN_CLUSTERMANAGER_BACKENDS from .julia_import import Pkg, jl from .julia_registry_helpers import try_with_registry_fallback from .logger_specs import AbstractLoggerSpec, TensorBoardLoggerSpec +PACKAGE_UUIDS = { + "LoopVectorization": "bdcacae8-1622-11e9-2a5c-532679323890", + "Bumper": "8ce10254-0962-460f-a3d8-1f77fea1446e", + "Zygote": "e88e6eb3-aa80-5325-afca-941959d7151f", + "SlurmClusterManager": "c82cd089-7bf7-41d7-976b-6b5d413cbe0a", + "ClusterManagers": "34f1f09b-3a8b-5176-ab39-66d58a4d544e", + "TensorBoardLogger": "899adc3e-224a-11e9-021f-63837185c80f", +} + def load_required_packages( *, @@ -18,7 +28,7 @@ def load_required_packages( logger_spec: AbstractLoggerSpec | None = None, ): if turbo: - load_package("LoopVectorization", "bdcacae8-1622-11e9-2a5c-532679323890") + load_package("LoopVectorization") if bumper: load_package("Bumper", "8ce10254-0962-460f-a3d8-1f77fea1446e") if autodiff_backend == "Zygote": @@ -28,20 +38,18 @@ def load_required_packages( elif autodiff_backend == "Enzyme": load_package("Enzyme", "7da242da-08ed-463a-9acd-ee780be4f1d9") if cluster_manager is not None: - load_package("ClusterManagers", "34f1f09b-3a8b-5176-ab39-66d58a4d544e") + if cluster_manager == "slurm": + load_package("SlurmClusterManager") + elif cluster_manager in KNOWN_CLUSTERMANAGER_BACKENDS: + load_package("ClusterManagers") if isinstance(logger_spec, TensorBoardLoggerSpec): - load_package("TensorBoardLogger", "899adc3e-224a-11e9-021f-63837185c80f") + load_package("TensorBoardLogger") def load_all_packages(): """Install and load all Julia extensions available to PySR.""" - load_required_packages( - turbo=True, - bumper=True, - autodiff_backend="Zygote", - cluster_manager="slurm", - logger_spec=TensorBoardLoggerSpec(log_dir="logs"), - ) + for package_name, uuid_s in PACKAGE_UUIDS.items(): + load_package(package_name, uuid_s) # TODO: Refactor this file so we can install all packages at once using `juliapkg`, @@ -52,7 +60,8 @@ def isinstalled(uuid_s: str): return jl.haskey(Pkg.dependencies(), jl.Base.UUID(uuid_s)) -def load_package(package_name: str, uuid_s: str) -> None: +def load_package(package_name: str, uuid_s: str | None = None) -> None: + uuid_s = uuid_s or PACKAGE_UUIDS[package_name] if not isinstalled(uuid_s): def _add_package(): diff --git a/pysr/julia_helpers.py b/pysr/julia_helpers.py index b7279310..b2684d53 100644 --- a/pysr/julia_helpers.py +++ b/pysr/julia_helpers.py @@ -29,9 +29,46 @@ def _escape_filename(filename): return str_repr -def _load_cluster_manager(cluster_manager: str): - jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}") - return jl.seval(f"addprocs_{cluster_manager}") +KNOWN_CLUSTERMANAGER_BACKENDS = ["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"] + + +def load_cluster_manager(cluster_manager: str) -> AnyValue: + if cluster_manager == "slurm": + jl.seval("using SlurmClusterManager: SlurmManager") + jl.seval("using Distributed") + jl.seval( + """ + function addprocs_slurm(numprocs::Integer; exeflags=``, lazy=false, kws...) + procs = Distributed.addprocs(SlurmManager(); exeflags=exeflags, lazy=lazy, kws...) + # SymbolicRegression may serialize the addprocs function to workers. Defining a + # stub on the new workers avoids deserialization failures if it gets captured. + Distributed.@everywhere procs begin + function addprocs_slurm( + numprocs::Integer; + exeflags=``, + lazy=false, + kws..., + ) + error("addprocs_slurm should only be called on the master process.") + end + end + if length(procs) != numprocs + error( + "Requested $numprocs processes, but Slurm allocation has $(length(procs)) tasks. " * + "Set Slurm `--ntasks`/`--ntasks-per-node` to match, and set `procs` accordingly." + ) + end + return procs + end + """ + ) + return jl.addprocs_slurm + elif cluster_manager in KNOWN_CLUSTERMANAGER_BACKENDS: + jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}") + return jl.seval(f"addprocs_{cluster_manager}") + else: + # Assume it's a function + return jl.seval(cluster_manager) def jl_array(x, dtype=None): diff --git a/pysr/sr.py b/pysr/sr.py index 8f8b9616..970b6cb2 100644 --- a/pysr/sr.py +++ b/pysr/sr.py @@ -45,12 +45,12 @@ from .julia_extensions import load_required_packages from .julia_helpers import ( _escape_filename, - _load_cluster_manager, jl_array, jl_deserialize, jl_is_function, jl_named_tuple, jl_serialize, + load_cluster_manager, ) from .julia_import import AnyValue, SymbolicRegression, VectorValue, jl from .logger_specs import AbstractLoggerSpec @@ -605,8 +605,8 @@ class PySRRegressor(MultiOutputMixin, RegressorMixin, BaseEstimator): Default is `None`. cluster_manager : str For distributed computing, this sets the job queue system. Set - to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", or - "htc". If set to one of these, PySR will run in distributed + to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", or "htc". + If set to one of these, PySR will run in distributed mode, and use `procs` to figure out how many processes to launch. Default is `None`. heap_size_hint_in_bytes : int @@ -926,13 +926,11 @@ def __init__( probability_negate_constant: float = 0.00743, tournament_selection_n: int = 15, tournament_selection_p: float = 0.982, - parallelism: ( - Literal["serial", "multithreading", "multiprocessing"] | None - ) = None, + # fmt: off + parallelism: Literal["serial", "multithreading", "multiprocessing"] | None = None, procs: int | None = None, - cluster_manager: ( - Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"] | None - ) = None, + cluster_manager: Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"] | str | None = None, + # fmt: on heap_size_hint_in_bytes: int | None = None, worker_timeout: float | None = None, worker_imports: list[str] | None = None, @@ -2066,13 +2064,12 @@ def _run( if cluster_manager is not None: active_project = jl.seval("Base.active_project()") if isinstance(active_project, str) and len(active_project) > 0: - # `ClusterManagers.addprocs_slurm` launches new Julia workers via `srun`. - # The project (environment) is propagated via `JULIA_PROJECT` rather - # than a `--project=...` flag, so ensure it is set. + # Some distributed worker launchers propagate the project (environment) + # via `JULIA_PROJECT` rather than a `--project=...` flag. os.environ.setdefault( "JULIA_PROJECT", str(Path(active_project).resolve().parent) ) - cluster_manager = _load_cluster_manager(cluster_manager) + cluster_manager = load_cluster_manager(cluster_manager) if self.autodiff_backend is not None: autodiff_backend = jl.Symbol(self.autodiff_backend) diff --git a/pysr/test/slurm_docker_cluster/config/slurm.conf b/pysr/test/slurm_docker_cluster/config/slurm.conf index 3603a010..4f282b83 100644 --- a/pysr/test/slurm_docker_cluster/config/slurm.conf +++ b/pysr/test/slurm_docker_cluster/config/slurm.conf @@ -37,6 +37,6 @@ SlurmctldLogFile=/var/log/slurm/slurmctld.log SlurmdDebug=info SlurmdLogFile=/var/log/slurm/slurmd.log -NodeName=c1 CPUs=2 RealMemory=1000 State=UNKNOWN -NodeName=c2 CPUs=2 RealMemory=1000 State=UNKNOWN +NodeName=c1 CPUs=4 RealMemory=1000 State=UNKNOWN +NodeName=c2 CPUs=4 RealMemory=1000 State=UNKNOWN PartitionName=normal Nodes=c1,c2 Default=YES MaxTime=INFINITE State=UP diff --git a/pysr/test/slurm_docker_cluster/docker-compose.yml b/pysr/test/slurm_docker_cluster/docker-compose.yml index ec2d719e..628c6e76 100644 --- a/pysr/test/slurm_docker_cluster/docker-compose.yml +++ b/pysr/test/slurm_docker_cluster/docker-compose.yml @@ -17,7 +17,7 @@ services: command: ["slurmd"] hostname: c1 working_dir: /data - cpus: 2 + cpus: 4 privileged: true volumes: - etc_munge:/etc/munge @@ -32,7 +32,7 @@ services: command: ["slurmd"] hostname: c2 working_dir: /data - cpus: 2 + cpus: 4 privileged: true volumes: - etc_munge:/etc/munge diff --git a/pysr/test/test_dev.py b/pysr/test/test_dev.py index ce967590..4225f75c 100644 --- a/pysr/test/test_dev.py +++ b/pysr/test/test_dev.py @@ -1,5 +1,6 @@ import os import subprocess +import tempfile import unittest from pathlib import Path @@ -11,41 +12,45 @@ def test_simple_change_to_backend(self): PYSR_TEST_PYTHON_VERSION = os.environ.get("PYSR_TEST_PYTHON_VERSION", "3.12") repo_root = Path(__file__).parent.parent.parent - build_result = subprocess.run( - [ - "docker", - "buildx", - "bake", - "-f", - "docker-bake.hcl", - "pysr-dev", - "--set", - f"pysr-dev.args.JLVERSION={PYSR_TEST_JULIA_VERSION}", - "--set", - f"pysr-dev.args.PYVERSION={PYSR_TEST_PYTHON_VERSION}", - ], - env=os.environ, - cwd=repo_root, - universal_newlines=True, - ) - self.assertEqual(build_result.returncode, 0) - test_result = subprocess.run( - [ - "docker", - "run", - "--rm", - "pysr-dev", - "python3", - "-c", - "from pysr import SymbolicRegression as SR; print(SR.__test_function())", - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=os.environ, - cwd=repo_root, - ) - self.assertEqual(test_result.returncode, 0) - self.assertEqual(test_result.stdout.decode("utf-8").strip(), "2.3") + with tempfile.TemporaryDirectory(prefix="pysr-docker-config-") as docker_config: + env = dict(os.environ) + env["DOCKER_CONFIG"] = docker_config + + build_result = subprocess.run( + [ + "docker", + "buildx", + "bake", + "-f", + "docker-bake.hcl", + "pysr-dev", + "--set", + f"pysr-dev.args.JLVERSION={PYSR_TEST_JULIA_VERSION}", + "--set", + f"pysr-dev.args.PYVERSION={PYSR_TEST_PYTHON_VERSION}", + ], + env=env, + cwd=repo_root, + universal_newlines=True, + ) + self.assertEqual(build_result.returncode, 0) + test_result = subprocess.run( + [ + "docker", + "run", + "--rm", + "pysr-dev", + "python3", + "-c", + "from pysr import SymbolicRegression as SR; print(SR.__test_function())", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd=repo_root, + ) + self.assertEqual(test_result.returncode, 0) + self.assertEqual(test_result.stdout.decode("utf-8").strip(), "2.3") def runtests(just_tests=False): diff --git a/pysr/test/test_dev_pysr.dockerfile b/pysr/test/test_dev_pysr.dockerfile index 92eed59b..45f88dde 100644 --- a/pysr/test/test_dev_pysr.dockerfile +++ b/pysr/test/test_dev_pysr.dockerfile @@ -31,18 +31,18 @@ ADD ./pysr/_cli/*.py /pysr/pysr/_cli/ RUN mkdir /pysr/pysr/test # Now, we create a custom version of SymbolicRegression.jl -# First, we get the version or rev from juliapkg.json: -RUN python3 -c 'import json; pkg = json.load(open("/pysr/pysr/juliapkg.json", "r"))["packages"]["SymbolicRegression"]; print(pkg.get("version", pkg.get("rev", "")))' > /pysr/sr_version - -# Remove any = or ^ or ~ from the version: -RUN cat /pysr/sr_version | sed 's/[\^=~]//g' > /pysr/sr_version_processed - -# Now, we check out the version of SymbolicRegression.jl that PySR is using: -# If sr_version starts with 'v', use it as-is; otherwise prepend 'v' -RUN if grep -q '^v' /pysr/sr_version_processed; then \ - git clone -b "$(cat /pysr/sr_version_processed)" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \ +# If PySR pins a backend version/rev, use it; otherwise fall back to the default branch. +RUN set -eu; \ + sr_version="$(python3 -c 'import json; pkg = json.load(open("/pysr/pysr/juliapkg.json", "r"))["packages"]["SymbolicRegression"]; print(pkg.get("version") or pkg.get("rev") or "")')"; \ + sr_version="$(echo "$sr_version" | sed 's/[\\^=~]//g')"; \ + if [ -n "$sr_version" ]; then \ + if echo "$sr_version" | grep -q '^v'; then \ + git clone -b "$sr_version" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \ + else \ + git clone -b "v$sr_version" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \ + fi; \ else \ - git clone -b "v$(cat /pysr/sr_version_processed)" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \ + git clone --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \ fi # Edit SymbolicRegression.jl to create a new function. diff --git a/pysr/test/test_slurm.py b/pysr/test/test_slurm.py index bda248c9..a0ec1610 100644 --- a/pysr/test/test_slurm.py +++ b/pysr/test/test_slurm.py @@ -8,9 +8,12 @@ import time import unittest from pathlib import Path +from typing import cast -def _run(cmd: list[str], *, cwd: Path | None = None, env: dict[str, str] | None = None): +def _run( + cmd: list[str], *, cwd: Path | None = None, env: dict[str, str] | None = None +) -> subprocess.CompletedProcess[str]: return subprocess.run( cmd, cwd=cwd, @@ -42,6 +45,11 @@ def setUp(self): self.addCleanup(self.data_dir_obj.cleanup) self.data_dir = Path(self.data_dir_obj.name).resolve() + self.docker_config_obj = tempfile.TemporaryDirectory( + prefix="pysr-slurm-docker-config-" + ) + self.addCleanup(self.docker_config_obj.cleanup) + self.julia_depot_dir_obj = None julia_depot_dir = os.environ.get("PYSR_SLURM_TEST_JULIA_DEPOT_DIR") if julia_depot_dir is None: @@ -56,6 +64,7 @@ def setUp(self): self.compose_env["COMPOSE_PROJECT_NAME"] = ( f"pysrslurm{os.getpid()}_{time.time_ns()}" ) + self.compose_env["DOCKER_CONFIG"] = self.docker_config_obj.name self.compose_env["PYSR_SLURM_TEST_DATA_DIR"] = str(self.data_dir) self.compose_env["PYSR_SLURM_TEST_JULIA_DEPOT_DIR"] = str( Path(julia_depot_dir).resolve() @@ -91,96 +100,12 @@ def setUp(self): self._wait_for_cluster_ready(timeout_s=300) - def _wait_for_cluster_ready(self, *, timeout_s: int): - start = time.time() - last_out = "" - while True: - ping = _run( - [ - "docker", - "compose", - "exec", - "-T", - "slurmctld", - "bash", - "-lc", - "scontrol ping", - ], - cwd=self.cluster_dir, - env=self.compose_env, - ) - last_out = ping.stdout - if ping.returncode == 0 and "Slurmctld" in ping.stdout: - sinfo = _run( - [ - "docker", - "compose", - "exec", - "-T", - "slurmctld", - "bash", - "-lc", - "sinfo -N -h -o '%N %T'", - ], - cwd=self.cluster_dir, - env=self.compose_env, - ) - if sinfo.returncode == 0: - states = {} - for line in sinfo.stdout.splitlines(): - parts = line.split() - if len(parts) >= 2: - states[parts[0]] = parts[1].lower() - if states.get("c1") == "idle" and states.get("c2") == "idle": - return - last_out = sinfo.stdout - if time.time() - start > timeout_s: - logs = _run( - ["docker", "compose", "logs", "--no-color", "c1", "c2"], - cwd=self.cluster_dir, - env=self.compose_env, - ).stdout - raise RuntimeError( - f"Slurm cluster did not become ready in {timeout_s}s:\n{last_out}\n\n{logs}" - ) - time.sleep(2) - - def test_pysr_slurm_cluster_manager(self): - job_script = self.data_dir / "pysr_slurm_job.sh" - job_script.write_text( - "\n".join( - [ - "#!/bin/bash", - "#SBATCH --job-name=pysr-slurm-test", - "#SBATCH --partition=normal", - "#SBATCH --nodes=2", - "#SBATCH --ntasks=4", - "#SBATCH --time=40:00", - "set -euo pipefail", - "python3 - <<'PY'", - "import numpy as np", - "from pysr import PySRRegressor", - "X = np.random.RandomState(0).randn(30, 2)", - "y = X[:, 0] + 1.0", - "model = PySRRegressor(", - " niterations=3,", - " populations=3,", - " progress=False,", - " temp_equation_file=True,", - " parallelism='multiprocessing',", - " procs=2,", - " cluster_manager='slurm',", - " verbosity=0,", - ")", - "model.fit(X, y)", - "print('PYSR_SLURM_OK')", - "PY", - ] - ) - + "\n" - ) - job_script.chmod(0o755) - + def _run_sbatch( + self, + job_script: Path, + *, + timeout_s: int = 2400, + ) -> str: submit = _run( [ "docker", @@ -197,8 +122,9 @@ def test_pysr_slurm_cluster_manager(self): ) self.assertEqual(submit.returncode, 0, msg=submit.stdout) match = re.search(r"Submitted batch job (\d+)", submit.stdout) - self.assertIsNotNone(match, msg=f"Could not parse job id:\n{submit.stdout}") - job_id = match.group(1) + if match is None: + self.fail(f"Could not parse job id:\n{submit.stdout}") + job_id = cast(str, match.group(1)) start = time.time() while True: @@ -240,7 +166,7 @@ def test_pysr_slurm_cluster_manager(self): self.fail( f"Slurm job {job_id} did not complete successfully:\n{state.stdout}\n\n{out}" ) - if time.time() - start > 2400: + if time.time() - start > timeout_s: out = _run( [ "docker", @@ -275,7 +201,253 @@ def test_pysr_slurm_cluster_manager(self): env=self.compose_env, ) self.assertEqual(output.returncode, 0, msg=output.stdout) - self.assertIn("PYSR_SLURM_OK", output.stdout) + return output.stdout + + def _assert_scontrol_step_usage( + self, + output: str, + *, + expected_tasks: int, + expected_nodes: int, + expected_nodelist: set[str] | None = None, + label: str, + ) -> None: + marker = "PYSR_SCONTROL_STEP_SAMPLE" + self.assertIn( + marker, output, msg=f"{label}: did not capture any scontrol samples." + ) + + saw_expected = False + for sample in output.split(marker)[1:]: + step_lines = [ + line.strip() + for line in sample.splitlines() + if line.strip().startswith("StepId=") + ] + steps = [dict(re.findall(r"(\w+)=([^\s]+)", line)) for line in step_lines] + running_steps: dict[str, dict[str, str]] = {} + for s in steps: + step_id = s.get("StepId", "") + if step_id.endswith(".batch") or s.get("State", "") != "RUNNING": + continue + running_steps[step_id] = s + + total_tasks = 0 + for step in running_steps.values(): + tasks = int(step.get("Tasks", "0")) + nodes = int(step.get("Nodes", "0")) + if tasks > expected_tasks: + self.fail( + f"{label}: too many tasks in Slurm step: expected <= {expected_tasks}, got {tasks}.\n\n{sample}" + ) + if tasks == expected_tasks and nodes != expected_nodes: + self.fail( + f"{label}: expected {expected_tasks} tasks across {expected_nodes} nodes, got Nodes={nodes}.\n\n{sample}" + ) + if tasks == expected_tasks and nodes == expected_nodes: + if expected_tasks == expected_nodes and expected_tasks > 0: + # With Tasks==Nodes, Slurm must be distributing exactly 1 task per node. + pass + if expected_nodelist is not None: + node_list = step.get("NodeList") + if not node_list: + self.fail( + f"{label}: expected NodeList to be present in scontrol step output.\n\n{sample}" + ) + if node_list not in expected_nodelist: + self.fail( + f"{label}: expected NodeList in {sorted(expected_nodelist)}, got {node_list!r}.\n\n{sample}" + ) + saw_expected = True + total_tasks += tasks + + if total_tasks > expected_tasks: + self.fail( + f"{label}: too many concurrent tasks across steps: expected <= {expected_tasks}, got {total_tasks}.\n\n{sample}" + ) + + tail = "\n".join(output.splitlines()[-200:]) + self.assertTrue( + saw_expected, + msg=( + f"{label}: never observed a RUNNING Slurm step with Tasks={expected_tasks} " + f"and Nodes={expected_nodes}.\n\nLast 200 lines:\n{tail}" + ), + ) + + def _wait_for_cluster_ready(self, *, timeout_s: int): + start = time.time() + last_out = "" + while True: + ping = _run( + [ + "docker", + "compose", + "exec", + "-T", + "slurmctld", + "bash", + "-lc", + "scontrol ping", + ], + cwd=self.cluster_dir, + env=self.compose_env, + ) + last_out = ping.stdout + if ping.returncode == 0 and "Slurmctld" in ping.stdout: + sinfo = _run( + [ + "docker", + "compose", + "exec", + "-T", + "slurmctld", + "bash", + "-lc", + "sinfo -N -h -o '%N %T'", + ], + cwd=self.cluster_dir, + env=self.compose_env, + ) + if sinfo.returncode == 0: + states = {} + for line in sinfo.stdout.splitlines(): + parts = line.split() + if len(parts) >= 2: + states[parts[0]] = parts[1].lower() + if states.get("c1") == "idle" and states.get("c2") == "idle": + return + last_out = sinfo.stdout + if time.time() - start > timeout_s: + logs = _run( + ["docker", "compose", "logs", "--no-color", "c1", "c2"], + cwd=self.cluster_dir, + env=self.compose_env, + ).stdout + raise RuntimeError( + f"Slurm cluster did not become ready in {timeout_s}s:\n{last_out}\n\n{logs}" + ) + time.sleep(2) + + def test_pysr_slurm_cluster_manager(self): + def _assert_worker_distribution( + output: str, + *, + expected_procs: int, + expected_nodes: int, + expected_per_node: int, + ) -> None: + worker_hosts = re.findall( + r"Worker \d+ ready on host ([^,]+), port \d+", + output, + ) + self.assertEqual( + len(worker_hosts), + expected_procs, + msg=f"Expected {expected_procs} workers.\n\n{output}", + ) + counts: dict[str, int] = {} + for host in worker_hosts: + counts[host] = counts.get(host, 0) + 1 + self.assertEqual( + len(counts), + expected_nodes, + msg=f"Expected workers on {expected_nodes} nodes.\n\n{output}", + ) + self.assertTrue( + all(v == expected_per_node for v in counts.values()), + msg=f"Expected exactly {expected_per_node} workers per node.\n\n{output}", + ) + + def _run_case(*, ntasks_per_node: int, procs: int, seed: int) -> str: + marker = f"PYSR_SLURM_OK:slurm:{procs}" + job = self.data_dir / f"pysr_slurm_job_{procs}.sh" + job.write_text( + "\n".join( + [ + "#!/bin/bash", + f"#SBATCH --job-name=pysr-slurm-test-{procs}", + "#SBATCH --partition=normal", + "#SBATCH --nodes=2", + f"#SBATCH --ntasks-per-node={ntasks_per_node}", + "#SBATCH --time=40:00", + "set -euo pipefail", + 'jobid="${SLURM_JOB_ID:-${SLURM_JOBID:-}}"', + 'if [ -z "$jobid" ]; then echo "Missing SLURM_JOB_ID/SLURM_JOBID" >&2; exit 1; fi', + "monitor_steps() {", + " while true; do", + " echo PYSR_SCONTROL_STEP_SAMPLE", + ' scontrol show step "$jobid" -o 2>/dev/null || true', + " sleep 1", + " done", + "}", + "monitor_steps &", + "MONITOR_PID=$!", + "trap 'kill $MONITOR_PID 2>/dev/null || true' EXIT", + "python3 - <<'PY'", + "import os", + "os.environ['JULIA_DEBUG'] = 'SlurmClusterManager'", + "import numpy as np", + "from pysr import PySRRegressor", + f"X = np.random.RandomState({seed}).randn(30, 2)", + "y = X[:, 0] + 1.0", + "model = PySRRegressor(", + " niterations=2,", + " populations=2,", + " progress=False,", + " temp_equation_file=True,", + " parallelism='multiprocessing',", + f" procs={procs},", + " cluster_manager='slurm',", + " verbosity=0,", + ")", + "model.fit(X, y)", + f"print('{marker}')", + "PY", + ] + ) + + "\n" + ) + job.chmod(0o755) + + output = self._run_sbatch(job) + self.assertIn(marker, output) + self.assertEqual( + len(re.findall(rf"^{re.escape(marker)}$", output, flags=re.MULTILINE)), + 1, + msg=f"Expected marker exactly once.\n\n{output}", + ) + self.assertEqual( + len( + re.findall(r"^\[ Info: Starting SLURM job .*", output, re.MULTILINE) + ), + 0, + msg=( + "Expected Slurm backend to use SlurmClusterManager (allocation-based), " + "not ClusterManagers.\n\n" + output + ), + ) + return output + + cases = [ + dict(ntasks_per_node=1, procs=2, seed=0), + dict(ntasks_per_node=3, procs=6, seed=2), + ] + for case in cases: + output = _run_case(**case) + self._assert_scontrol_step_usage( + output, + expected_tasks=case["procs"], + expected_nodes=2, + expected_nodelist={"c1,c2", "c[1-2]"}, + label=f"slurm({case['procs']})", + ) + _assert_worker_distribution( + output, + expected_procs=case["procs"], + expected_nodes=2, + expected_per_node=case["ntasks_per_node"], + ) def runtests(just_tests=False): diff --git a/pysr/test/test_startup.py b/pysr/test/test_startup.py index 4b2a450b..d58e19dc 100644 --- a/pysr/test/test_startup.py +++ b/pysr/test/test_startup.py @@ -151,6 +151,7 @@ def test_notebook(self): "-m", "pytest", "--nbval", + "--nbval-current-env", str(notebook_file), "--nbval-sanitize-with", str(sanitize_file),