Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4cf7b23
[python] Always resolve blob to actual data on read regardless of blo…
XiaoHongbo-Hope May 18, 2026
77bc51b
[python] Add Blob.from_bytes unified API and revert broken read behavior
XiaoHongbo-Hope May 19, 2026
a610d97
[python] Fix Blob.from_bytes type annotation and add tests
XiaoHongbo-Hope May 19, 2026
eb3897d
[python] Align Blob.from_bytes with Java Blob.fromBytes semantics
XiaoHongbo-Hope May 19, 2026
5a13539
[python] Fix flake8 lint errors
XiaoHongbo-Hope May 19, 2026
3a2a85d
[python] Support row-level Blob access aligned with Java getBlob
XiaoHongbo-Hope May 21, 2026
151fa9c
[python] Align BLOB read path with Java getBlob semantics
XiaoHongbo-Hope May 24, 2026
5983598
Revert "[python] Align BLOB read path with Java getBlob semantics"
XiaoHongbo-Hope May 24, 2026
a5161cf
[python] Align BLOB row API shape with Java InternalRow.getBlob
XiaoHongbo-Hope May 24, 2026
27560b6
[python] Remove to_blob_iterator() — to_iterator() suffices
XiaoHongbo-Hope May 24, 2026
a4e9ee0
[python] Make InternalRow.get_blob abstract and add BinaryRow/Project…
XiaoHongbo-Hope May 24, 2026
a848c1f
[python] Remove unused OffsetRow.with_blob_context alias
XiaoHongbo-Hope May 24, 2026
fca89c4
[docs] Add pypaimon Blob storage page
XiaoHongbo-Hope May 24, 2026
79765e3
[python] Tighten blob tests: temp file safety, naming, coverage
XiaoHongbo-Hope May 24, 2026
4fe5f96
[python] Address JingsongLi review on PR #7891
XiaoHongbo-Hope May 24, 2026
8358165
[python] Fix BinaryRow.get_blob + validate column type in OffsetRow.g…
XiaoHongbo-Hope May 24, 2026
e41934d
[python] Drop verbose remap comment in OuterProjectionRecordReader
XiaoHongbo-Hope May 24, 2026
c8498f5
[python] DRY blob_field_indices computation and trim test comments
XiaoHongbo-Hope May 24, 2026
cd5f4f7
[python] Close SSRF gaps: BlobDescriptorConvertReader propagation + f…
XiaoHongbo-Hope May 24, 2026
9d91f5c
[python] Remove DataFileBatchReader.blob_field_indices footgun
XiaoHongbo-Hope May 24, 2026
2c3bc23
[docs] Clarify that lazy blob streaming requires blob-as-descriptor=true
XiaoHongbo-Hope May 24, 2026
c66e433
[python] Add e2e test pinning BlobDescriptorConvertReader propagation
XiaoHongbo-Hope May 24, 2026
0ed1cb6
[python] Fix misleading blob-as-descriptor doc phrasing and lazy test…
XiaoHongbo-Hope May 24, 2026
24152b2
[python] Drop unused file_io plumbing on inner blob readers
XiaoHongbo-Hope May 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions paimon-python/pypaimon/read/reader/data_file_batch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.blob import Blob, BlobDescriptor
from pypaimon.table.row.blob import Blob
from pypaimon.table.special_fields import SpecialFields


Expand Down Expand Up @@ -178,28 +178,10 @@ def _blob_cell_to_data(self, value):
value = self._normalize_blob_cell(value)
if value is None:
return None

if not isinstance(value, bytes):
return value

descriptor = self._deserialize_descriptor_or_none(value)
if descriptor is None:
return value

try:
uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri)
blob = Blob.from_descriptor(uri_reader, descriptor)
return blob.to_data()
except Exception as e:
raise RuntimeError(
"Failed to read blob bytes from descriptor URI while converting blob value."
) from e

@staticmethod
def _deserialize_descriptor_or_none(raw: bytes):
if not BlobDescriptor.is_blob_descriptor(raw):
return None
return BlobDescriptor.deserialize(raw)
blob = Blob.from_bytes(value, self.file_io)
return blob.to_data() if blob is not None else None

def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
"""Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
Expand Down
10 changes: 7 additions & 3 deletions paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,21 @@ def tuple_iterator(self) -> Optional[Iterator[tuple]]:
return None
return df.iter_rows()

def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
def read_batch(self, file_io=None, blob_field_indices=None) -> Optional[RecordIterator[InternalRow]]:
df = self.read_next_df()
if df is None:
return None
return InternalRowWrapperIterator(df.iter_rows(), df.width)
return InternalRowWrapperIterator(
df.iter_rows(), df.width, file_io, blob_field_indices)


class InternalRowWrapperIterator(RecordIterator[InternalRow]):
def __init__(self, iterator: Iterator[tuple], width: int):
def __init__(self, iterator: Iterator[tuple], width: int,
file_io=None, blob_field_indices=None):
self._iterator = iterator
self._reused_row = OffsetRow(None, 0, width)
if file_io is not None and blob_field_indices:
self._reused_row.with_blob_context(file_io, blob_field_indices)

def next(self) -> Optional[InternalRow]:
row_tuple = next(self._iterator, None)
Expand Down
49 changes: 49 additions & 0 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,55 @@ def _record_generator():

return _record_generator()

def to_blob_iterator(self, splits: List[Split]) -> Iterator:
"""Iterator where blob fields are accessible via row.get_blob(pos).

Unlike to_iterator() which eagerly resolves blobs to bytes,
this returns rows with lazy Blob access supporting streaming.
"""
from pypaimon.common.options.core_options import CoreOptions

blob_field_indices = {
i for i, field in enumerate(self.read_type)
if hasattr(field.type, 'type') and field.type.type == 'BLOB'
}
file_io = self.table.file_io
limit = self.limit

# Force blob-as-descriptor=true so descriptors are preserved
original_value = self.table.options.blob_as_descriptor()
self.table.options.set(CoreOptions.BLOB_AS_DESCRIPTOR, True)

def _blob_record_generator():
try:
count = 0
for split in splits:
if limit is not None and count >= limit:
return
reader = self._create_split_read(split).create_reader()
try:
for batch in iter(
lambda: reader.read_batch(file_io, blob_field_indices),
None
):
for row in iter(batch.next, None):
yield row
count += 1
if limit is not None and count >= limit:
return
finally:
reader.close()
finally:
# Restore original option
if original_value is not None:
self.table.options.set(
CoreOptions.BLOB_AS_DESCRIPTOR, original_value)
else:
self.table.options.options.data.pop(
CoreOptions.BLOB_AS_DESCRIPTOR.key(), None)

return _blob_record_generator()

def to_arrow_batch_reader(self, splits: List[Split]) -> pyarrow.ipc.RecordBatchReader:
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
if self.include_row_kind:
Expand Down
15 changes: 15 additions & 0 deletions paimon-python/pypaimon/table/row/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ def from_file(file_io, file_path: str, offset: int, length: int) -> 'Blob':
def from_descriptor(uri_reader: UriReader, descriptor: BlobDescriptor) -> 'Blob':
return BlobRef(uri_reader, descriptor)

@staticmethod
def from_bytes(data: Optional[bytes], file_io=None, allow_blob_data: bool = True) -> Optional['Blob']:
if data is None:
return None
if not isinstance(data, (bytes, bytearray)):
raise TypeError(f"Blob.from_bytes expects bytes, got {type(data)}")
data = bytes(data)
if BlobDescriptor.is_blob_descriptor(data) or not allow_blob_data:
if file_io is None:
raise ValueError("file_io is required to resolve BlobDescriptor bytes")
descriptor = BlobDescriptor.deserialize(data)
uri_reader = file_io.uri_reader_factory.create(descriptor.uri)
return BlobRef(uri_reader, descriptor)
return BlobData(data)


class BlobData(Blob):

Expand Down
9 changes: 9 additions & 0 deletions paimon-python/pypaimon/table/row/generic_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ def get_field(self, pos: int) -> Any:
raise IndexError(f"Position {pos} is out of bounds for row arity {len(self.values)}")
return self.values[pos]

def get_blob(self, pos: int):
from pypaimon.table.row.blob import Blob
value = self.get_field(pos)
if value is None:
return None
if isinstance(value, Blob):
return value
raise TypeError(f"Cannot get Blob from {type(value)} at position {pos}")

def get_row_kind(self) -> RowKind:
return self.row_kind

Expand Down
11 changes: 10 additions & 1 deletion paimon-python/pypaimon/table/row/internal_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Optional

from pypaimon.table.row.row_kind import RowKind

Expand Down Expand Up @@ -45,6 +45,15 @@ def __len__(self) -> int:
The number does not include RowKind. It is kept separately.
"""

def get_blob(self, pos: int) -> Optional[Any]:
"""Returns the Blob at the given position, or None if null.

Requires a blob-aware row context. Use TableRead.to_blob_iterator().
"""
raise NotImplementedError(
"get_blob() requires a blob-aware row. Use TableRead.to_blob_iterator()."
)

def __str__(self) -> str:
fields = []
for pos in range(self.__len__()):
Expand Down
27 changes: 26 additions & 1 deletion paimon-python/pypaimon/table/row/offset_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from typing import Optional
from typing import Optional, Set

from pypaimon.table.row.internal_row import InternalRow, RowKind

Expand All @@ -28,6 +28,13 @@ def __init__(self, row_tuple: Optional[tuple], offset: int, arity: int):
self.offset = offset
self.arity = arity
self.row_kind_byte: int = 1
self._file_io = None
self._blob_field_indices: Optional[Set[int]] = None

def with_blob_context(self, file_io, blob_field_indices: Set[int]) -> 'OffsetRow':
self._file_io = file_io
self._blob_field_indices = blob_field_indices
return self

def replace(self, row_tuple: tuple) -> 'OffsetRow':
self.row_tuple = row_tuple
Expand All @@ -46,6 +53,24 @@ def get_field(self, pos: int):
raise IndexError(f"Position {pos} is out of bounds for row arity {self.arity}")
return self.row_tuple[self.offset + pos]

def get_blob(self, pos: int):
from pypaimon.table.row.blob import Blob, BlobDescriptor

if self._blob_field_indices is not None and pos not in self._blob_field_indices:
raise TypeError(f"Field at position {pos} is not a BLOB field")
value = self.get_field(pos)
if value is None:
return None
if isinstance(value, (bytes, bytearray)):
value = bytes(value)
if BlobDescriptor.is_blob_descriptor(value):
descriptor = BlobDescriptor.deserialize(value)
uri_reader = self._file_io.uri_reader_factory.create(descriptor.uri)
return Blob.from_descriptor(uri_reader, descriptor)
else:
return Blob.from_data(value)
raise TypeError(f"Cannot convert {type(value)} to Blob")

def get_row_kind(self) -> RowKind:
return RowKind(self.row_kind_byte)

Expand Down
91 changes: 91 additions & 0 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3184,5 +3184,96 @@ def test_rename_blob_column_should_fail(self):
self.assertIn('Cannot rename BLOB column', str(ctx.exception))


class GetBlobTest(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
cls.catalog.create_database('test_db', False)

pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('picture', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
cls.catalog.create_table('test_db.get_blob_test', schema, False)
cls.table = cls.catalog.get_table('test_db.get_blob_test')

data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['a', 'b', 'c'],
'picture': [b'img_data_1', b'img_data_2', b'img_data_3'],
}, schema=pa_schema)

write_builder = cls.table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(data)
commit_messages = writer.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
writer.close()

@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.temp_dir, ignore_errors=True)

def test_get_blob_lazy_access(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

results = []
for row in read.to_blob_iterator(splits):
blob = row.get_blob(2)
self.assertIsNotNone(blob)
results.append((row.get_field(0), blob.to_data()))

self.assertEqual(len(results), 3)
results.sort(key=lambda x: x[0])
self.assertEqual(results[0], (1, b'img_data_1'))
self.assertEqual(results[1], (2, b'img_data_2'))
self.assertEqual(results[2], (3, b'img_data_3'))

def test_get_blob_streaming(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

for row in read.to_blob_iterator(splits):
blob = row.get_blob(2)
with blob.new_input_stream() as stream:
data = stream.read()
self.assertTrue(data.startswith(b'img_data_'))
break

def test_get_blob_non_blob_field_raises(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

for row in read.to_blob_iterator(splits):
with self.assertRaises(TypeError):
row.get_blob(0)
break

def test_to_iterator_unchanged(self):
read_builder = self.table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
read = read_builder.new_read()

count = 0
for row in read.to_iterator(splits):
self.assertIsNotNone(row.get_field(0))
self.assertIsNotNone(row.get_field(1))
count += 1
self.assertEqual(count, 3)


if __name__ == '__main__':
unittest.main()
37 changes: 37 additions & 0 deletions paimon-python/pypaimon/tests/blob_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,43 @@ def test_from_http(self):
self.assertEqual(descriptor.offset, 0)
self.assertEqual(descriptor.length, -1)

def test_from_bytes_with_raw_data(self):
raw = b"hello blob"
blob = Blob.from_bytes(raw)
self.assertIsInstance(blob, BlobData)
self.assertEqual(blob.to_data(), raw)

def test_from_bytes_with_none(self):
self.assertIsNone(Blob.from_bytes(None))

def test_from_bytes_with_descriptor(self):
import tempfile
import os
data = b"actual blob content"
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.write(data)
tmp.close()

descriptor = BlobDescriptor(tmp.name, 0, len(data))
serialized = descriptor.serialize()

from pypaimon.common.file_io import FileIO
file_io = FileIO.get(f"file://{os.path.dirname(tmp.name)}", {})
blob = Blob.from_bytes(serialized, file_io)
self.assertIsInstance(blob, BlobRef)
self.assertEqual(blob.to_data(), data)
os.unlink(tmp.name)

def test_from_bytes_descriptor_without_file_io_raises(self):
descriptor = BlobDescriptor("/tmp/fake", 0, 10)
serialized = descriptor.serialize()
with self.assertRaises(ValueError):
Blob.from_bytes(serialized)

def test_from_bytes_invalid_type_raises(self):
with self.assertRaises(TypeError):
Blob.from_bytes(12345)

def test_blob_data_interface_compliance(self):
"""Test that BlobData properly implements Blob interface."""
test_data = b"interface test data"
Expand Down
Loading