Skip to content
51 changes: 51 additions & 0 deletions alembic/versions/b2c52ee8ff12_add_ingestion_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Add ingestion status

Revision ID: b2c52ee8ff12
Revises: 9e9a4a7cd639
Create Date: 2026-05-11 16:16:03.768893

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "b2c52ee8ff12"
down_revision: Union[str, Sequence[str], None] = "9e9a4a7cd639"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Upgrade schema."""
with op.batch_alter_table("simulations", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"ingestion_status",
sa.Enum(
"QUEUED",
"COPYING",
"COPIED",
"VALIDATING",
"VALIDATED",
"COMPLETED",
"COPY_FAILED",
"VALIDATION_FAILED",
name="ingestionstatus",
),
nullable=False,
)
)
batch_op.add_column(
sa.Column("ingestion_version", sa.Integer(), nullable=False)
)


def downgrade() -> None:
"""Downgrade schema."""
with op.batch_alter_table("simulations", schema=None) as batch_op:
batch_op.drop_column("ingestion_version")
batch_op.drop_column("ingestion_status")
77 changes: 77 additions & 0 deletions docs/celery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Celery async task processing

SimDB uses [Celery](https://docs.celeryproject.org/) to run asynchronous background
tasks such as copying simulation files and completing the ingestion pipeline.

## Overview

When simulations are uploaded via the REST API, the server offloads heavy operations
to Celery workers instead of blocking the HTTP request. Tasks are defined in
`src/simdb/workers/tasks.py`:

- `copy_files_task` — copies input/output files from source locations to the server's
upload folder and updates the simulation's ingestion status.
- `complete_ingestion_task` — marks a simulation as fully ingested.
- `validate_imas_task` — runs validation checks on IMAS data (placeholder).
- `send_email_task` — sends email notifications.

Tasks can be chained in the API endpoint:

```python
copy_files = copy_files_task.si(simulation.uuid, ...)
complete = complete_ingestion_task.si(simulation.uuid)
_ = (copy_files | complete).apply_async()
```

## Configuration

Celery is configured via `app.cfg`:

| Section | Option | Required | Description |
|---------|----------------|----------|--------------------------------------------------|
| celery | broker_url | no | Redis URL for the message broker. Defaults to `redis://localhost:6379/0` |
| celery | result_backend | no | Redis URL for results storage. Defaults to `redis://localhost:6379/0` |

Example:

```ini
[celery]
broker_url = redis://localhost:6379/0
result_backend = redis://localhost:6379/0
```

## Running workers

### Standalone worker

Start a Celery worker using the built-in CLI:

```bash
simdb_celery worker
```

### Worker with beat scheduler

For periodic tasks (e.g. cleanup, reports), run both the worker and beat:

```bash
# Terminal 1: worker
simdb_celery worker

# Terminal 2: beat scheduler
simdb_celery beat
```

### Flower monitoring

[Flower](https://flower.readthedocs.io/) provides a web UI for monitoring Celery
workers and tasks:

```bash
celery -A simdb.workers.celery flower --port=5555
```

## Testing with eager mode

In tests, set `task_always_eager = True` to run tasks synchronously without a
broker.
17 changes: 17 additions & 0 deletions docs/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ simdb_server

This will start a server on port 5000. You can test this server is running by opening http://localhost:5000 in a browser.

## Running Celery workers

For development, you typically want to run Celery tasks synchronously. This is
enabled by setting `task_always_eager = True` in tests (see `tests/remote/api/v1.3/test_simulations3.py`).

To run actual background workers during development:

```bash
# Worker
simdb_celery worker

# Beat scheduler (if needed)
simdb_celery beat
```

See the [Celery documentation](celery.md) for full details.

## Linting and formatting

SimDB uses [Ruff](https://docs.astral.sh/ruff/) for both linting and code
Expand Down
6 changes: 6 additions & 0 deletions docs/maintenance_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ service nginx restart

You should now be able to check the simdb server is running by going to the http address defined in your nginx site (localhost:80 in the example above).

## Celery background workers

SimDB uses Celery to run asynchronous background tasks such as copying simulation
files. See the [Celery documentation](celery.md) for details on configuration and
running workers.

#### Nginx Request Entity Size

You may need to increase the size of uploaded files that Nginx will accept. For SimDB this should be at least 100MB.
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,19 @@ build-docs = [
postgres = [
"psycopg2-binary>=2.8.0",
]
celery = [
"celery>=5.3.0",
"redis>=5.0.0",
]
all = [
"imas-simdb[server, imas-validator, postgres]"
"imas-simdb[server, imas-validator, postgres, celery]",
]

[project.scripts]
simdb = "simdb.cli.simdb:main"
simdb_server = "simdb.remote.wsgi:run"
simdb_worker = "simdb.workers.cli:worker"
simdb_beat = "simdb.workers.cli:beat"

[project.urls]
Homepage = "https://simdb.iter.org/dashboard/"
Expand Down Expand Up @@ -167,5 +173,5 @@ dev = [
"pytest-cov>=5.0.0",
"ruff~=0.15.0",
"ty==0.0.34",
"imas-simdb[server, imas-validator, postgres, auth]"
"imas-simdb[server, imas-validator, postgres, auth, celery]"
]
17 changes: 17 additions & 0 deletions src/simdb/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,23 @@ def get_string_option(
)
return value

def get_int_option(
self, name: str, default: Union[int, None, _NothingSentinel] = NOTHING
) -> int:
"""
Returns the value for the option with the given name from the configuration but
also ensures the resulting value is an integer.

