Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ export default defineConfig({
text: 'Advanced',
items: [
{ text: 'Tuning', link: '/tuning' },
{ text: 'Slurm', link: '/slurm' },
{ text: 'Backend', link: '/backend' },
]
}
Expand Down
51 changes: 51 additions & 0 deletions docs/src/slurm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Slurm (Multi-node)

PySR supports running across multiple nodes on Slurm via `cluster_manager="slurm"`.
This backend is **allocation-based**: you request resources with Slurm (`sbatch`/`salloc`), then PySR launches Julia workers inside that allocation (using `SlurmClusterManager.jl`).

Here is a minimal `sbatch` example using 3 workers on each of 2 nodes (6 workers total).

Save this as `pysr_job.sh`:

```bash
#!/bin/bash
#SBATCH --job-name=pysr
#SBATCH --partition=normal
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=3
#SBATCH --time=01:00:00

set -euo pipefail
python pysr_script.py
```

Save this as `pysr_script.py` in the same directory:

```python
import numpy as np
from pysr import PySRRegressor

X = np.random.RandomState(0).randn(1000, 2)
y = X[:, 0] + 2 * X[:, 1]

model = PySRRegressor(
niterations=200,
populations=2,
parallelism="multiprocessing",
cluster_manager="slurm",
procs=6, # must match the Slurm allocation's total task count
)
model.fit(X, y)
print(model)
```

Submit it with:

```bash
sbatch pysr_job.sh
```

## Notes

- `procs` is the number of Julia worker processes. It must match the Slurm allocation's total tasks (e.g., `--ntasks` or `--nodes * --ntasks-per-node`).
- Run the Python script once (as the master) inside the allocation; do not wrap it in `srun`.
2 changes: 1 addition & 1 deletion docs/src/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ First, my general tips would be to avoid using redundant operators, like how `po

When running PySR, I usually do the following:

I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation. I set `procs` equal to the total number of cores over my entire allocation.
I run from IPython (Jupyter Notebooks don't work as well[^1]) on the head node of a slurm cluster. Passing `cluster_manager="slurm"` will make PySR set up a run over the entire allocation. I set `procs` equal to the total number of tasks across my entire allocation (see the [Slurm page](slurm.md) for a complete multi-node example).

I use the [tensorboard feature](https://ai.damtp.cam.ac.uk/pysr/examples/#12-using-tensorboard-for-logging) for experiment tracking.

Expand Down
31 changes: 20 additions & 11 deletions pysr/julia_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@

from typing import Literal

from .julia_helpers import KNOWN_CLUSTERMANAGER_BACKENDS
from .julia_import import Pkg, jl
from .julia_registry_helpers import try_with_registry_fallback
from .logger_specs import AbstractLoggerSpec, TensorBoardLoggerSpec

PACKAGE_UUIDS = {
"LoopVectorization": "bdcacae8-1622-11e9-2a5c-532679323890",
"Bumper": "8ce10254-0962-460f-a3d8-1f77fea1446e",
"Zygote": "e88e6eb3-aa80-5325-afca-941959d7151f",
"SlurmClusterManager": "c82cd089-7bf7-41d7-976b-6b5d413cbe0a",
"ClusterManagers": "34f1f09b-3a8b-5176-ab39-66d58a4d544e",
"TensorBoardLogger": "899adc3e-224a-11e9-021f-63837185c80f",
}


def load_required_packages(
*,
Expand All @@ -18,7 +28,7 @@ def load_required_packages(
logger_spec: AbstractLoggerSpec | None = None,
):
if turbo:
load_package("LoopVectorization", "bdcacae8-1622-11e9-2a5c-532679323890")
load_package("LoopVectorization")
if bumper:
load_package("Bumper", "8ce10254-0962-460f-a3d8-1f77fea1446e")
if autodiff_backend == "Zygote":
Expand All @@ -28,20 +38,18 @@ def load_required_packages(
elif autodiff_backend == "Enzyme":
load_package("Enzyme", "7da242da-08ed-463a-9acd-ee780be4f1d9")
if cluster_manager is not None:
load_package("ClusterManagers", "34f1f09b-3a8b-5176-ab39-66d58a4d544e")
if cluster_manager == "slurm":
load_package("SlurmClusterManager")
elif cluster_manager in KNOWN_CLUSTERMANAGER_BACKENDS:
load_package("ClusterManagers")
if isinstance(logger_spec, TensorBoardLoggerSpec):
load_package("TensorBoardLogger", "899adc3e-224a-11e9-021f-63837185c80f")
load_package("TensorBoardLogger")


def load_all_packages():
"""Install and load all Julia extensions available to PySR."""
load_required_packages(
turbo=True,
bumper=True,
autodiff_backend="Zygote",
cluster_manager="slurm",
logger_spec=TensorBoardLoggerSpec(log_dir="logs"),
)
for package_name, uuid_s in PACKAGE_UUIDS.items():
load_package(package_name, uuid_s)


# TODO: Refactor this file so we can install all packages at once using `juliapkg`,
Expand All @@ -52,7 +60,8 @@ def isinstalled(uuid_s: str):
return jl.haskey(Pkg.dependencies(), jl.Base.UUID(uuid_s))


def load_package(package_name: str, uuid_s: str) -> None:
def load_package(package_name: str, uuid_s: str | None = None) -> None:
uuid_s = uuid_s or PACKAGE_UUIDS[package_name]
if not isinstalled(uuid_s):

def _add_package():
Expand Down
43 changes: 40 additions & 3 deletions pysr/julia_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,46 @@ def _escape_filename(filename):
return str_repr


def _load_cluster_manager(cluster_manager: str):
jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}")
return jl.seval(f"addprocs_{cluster_manager}")
KNOWN_CLUSTERMANAGER_BACKENDS = ["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"]


def load_cluster_manager(cluster_manager: str) -> AnyValue:
if cluster_manager == "slurm":
jl.seval("using SlurmClusterManager: SlurmManager")
jl.seval("using Distributed")
jl.seval(
"""
function addprocs_slurm(numprocs::Integer; exeflags=``, lazy=false, kws...)
procs = Distributed.addprocs(SlurmManager(); exeflags=exeflags, lazy=lazy, kws...)
# SymbolicRegression may serialize the addprocs function to workers. Defining a
# stub on the new workers avoids deserialization failures if it gets captured.
Distributed.@everywhere procs begin
function addprocs_slurm(
numprocs::Integer;
exeflags=``,
lazy=false,
kws...,
)
error("addprocs_slurm should only be called on the master process.")
end
end
if length(procs) != numprocs
error(
"Requested $numprocs processes, but Slurm allocation has $(length(procs)) tasks. " *
"Set Slurm `--ntasks`/`--ntasks-per-node` to match, and set `procs` accordingly."
)
end
return procs
end
"""
)
return jl.addprocs_slurm
elif cluster_manager in KNOWN_CLUSTERMANAGER_BACKENDS:
jl.seval(f"using ClusterManagers: addprocs_{cluster_manager}")
return jl.seval(f"addprocs_{cluster_manager}")
else:
# Assume it's a function
return jl.seval(cluster_manager)


def jl_array(x, dtype=None):
Expand Down
23 changes: 10 additions & 13 deletions pysr/sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
from .julia_extensions import load_required_packages
from .julia_helpers import (
_escape_filename,
_load_cluster_manager,
jl_array,
jl_deserialize,
jl_is_function,
jl_named_tuple,
jl_serialize,
load_cluster_manager,
)
from .julia_import import AnyValue, SymbolicRegression, VectorValue, jl
from .logger_specs import AbstractLoggerSpec
Expand Down Expand Up @@ -605,8 +605,8 @@ class PySRRegressor(MultiOutputMixin, RegressorMixin, BaseEstimator):
Default is `None`.
cluster_manager : str
For distributed computing, this sets the job queue system. Set
to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", or
"htc". If set to one of these, PySR will run in distributed
to one of "slurm", "pbs", "lsf", "sge", "qrsh", "scyld", or "htc".
If set to one of these, PySR will run in distributed
mode, and use `procs` to figure out how many processes to launch.
Default is `None`.
heap_size_hint_in_bytes : int
Expand Down Expand Up @@ -926,13 +926,11 @@ def __init__(
probability_negate_constant: float = 0.00743,
tournament_selection_n: int = 15,
tournament_selection_p: float = 0.982,
parallelism: (
Literal["serial", "multithreading", "multiprocessing"] | None
) = None,
# fmt: off
parallelism: Literal["serial", "multithreading", "multiprocessing"] | None = None,
procs: int | None = None,
cluster_manager: (
Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"] | None
) = None,
cluster_manager: Literal["slurm", "pbs", "lsf", "sge", "qrsh", "scyld", "htc"] | str | None = None,
# fmt: on
heap_size_hint_in_bytes: int | None = None,
worker_timeout: float | None = None,
worker_imports: list[str] | None = None,
Expand Down Expand Up @@ -2066,13 +2064,12 @@ def _run(
if cluster_manager is not None:
active_project = jl.seval("Base.active_project()")
if isinstance(active_project, str) and len(active_project) > 0:
# `ClusterManagers.addprocs_slurm` launches new Julia workers via `srun`.
# The project (environment) is propagated via `JULIA_PROJECT` rather
# than a `--project=...` flag, so ensure it is set.
# Some distributed worker launchers propagate the project (environment)
# via `JULIA_PROJECT` rather than a `--project=...` flag.
os.environ.setdefault(
"JULIA_PROJECT", str(Path(active_project).resolve().parent)
)
cluster_manager = _load_cluster_manager(cluster_manager)
cluster_manager = load_cluster_manager(cluster_manager)

if self.autodiff_backend is not None:
autodiff_backend = jl.Symbol(self.autodiff_backend)
Expand Down
4 changes: 2 additions & 2 deletions pysr/test/slurm_docker_cluster/config/slurm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ SlurmctldLogFile=/var/log/slurm/slurmctld.log
SlurmdDebug=info
SlurmdLogFile=/var/log/slurm/slurmd.log

NodeName=c1 CPUs=2 RealMemory=1000 State=UNKNOWN
NodeName=c2 CPUs=2 RealMemory=1000 State=UNKNOWN
NodeName=c1 CPUs=4 RealMemory=1000 State=UNKNOWN
NodeName=c2 CPUs=4 RealMemory=1000 State=UNKNOWN
PartitionName=normal Nodes=c1,c2 Default=YES MaxTime=INFINITE State=UP
4 changes: 2 additions & 2 deletions pysr/test/slurm_docker_cluster/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ services:
command: ["slurmd"]
hostname: c1
working_dir: /data
cpus: 2
cpus: 4
privileged: true
volumes:
- etc_munge:/etc/munge
Expand All @@ -32,7 +32,7 @@ services:
command: ["slurmd"]
hostname: c2
working_dir: /data
cpus: 2
cpus: 4
privileged: true
volumes:
- etc_munge:/etc/munge
Expand Down
75 changes: 40 additions & 35 deletions pysr/test/test_dev.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import subprocess
import tempfile
import unittest
from pathlib import Path

Expand All @@ -11,41 +12,45 @@ def test_simple_change_to_backend(self):
PYSR_TEST_PYTHON_VERSION = os.environ.get("PYSR_TEST_PYTHON_VERSION", "3.12")
repo_root = Path(__file__).parent.parent.parent

build_result = subprocess.run(
[
"docker",
"buildx",
"bake",
"-f",
"docker-bake.hcl",
"pysr-dev",
"--set",
f"pysr-dev.args.JLVERSION={PYSR_TEST_JULIA_VERSION}",
"--set",
f"pysr-dev.args.PYVERSION={PYSR_TEST_PYTHON_VERSION}",
],
env=os.environ,
cwd=repo_root,
universal_newlines=True,
)
self.assertEqual(build_result.returncode, 0)
test_result = subprocess.run(
[
"docker",
"run",
"--rm",
"pysr-dev",
"python3",
"-c",
"from pysr import SymbolicRegression as SR; print(SR.__test_function())",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ,
cwd=repo_root,
)
self.assertEqual(test_result.returncode, 0)
self.assertEqual(test_result.stdout.decode("utf-8").strip(), "2.3")
with tempfile.TemporaryDirectory(prefix="pysr-docker-config-") as docker_config:
env = dict(os.environ)
env["DOCKER_CONFIG"] = docker_config

build_result = subprocess.run(
[
"docker",
"buildx",
"bake",
"-f",
"docker-bake.hcl",
"pysr-dev",
"--set",
f"pysr-dev.args.JLVERSION={PYSR_TEST_JULIA_VERSION}",
"--set",
f"pysr-dev.args.PYVERSION={PYSR_TEST_PYTHON_VERSION}",
],
env=env,
cwd=repo_root,
universal_newlines=True,
)
self.assertEqual(build_result.returncode, 0)
test_result = subprocess.run(
[
"docker",
"run",
"--rm",
"pysr-dev",
"python3",
"-c",
"from pysr import SymbolicRegression as SR; print(SR.__test_function())",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
cwd=repo_root,
)
self.assertEqual(test_result.returncode, 0)
self.assertEqual(test_result.stdout.decode("utf-8").strip(), "2.3")


def runtests(just_tests=False):
Expand Down
22 changes: 11 additions & 11 deletions pysr/test/test_dev_pysr.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ ADD ./pysr/_cli/*.py /pysr/pysr/_cli/
RUN mkdir /pysr/pysr/test

# Now, we create a custom version of SymbolicRegression.jl
# First, we get the version or rev from juliapkg.json:
RUN python3 -c 'import json; pkg = json.load(open("/pysr/pysr/juliapkg.json", "r"))["packages"]["SymbolicRegression"]; print(pkg.get("version", pkg.get("rev", "")))' > /pysr/sr_version

# Remove any = or ^ or ~ from the version:
RUN cat /pysr/sr_version | sed 's/[\^=~]//g' > /pysr/sr_version_processed

# Now, we check out the version of SymbolicRegression.jl that PySR is using:
# If sr_version starts with 'v', use it as-is; otherwise prepend 'v'
RUN if grep -q '^v' /pysr/sr_version_processed; then \
git clone -b "$(cat /pysr/sr_version_processed)" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \
# If PySR pins a backend version/rev, use it; otherwise fall back to the default branch.
RUN set -eu; \
sr_version="$(python3 -c 'import json; pkg = json.load(open("/pysr/pysr/juliapkg.json", "r"))["packages"]["SymbolicRegression"]; print(pkg.get("version") or pkg.get("rev") or "")')"; \
sr_version="$(echo "$sr_version" | sed 's/[\\^=~]//g')"; \
if [ -n "$sr_version" ]; then \
if echo "$sr_version" | grep -q '^v'; then \
git clone -b "$sr_version" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \
else \
git clone -b "v$sr_version" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \
fi; \
else \
git clone -b "v$(cat /pysr/sr_version_processed)" --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \
git clone --single-branch https://github.com/MilesCranmer/SymbolicRegression.jl /srjl; \
fi

# Edit SymbolicRegression.jl to create a new function.
Expand Down
Loading
Loading