Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/simdb/remote/apis/v1_2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from simdb.remote.core.typing import current_app
from simdb.remote.models import StagingDirectoryResponse

from .simulation_data import api as data_ns
from .simulations import api as sim_ns

api = Api(
Expand All @@ -31,7 +32,7 @@
)

api.add_namespace(sim_ns)
namespaces = [metadata_ns, watcher_ns, file_ns, sim_ns]
namespaces = [metadata_ns, watcher_ns, file_ns, sim_ns, data_ns]


@api.route("/staging_dir", defaults={"sim_hex": None})
Expand Down
189 changes: 189 additions & 0 deletions src/simdb/remote/apis/v1_2/simulation_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""IMAS simulation data endpoint: /data.

TODO: Temporary solution to retrieve data (for IBEX backend)
"""

from typing import Annotated, Any, NamedTuple

import numpy as np
from flask_restx import Namespace, Resource
from imas.ids_defs import EMPTY_FLOAT
from imas.ids_primitive import IDSPrimitive

from simdb.cli.manifest import DataObject
from simdb.database import DatabaseError
from simdb.imas.utils import (
ImasError,
open_imas,
)
from simdb.remote.core.auth import User, requires_auth
from simdb.remote.core.pydantic_utils import (
Query,
ResponseException,
ServerException,
pydantic_validate,
)
from simdb.remote.core.typing import current_app
from simdb.remote.models import ImasDataQueryParams, ImasDataResponse, QuantityData
from simdb.uri import URI

api = Namespace("data", path="/")


# Helpers


def _to_python(value: Any) -> Any:
"""Convert a value returned by IDSPrimitive.value to a JSON-serialisable
Python object."""
if isinstance(value, np.ndarray):
flat = value.tolist()

def _clean(v):
if isinstance(v, float) and (
v != v or v == float("inf") or v == float("-inf") or v == EMPTY_FLOAT
):
return None
if isinstance(v, list):
return [_clean(x) for x in v]
return v

return _clean(flat)
return value


def _parse_ids_path(path: str) -> tuple:
"""Parse ``ids_name[:occurrence][/ids_path]`` into a 3-tuple"""
head, _, ids_path = path.partition("/")
if ":" in head:
ids_name, occ_str = head.split(":", 1)
try:
occurrence = int(occ_str)
except ValueError as exc:
raise ValueError(
f"Invalid occurrence in path '{path}': '{occ_str}'"
) from exc
else:
ids_name, occurrence = head, 0
return ids_name, occurrence, ids_path


def _get_coordinates(node: IDSPrimitive, ids_name: str) -> list:
"""Return a :class:`QuantityData` for each coordinate dimension of *node*."""
coords = []
for i in range(node.metadata.ndim):
coord = node.coordinates[i]
if isinstance(coord, IDSPrimitive):
data = (
_to_python(coord.value)
if coord.has_value
else list(range(node.shape[i]))
)
coords.append(
QuantityData(
name=f"{ids_name}/{coord._path}",
units=coord.metadata.units or "",
data=data,
)
)
else:
# Index-based coordinate: coord is already a numpy arange
coords.append(
QuantityData(
name=f"dim_{i + 1}",
units="",
data=coord.tolist(),
)
)
return coords


def _get_ids_node(entry, ids_name: str, occurrence: int, ids_path: str) -> IDSPrimitive:
"""Return the :class:`IDSPrimitive` leaf node at *ids_path* inside *ids_name*."""
ids_obj = entry.get(
ids_name,
occurrence,
lazy=True,
autoconvert=False,
ignore_unknown_dd_version=True,
)
node = ids_obj[ids_path] if ids_path else ids_obj
if not isinstance(node, IDSPrimitive):
raise ValueError(
f"path does not point to a scalar/array leaf "
f"(reached {type(node).__name__}); add more path segments"
)
if not node.has_value:
raise ValueError("field is not populated (no data written)")
return node


class _SimulationImasFile(NamedTuple):
simulation: Any
imas_file: Any


def _get_simulation_and_imas_file(sim_id: str) -> _SimulationImasFile:
try:
simulation = current_app.db.get_simulation(sim_id)
except DatabaseError as exc:
raise ResponseException(str(exc), 404) from exc

imas_outputs = [f for f in simulation.outputs if f.type == DataObject.Type.IMAS]
if not imas_outputs:
raise ResponseException(f"Simulation {sim_id} has no IMAS output files", 404)

return _SimulationImasFile(simulation, imas_outputs[0])


# Endpoints


@api.route("/simulation/<path:sim_id>/data")
class SimulationImasData(Resource):
@requires_auth()
@pydantic_validate(api)
def get(
self,
sim_id: str,
user: User,
params: Annotated[ImasDataQueryParams, Query()],
) -> ImasDataResponse:
"""Return the value at a given IDS path for a simulation's IMAS output."""
result = _get_simulation_and_imas_file(sim_id)

