Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,9 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
@staticmethod
CResult[shared_ptr[COutputStream]] Open(const c_string& path)

@staticmethod
CResult[shared_ptr[COutputStream]] Open(int fd)

@staticmethod
CResult[shared_ptr[COutputStream]] OpenWithAppend" Open"(
const c_string& path, c_bool append)
Expand All @@ -1687,6 +1690,12 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
@staticmethod
CResult[shared_ptr[ReadableFile]] Open(const c_string& path)

@staticmethod
CResult[shared_ptr[ReadableFile]] Open(int fd)

@staticmethod
CResult[shared_ptr[ReadableFile]] Open(int fd, CMemoryPool* memory_pool)

@staticmethod
CResult[shared_ptr[ReadableFile]] Open(const c_string& path,
CMemoryPool* memory_pool)
Expand Down
64 changes: 53 additions & 11 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,11 @@ cdef class OSFile(NativeFile):
"""
A stream backed by a regular file descriptor.

Parameters
----------
path : str or int
A file path or an open file descriptor.
Comment thread
alippai marked this conversation as resolved.

Examples
--------
Create a new file to write to:
Expand Down Expand Up @@ -1228,22 +1233,33 @@ cdef class OSFile(NativeFile):
object path

def __cinit__(self, path, mode='r', MemoryPool memory_pool=None):
_check_is_file(path)
self.path = path

cdef:
FileMode c_mode
shared_ptr[Readable] handle
c_string c_path = encode_file_path(path)

if mode in ('r', 'rb'):
self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb'):
self._open_writable(c_path)
elif mode in ('a', 'ab'):
self._open_writable(c_path, append=True)
c_string c_path
int fd

if isinstance(path, int):
fd = path
if mode in ('r', 'rb'):
self._open_readable_fd(fd, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb', 'a', 'ab'):
self._open_writable_fd(fd, append=(mode in ('a', 'ab')))
else:
raise ValueError(f'Invalid file mode: {mode}')
else:
raise ValueError(f'Invalid file mode: {mode}')
_check_is_file(path)
c_path = encode_file_path(path)
if mode in ('r', 'rb'):
self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool))
elif mode in ('w', 'wb'):
self._open_writable(c_path)
elif mode in ('a', 'ab'):
self._open_writable(c_path, append=True)
else:
raise ValueError(f'Invalid file mode: {mode}')

cdef _open_readable(self, c_string path, CMemoryPool* pool):
cdef shared_ptr[ReadableFile] handle
Expand All @@ -1262,9 +1278,35 @@ cdef class OSFile(NativeFile):
self.is_writable = True
self._is_appending = append

cdef _open_readable_fd(self, int fd, CMemoryPool* pool):
cdef shared_ptr[ReadableFile] handle

with nogil:
handle = GetResultValue(ReadableFile.Open(fd, pool))

self.is_readable = True
self.set_random_access_file(<shared_ptr[CRandomAccessFile]> handle)

cdef _open_writable_fd(self, int fd, c_bool append=False):
with nogil:
self.output_stream = GetResultValue(FileOutputStream.Open(fd))
self.is_writable = True
self._is_appending = append

def fileno(self):
self._assert_open()
return self.handle.file_descriptor()
cdef:
shared_ptr[ReadableFile] readable_handle
shared_ptr[FileOutputStream] writable_handle

if self.is_readable:
readable_handle = static_pointer_cast[ReadableFile, CRandomAccessFile](
self.get_random_access_file())
return readable_handle.get().file_descriptor()
else:
writable_handle = static_pointer_cast[FileOutputStream, COutputStream](
self.get_output_stream())
return writable_handle.get().file_descriptor()


cdef class FixedSizeBufferWriter(NativeFile):
Expand Down
17 changes: 17 additions & 0 deletions python/pyarrow/tests/parquet/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ def test_memory_map(tempdir):
assert table_read.equals(table)


def test_parquet_read_write_table_raw_fd(tempdir):
table = pa.table({'a': [1, 2, 3]})
path = str(tempdir / 'raw-fd.parquet')
binary_flag = getattr(os, "O_BINARY", 0)

fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | binary_flag,
0o666)
with pa.OSFile(fd, mode='wb') as sink:
pq.write_table(table, sink)

fd = os.open(path, os.O_RDONLY | binary_flag)
with pa.OSFile(fd, mode='rb') as source:
result = pq.read_table(source)

assert result.equals(table)


@pytest.mark.pandas
def test_enable_buffered_stream(tempdir):
df = alltypes_sample(size=10)
Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import bz2
from contextlib import contextmanager
import errno
from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase)
import itertools
import gc
Expand Down Expand Up @@ -1280,6 +1281,30 @@ def test_os_file_writer(tmpdir):
assert f5.size() == 6 # foo + bar


def test_os_file_raw_fd(tmpdir):
path = os.path.join(str(tmpdir), guid())
binary_flag = getattr(os, "O_BINARY", 0)

fd = os.open(path, os.O_CREAT | os.O_WRONLY | os.O_TRUNC | binary_flag,
0o666)
with pa.OSFile(fd, mode='wb') as f:
assert f.fileno() == fd
f.write(b'foo')

with pytest.raises(OSError) as exc:
os.fstat(fd)
assert exc.value.errno == errno.EBADF

fd = os.open(path, os.O_RDONLY | binary_flag)
with pa.OSFile(fd, mode='rb') as f:
assert f.fileno() == fd
assert f.read() == b'foo'

with pytest.raises(OSError) as exc:
os.fstat(fd)
assert exc.value.errno == errno.EBADF


def test_native_file_write_reject_unicode():
# ARROW-3227
nf = pa.BufferOutputStream()
Expand Down
Loading