@see get_option
@raise TypeError if the found value was not an integer
"""
value = self.get_option(name, default)
if value is not None and not isinstance(value, int):
raise TypeError(
f"Invalid type of option {name}: expected int, got {type(value)}"
)
return value

def delete_option(self, name: str) -> None:
"""
Delete the option with the given name from the configuration.
Expand Down
35 changes: 19 additions & 16 deletions src/simdb/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,6 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
self.engine: sqlalchemy.engine.Engine = create_engine(
"sqlite:///{file}".format(**kwargs)
)
with contextlib.closing(self.engine.connect()) as con:
res: sqlalchemy.engine.ResultProxy = con.execute(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT "
"LIKE 'sqlite_%';"
)
new_db = res.rowcount == -1

elif db_type == Database.DBMS.POSTGRESQL:
if "host" not in kwargs:
Expand All @@ -131,12 +125,6 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
pool_pre_ping=True,
pool_recycle=3600,
)
with contextlib.closing(self.engine.connect()) as con:
res: sqlalchemy.engine.ResultProxy = con.execute(
"SELECT * FROM pg_catalog.pg_tables WHERE schemaname = 'public';"
)
new_db = res.rowcount == 0

elif db_type == Database.DBMS.MSSQL:
if "user" not in kwargs:
raise ValueError("Missing user parameter for MSSQL database")
Expand All @@ -147,12 +135,8 @@ def __init__(self, db_type: DBMS, scopefunc=None, **kwargs) -> None:
self.engine: sqlalchemy.engine.Engine = create_engine(
"mssql+pyodbc://{user}:{password}@{dsnname}".format(**kwargs)
)
new_db = False

else:
raise ValueError("Unknown database type: " + db_type.name)
if new_db:
Base.metadata.create_all(self.engine)
Base.metadata.bind = self.engine
if scopefunc is None:

Expand Down Expand Up @@ -745,3 +729,22 @@ def get_local_db(config: Config) -> Database:
db_file.parent.mkdir(parents=True, exist_ok=True)
database = Database(Database.DBMS.SQLITE, file=db_file)
return database


def get_db(config: Config) -> Database:
db_type = config.get_option("database.type")
if db_type == "postgres":
args = config.get_section("database")
return Database(
Database.DBMS.POSTGRESQL,
**args,
)
elif db_type == "sqlite":
db_dir = appdirs.user_data_dir("simdb")
file = Path(config.get_string_option("database.file", default=None)) or Path(
db_dir, "remote.db"
)
file.parent.mkdir(parents=True, exist_ok=True)
return Database(Database.DBMS.SQLITE, file=file)
else:
raise RuntimeError(f"Unknown database type in configuration: {db_type}.")
8 changes: 8 additions & 0 deletions src/simdb/database/models/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union

from simdb.enums import IngestionStatus
from simdb.remote.models import (
FileDataList,
MetadataDataList,
Expand Down Expand Up @@ -120,6 +121,13 @@ class Status(Enum):
"Watcher", secondary=simulation_watchers, lazy="dynamic"
)

ingestion_status = Column(
sql_types.Enum(IngestionStatus),
nullable=False,
default=IngestionStatus.COMPLETED,
)
ingestion_version = Column(sql_types.Integer, nullable=False, default=0)

def __init__(
self, manifest: Union[Manifest, None], config: Optional[Config] = None
) -> None:
Expand Down
13 changes: 13 additions & 0 deletions src/simdb/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from enum import Enum


class IngestionStatus(str, Enum):
QUEUED = "queued"
COPYING = "copying"
COPIED = "copied"
VALIDATING = "validating"
VALIDATED = "validated"
COMPLETED = "completed"

COPY_FAILED = "copy_failed"
VALIDATION_FAILED = "validation_failed"
5 changes: 5 additions & 0 deletions src/simdb/imas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ def imas_files(uri: URI) -> List[Path]:
@return: a list of files which contains the IDS data for the backend specified in
the URI
"""

# Early exit for NetCDF files
if uri.scheme == "file" and uri.path and uri.path.suffix == "nc":
return [uri.path]

backend = str(uri.path)
if backend.startswith("/"):
backend = backend[1:]
Expand Down
3 changes: 3 additions & 0 deletions src/simdb/remote/apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from .v1_1 import namespaces as namespaces_v1_1
from .v1_2 import api as api_v1_2
from .v1_2 import namespaces as namespaces_v1_2
from .v1_3 import api as api_v1_3
from .v1_3 import namespaces as namespaces_v1_3


def error(message: str) -> Response:
Expand Down Expand Up @@ -144,3 +146,4 @@ def get(self, user: User):
register(api_v1, "v1", namespaces_v1)
register(api_v1_1, "v1.1", namespaces_v1_1)
register(api_v1_2, "v1.2", namespaces_v1_2)
register(api_v1_3, "v1.3", namespaces_v1_3)
28 changes: 28 additions & 0 deletions src/simdb/remote/apis/v1_3/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from flask_restx import Api

from simdb.remote.apis.files import api as file_ns
from simdb.remote.apis.metadata import api as metadata_ns
from simdb.remote.apis.watchers import api as watcher_ns
from simdb.remote.core.auth import TokenAuthenticator

from .simulations import api as sim_ns

api = Api(
title="SimDB REST API",
version="1.3",
description="SimDB REST API",
authorizations={
"basicAuth": {
"type": "basic",
},
"apiToken": {
"type": "apiKey",
"in": "header",
"name": TokenAuthenticator.TOKEN_HEADER_NAME,
},
},
security=["basicAuth", "apiToken"],
doc="/docs",
)

namespaces = [metadata_ns, watcher_ns, file_ns, sim_ns]
Loading
Loading