try:
ids_name, occurrence, ids_path = _parse_ids_path(params.path)
except ValueError as exc:
raise ResponseException(str(exc)) from exc

try:
imas_uri = URI(str(result.imas_file.uri))
if imas_uri.authority.host and "cache_mode" not in imas_uri.query:
imas_uri.query.set("cache_mode", "none")
entry = open_imas(imas_uri)
with entry:
node = _get_ids_node(entry, ids_name, occurrence, ids_path)
coordinates = _get_coordinates(node, ids_name)
field = QuantityData(
name=f"{ids_name}/{node._path}",
units=node.metadata.units or "",
data=_to_python(node.value),
)
except (ValueError, AttributeError, IndexError, KeyError) as exc:
raise ResponseException(f"Invalid IDS path '{params.path}': {exc}") from exc
except ImasError as exc:
raise ServerException(f"Failed to open IMAS data: {exc}") from exc
except Exception as exc:
msg = str(exc)
if "is empty" in msg or "not found" in msg.lower():
raise ResponseException(msg, 404) from exc
raise ServerException(msg) from exc

return ImasDataResponse(
simulation=str(result.simulation.uuid),
path=params.path,
occurrence=occurrence,
field=field,
coordinates=coordinates,
)
48 changes: 47 additions & 1 deletion src/simdb/remote/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Field,
InstanceOf,
PlainSerializer,
field_validator,
model_validator,
)
from pydantic import (
Expand Down Expand Up @@ -133,7 +134,10 @@ def _deserialize_numpy(v: Any) -> Any:
return v
if isinstance(v, dict) and v.get("_type") == "numpy.ndarray":
np_bytes = base64.b64decode(v["bytes"].encode())
return np.frombuffer(np_bytes, dtype=v["dtype"]).reshape(v["shape"])
arr = np.frombuffer(np_bytes, dtype=v["dtype"])
if "shape" in v:
arr = arr.reshape(v["shape"])
return arr
raise ValueError(f"Cannot deserialize {v} to np.ndarray")


Expand Down Expand Up @@ -553,6 +557,48 @@ class StagingDirectoryResponse(BaseModel):
"""Path to the staging dir."""


class ImasDataQueryParams(BaseModel):
"""Query parameters for the IMAS field-data endpoint."""

path: str
"""Full IDS path including IDS name and optional occurrence."""

@field_validator("path", mode="before")
@classmethod
def _strip_path(cls, v: Any) -> str:
v = str(v).strip()
if not v:
raise ValueError("must not be empty")
return v


class QuantityData(BaseModel):
"""A named, unit-bearing data quantity (field value or coordinate)."""

name: str
"""IDS path of this quantity relative to the IDS root"""
units: str
"""Physical units of the quantity"""
data: Any
"""Data value: a Python scalar for 0-D quantities, or a nested list for
arrays. """


class ImasDataResponse(BaseModel):
"""Response from the IMAS field-data endpoint."""

simulation: str
"""UUID of the simulation."""
path: str
"""Requested IDS path."""
occurrence: int
"""IDS occurrence index."""
field: QuantityData
"""The requested quantity"""
coordinates: List[QuantityData]
"""Coordinates for each dimension of *field*, in dimension order."""


class ErrorResponse(BaseModel):
"""Response model for server errors."""

Expand Down
Loading