Skip to content
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d97bd65
fix: add shutdown methods to executors and fix lithops memory leak
jbusecke Mar 12, 2026
4f4c061
Add tests and constrain fix only to lithops
jbusecke Mar 12, 2026
d925fb7
Clean up claudes horrible tests
jbusecke Mar 12, 2026
46908c1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 12, 2026
4732cc3
Alternative approach via lithops config
jbusecke Mar 12, 2026
031ffac
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 12, 2026
6414546
toms renaming suggestion
jbusecke Mar 12, 2026
f013d74
Merge branch 'executor-cleaning' of https://github.com/zarr-developer…
jbusecke Mar 12, 2026
216fb09
Merge branch 'main' into executor-cleaning
TomNicholas Mar 12, 2026
6a83bf3
Merge branch 'main' into executor-cleaning
TomNicholas Mar 16, 2026
3f9aa44
Update lithops dependency version in pyproject.toml
jbusecke Mar 16, 2026
af2c83e
Revert lithops dependency version constraint
jbusecke Mar 16, 2026
4481f63
Merge branch 'main' into executor-cleaning
TomNicholas Mar 16, 2026
a612af5
Merge branch 'main' into executor-cleaning
TomNicholas Mar 16, 2026
3ff3bf1
Merge branch 'main' into executor-cleaning
jbusecke Mar 19, 2026
0cb839d
Mark Lithops executor tests as flaky
jbusecke Mar 19, 2026
05316e6
rerun flaky tests
jbusecke Mar 19, 2026
23d2654
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 19, 2026
748f50d
Fix LithopsEagerFunctionExecutor.shutdown() not clearing futures
jbusecke Mar 19, 2026
263656e
add comment
jbusecke Mar 19, 2026
b9ba3ca
Test against lithops fork with job_manager thread join fix
jbusecke Mar 19, 2026
63481f8
Add sleep between memory test iterations to allow background threads …
jbusecke Mar 19, 2026
68a60e9
Move sleep to after the loop to allow background threads to exit befo…
jbusecke Mar 19, 2026
ba61353
Increase sleep to 30s to give background threads more time to exit
jbusecke Mar 19, 2026
71a43b4
Revert lithops dep back to released version
jbusecke Mar 19, 2026
394c019
remove the memory growth test
jbusecke Mar 19, 2026
d1822a3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 19, 2026
b95baa1
Merge branch 'main' into executor-cleaning
jbusecke Mar 19, 2026
d8c0ab1
Add .shutdown() method to custom executors
jbusecke Mar 19, 2026
887edac
satisfy linter
jbusecke Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions virtualizarr/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def map(
"""
return map(fn, *iterables)

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
self._futures.clear()


class DaskDelayedExecutor(Executor):
"""
Expand Down Expand Up @@ -230,6 +233,9 @@ def map(
# Compute all tasks
return iter(dask.compute(*delayed_tasks))

def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
self._futures.clear()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just get rid of self._futures? It's not clear why we even hold the list of futures to begin with?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Um that is actually a good question @chuckwondo. @TomNicholas do you know why this was necessary in the case of dask?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this the case for any of the executors?



class LithopsEagerFunctionExecutor(Executor):
"""
Expand Down Expand Up @@ -270,6 +276,23 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
def __init__(self, **kwargs) -> None:
import lithops # type: ignore[import-untyped]

# Fix for unbounded memory growth on repeated `open_virtual_mfdataset` calls
# see https://github.com/zarr-developers/VirtualiZarr/issues/926

# Users are encouraged to provide configs for lithops via file
# But just in case that someone imports this and configures it, they have to provide all
# details below explicitly as `config=` argument.
if "config" not in kwargs:
_config_file = lithops.config.load_config()
if _config_file["lithops"].get("backend") == "localhost":
# We currently only want to apply this fix for the localhost executor
kwargs["config"] = {
"lithops": {
"data_cleaner": False, # prevents atexit registration of `.lithops_client.clean` method
"backend": "localhost", # if this is not provided lithops will default to aws lambda
}
}

# Create Lithops client with optional configuration
self.lithops_client = lithops.FunctionExecutor(**kwargs).__enter__()

Expand Down Expand Up @@ -372,4 +395,16 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None:
wait
Whether to wait for pending futures.
"""
if wait:
# ensure all futures are completed before exiting
self.lithops_client.wait(show_progressbar=False)

self._futures.clear()

# Free output memory and clear lithops internal futures list
for f in self.lithops_client.futures:
f._call_output = None
self.lithops_client.futures.clear()

# Exit context manager entered during __init__
self.lithops_client.__exit__(None, None, None)
61 changes: 59 additions & 2 deletions virtualizarr/tests/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import pytest

from virtualizarr.parallel import LithopsEagerFunctionExecutor, get_executor
from virtualizarr.tests import requires_lithops
from virtualizarr.parallel import (
DaskDelayedExecutor,
LithopsEagerFunctionExecutor,
SerialExecutor,
get_executor,
)
from virtualizarr.tests import requires_dask, requires_lithops


@pytest.mark.flaky
Expand Down Expand Up @@ -43,3 +48,55 @@ def test_get_executor_process_pool_mode():

assert ctx is not None, "Expected executor to have a multiprocessing context"
assert ctx.get_start_method() == "forkserver"


def _make_executor(executor_cls):
"""Create a pytest param for an executor class with appropriate marks."""
marks = {
"DaskDelayedExecutor": [requires_dask],
"LithopsEagerFunctionExecutor": [requires_lithops],
}
return pytest.param(
executor_cls,
id=executor_cls.__name__,
marks=marks.get(executor_cls.__name__, []),
)


ALL_CUSTOM_EXECUTORS = [
_make_executor(SerialExecutor),
_make_executor(DaskDelayedExecutor),
_make_executor(LithopsEagerFunctionExecutor),
]


@pytest.mark.parametrize("executor_cls", ALL_CUSTOM_EXECUTORS)
class TestExecutorShutdown:
def test_shutdown_clears_futures(self, executor_cls):
"""Internal _futures list should be empty after shutdown."""
with executor_cls() as executor:
executor.submit(lambda x: x * 2, 1)
executor.submit(lambda x: x + 1, 2)
assert len(executor._futures) == 2
if executor_cls is LithopsEagerFunctionExecutor:
# grab refs before they get cleared
lithops_futures = list(executor.lithops_client.futures)
assert len(lithops_futures) == 2

assert len(executor._futures) == 0

# Lithops-specific: verify lithops internal futures are also cleared
if executor_cls is LithopsEagerFunctionExecutor:
assert len(executor.lithops_client.futures) == 0
assert all(f._call_output is None for f in lithops_futures)

# Testing idempotency
executor.shutdown()
assert len(executor._futures) == 0


@requires_lithops
def test_lithops_executor_data_cleaner_disabled():
"""data_cleaner must be False to prevent atexit registration of lithops' clean method."""
with LithopsEagerFunctionExecutor() as executor:
assert executor.lithops_client.data_cleaner is False
Loading