-
Notifications
You must be signed in to change notification settings - Fork 63
fix: add shutdown methods to executors #925
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
d97bd65
4f4c061
d925fb7
46908c1
4732cc3
031ffac
6414546
f013d74
216fb09
6a83bf3
3f9aa44
af2c83e
4481f63
a612af5
3ff3bf1
0cb839d
05316e6
23d2654
748f50d
263656e
b9ba3ca
63481f8
68a60e9
ba61353
71a43b4
394c019
d1822a3
b95baa1
d8c0ab1
887edac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import atexit | ||
| import inspect | ||
| import multiprocessing as mp | ||
| import warnings | ||
|
|
@@ -372,4 +373,18 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: | |
| wait | ||
| Whether to wait for pending futures. | ||
| """ | ||
| # Free cached results from lithops ResponseFuture objects before shutdown. | ||
| # lithops.FunctionExecutor.futures is never cleared internally — each map() | ||
| # call extends it with new ResponseFutures that cache deserialized results | ||
| # in _call_output. Without this, memory accumulates across repeated calls. | ||
| for f in self.lithops_client.futures: | ||
| f._call_output = None | ||
| self.lithops_client.futures.clear() | ||
| self._futures.clear() | ||
|
|
||
| # Lithops registers self.clean as an atexit handler (executors.py __init__), | ||
| # which prevents the FunctionExecutor from ever being garbage collected. | ||
| # Unregister it so the executor can be freed after shutdown. | ||
| atexit.unregister(self.lithops_client.clean) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is absolutely wild and deserves raising upstream
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably so.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see #926 |
||
|
|
||
| self.lithops_client.__exit__(None, None, None) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,16 @@ | ||
| import gc | ||
| import multiprocessing as mp | ||
| import weakref | ||
|
|
||
| import pytest | ||
|
|
||
| from virtualizarr.parallel import LithopsEagerFunctionExecutor, get_executor | ||
| from virtualizarr.tests import requires_lithops | ||
| from virtualizarr.parallel import ( | ||
| DaskDelayedExecutor, | ||
| LithopsEagerFunctionExecutor, | ||
| SerialExecutor, | ||
| get_executor, | ||
| ) | ||
| from virtualizarr.tests import requires_dask, requires_lithops | ||
|
|
||
|
|
||
| @requires_lithops | ||
|
|
@@ -41,3 +48,99 @@ def test_get_executor_process_pool_mode(): | |
|
|
||
| assert ctx is not None, "Expected executor to have a multiprocessing context" | ||
| assert ctx.get_start_method() == "forkserver" | ||
|
|
||
|
|
||
| @requires_lithops | ||
| class TestLithopsExecutorShutdown: | ||
|
jbusecke marked this conversation as resolved.
Outdated
|
||
| def test_shutdown_clears_lithops_client_futures(self): | ||
| executor = LithopsEagerFunctionExecutor() | ||
| executor.submit(lambda: 42) | ||
|
|
||
| executor.shutdown() | ||
| assert len(executor.lithops_client.futures) == 0 | ||
|
|
||
| def test_shutdown_clears_lithops_cached_results(self): | ||
| """Verify that shutdown clears _call_output on lithops ResponseFutures.""" | ||
| with LithopsEagerFunctionExecutor() as executor: | ||
| executor.map(lambda x: x * 2, (1, 2, 3)) | ||
| lithops_futures = list(executor.lithops_client.futures) | ||
| assert len(lithops_futures) > 0 | ||
|
|
||
| # After shutdown, lithops futures list should be cleared | ||
| assert len(executor.lithops_client.futures) == 0 | ||
|
|
||
|
|
||
| def _make_executor(executor_cls): | ||
| """Create a pytest param for an executor class with appropriate marks.""" | ||
| marks = { | ||
| "DaskDelayedExecutor": [requires_dask], | ||
| "LithopsEagerFunctionExecutor": [requires_lithops], | ||
| } | ||
| return pytest.param( | ||
| executor_cls, | ||
| id=executor_cls.__name__, | ||
| marks=marks.get(executor_cls.__name__, []), | ||
| ) | ||
|
|
||
|
|
||
| ALL_EXECUTORS = [ | ||
|
jbusecke marked this conversation as resolved.
Outdated
|
||
| _make_executor(SerialExecutor), | ||
| _make_executor(DaskDelayedExecutor), | ||
| _make_executor(LithopsEagerFunctionExecutor), | ||
| ] | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) | ||
| class TestExecutorMemory: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if either of these tests will be reliable enough - curious of @chuckwondo 's thoughts. |
||
| def test_executor_does_not_leak_after_context_manager(self, executor_cls): | ||
| """Executor and its futures should be GC-collectable after the with block.""" | ||
|
|
||
| with executor_cls() as executor: | ||
| # Use map() since lithops call_async requires a data argument | ||
| list(executor.map(lambda x: x * 2, range(5))) | ||
| ref = weakref.ref(executor) | ||
|
|
||
| # Drop the only local reference to the executor | ||
| del executor | ||
| gc.collect() | ||
|
|
||
| assert ref() is None, ( | ||
| f"{executor_cls.__name__} was not garbage collected after shutdown" | ||
| ) | ||
|
|
||
| def test_repeated_executor_use_does_not_grow_memory(self, executor_cls): | ||
| """Memory should not grow when creating and destroying executors repeatedly.""" | ||
| import tracemalloc | ||
|
|
||
| def _run_once(): | ||
| with executor_cls() as executor: | ||
| # Use map() to produce non-trivial results | ||
| return list(executor.map(lambda x: list(range(10_000)), range(5))) | ||
|
|
||
| # Warm up (first run may allocate caches, import modules, etc.) | ||
| _run_once() | ||
| gc.collect() | ||
|
|
||
| # Measure baseline: peak memory from a single run | ||
| tracemalloc.start() | ||
| _run_once() | ||
| gc.collect() | ||
| _, baseline_peak = tracemalloc.get_traced_memory() | ||
| tracemalloc.stop() | ||
|
|
||
| # Now run many iterations and check peak doesn't grow | ||
| tracemalloc.start() | ||
| n_iterations = 10 | ||
| for _ in range(n_iterations): | ||
| _run_once() | ||
| gc.collect() | ||
| _, multi_peak = tracemalloc.get_traced_memory() | ||
| tracemalloc.stop() | ||
|
|
||
| # If memory leaks, peak will scale with n_iterations. | ||
| # Allow 1.2x the single-run peak to account for GC timing jitter. | ||
| assert multi_peak < 1.2 * baseline_peak, ( | ||
| f"{executor_cls.__name__} leaked memory: single run peak " | ||
| f"{baseline_peak / 1024:.0f} KB, {n_iterations} runs peak " | ||
| f"{multi_peak / 1024:.0f} KB" | ||
| ) | ||
Uh oh!
There was an error while loading. Please reload this page.