Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/aleph/services/ipfs/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

async def fetch_raw_cid_streamed(
aioipfs_client: aioipfs.AsyncIPFS,
chunk_size: int,
params: Optional[Dict] = None,
chunk_size: int = 16 * 1024,
) -> AsyncIterable[bytes]:
driver = aioipfs_client.core.driver
params = params or {}
Expand Down Expand Up @@ -208,10 +208,14 @@ async def get_ipfs_content(
return result

async def get_ipfs_content_iterator(
self, cid: str
self, cid: str, chunk_size: int
) -> Optional[AsyncIterable[bytes]]:
params = {aioipfs.helpers.ARG_PARAM: cid}
return fetch_raw_cid_streamed(aioipfs_client=self.ipfs_client, params=params)
return fetch_raw_cid_streamed(
aioipfs_client=self.ipfs_client,
chunk_size=chunk_size,
params=params,
)

async def get_json(self, hash, timeout=1, tries=1):
result = await self.get_ipfs_content(hash, timeout=timeout, tries=tries)
Expand Down
31 changes: 5 additions & 26 deletions src/aleph/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

U0000_STR: Final = "\\u0000"
U0000_BYTES: Final = U0000_STR.encode("utf-8")
STREAM_CHUNK_SIZE: Final = 128 * 1024


def check_for_u0000(item_content: aleph_json.SerializedJsonInput):
Expand Down Expand Up @@ -248,7 +249,7 @@ async def get_hash_content_iterator(
source = None

content_iterator = await self.storage_engine.read_iterator(
filename=content_hash
filename=content_hash, chunk_size=STREAM_CHUNK_SIZE
)
if content_iterator is not None:
source = ContentSource.DB
Expand All @@ -259,34 +260,12 @@ async def get_hash_content_iterator(
ipfs_enabled = config.ipfs.enabled.value
if ipfs_enabled:
content_iterator = (
await self.ipfs_service.get_ipfs_content_iterator(content_hash)
await self.ipfs_service.get_ipfs_content_iterator(
content_hash, chunk_size=STREAM_CHUNK_SIZE
)
)
source = ContentSource.IPFS

if content_iterator is None:
# Fallback to non-streaming if only P2P is available or if streaming failed
# This is a bit suboptimal but keeps it working.
# However, for GET /storage/raw/ we really want streaming.
# If we reach here and it's not in DB/IPFS, we might have to load it from P2P and then wrap it.
try:
content = await self.get_hash_content(
content_hash,
engine=engine,
timeout=timeout,
tries=tries,
use_network=use_network,
use_ipfs=use_ipfs,
store_value=True,
)
source = content.source

async def _iterator():
yield content.value

content_iterator = _iterator()
except ContentCurrentlyUnavailable:
content_iterator = None

if content_iterator is None:
raise ContentCurrentlyUnavailable(
f"Could not fetch content for '{content_hash}'."
Expand Down
Loading