Skip to content

Add rechunk_by_size functionality#284

Merged
sjperkins merged 14 commits intomasterfrom
rechunk_by_size
Sep 19, 2023
Merged

Add rechunk_by_size functionality#284
sjperkins merged 14 commits intomasterfrom
rechunk_by_size

Conversation

@JSKenyon
Copy link
Copy Markdown
Collaborator

@JSKenyon JSKenyon commented Aug 17, 2023

  • Tests added / passed

    $ py.test -v -s daskms/tests

    If the pep8 tests fail, the quickest way to correct
    this is to run autopep8 and then flake8 and
    pycodestyle to fix the remaining issues.

    $ pip install -U autopep8 flake8 pycodestyle
    $ autopep8 -r -i daskms
    $ flake8 daskms
    $ pycodestyle daskms
    
  • Fully documented, including HISTORY.rst for all changes
    and one of the docs/*-api.rst files for new API

    To build the docs locally:

    pip install -r requirements.readthedocs.txt
    cd docs
    READTHEDOCS=True make html
    

This PR aims to address #283 by adding a utility function which can be invoked to rechunk an xarray.Dataset such that all dask.Arrays have chunk sizes less than or equal to some specified size. This is handy when we want to read from one format but write to another which may have different limitations. The specific motivating case is reading from a measurement set and writing to zarr. This operation may fail when chunks exceed 2GB.

I believe that this first effort works but some additional functionality may be required including:

  • Subchunk only: Given a large chunk, ensure that the new chunks do not cross chunk boundaries. This is complicated by zarr's expectation of uniform chunk size in each dimension, with the exception of the last. The v3 spec may make this easier (actually, possibly available in v2, see POC implementation of ZEP003 zarr-developers/zarr-python#1483).
  • Only if required: We may only want to rechunk when necessary to ensure chunks do not exceed a certain size, but retain existing chunking otherwise.

@JSKenyon JSKenyon marked this pull request as draft August 17, 2023 10:57
@JSKenyon
Copy link
Copy Markdown
Collaborator Author

JSKenyon commented Aug 22, 2023

Currently failing test appears to be minio related. Traceback as follows:

_____________________ ERROR at setup of test_minio_server ______________________

minio_client = PosixPath('/home/runner/.local/bin/mc')
minio_alias = 'testcloud', minio_user_key = 'abcdef1234567890'

    @pytest.fixture
    def minio_admin(minio_client, minio_alias, minio_user_key):
        minio = pytest.importorskip("minio")
        minio_admin = minio.MinioAdmin(minio_alias, binary_path=str(minio_client))
        # Add a user and give it readwrite access
        minio_admin.user_add(minio_user_key, minio_user_key)
>       minio_admin.policy_set("readwrite", user=minio_user_key)

daskms/conftest.py:343: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../.cache/pypoetry/virtualenvs/dask-ms-v9AZFZiS-py3.9/lib/python3.9/site-packages/minio/minioadmin.py:148: in policy_set
    return self._run(
../../../.cache/pypoetry/virtualenvs/dask-ms-v9AZFZiS-py3.9/lib/python3.9/site-packages/minio/minioadmin.py:47: in _run
    proc = subprocess.run(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
input = None, capture_output = True, timeout = None, check = True
popenargs = (['/home/runner/.local/bin/mc', '--json', 'admin', 'policy', 'attach', 'testcloud', ...],)
kwargs = {'env': None, 'stderr': -1, 'stdout': -1, 'text': True}
process = <Popen: returncode: 1 args: ['/home/runner/.local/bin/mc', '--json', 'admin'...>
stdout = '{"status":"error","error":{"message":"`attach` is not a recognized command. Get help using `--help` flag.","cause":{"message":"","error":{}},"type":"fatal"}}\n'
stderr = '', retcode = 1

    def run(*popenargs,
            input=None, capture_output=False, timeout=None, check=False, **kwargs):
        """Run command with arguments and return a CompletedProcess instance.
    
        The returned instance will have attributes args, returncode, stdout and
        stderr. By default, stdout and stderr are not captured, and those attributes
        will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them.
    
        If check is True and the exit code was non-zero, it raises a
        CalledProcessError. The CalledProcessError object will have the return code
        in the returncode attribute, and output & stderr attributes if those streams
        were captured.
    
        If timeout is given, and the process takes too long, a TimeoutExpired
        exception will be raised.
    
        There is an optional argument "input", allowing you to
        pass bytes or a string to the subprocess's stdin.  If you use this argument
        you may not also use the Popen constructor's "stdin" argument, as
        it will be used internally.
    
        By default, all communication is in bytes, and therefore any "input" should
        be bytes, and the stdout and stderr will be bytes. If in text mode, any
        "input" should be a string, and stdout and stderr will be strings decoded
        according to locale encoding, or by "encoding" if set. Text mode is
        triggered by setting any of text, encoding, errors or universal_newlines.
    
        The other arguments are the same as for the Popen constructor.
        """
        if input is not None:
            if kwargs.get('stdin') is not None:
                raise ValueError('stdin and input arguments may not both be used.')
            kwargs['stdin'] = PIPE
    
        if capture_output:
            if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
                raise ValueError('stdout and stderr arguments may not be used '
                                 'with capture_output.')
            kwargs['stdout'] = PIPE
            kwargs['stderr'] = PIPE
    
        with Popen(*popenargs, **kwargs) as process:
            try:
                stdout, stderr = process.communicate(input, timeout=timeout)
            except TimeoutExpired as exc:
                process.kill()
                if _mswindows:
                    # Windows accumulates the output in a single blocking
                    # read() call run on child threads, with the timeout
                    # being done in a join() on those threads.  communicate()
                    # _after_ kill() is required to collect that and add it
                    # to the exception.
                    exc.stdout, exc.stderr = process.communicate()
                else:
                    # POSIX _communicate already populated the output so
                    # far into the TimeoutExpired exception.
                    process.wait()
                raise
            except:  # Including KeyboardInterrupt, communicate handled that.
                process.kill()
                # We don't call process.wait() as .__exit__ does that for us.
                raise
            retcode = process.poll()
            if check and retcode:
>               raise CalledProcessError(retcode, process.args,
                                         output=stdout, stderr=stderr)
E               subprocess.CalledProcessError: Command '['/home/runner/.local/bin/mc', '--json', 'admin', 'policy', 'attach', 'testcloud', 'readwrite', '--user', 'abcdef1234567890']' returned non-zero exit status 1.

@JSKenyon
Copy link
Copy Markdown
Collaborator Author

OK, I can confirm that something in minio==17.1.16 breaks the test suite. Reverting for now.

@sjperkins
Copy link
Copy Markdown
Member

sjperkins commented Sep 14, 2023

OK, I can confirm that something in minio==17.1.16 breaks the test suite. Reverting for now.

This has been fixed on master. Could you merge or rebase the PR?

@JSKenyon
Copy link
Copy Markdown
Collaborator Author

I have merged in master.

@sjperkins
Copy link
Copy Markdown
Member

Could you please add a HISTORY.rst entry?

@JSKenyon JSKenyon marked this pull request as ready for review September 19, 2023 13:32
@sjperkins sjperkins merged commit 4e27d98 into master Sep 19, 2023
@sjperkins sjperkins deleted the rechunk_by_size branch September 19, 2023 13:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants