From 4ed4b5bdb227ad579892f6b7a13934e48e3d0d59 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Sun, 25 Jan 2026 23:33:10 +0100 Subject: [PATCH] fix: make streaming chunk sizes consistent across filesystem and IPFS --- src/aleph/services/ipfs/service.py | 10 +++++++--- src/aleph/storage.py | 31 +++++------------------------- 2 files changed, 12 insertions(+), 29 deletions(-) diff --git a/src/aleph/services/ipfs/service.py b/src/aleph/services/ipfs/service.py index 8f09796ec..9f8c9df98 100644 --- a/src/aleph/services/ipfs/service.py +++ b/src/aleph/services/ipfs/service.py @@ -20,8 +20,8 @@ async def fetch_raw_cid_streamed( aioipfs_client: aioipfs.AsyncIPFS, + chunk_size: int, params: Optional[Dict] = None, - chunk_size: int = 16 * 1024, ) -> AsyncIterable[bytes]: driver = aioipfs_client.core.driver params = params or {} @@ -208,10 +208,14 @@ async def get_ipfs_content( return result async def get_ipfs_content_iterator( - self, cid: str + self, cid: str, chunk_size: int ) -> Optional[AsyncIterable[bytes]]: params = {aioipfs.helpers.ARG_PARAM: cid} - return fetch_raw_cid_streamed(aioipfs_client=self.ipfs_client, params=params) + return fetch_raw_cid_streamed( + aioipfs_client=self.ipfs_client, + chunk_size=chunk_size, + params=params, + ) async def get_json(self, hash, timeout=1, tries=1): result = await self.get_ipfs_content(hash, timeout=timeout, tries=tries) diff --git a/src/aleph/storage.py b/src/aleph/storage.py index 5e3a70f4f..f1f386ec3 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -36,6 +36,7 @@ U0000_STR: Final = "\\u0000" U0000_BYTES: Final = U0000_STR.encode("utf-8") +STREAM_CHUNK_SIZE: Final = 128 * 1024 def check_for_u0000(item_content: aleph_json.SerializedJsonInput): @@ -248,7 +249,7 @@ async def get_hash_content_iterator( source = None content_iterator = await self.storage_engine.read_iterator( - filename=content_hash + filename=content_hash, chunk_size=STREAM_CHUNK_SIZE ) if content_iterator is not None: source = ContentSource.DB @@ -259,34 +260,12 @@ async def get_hash_content_iterator( ipfs_enabled = config.ipfs.enabled.value if ipfs_enabled: content_iterator = ( - await self.ipfs_service.get_ipfs_content_iterator(content_hash) + await self.ipfs_service.get_ipfs_content_iterator( + content_hash, chunk_size=STREAM_CHUNK_SIZE + ) ) source = ContentSource.IPFS - if content_iterator is None: - # Fallback to non-streaming if only P2P is available or if streaming failed - # This is a bit suboptimal but keeps it working. - # However, for GET /storage/raw/ we really want streaming. - # If we reach here and it's not in DB/IPFS, we might have to load it from P2P and then wrap it. - try: - content = await self.get_hash_content( - content_hash, - engine=engine, - timeout=timeout, - tries=tries, - use_network=use_network, - use_ipfs=use_ipfs, - store_value=True, - ) - source = content.source - - async def _iterator(): - yield content.value - - content_iterator = _iterator() - except ContentCurrentlyUnavailable: - content_iterator = None - if content_iterator is None: raise ContentCurrentlyUnavailable( f"Could not fetch content for '{content_hash}'."