diff --git a/Makefile b/Makefile index de274ae5..92509bfc 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ DOCKERS_DEV = $(addprefix docker_dev_,$(SERVICES)) CGO_ENABLED ?= 0 GOARCH ?= amd64 DOCKER_IMAGE_NAME_PREFIX ?= ghcr.io/absmach -VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'unknown') +VERSION ?= $(shell git describe --abbrev=0 --tags 2>/dev/null || echo 'v0.0.0') COMMIT ?= $(shell git rev-parse HEAD) TIME ?= $(shell date +%F_%T) MOCKERY = $(GOBIN)/mockery diff --git a/README.md b/README.md index 3f6fa483..11eb2064 100644 --- a/README.md +++ b/README.md @@ -13,144 +13,50 @@ Magistrala IoT Agent is a communication, execution and software management agent for the [Magistrala][magistrala] IoT platform. It runs on edge devices and bridges local services (Node-RED, terminal) with a Magistrala deployment over MQTT. That Magistrala deployment can be local or cloud-hosted. A built-in web UI is included for local management. -## MQTT and Local Messaging - -The agent uses two messaging paths: - -- **MQTT** is used for the Magistrala-facing control and data plane. The agent connects to the MQTT broker from the rendered bootstrap profile or environment config, subscribes for commands on `m//c//req`, and publishes responses on the commands channel plus data messages on the telemetry channel. This MQTT broker can be a local Magistrala deployment or Magistrala Cloud. -- **FluxMQ over AMQP** is used for local gateway service messaging. The agent subscribes to local heartbeat messages on the FluxMQ-backed message bus so nearby services can report liveness without the agent polling them. +## Features + +- **MQTT command & control** — remote shell execution, config management, process reset over [SenML][senml] JSON via Magistrala MQTT +- **Node-RED integration** — deploy, fetch, and manage Node-RED flows over MQTT or HTTP +- **Interactive terminal** — full PTY sessions tunneled over MQTT +- **Periodic telemetry** — uptime, memory, CPU temperature, disk usage, load averages, wireless RSSI +- **Heartbeat & liveness** — self-heartbeat to Magistrala + local service tracking via AMQP +- **Downstream device management** — provision, register, and manage serial/I2C/Modbus devices +- **OTA updates** — remote binary update with SHA-256 verification +- **Health supervisor** — process watchdog with systemd integration +- **Bootstrap provisioning** — profile-based startup from Magistrala Bootstrap service ## Install ```bash git clone https://github.com/absmach/agent cd agent -``` - -Build the binary: - -```bash make all ``` The binary is written to `build/magistrala-agent`. -## Running with Docker - -The recommended way to run agent is with the provided Docker Compose stack, which also starts Node-RED, FluxMQ, and the Agent UI. +## Quick Start with Docker ### 1. Provision Magistrala resources -If you have a running Magistrala instance, provision the required client, channels, bootstrap profile/enrollment, profile bindings, and `save_senml` rule. - -Export the provisioning values first: - ```bash -export MG_AGENT_BOOTSTRAP_EXTERNAL_ID='01:6:0:sb:sa' -export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY='secret' -export MG_DOMAIN_ID= -export MG_PAT= +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID='' +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY='' +export MG_DOMAIN_ID='' +export MG_PAT='' make run_provision ``` -Use your real PAT in the shell, but do not commit it to files. The provisioning script no longer writes a runtime `config.toml`; it creates a Bootstrap Profile and Enrollment. At startup, the agent and Node-RED fetch the rendered bootstrap profile and use that as the runtime config source. - -The PAT used for provisioning must be able to create bootstrap configs, rules, clients, and channels in the target domain. In practice the provisioning flow expects scopes like: - -- `bootstrap:create` -- `rules:create` -- `clients:create` -- `clients:view` -- `clients:connect_to_channel` -- `channels:create` -- `channels:view` -- `channels:connect_client` - -all scoped to the target `domain_id`. - -The provisioning script uses sensible defaults for local Docker: - -- MQTT: `ssl://host.docker.internal:8883` -- Bootstrap API: `http://localhost:9013` - -Override them before provisioning when needed, for example: - -```bash -export MG_AGENT_BOOTSTRAP_EXTERNAL_ID= -export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY= -export MG_DOMAIN_ID= -export MG_PAT= -export MG_AGENT_MQTT_URL=ssl://messaging.magistrala.absmach.eu:8883 -export MG_AGENT_MQTT_SKIP_TLS=false -make run_provision -``` - -Using `MG_API=https://cloud.magistrala.absmach.eu/api` points provisioning at Magistrala Cloud. Setting `MG_AGENT_MQTT_URL=ssl://messaging.magistrala.absmach.eu:8883` points the agent at the cloud MQTT broker instead of the local Docker default. - -**Alternatively**, create a Client, telemetry Channel, commands Channel, Bootstrap Profile, Enrollment, profile bindings, and Rule Engine rule manually via the Magistrala UI or API, then set bootstrap runtime env vars in `docker/.env`. - -For bootstrap mode, the runtime env values are: - -```env -MG_AGENT_BOOTSTRAP_EXTERNAL_ID= -MG_AGENT_BOOTSTRAP_EXTERNAL_KEY= -MG_AGENT_BOOTSTRAP_URL=http://bootstrap:9013/clients/bootstrap -``` - -You can fetch the rendered bootstrap response directly: - -```bash -curl -s 'http://localhost:9013/clients/bootstrap/01:6:0:sb:sa' \ - -H 'accept: */*' \ - -H 'Authorization: Client secret' -``` - -The bootstrap endpoint returns a wrapper object. The agent parses the JSON string in `content`: +The PAT must have scopes for `bootstrap:create`, `rules:create`, `clients:create`, `channels:create`, and connect permissions in the target domain. See [docs/bootstrap.md](docs/bootstrap.md) for details and cloud provisioning. -```json -{ - "content": "{\"device_id\":\"\",\"external_id\":\"01:6:0:sb:sa\",\"domain_id\":\"\",\"mqtt\":{\"url\":\"ssl://host.docker.internal:8883\",\"client_id\":\"\",\"secret\":\"\"},\"telemetry\":{\"channel_id\":\"\",\"topic\":\"m//c//msg\"},\"commands\":{\"channel_id\":\"\"}}", - "client_key": "", - "client_cert": "", - "ca_cert": "" -} -``` - -Decoded, the rendered profile content looks like: - -```json -{ - "device_id": "", - "external_id": "01:6:0:sb:sa", - "domain_id": "", - "mqtt": { - "url": "ssl://host.docker.internal:8883", - "client_id": "", - "secret": "" - }, - "telemetry": { - "channel_id": "", - "topic": "m//c//msg" - }, - "commands": { - "channel_id": "" - } -} -``` - -### 2. Build the dev Docker image +### 2. Build and start ```bash make all && make dockers_dev -``` - -### 3. Start the stack - -```bash make run ``` -Starts: Agent (:9999), Node-RED (:1880), Agent UI (:3002). +Starts: Agent + UI (:9999), Node-RED (:1880). ### Stopping @@ -161,25 +67,15 @@ make clean_volumes ## Agent UI -A web-based management UI is included and served at `http://localhost:3002`. It provides: +A web-based management UI at `http://localhost:9999` provides: -- **Configuration** — view the effective runtime config (`server`, `channels`, `mqtt`, `nodered`, `log`) -- **Node-RED** — ping, get state, fetch flows, deploy flows (replaces all running flows), and add a single flow tab (non-destructive) from a local JSON file +- **Configuration** — view the effective runtime config +- **Node-RED** — ping, get state, fetch/deploy/add flows from a local JSON file - **Services** — view registered heartbeat services -- **Execute Command** — run shell commands on the edge device and see terminal-style output - -The UI is built with [Elm](https://elm-lang.org/) and served via nginx as a Docker container. - -To build the UI image: - -```bash -make dockers_dev -``` +- **Execute Command** — run shell commands on the edge device ## Running without Docker -Start FluxMQ (or use an existing Magistrala FluxMQ instance), then run the agent with bootstrap env vars: - ```bash MG_AGENT_BOOTSTRAP_EXTERNAL_ID= \ MG_AGENT_BOOTSTRAP_EXTERNAL_KEY= \ @@ -187,181 +83,42 @@ MG_AGENT_BOOTSTRAP_URL=http://localhost:9013/clients/bootstrap \ build/magistrala-agent ``` -### Config - -In the normal runtime flow, configuration is built from environment variables plus the rendered bootstrap profile. Environment variables provide local infrastructure settings, such as HTTP port, FluxMQ URL, Node-RED URL, MQTT TLS options, and bootstrap credentials. The rendered bootstrap profile provides device identity, domain ID, MQTT credentials, and telemetry/commands channel IDs. - -The legacy `config.toml` fallback still exists for local development, but bootstrap mode skips reading the file when `MG_AGENT_BOOTSTRAP_URL`, `MG_AGENT_BOOTSTRAP_EXTERNAL_ID`, and `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` are all set. - -Environment variables: - -| Variable | Description | Default | -| ---------------------------------------- | ------------------------------------------------------ | ------------------------------------ | -| `MG_AGENT_CONFIG_FILE` | Legacy fallback config file, ignored in bootstrap mode | `config.toml` | -| `MG_AGENT_LOG_LEVEL` | Log level | `info` | -| `MG_AGENT_HTTP_PORT` | Agent HTTP port | `9999` | -| `MG_AGENT_PORT` | Alias for agent HTTP port | | -| `MG_AGENT_BROKER_URL` | FluxMQ (AMQP) broker URL | `amqp://guest:guest@localhost:5682/` | -| `MG_AGENT_MQTT_URL` | MQTT broker URL | `localhost:1883` | -| `MG_AGENT_MQTT_SKIP_TLS` | Skip TLS verification for MQTT | `true` | -| `MG_AGENT_MQTT_MTLS` | Use mTLS for MQTT | `false` | -| `MG_AGENT_MQTT_CA` | CA certificate path for mTLS | `ca.crt` | -| `MG_AGENT_MQTT_CLIENT_CERT` | Client certificate path for mTLS | `client.cert` | -| `MG_AGENT_MQTT_CLIENT_KEY` | Client private key path for mTLS | `client.key` | -| `MG_AGENT_MQTT_QOS` | MQTT QoS level | `0` | -| `MG_AGENT_MQTT_RETAIN` | MQTT retain flag | `false` | -| `MG_AGENT_NODERED_URL` | Node-RED API URL | `http://localhost:1880/` | -| `MG_AGENT_HEARTBEAT_INTERVAL` | Expected heartbeat interval | `10s` | -| `MG_AGENT_TERMINAL_SESSION_TIMEOUT` | Terminal session timeout | `60s` | -| `MG_AGENT_BOOTSTRAP_URL` | Bootstrap base URL | | -| `MG_AGENT_BOOTSTRAP_EXTERNAL_ID` | Bootstrap external ID | | -| `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` | Bootstrap external key | | -| `MG_AGENT_BOOTSTRAP_RETRIES` | Bootstrap fetch retries | `5` | -| `MG_AGENT_BOOTSTRAP_RETRY_DELAY_SECONDS` | Bootstrap retry delay in seconds | `10` | -| `MG_AGENT_BOOTSTRAP_SKIP_TLS` | Skip TLS verification for bootstrap fetch | `false` | - -## MQTT Message Format - -Agent uses MQTT against the configured Magistrala MQTT broker. It subscribes to `m//c//req`. - -All messages use [SenML][senml] JSON array format: - -```json -[{ "bn": ":", "n": "", "vs": "[,]" }] -``` +## Configuration -The `n` field selects the subsystem. Supported subsystems: +Configuration comes from environment variables plus the rendered bootstrap profile. Environment variables provide local infrastructure settings (HTTP port, FluxMQ URL, Node-RED URL, MQTT TLS). The bootstrap profile provides device identity, MQTT credentials, and channel IDs. A persistent config store (`MG_AGENT_CONFIG_PATH`) holds runtime overrides applied via MQTT `config set`. -| `n` | Description | -| --------- | ------------------------------------------------- | -| `control` | Node-RED commands | -| `exec` | Execute a shell command | -| `config` | View runtime config or save export service config | -| `term` | Terminal session control | -| `nodered` | Node-RED flow management | +Key variables: -## Sending Commands +| Variable | Description | Default | +| --------------------------------- | ---------------------------------------- | ------------------------------------ | +| `MG_AGENT_HTTP_PORT` | Agent HTTP port | `9999` | +| `MG_AGENT_MQTT_URL` | MQTT broker URL | `localhost:1883` | +| `MG_AGENT_NODERED_URL` | Node-RED API URL | `http://localhost:1880/` | +| `MG_AGENT_BROKER_URL` | FluxMQ (AMQP) broker URL | `amqp://guest:guest@localhost:5682/` | +| `MG_AGENT_HEARTBEAT_INTERVAL` | Heartbeat interval | `10s` | +| `MG_AGENT_TELEMETRY_INTERVAL` | Telemetry interval (`0s` to disable) | `30s` | +| `MG_AGENT_LOG_LEVEL` | Log level | `info` | +| `MG_AGENT_BOOTSTRAP_URL` | Bootstrap base URL | | +| `MG_AGENT_BOOTSTRAP_EXTERNAL_ID` | Bootstrap external ID | | +| `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` | Bootstrap external key | | -### Execute a shell command +Per-feature env vars are documented in each feature doc below. -Commands are passed as a comma-separated string: `command,arg1,arg2`. Commands with no arguments work as-is: +## Documentation -```bash -# No-arg command -mosquitto_pub \ - -h -p 8883 --capath /etc/ssl/certs \ - -u -P --id "cmd-$(date +%s)" \ - -t "m//c//req" \ - -m '[{"bn":"req-1:", "n":"exec", "vs":"pwd"}]' - -# With arguments -mosquitto_pub \ - -h -p 8883 --capath /etc/ssl/certs \ - -u -P --id "cmd-$(date +%s)" \ - -t "m//c//req" \ - -m '[{"bn":"req-1:", "n":"exec", "vs":"ls,-la"}]' -``` +Per-feature documentation with configuration, MQTT topic maps, and copy-paste test recipes: -Commands are executed via `sh -c` so shell builtins and pipelines are supported. Each invocation is stateless; use `&&` to chain commands: `ls,-la,/tmp,&&,cat,/etc/os-release`. - -### View service config - -```bash -mosquitto_pub \ - -h -p 8883 --capath /etc/ssl/certs \ - -u -P --id "cmd-$(date +%s)" \ - -t "m//c//req" \ - -m '[{"bn":"req-1:", "n":"config", "vs":"view"}]' -``` - -Responses are published to `m//c//res`. - -## Node-RED Integration - -Agent can manage Node-RED flows running on the same device. Flows can be deployed either via the Node-RED UI directly, via the agent's HTTP API (local), or from Magistrala over MQTT. - -### Via HTTP (local) - -First, base64-encode the flow JSON: - -```bash -FLOWS=$(cat examples/nodered/speed-flow.json | base64 -w 0) -``` - -Then send it to the agent. The agent decodes the flows, patches the MQTT client ID, and forwards them to Node-RED on its behalf: - -```bash -curl -s -X POST http://localhost:9999/nodered \ - -H 'Content-Type: application/json' \ - -d "{\"command\":\"nodered-deploy\",\"flows\":\"$FLOWS\"}" -``` - -Other commands (no `flows` field needed): - -```bash -# Fetch current flows -curl -s -X POST http://localhost:9999/nodered \ - -H 'Content-Type: application/json' \ - -d '{"command":"nodered-flows"}' - -# Ping Node-RED -curl -s -X POST http://localhost:9999/nodered \ - -H 'Content-Type: application/json' \ - -d '{"command":"nodered-ping"}' - -# Get flow state -curl -s -X POST http://localhost:9999/nodered \ - -H 'Content-Type: application/json' \ - -d '{"command":"nodered-state"}' -``` - -### Via MQTT (from Magistrala) - -```bash -FLOWS=$(cat examples/nodered/speed-flow.json | base64 -w 0) - -mosquitto_pub \ - -h -p 8883 --capath /etc/ssl/certs \ - -u -P --id "deploy-$(date +%s)" \ - -t "m//c//req" \ - -m "[{\"bn\":\"req-1:\",\"n\":\"nodered\",\"vs\":\"nodered-deploy,$FLOWS\"}]" -``` - -In both cases `flows` is the flow JSON **base64-encoded**. The agent automatically patches the MQTT `clientid` inside the deployed flows to `-nr` to prevent Node-RED from conflicting with the agent's own MQTT session. - -See [docs/nodered.md](docs/nodered.md) for the full setup guide, Docker Compose stack, and provisioning instructions. - -## Heartbeat Service - -Services running on the same host can publish to `heartbeat..` to register with the agent. - -```bash -go run ./examples/publish/main.go -s amqp://guest:guest@localhost:5682/ heartbeat.myservice.sensor "" -``` - -Check registered services: - -```bash -curl -s http://localhost:9999/services -``` - -## How to Save Export Config via Agent - -Agent can push an export service config file from Magistrala to the gateway via MQTT. Bootstrap mode does not update the agent runtime config this way. - -```bash -mosquitto_pub \ - -h -p 8883 --capath /etc/ssl/certs \ - -u -P --id "cfg-$(date +%s)" \ - -t "m//c//req" \ - -m "[{\"bn\":\"req-1:\", \"n\":\"config\", \"vs\":\",\"}]" -``` - -Generate the base64 payload from a JSON export config file: - -```bash -base64 -w 0 export.json -``` +| Document | Description | +| ---------------------------- | ----------------------------------------------------------------------------------------------------------- | +| [control.md](docs/control.md) | Command dispatch, runtime config get/set/reset, token authentication, exec subsystem, test recipes | +| [nodered.md](docs/nodered.md) | Node-RED integration, flow deployment, provisioning, HTTP and MQTT management, test recipes | +| [telemetry.md](docs/telemetry.md) | Periodic uptime telemetry, payload format, runtime configuration, test recipes | +| [heartbeat.md](docs/heartbeat.md) | Self-heartbeat and service liveness tracking, interval configuration, test recipes | +| [terminal.md](docs/terminal.md) | Interactive terminal sessions over MQTT, session lifecycle, PTY management, test recipes | +| [devices.md](docs/devices.md) | Downstream device provisioning, physical interfaces, device CRUD, telemetry scheduler, test recipes | +| [bootstrap.md](docs/bootstrap.md) | Profile-based provisioning flow, environment variables, cache management, test recipes | +| [ota.md](docs/ota.md) | Over-the-air binary updates, trigger payload, download/verify/replace cycle, status reporting, test recipes | +| [health.md](docs/health.md) | Health supervisor, systemd watchdog integration, MQTT connection monitoring, health check endpoints | ## License diff --git a/docker/.env b/docker/.env index 32db698f..c17910f1 100644 --- a/docker/.env +++ b/docker/.env @@ -41,3 +41,8 @@ MG_AGENT_DEVICE_DB_PATH=/var/lib/agent/devices.db ## Config Store MG_AGENT_CONFIG_PATH=/var/lib/agent/agent-config.json + +MG_AGENT_CLIENTS_URL="http://host.docker.internal:9006" +MG_AGENT_CHANNELS_URL="http://host.docker.internal:9005" +MG_AGENT_RULES_ENGINE_URL="http://host.docker.internal:9008" +MG_PAT="" diff --git a/docker/Dockerfile b/docker/Dockerfile index d1d3edc5..3fe646da 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -33,7 +33,6 @@ RUN apk update && apk add make && \ -o build/magistrala-$SVC cmd/main.go && \ mv build/magistrala-$SVC /exe -FROM scratch -COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +FROM alpine:3.24 COPY --from=builder /exe / ENTRYPOINT ["/exe"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2e84a3e8..ac48db5d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -76,6 +76,10 @@ services: MG_AGENT_OTA_DOWNLOAD_DIR: ${MG_AGENT_OTA_DOWNLOAD_DIR} MG_AGENT_DEVICE_DB_PATH: ${MG_AGENT_DEVICE_DB_PATH} MG_AGENT_CONFIG_PATH: ${MG_AGENT_CONFIG_PATH} + MG_AGENT_CLIENTS_URL: ${MG_AGENT_CLIENTS_URL} + MG_AGENT_CHANNELS_URL: ${MG_AGENT_CHANNELS_URL} + MG_AGENT_RULES_ENGINE_URL: ${MG_AGENT_RULES_ENGINE_URL} + MG_PAT: ${MG_PAT} volumes: - /etc/ssl/certs/ca-certificates.crt:/etc/ssl/certs/ca-certificates.crt:ro - agent-data:/var/lib/agent diff --git a/docs/README.md b/docs/README.md index abffd959..93f3703a 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1 +1,15 @@ -Agent docs placeholder \ No newline at end of file +# Magistrala Agent Documentation + +Per-feature documentation with configuration, MQTT topic maps, and copy-paste test recipes. + +| Document | Description | +| ---------------------------- | ----------------------------------------------------------------------------------------------------------- | +| [heartbeat.md](heartbeat.md) | Self-heartbeat and service liveness tracking, interval configuration, test recipes | +| [telemetry.md](telemetry.md) | Periodic uptime telemetry, payload format, runtime configuration, test recipes | +| [control.md](control.md) | Command dispatch, runtime config get/set/reset, token authentication, exec subsystem, test recipes | +| [bootstrap.md](bootstrap.md) | Profile-based provisioning flow, environment variables, cache management, test recipes | +| [ota.md](ota.md) | Over-the-air binary updates, trigger payload, download/verify/replace cycle, status reporting, test recipes | +| [terminal.md](terminal.md) | Interactive terminal sessions over MQTT, session lifecycle, PTY management, test recipes | +| [nodered.md](nodered.md) | Node-RED integration, flow deployment, provisioning, HTTP and MQTT management, test recipes | +| [devices.md](devices.md) | Downstream device provisioning, physical interfaces, device CRUD, telemetry scheduler, test recipes | +| [health.md](health.md) | Health supervisor, systemd watchdog integration, MQTT connection monitoring, health check endpoints | diff --git a/docs/bootstrap.md b/docs/bootstrap.md new file mode 100644 index 00000000..228fdcd8 --- /dev/null +++ b/docs/bootstrap.md @@ -0,0 +1,191 @@ +# Bootstrap + +The bootstrap subsystem handles profile-based provisioning. At startup, the agent fetches a rendered bootstrap profile from the Magistrala Bootstrap service, which provides device identity, MQTT credentials, channel IDs, and provision configuration. The profile is cached locally so subsequent starts skip the HTTP fetch. + +## Bootstrap Flow + +1. **Agent starts** with `MG_AGENT_BOOTSTRAP_URL`, `MG_AGENT_BOOTSTRAP_EXTERNAL_ID`, and `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` set. +2. **Check cache** — if bootstrap-derived fields are already in the persistent config store and `bs_valid` is `1`, skip the HTTP fetch. +3. **Fetch profile** — HTTP GET to the bootstrap endpoint with the external ID and key. +4. **Parse content** — the bootstrap response wraps a JSON string in `content`: + ```json + { + "content": "{ \"device_id\": \"...\", \"mqtt\": { ... } }", + "client_key": "", + "client_cert": "", + "ca_cert": "" + } + ``` +5. **Merge with env config** — bootstrap fields override env defaults for `domain_id`, `channels`, and `mqtt`. +6. **Persist** — bootstrap fields are saved to the persistent config store (`agent-config.json`). +7. **Load certificates** — if mTLS is configured, client and CA certs are loaded. +8. **Connect MQTT** — the agent connects using the bootstrap-provided credentials. + +## Rendered Profile Content + +The `content` field from the bootstrap response decodes to: + +```json +{ + "device_id": "", + "external_id": "", + "domain_id": "", + "mqtt": { + "url": "ssl://host.docker.internal:8883", + "client_id": "", + "secret": "" + }, + "telemetry": { + "channel_id": "", + "topic": "m//c//msg" + }, + "commands": { + "channel_id": "" + } +} +``` + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +| ---------------------------------------- | ------------------------------- | --------------------------------------------------------------------------- | +| `MG_AGENT_BOOTSTRAP_URL` | `""` | Bootstrap service base URL (e.g. `http://bootstrap:9013/clients/bootstrap`) | +| `MG_AGENT_BOOTSTRAP_EXTERNAL_ID` | `""` | Device external ID used to look up the profile | +| `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` | `""` | Device external key (sent as `Authorization: Client `) | +| `MG_AGENT_BOOTSTRAP_RETRIES` | `5` | Number of retries when fetching bootstrap profile | +| `MG_AGENT_BOOTSTRAP_RETRY_DELAY_SECONDS` | `10` | Delay between retries in seconds | +| `MG_AGENT_BOOTSTRAP_SKIP_TLS` | `false` | Skip TLS verification for bootstrap HTTP fetch | +| `MG_AGENT_BOOTSTRAP_CACHE_PATH` | `/var/lib/agent/bootstrap.json` | Local file path for caching the bootstrap response | + +### When Bootstrap Is Active + +Bootstrap mode activates when **all three** are set: + +- `MG_AGENT_BOOTSTRAP_URL` +- `MG_AGENT_BOOTSTRAP_EXTERNAL_ID` +- `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` + +When bootstrap is active, the legacy `config.toml` file is ignored. + +### Persistent Config Store + +| Variable | Default | Description | +| ---------------------- | ------------------- | ---------------------------------------------------------- | +| `MG_AGENT_CONFIG_PATH` | `agent-config.json` | Path to the JSON file used for persistent config overrides | + +Bootstrap-derived fields persisted in the store: + +| Key | Source | +| ------------------ | ----------------------------------- | +| `domain_id` | Bootstrap profile | +| `channels_ctrl_id` | Bootstrap profile | +| `channels_data_id` | Bootstrap profile | +| `mqtt_url` | Bootstrap profile | +| `mqtt_username` | Bootstrap profile | +| `mqtt_password` | Bootstrap profile | +| `bs_valid` | Set to `"1"` after successful fetch | + +## Provisioning + +### Automated (recommended) + +```bash +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID="" +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY="" +export MG_DOMAIN_ID="" +export MG_PAT="" +make run_provision +``` + +The provisioning script creates: + +1. A Client (device) with credentials +2. Telemetry and commands Channels +3. A Bootstrap Profile and Enrollment with `external_id` and `external_key` +4. Profile bindings to the provisioned client and channels +5. A Rule Engine rule with `save_senml` output for telemetry + +### Cloud provisioning + +```bash +export MG_API="https://cloud.magistrala.absmach.eu/api" +export MG_AGENT_MQTT_URL=ssl://messaging.magistrala.absmach.eu:8883 +export MG_AGENT_MQTT_SKIP_TLS=false +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID="" +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY="" +export MG_DOMAIN_ID="" +export MG_PAT="" +make run_provision +``` + +## Test Recipes + +### Fetch the bootstrap profile manually + +```bash +curl -s 'http://localhost:9013/clients/bootstrap/' \ + -H 'accept: */*' \ + -H 'Authorization: Client ' +``` + +**Expected response:** + +```json +{ + "id": "fa846d56-3100-44aa-8385-3a88cb437a5a", + "content": "{\n \"commands\": {\n \"channel_id\": \"bc9a0af7-6d0f-4806-aa5a-61d68c0a7cf7\"\n },\n \"device_id\": \"fa846d56-3100-44aa-8385-3a88cb437a5a\",\n \"domain_id\": \"e9692c28-b730-4797-8a15-2e25c08f9641\",\n \"external_id\": \"019eb690777d7452ba898a66f5cc9cb8\",\n \"mqtt\": {\n \"client_id\": \"ffec2491-0de1-4051-9e75-ad2e2d241627\",\n \"secret\": \"30c775d7-3504-42c6-976c-52c02474bf2f\",\n \"url\": \"ssl://host.docker.internal:8883\"\n },\n \"provision\": {\n \"channels_url\": \"http://channels:9005\",\n \"clients_url\": \"http://clients:9006\",\n \"rules_engine_url\": \"http://rules:9008\",\n \"token\": \"pat_TurQa8bRR72vtZguCtIIe8ZTeaSkqkinkhLxSqPo7bw=_PoOG@UuEadfD!F7TcWYzsDKSxLB%3mzlh1M\\u0026MmLIky0M8A2Ui9f9J^4DuzZ@O0rjCA-cvgjbuFjOofOwreHL-j\\u0026CcgffH7FzwoDC\"\n },\n \"telemetry\": {\n \"channel_id\": \"b465a688-c1ca-417d-a36f-71f6f1be2409\",\n \"topic\": \"\\u003cno value\\u003e\"\n }\n}" +} +``` + +### Force a bootstrap re-fetch at runtime + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,bs_valid,0"}]' +``` + +This sets `bs_valid` to `0` and deletes the cached bootstrap profile. On the **next restart**, the agent will re-fetch the profile from the bootstrap service. + +### Check bootstrap cache status + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"get,bs_valid"}]' +``` + +**Response:** `1` (valid cache) or `0` (cache invalidated). + +### Run agent without Docker + +```bash +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID="" +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY="" +export MG_AGENT_BOOTSTRAP_URL="http://localhost:9013/clients/bootstrap" +build/magistrala-agent +``` + +### Verify agent startup logs + +After a successful bootstrap fetch, the agent logs: + +```json +{"level":"INFO","msg":"Client connected","client_name":""} +{"level":"INFO","msg":"Agent service started","port":"9999"} +``` + +If bootstrap data is already cached: + +```json +{ + "level": "INFO", + "msg": "Bootstrap data already present, skipping bootstrap fetch" +} +``` diff --git a/docs/control.md b/docs/control.md new file mode 100644 index 00000000..c7e22b54 --- /dev/null +++ b/docs/control.md @@ -0,0 +1,333 @@ +# Control — Command Dispatch, Config, and Token Auth + +The control subsystem handles inbound MQTT commands from Magistrala, dispatches them to the correct handler, and returns responses on the control response channel. It also provides runtime configuration management (`get`/`set`/`reset`) and optional token-based command authentication. + +## Overview + +All commands are sent as [SenML][senml] JSON arrays to the **commands channel request topic**. The agent decodes the first record's `n` field to determine the subsystem, then routes to the appropriate handler. Responses are published to the **commands channel response topic**. + +### Command Subsystems + +| `n` value | Handler | Description | +| --------- | ------------- | --------------------------------------------------------------- | +| `exec` | Execute | Run an allowlisted shell command | +| `config` | ServiceConfig | View services, get/set/reset runtime config, save export config | +| `service` | ServiceConfig | Alias for `config` — same handler | +| `control` | Control | Node-RED management commands | +| `term` | Terminal | Open/close/write interactive terminal sessions | +| `nodered` | NodeRed | Node-RED flow operations | +| `ping` | Ping | Publish an immediate heartbeat | +| `reset` | Reset | Graceful shutdown and process restart | +| `ota` | OTA | Over-the-air binary update | +| `devices` | DeviceManager | Downstream device CRUD | + +## Message Format + +### Request (cloud → agent) + +**Topic:** `m//c//req` + +```json +[{ "bn": ":", "n": "", "vs": "[,]" }] +``` + +| Field | Description | +| ----- | ----------------------------------------------------------------- | +| `bn` | Request UUID followed by `:` (used to correlate request/response) | +| `n` | Subsystem name (see table above) | +| `vs` | Comma-delimited command and arguments | + +### Response (agent → cloud) + +**Topic:** `m//c//res` + +```json +[ + { + "bn": ":", + "n": "", + "vs": "", + "t": 1749552000.0 + } +] +``` + +## Token Authentication + +When `MG_AGENT_COMMAND_SECRET` is set to a non-empty string, **all inbound MQTT commands** (except service heartbeats) must include a `token` record in the SenML pack. The agent uses constant-time comparison to prevent timing attacks. + +### Enable token auth + +```bash +export MG_AGENT_COMMAND_SECRET="my-secret-token" +``` + +### Send an authenticated command + +```json +[ + { "bn": "req-1:", "n": "exec", "vs": "pwd" }, + { "n": "token", "vs": "my-secret-token" } +] +``` + +When the secret is configured and the token is missing or does not match, the agent logs: + +``` +Command rejected: invalid or missing token +``` + +and silently drops the message (no response is published). + +### Set the command secret at runtime + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,command_secret,my-secret-token"}]' +``` + +> **Note:** The `get` command returns `REDACTED` for `command_secret` to avoid leaking the secret over MQTT. + +## Runtime Configuration (config subsystem) + +The `config` command provides runtime management of agent parameters without restart. + +### Settable Keys + +| Key | Format | Description | +| -------------------------- | ------------------------------------------ | ----------------------------------- | +| `log_level` | `debug`, `info`, `warn`, `error` | Log verbosity | +| `heartbeat_interval` | Go duration (e.g. `30s`, `1m`) | Self-heartbeat period; minimum `1s` | +| `telemetry_interval` | Go duration (`1s`–`1h`) or `0s` to disable | Telemetry publish period | +| `terminal_session_timeout` | Go duration (e.g. `60s`, `5m`) | Terminal session idle timeout | +| `command_secret` | Any non-empty string | Token for MQTT command auth | +| `bs_valid` | `0` or `1` | Bootstrap cache validity flag | +| `mqtt_password` | Any non-empty string | MQTT broker password (write-only) | +| `provision_token` | Any non-empty string | Provisioning API token (write-only) | + +> **Note:** `mqtt_password` and `provision_token` are credential keys. They can be `set` but `get` returns `"not_allowed"` to avoid leaking secrets over MQTT. + +### Commands + +#### View registered services + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"view"}]' +``` + +Response will be something like this: + +Channel `m//c//res` + +```json +[ + { + "bn": "req-1", + "n": "view", + "t": 1781191691.7735436, + "vs": "[{\"name\":\"nodered\",\"last_seen\":\"2026-06-11T15:28:08.943017256Z\",\"status\":\"online\",\"type\":\"nodered\",\"terminal\":0}]" + } +] +``` + +#### Get a config value + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"get,log_level"}]' +``` + +**Response:** + +```json +[{ "bn": "req-1", "n": "get", "t": 1781192457.305233, "vs": "debug" }] +``` + +#### Set a config value + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,log_level,debug"}]' +``` + +**Response:** + +```json +[{ "bn": "req-1", "n": "set", "t": 1781192515.7082806, "vs": "ok" }] +``` + +#### Reset a config value to startup default + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"reset,log_level"}]' +``` + +#### Invalidate bootstrap cache + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,bs_valid,0"}]' +``` + +Setting `bs_valid` to `0` deletes the cached bootstrap profile. On next restart, the agent re-fetches from the bootstrap server. + +## Execute Commands (exec subsystem) + +The `exec` subsystem runs allowlisted shell commands on the agent host. + +### Allowlisted Commands + +``` +cat cd curl date df echo env false free hostname id +ifconfig ip journalctl ls netstat ping printf ps pwd +ss systemctl true uname uptime who +``` + +### Execute a command + +```bash +# No arguments +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cmd-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"exec", "vs":"pwd"}]' + +# With arguments (comma-separated) +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cmd-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"exec", "vs":"ls,-la"}]' +``` + +Commands are executed directly (not via a shell), so shell operators like `&&`, `||`, `|`, and `>` are not supported. Each command and its arguments are comma-separated: `ls,-la,/tmp`. + +**Response (on control response topic):** + +```json +[{ "bn": "req-1", "bt": 1781192628.4707563, "n": "pwd", "vs": "/\n" }] +``` + +```json +[ + { + "bn": "req-1", + "bt": 1781192641.2791781, + "n": "ls -la", + "vs": "total 22096\ndrwxr-xr-x 1 root root 6 Jun 11 15:39 .\ndrwxr-xr-x 1 root root 6 Jun 11 15:39 ..\n-rwxr-xr-x 1 root root 0 Jun 11 15:39 .dockerenv\ndrwxr-xr-x 1 root root 858 Dec 17 07:03 bin\ndrwxr-xr-x 5 root root 340 Jun 11 15:39 dev\ndrwxr-xr-x 1 root root 56 Jun 11 15:39 etc\n-rwxr-xr-x 1 root root 22622370 Jun 11 15:39 exe\ndrwxr-xr-x 1 root root 0 Dec 17 07:03 home\ndrwxr-xr-x 1 root root 146 Dec 17 07:03 lib\ndrwxr-xr-x 1 root root 28 Dec 17 07:03 media\ndrwxr-xr-x 1 root root 0 Dec 17 07:03 mnt\ndrwxr-xr-x 1 root root 0 Dec 17 07:03 opt\ndr-xr-xr-x 1073 root root 0 Jun 11 15:39 proc\ndrwx------ 1 root root 0 Dec 17 07:03 root\ndrwxr-xr-x 1 root root 8 Dec 17 07:03 run\ndrwxr-xr-x 1 root root 810 Dec 17 07:03 sbin\ndrwxr-xr-x 1 root root 0 Dec 17 07:03 srv\ndr-xr-xr-x 13 root root 0 Jun 11 15:39 sys\ndrwxrwxrwt 1 root root 0 Dec 17 07:03 tmp\ndrwxr-xr-x 1 root root 40 Dec 17 07:03 usr\ndrwxr-xr-x 1 root root 6 Dec 17 07:03 var\n" + } +] +``` + +> **Note:** `cd` is handled specially — it changes the agent's internal working directory, so subsequent `exec` commands use the new directory. + +## Save Export Config + +Push an export service config file to the gateway: + +```bash +# Encode the config file +CONTENT=$(base64 -w 0 export.json) + +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m "[{\"bn\":\"req-1:\", \"n\":\"config\", \"vs\":\"save,export,/path/to/config.toml,$CONTENT\"}]" +``` + +## Reset (process restart) + +The `reset` command performs a graceful shutdown and then replaces the running process in-place via `syscall.Exec()`. The agent supports multiple reset modes: + +| Mode | Behavior | +| ----------- | ----------------------------------------------------------------------------------------- | +| `graceful` | Send goodbye heartbeat, stop service tickers, close terminals, disconnect MQTT, then exec | +| `immediate` | Minimal cleanup, quick MQTT disconnect (100ms), then exec | +| `now` | Alias for `immediate` | +| `watchdog` | Save reset reason and delegate to the health supervisor (no exec) | + +If no mode is specified, `graceful` is used by default. + +> **Warning:** `graceful`, `immediate`, and `now` modes restart the agent process immediately. Use with caution in production. + +### Via MQTT + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "reset-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"reset","vs":"graceful"}]' +``` + +With token auth (when `command_secret` is set): + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "reset-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"reset","vs":"immediate"},{"n":"token","vs":"my-secret-token"}]' +``` + +### Via HTTP + +```bash +curl -s -X POST http://localhost:9999/reset \ + -H 'Content-Type: application/json' \ + -d '{"mode":"graceful"}' +``` + +**Response (HTTP 202 Accepted):** + +```json +{ "service": "agent", "response": "reset", "mode": "graceful" } +``` + +### Goodbye Heartbeat + +On a graceful reset, the agent publishes a goodbye heartbeat before disconnecting: + +```json +[ + { "bn": "agent:", "n": "service_type", "vs": "agent" }, + { "n": "heartbeat", "vb": false } +] +``` + +This allows downstream consumers to detect the agent going offline without waiting for a timeout. + +### Reset Reason + +The reset reason (`graceful`, `immediate`, `watchdog`) is persisted in the config store key `reset_reason` before the process exits. On the next start, the agent (or external tooling) can read this value to determine why the previous instance exited. + +## Topic Map + +| Direction | Topic | QoS | Description | +| ------------- | --------------------------------- | --- | ---------------- | +| Cloud → Agent | `m//c//req` | 1 | Command request | +| Agent → Cloud | `m//c//res` | 1 | Command response | + +[senml]: https://tools.ietf.org/html/rfc8428 diff --git a/docs/devices.md b/docs/devices.md new file mode 100644 index 00000000..9243366c --- /dev/null +++ b/docs/devices.md @@ -0,0 +1,326 @@ +# Device Manager — Downstream Device Provisioning and Management + +The device manager subsystem allows the agent to provision, register, and manage downstream devices connected via physical interfaces (serial, I2C, Modbus RTU/TCP, USB). Each device is provisioned as a Magistrala client with its own channel, and data from the device is forwarded to Magistrala over MQTT. + +## Architecture + +The device manager is split into three layers: + +1. **Service layer** (`DeviceManager()` in `service.go`) — handles MQTT command dispatch for 9 subcommands +2. **Manager** (`pkg/devicemgr/manager.go`) — provisioning logic (create client, channel, connect, optional rule) +3. **Store** (`pkg/devicemgr/store.go`) — BoltDB-backed persistent device registry + +## Supported Interface Types + +| Type | Address Format | Description | +| ------------ | ------------------ | ----------------------- | +| `serial` | `/dev/ttyS0` | Serial / RS-232 | +| `usb` | `/dev/ttyACM0` | USB serial | +| `modbus-rtu` | `/dev/ttyS0` | Modbus RTU over serial | +| `modbus-tcp` | `192.168.1.10:502` | Modbus TCP over network | +| `i2c` | `/dev/i2c-1` | Linux I2C bus | +| `ble` | — | Not yet implemented | +| `zigbee` | — | Not yet implemented | + +## Subcommands + +All device commands are sent via the `devices` dispatch name on the commands channel: + +| Subcommand | Format | Description | +| ---------- | ------------------------------------------------------------------------------------------------------- | --------------------------------------------- | +| `list` | `devices,list` | Returns JSON array of all registered devices | +| `add` | `devices,{"name":"...","external_id":"...","external_key":"...","iface_type":"...","iface_addr":"..."}` | Provision and register a new device | +| `remove` | `devices,remove,` | Deregister and remove a device | +| `get` | `devices,get,` | Returns JSON for one device | +| `seen` | `devices,seen,` | Mark device as active / update last-seen time | +| `open` | `devices,open,` | Open the physical interface for the device | +| `close` | `devices,close,` | Close the physical interface | +| `read` | `devices,read,,` | Read n bytes from device, reply as hex string | +| `write` | `devices,write,,` | Write hex-encoded bytes to the device | + +## Provisioning Flow + +When `add` is called, the agent: + +1. Creates a Magistrala **Client** via the Clients API +2. Creates a Magistrala **Channel** via the Channels API +3. **Connects** the client to the channel (publish + subscribe) +4. Optionally creates a **save_senml rule** via the Rules Engine API (if `MG_AGENT_RULES_ENGINE_URL` is configured) +5. Saves the device to the local **BoltDB store** + +If any step fails, the agent rolls back all previously created resources (client, channel) before returning the error. + +The device's Magistrala credentials (client ID/key) and channel ID are persisted locally so the agent can reconnect on restart. + +## Device Telemetry Scheduler + +When a device has a valid channel ID, the agent launches a background goroutine that: + +1. Creates a dedicated MQTT connection using the device's credentials (client ID as both MQTT client ID and username, device secret as password) +2. Opens the physical interface +3. Reads data in a loop (4096-byte buffer) +4. Publishes raw data to `m//c//msg` + +Reconnection uses exponential backoff (1s to 30s). TLS settings are inherited from the gateway's MQTT configuration. + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +| --------------------------- | ------- | --------------------------------------------------- | +| `MG_AGENT_DEVICE_DB_PATH` | | Path to the BoltDB database file | +| `MG_AGENT_PROVISION_URL` | | Base URL for Magistrala provisioning API | +| `MG_AGENT_PROVISION_TOKEN` | | Token for provisioning API authentication | +| `MG_AGENT_RULES_ENGINE_URL` | | URL for the Rules Engine (optional, for auto-rules) | + +## Topic Map + +| Direction | Topic | QoS | Description | +| ------------- | --------------------------------- | --- | ---------------------- | +| Cloud → Agent | `m//c//req` | 1 | Device command request | +| Agent → Cloud | `m//c//res` | 1 | Command response | +| Agent → Cloud | `m//c//msg` | 0 | Device telemetry data | + +## MQTT Test Recipes + +Subscribe to command responses before sending commands: + +```bash +mosquitto_sub \ + -h localhost -p 1883 \ + -u -P \ + -t "m//c//res" \ + -v +``` + +### List all devices + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"list"}]' +``` + +**Response:** + +```json +[{ "bn": "req-1", "n": "list", "t": 1781259205.3925076, "vs": "null" }] +``` + +### Add a device + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"add,{\"name\":\"temp-sensor\",\"external_id\":\"ext-001\",\"external_key\":\"ext-key-001\",\"iface_type\":\"serial\",\"iface_addr\":\"/dev/ttyS0\"}"}]' +``` + +**Response:** + +```json +[ + { + "bn": "req-1", + "n": "add", + "t": 1781259547.2528343, + "vs": "{\"id\":\"63bdb473-02e6-457b-bb9a-773a18ab40a7\",\"key\":\"ext-key-001\",\"channel_id\":\"e90c8f2d-8063-4762-971a-f3628460423f\",\"interface_type\":\"serial\",\"interface_addr\":\"/dev/ttyS0\",\"name\":\"temp-sensor\",\"active\":false,\"last_seen\":\"0001-01-01T00:00:00Z\"}" + } +] +``` + +### Get a specific device + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"get,"}]' +``` + +### Remove a device + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"remove,"}]' +``` + +### Mark a device as seen + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"seen,"}]' +``` + +### Open a device interface + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"open,"}]' +``` + +### Close a device interface + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"close,"}]' +``` + +### Read bytes from a device + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"read,,64"}]' +``` + +**Response:** + +```json +[{"bn":"req-1:","n":"read","vs":"48656c6c6f20576f726c64","t":...}] +``` + +### Write bytes to a device + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"write,,48656c6c6f"}]' +``` + +**Response:** + +```json +[{"bn":"req-1:","n":"write","vs":"5","t":...}] +``` + +### Subscribe to device telemetry + +```bash +mosquitto_sub \ + -h localhost -p 1883 \ + -u -P \ + -t "m//c//msg" \ + -v +``` + +## Testing with Virtual Serial Ports + +When running the agent in a Docker container (Alpine-based), you can use `socat` to create virtual serial port pairs for testing without real hardware. + +### Set up virtual serial ports + +Inside the agent container: + +```bash +apk add socat + +# Create a linked virtual serial port pair +socat -d -d pty,raw,echo=0,link=/dev/ttyV0 pty,raw,echo=0,link=/dev/ttyV1 & +``` + +The agent connects to `/dev/ttyV0` and you interact with `/dev/ttyV1`. + +### Add a virtual device + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"add,{\"name\":\"dummy-sensor\",\"external_id\":\"ext-dummy\",\"external_key\":\"ext-dummy-key\",\"iface_type\":\"serial\",\"iface_addr\":\"/dev/ttyV0\"}"}]' +``` + +### Send data to the device (from the container) + +```bash +echo "HELLO" > /dev/ttyV1 +``` + +### Open a device interface + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"open,"}]' +``` + +### Read data from the device (via MQTT) + +```bash +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"read,,64"}]' +``` + +### Write data to the device (via MQTT) + +```bash +# "HELLO" in hex = 48454c4c4f +mosquitto_pub \ + -h localhost -p 1883 \ + -u -P --id "dev-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"devices","vs":"write,,48454c4c4f"}]' +``` + +Verify on the other end: + +```bash +cat /dev/ttyV1 +``` + +### Test management flow without hardware + +The `list`, `add`, `get`, `remove`, and `seen` subcommands work without physical hardware. You can test these via the HTTP API: + +```bash +# List devices +curl -s http://localhost:9999/devices | jq . + +# Add a device +curl -s -X POST http://localhost:9999/devices \ + -H 'Content-Type: application/json' \ + -d '{ + "name": "temp-sensor", + "external_id": "ext-001", + "external_key": "ext-key-001", + "interface_type": "serial", + "interface_address": "/dev/ttyS0" + }' +``` + +## HTTP API + +| Method | Path | Description | +| -------- | -------------------- | ------------------- | +| `GET` | `/devices` | List all devices | +| `GET` | `/devices/{id}` | Get a device | +| `POST` | `/devices` | Add a device | +| `DELETE` | `/devices/{id}` | Remove a device | +| `POST` | `/devices/{id}/seen` | Mark device as seen | diff --git a/docs/health.md b/docs/health.md new file mode 100644 index 00000000..b66cbae9 --- /dev/null +++ b/docs/health.md @@ -0,0 +1,242 @@ +# Health Supervisor and Systemd Watchdog + +The health supervisor periodically checks the agent's subsystem health and triggers a process restart if the agent remains unhealthy for too long. When running under systemd, it also sends `WATCHDOG=1` notifications. + +## Health Checkers + +| Checker | What it checks | Failure condition | +| ------- | ---------------------- | ------------------------------- | +| `mqtt` | MQTT broker connection | `client.IsConnected() == false` | + +Additional checkers can be registered at startup via `supervisor.Register()`. + +## Behavior + +1. The supervisor runs a periodic ticker at the configured interval +2. Each tick, all registered checkers are polled +3. If any checker reports unhealthy, the unhealthy timer starts +4. If the agent stays unhealthy longer than the timeout, the process is restarted via `syscall.Exec()` +5. If all checkers recover before the timeout, the unhealthy timer resets + +## Systemd Watchdog Integration + +When the `NOTIFY_SOCKET` environment variable is set (indicating the agent is running under systemd with `WatchdogSec` configured): + +- The agent sends periodic `WATCHDOG=1` notifications via the unix datagram socket +- Notifications are only sent when the agent is healthy +- If the agent becomes unhealthy and stops notifying, systemd will kill and restart the process + +### Example systemd unit + +```ini +[Unit] +Description=Magistrala Agent + +[Service] +Type=notify +ExecStart=/usr/local/bin/agent +WatchdogSec=30 +Restart=always + +[Install] +WantedBy=multi-user.target +``` + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +| ---------------------------- | ------- | ----------------------------------------------- | +| `MG_AGENT_WATCHDOG_INTERVAL` | `0` | Health check interval; `0` disables supervision | +| `MG_AGENT_WATCHDOG_TIMEOUT` | `60s` | How long unhealthy before triggering restart | + +## HTTP Endpoints + +### Health check + +```bash +curl -s http://localhost:9999/health | jq . +``` + +**Response:** + +```json +{ + "status": "pass", + "version": "0.0.0", + "commit": "ffffffff", + "description": "agent service", + "build_time": "1970-01-01_00:00:00", + "instance_id": "" +} +``` + +### Prometheus metrics + +```bash +curl -s http://localhost:9999/metrics +``` + +**Response:** + +```txt +# HELP agent_api_request_count Number of requests received. +# TYPE agent_api_request_count counter +agent_api_request_count{method="command_secret"} 6 +agent_api_request_count{method="config"} 8 +agent_api_request_count{method="terminal"} 6 +agent_api_request_count{method="update_liveness"} 315 +# HELP agent_api_request_latency_microseconds Total duration of requests in microseconds. +# TYPE agent_api_request_latency_microseconds summary +agent_api_request_latency_microseconds{method="command_secret",quantile="0.5"} 4.8131e-05 +agent_api_request_latency_microseconds{method="command_secret",quantile="0.9"} 9.6383e-05 +agent_api_request_latency_microseconds{method="command_secret",quantile="0.99"} 9.6383e-05 +agent_api_request_latency_microseconds_sum{method="command_secret"} 0.00033213700000000004 +agent_api_request_latency_microseconds_count{method="command_secret"} 6 +agent_api_request_latency_microseconds{method="config",quantile="0.5"} 9.6171e-05 +agent_api_request_latency_microseconds{method="config",quantile="0.9"} 0.000184515 +agent_api_request_latency_microseconds{method="config",quantile="0.99"} 0.000184515 +agent_api_request_latency_microseconds_sum{method="config"} 0.0008824780000000001 +agent_api_request_latency_microseconds_count{method="config"} 8 +agent_api_request_latency_microseconds{method="terminal",quantile="0.5"} 4.3531e-05 +agent_api_request_latency_microseconds{method="terminal",quantile="0.9"} 0.000945491 +agent_api_request_latency_microseconds{method="terminal",quantile="0.99"} 0.000945491 +agent_api_request_latency_microseconds_sum{method="terminal"} 0.0012051910000000002 +agent_api_request_latency_microseconds_count{method="terminal"} 6 +agent_api_request_latency_microseconds{method="update_liveness",quantile="0.5"} 1.2424e-05 +agent_api_request_latency_microseconds{method="update_liveness",quantile="0.9"} 5.322e-05 +agent_api_request_latency_microseconds{method="update_liveness",quantile="0.99"} 0.000103354 +agent_api_request_latency_microseconds_sum{method="update_liveness"} 0.007146967999999997 +agent_api_request_latency_microseconds_count{method="update_liveness"} 315 +# HELP go_gc_duration_seconds A summary of the wall-time pause (stop-the-world) duration in garbage collection cycles. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 5.2469e-05 +go_gc_duration_seconds{quantile="0.25"} 5.8792e-05 +go_gc_duration_seconds{quantile="0.5"} 0.000137268 +go_gc_duration_seconds{quantile="0.75"} 0.000150681 +go_gc_duration_seconds{quantile="1"} 0.000182847 +go_gc_duration_seconds_sum 0.000700169 +go_gc_duration_seconds_count 6 +# HELP go_gc_gogc_percent Heap size target percentage configured by the user, otherwise 100. This value is set by the GOGC environment variable, and the runtime/debug.SetGCPercent function. Sourced from /gc/gogc:percent. +# TYPE go_gc_gogc_percent gauge +go_gc_gogc_percent 100 +# HELP go_gc_gomemlimit_bytes Go runtime memory limit configured by the user, otherwise math.MaxInt64. This value is set by the GOMEMLIMIT environment variable, and the runtime/debug.SetMemoryLimit function. Sourced from /gc/gomemlimit:bytes. +# TYPE go_gc_gomemlimit_bytes gauge +go_gc_gomemlimit_bytes 9.223372036854776e+18 +# HELP go_goroutines Number of goroutines that currently exist. +# TYPE go_goroutines gauge +go_goroutines 24 +# HELP go_info Information about the Go environment. +# TYPE go_info gauge +go_info{version="go1.26.4-X:nodwarf5"} 1 +# HELP go_memstats_alloc_bytes Number of bytes allocated in heap and currently in use. Equals to /memory/classes/heap/objects:bytes. +# TYPE go_memstats_alloc_bytes gauge +go_memstats_alloc_bytes 1.961496e+06 +# HELP go_memstats_alloc_bytes_total Total number of bytes allocated in heap until now, even if released already. Equals to /gc/heap/allocs:bytes. +# TYPE go_memstats_alloc_bytes_total counter +go_memstats_alloc_bytes_total 6.99192e+06 +# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table. Equals to /memory/classes/profiling/buckets:bytes. +# TYPE go_memstats_buck_hash_sys_bytes gauge +go_memstats_buck_hash_sys_bytes 1.452815e+06 +# HELP go_memstats_frees_total Total number of heap objects frees. Equals to /gc/heap/frees:objects + /gc/heap/tiny/allocs:objects. +# TYPE go_memstats_frees_total counter +go_memstats_frees_total 50935 +# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata. Equals to /memory/classes/metadata/other:bytes. +# TYPE go_memstats_gc_sys_bytes gauge +go_memstats_gc_sys_bytes 3.395312e+06 +# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and currently in use, same as go_memstats_alloc_bytes. Equals to /memory/classes/heap/objects:bytes. +# TYPE go_memstats_heap_alloc_bytes gauge +go_memstats_heap_alloc_bytes 1.961496e+06 +# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used. Equals to /memory/classes/heap/released:bytes + /memory/classes/heap/free:bytes. +# TYPE go_memstats_heap_idle_bytes gauge +go_memstats_heap_idle_bytes 3.424256e+06 +# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use. Equals to /memory/classes/heap/objects:bytes + /memory/classes/heap/unused:bytes +# TYPE go_memstats_heap_inuse_bytes gauge +go_memstats_heap_inuse_bytes 3.85024e+06 +# HELP go_memstats_heap_objects Number of currently allocated objects. Equals to /gc/heap/objects:objects. +# TYPE go_memstats_heap_objects gauge +go_memstats_heap_objects 11786 +# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS. Equals to /memory/classes/heap/released:bytes. +# TYPE go_memstats_heap_released_bytes gauge +go_memstats_heap_released_bytes 2.490368e+06 +# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system. Equals to /memory/classes/heap/objects:bytes + /memory/classes/heap/unused:bytes + /memory/classes/heap/released:bytes + /memory/classes/heap/free:bytes. +# TYPE go_memstats_heap_sys_bytes gauge +go_memstats_heap_sys_bytes 7.274496e+06 +# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection. +# TYPE go_memstats_last_gc_time_seconds gauge +go_memstats_last_gc_time_seconds 1.7812584662391562e+09 +# HELP go_memstats_mallocs_total Total number of heap objects allocated, both live and gc-ed. Semantically a counter version for go_memstats_heap_objects gauge. Equals to /gc/heap/allocs:objects + /gc/heap/tiny/allocs:objects. +# TYPE go_memstats_mallocs_total counter +go_memstats_mallocs_total 62721 +# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures. Equals to /memory/classes/metadata/mcache/inuse:bytes. +# TYPE go_memstats_mcache_inuse_bytes gauge +go_memstats_mcache_inuse_bytes 36736 +# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system. Equals to /memory/classes/metadata/mcache/inuse:bytes + /memory/classes/metadata/mcache/free:bytes. +# TYPE go_memstats_mcache_sys_bytes gauge +go_memstats_mcache_sys_bytes 48216 +# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures. Equals to /memory/classes/metadata/mspan/inuse:bytes. +# TYPE go_memstats_mspan_inuse_bytes gauge +go_memstats_mspan_inuse_bytes 167680 +# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system. Equals to /memory/classes/metadata/mspan/inuse:bytes + /memory/classes/metadata/mspan/free:bytes. +# TYPE go_memstats_mspan_sys_bytes gauge +go_memstats_mspan_sys_bytes 179520 +# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place. Equals to /gc/heap/goal:bytes. +# TYPE go_memstats_next_gc_bytes gauge +go_memstats_next_gc_bytes 4.248298e+06 +# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations. Equals to /memory/classes/other:bytes. +# TYPE go_memstats_other_sys_bytes gauge +go_memstats_other_sys_bytes 2.155889e+06 +# HELP go_memstats_stack_inuse_bytes Number of bytes obtained from system for stack allocator in non-CGO environments. Equals to /memory/classes/heap/stacks:bytes. +# TYPE go_memstats_stack_inuse_bytes gauge +go_memstats_stack_inuse_bytes 1.114112e+06 +# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator. Equals to /memory/classes/heap/stacks:bytes + /memory/classes/os-stacks:bytes. +# TYPE go_memstats_stack_sys_bytes gauge +go_memstats_stack_sys_bytes 1.114112e+06 +# HELP go_memstats_sys_bytes Number of bytes obtained from system. Equals to /memory/classes/total:byte. +# TYPE go_memstats_sys_bytes gauge +go_memstats_sys_bytes 1.562036e+07 +# HELP go_sched_gomaxprocs_threads The current runtime.GOMAXPROCS setting, or the number of operating system threads that can execute user-level Go code simultaneously. Sourced from /sched/gomaxprocs:threads. +# TYPE go_sched_gomaxprocs_threads gauge +go_sched_gomaxprocs_threads 16 +# HELP go_threads Number of OS threads created. +# TYPE go_threads gauge +go_threads 15 +# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. +# TYPE process_cpu_seconds_total counter +process_cpu_seconds_total 0.23 +# HELP process_max_fds Maximum number of open file descriptors. +# TYPE process_max_fds gauge +process_max_fds 524287 +# HELP process_network_receive_bytes_total Number of bytes received by the process over the network. +# TYPE process_network_receive_bytes_total counter +process_network_receive_bytes_total 113277 +# HELP process_network_transmit_bytes_total Number of bytes sent by the process over the network. +# TYPE process_network_transmit_bytes_total counter +process_network_transmit_bytes_total 70609 +# HELP process_open_fds Number of open file descriptors. +# TYPE process_open_fds gauge +process_open_fds 11 +# HELP process_resident_memory_bytes Resident memory size in bytes. +# TYPE process_resident_memory_bytes gauge +process_resident_memory_bytes 2.4420352e+07 +# HELP process_start_time_seconds Start time of the process since unix epoch in seconds. +# TYPE process_start_time_seconds gauge +process_start_time_seconds 1.78125796441e+09 +# HELP process_virtual_memory_bytes Virtual memory size in bytes. +# TYPE process_virtual_memory_bytes gauge +process_virtual_memory_bytes 1.314848768e+09 +# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes. +# TYPE process_virtual_memory_max_bytes gauge +process_virtual_memory_max_bytes 1.8446744073709552e+19 +# HELP promhttp_metric_handler_requests_in_flight Current number of scrapes being served. +# TYPE promhttp_metric_handler_requests_in_flight gauge +promhttp_metric_handler_requests_in_flight 1 +# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code. +# TYPE promhttp_metric_handler_requests_total counter +promhttp_metric_handler_requests_total{code="200"} 1 +promhttp_metric_handler_requests_total{code="500"} 0 +promhttp_metric_handler_requests_total{code="503"} 0 +``` + +Exposes Prometheus counters and latency histograms for all service methods. diff --git a/docs/heartbeat.md b/docs/heartbeat.md new file mode 100644 index 00000000..7bcf6d0a --- /dev/null +++ b/docs/heartbeat.md @@ -0,0 +1,167 @@ +# Heartbeat + +The heartbeat subsystem tracks liveness of both the agent itself and services running on the same host. + +## Overview + +Two heartbeat paths exist: + +| Path | Transport | Purpose | +| --------------------- | --------------------- | --------------------------------------------------------------- | +| **Self-heartbeat** | MQTT → Magistrala | Agent publishes its own status periodically | +| **Service heartbeat** | FluxMQ (AMQP) → local | Co-located services register liveness via the local message bus | + +The agent also accepts an MQTT `ping` command that publishes an immediate heartbeat without waiting for the next interval. + +On a graceful reset, the agent publishes a **goodbye heartbeat** with `heartbeat: false` before disconnecting, so downstream consumers can detect the agent going offline without waiting for a timeout. + +## Self-Heartbeat + +The agent publishes a SenML heartbeat to the telemetry channel on startup and at every interval: + +**Topic:** `m//c//gateway/heartbeat` + +**Payload schema:** + +```json +[ + { "bn": "agent:", "n": "service_type", "vs": "agent" }, + { "n": "heartbeat", "vb": true }, + { "n": "fw_version", "vs": "0.0.0" }, + { "n": "uptime", "u": "s", "v": 123.4 }, + { "n": "heap_free", "u": "By", "v": 1048576 }, + { "n": "devices", "u": "count", "v": 3 }, + { "n": "connected", "vb": true } +] +``` + +| Field | Type | Unit | Description | +| -------------- | ------ | ------- | ----------------------------------------- | +| `service_type` | string | — | Always `"agent"` | +| `heartbeat` | bool | — | Always `true` | +| `fw_version` | string | — | Agent binary version (set via `-ldflags`) | +| `uptime` | float | `s` | Seconds since agent started | +| `heap_free` | float | `By` | Go runtime free heap bytes | +| `devices` | float | `count` | Number of registered downstream devices | +| `connected` | bool | — | MQTT connection state | + +## Service Heartbeat + +Co-located services publish a heartbeat message to the local FluxMQ broker. The agent subscribes to `m//c//services/#` and extracts the service name and type from the topic path. + +**Topic format:** `heartbeat..` + +When a heartbeat is received, the agent: + +1. Creates a tracker entry if the service is new +2. Resets the service's `last_seen` timestamp +3. Marks the service as `online` + +If no heartbeat arrives within the configured interval, the service is marked `offline`. + +**Service info schema:** + +```json +{ + "name": "myservice", + "last_seen": "2026-06-10T12:00:00Z", + "status": "online", + "type": "sensor", + "terminal": 0 +} +``` + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +| ----------------------------- | ------------------------------------ | ------------------------------------------------------------------------------------ | +| `MG_AGENT_HEARTBEAT_INTERVAL` | `10s` | Period between self-heartbeat publishes and the timeout for marking services offline | +| `MG_AGENT_BROKER_URL` | `amqp://guest:guest@localhost:5682/` | FluxMQ (AMQP) broker URL for local service heartbeats | + +### Runtime Config (MQTT set) + +The heartbeat interval can be changed at runtime via the `config` subsystem: + +``` +config set heartbeat_interval +``` + +See [control.md](control.md) for the full `config set` recipe. + +## Topic Map + +| Direction | Topic | QoS | Description | +| --------------- | ----------------------------------------------- | ------------ | ------------------------------------ | +| Agent → Cloud | `m//c//gateway/heartbeat` | Configurable | Periodic self-heartbeat | +| Service → Agent | `heartbeat..` (via AMQP) | — | Local service registration | +| Cloud → Agent | `m//c//req` | 1 | `ping` command (on-demand heartbeat) | + +## MQTT Test Recipes + +### Subscribe to self-heartbeat + +```bash +mosquitto_sub \ + -h -p 1883 \ + -u -P \ + -t "m//c//gateway/heartbeat" \ + -v +``` + +**Expected output (repeats every interval):** + +``` +m/e9692c28-b730-4797-8a15-2e25c08f9641/c/b465a688-c1ca-417d-a36f-71f6f1be2409/gateway/heartbeat [{"bn":"agent:","n":"service_type","vs":"agent"},{"n":"heartbeat","vb":true},{"n":"fw_version","vs":"unknown"},{"n":"uptime","u":"s","v":4580.244433353},{"n":"heap_free","u":"By","v":417792},{"n":"devices","u":"count","v":0},{"n":"connected","vb":true}] +``` + +### Trigger an on-demand ping + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "ping-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"ping", "vs":""}]' +``` + +### Check registered services via HTTP + +```bash +curl -s http://localhost:9999/services | jq . +``` + +**Expected output:** + +```json +[ + { + "name": "nodered", + "last_seen": "2026-06-11T13:53:18.692168817Z", + "status": "online", + "type": "nodered", + "terminal": 0 + } +] +``` + +### Publish a local service heartbeat (Go) + +```bash +go run ./examples/publish/main.go \ + -s amqp://guest:guest@localhost:5682/ \ + heartbeat.myservice.sensor "" +``` + +### Change heartbeat interval at runtime + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,heartbeat_interval,30s"}]' +``` + +The agent responds on the control response topic with `"ok"` and the next heartbeat uses the new interval. diff --git a/docs/nodered.md b/docs/nodered.md index 46981410..55faeb82 100644 --- a/docs/nodered.md +++ b/docs/nodered.md @@ -8,8 +8,6 @@ This guide explains how to run the Magistrala Agent with Node-RED support using Agent + Node-RED Architecture

- - ## Quick Start ### 1. Build the Agent Docker image @@ -24,7 +22,7 @@ make all && make dockers_dev make run ``` -This starts: Agent (:9999), Node-RED (:1880), FluxMQ (:5682), Agent UI (:3002). +This starts: Agent + UI (:9999), Node-RED (:1880). ### 3. Verify services @@ -38,7 +36,7 @@ curl -s -X POST http://localhost:9999/nodered \ -d '{"command":"nodered-ping"}' # Open the Agent UI -open http://localhost:3002 +open http://localhost:9999 # Open Node-RED UI open http://localhost:1880 @@ -49,10 +47,10 @@ open http://localhost:1880 If you have a running Magistrala instance, use the provisioning script to automatically create Clients, Channels, Bootstrap Profile resources, and Rule Engine rules: ```bash -export MG_AGENT_BOOTSTRAP_EXTERNAL_ID='01:6:0:sb:sa' -export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY='secret' -export MG_PAT= -export MG_DOMAIN_ID= +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID="" +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY="" +export MG_PAT="" +export MG_DOMAIN_ID="" make run_provision ``` @@ -72,24 +70,24 @@ The PAT used for provisioning must be able to create bootstrap configs, rules, c Or with a custom API URL: ```bash -MG_API=https://my-instance/api \ -MG_AGENT_BOOTSTRAP_EXTERNAL_ID= \ -MG_AGENT_BOOTSTRAP_EXTERNAL_KEY= \ -MG_DOMAIN_ID= \ -MG_PAT= \ +export MG_API=https://my-instance/api +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID="" +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY="" +export MG_DOMAIN_ID="" +export MG_PAT="" make run_provision ``` For Magistrala Cloud specifically, use: ```bash -MG_API=https://cloud.magistrala.absmach.eu/api \ -MG_AGENT_MQTT_URL=ssl://messaging.magistrala.absmach.eu:8883 \ -MG_AGENT_MQTT_SKIP_TLS=false \ -MG_AGENT_BOOTSTRAP_EXTERNAL_ID= \ -MG_AGENT_BOOTSTRAP_EXTERNAL_KEY= \ -MG_DOMAIN_ID= \ -MG_PAT= \ +export MG_API=https://cloud.magistrala.absmach.eu/api +export MG_AGENT_MQTT_URL=ssl://messaging.magistrala.absmach.eu:8883 +export MG_AGENT_MQTT_SKIP_TLS=false +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID="" +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY="" +export MG_DOMAIN_ID="" +export MG_PAT="" make run_provision ``` @@ -98,14 +96,15 @@ That combination targets the cloud APIs for provisioning and the cloud MQTT brok Or run the script directly: ```bash -export MG_AGENT_BOOTSTRAP_EXTERNAL_ID= -export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY= -export MG_PAT= -export MG_DOMAIN_ID= +export MG_AGENT_BOOTSTRAP_EXTERNAL_ID="" +export MG_AGENT_BOOTSTRAP_EXTERNAL_KEY="" +export MG_PAT="" +export MG_DOMAIN_ID="" bash scripts/provision.sh ``` This will: + 1. Create a Client (device) with credentials 2. Create telemetry and commands Channels 3. Create a Bootstrap Profile and Enrollment with `external_id` and `external_key` @@ -121,6 +120,7 @@ MG_AGENT_BOOTSTRAP_EXTERNAL_KEY= ``` Then restart the agent: + ```bash docker compose up -d ``` @@ -129,7 +129,7 @@ docker compose up -d ### Via Agent UI -Open `http://localhost:3002` in a browser. The **Node-RED** panel lets you: +Open `http://localhost:9999` in a browser. The **Node-RED** panel lets you: - **Ping** — check that Node-RED is reachable - **State** — get the current runtime state @@ -171,6 +171,7 @@ curl -s -X POST http://localhost:9999/nodered \ Send a SenML array to `m//c//req`: Supported commands: + - `nodered-deploy,` — **Replace all running flows** with the provided flow JSON - `nodered-add-flow,` — **Add a new flow tab** alongside existing running flows - `nodered-flows` — Fetch current flows @@ -180,7 +181,13 @@ Supported commands: ### Via Control command ```json -[{"bn":"uuid:", "n":"control", "vs":"nodered-deploy,"}] +[ + { + "bn": "uuid:", + "n": "control", + "vs": "nodered-deploy," + } +] ``` ## Example Flows @@ -197,12 +204,12 @@ A ready-to-deploy example that publishes `speed` (km/h), `rpm`, and `gear` SenML Simulates polling 4 Modbus TCP holding registers (FC03) every 10 seconds and publishing SenML records to Magistrala: -| Register | Measurement | Unit | -|----------|-------------|------| -| HR0 | Voltage | V | -| HR1 | Current (scaled ×10) | A | -| HR2 | Power | W | -| HR3 | Temperature | °C | +| Register | Measurement | Unit | +| -------- | -------------------- | ---- | +| HR0 | Voltage | V | +| HR1 | Current (scaled ×10) | A | +| HR2 | Power | W | +| HR3 | Temperature | °C | The simulation function node can be replaced with a real `modbus-read` node when a physical Modbus TCP slave is available. @@ -223,6 +230,7 @@ mosquitto_pub \ ``` The agent will: + 1. Receive the SenML message over MQTT 2. Base64-decode the flow JSON 3. Patch the MQTT `clientid` in the flow to `-nr` (prevents session conflict with the agent itself) @@ -242,8 +250,14 @@ mosquitto_pub \ ``` The agent logs will show: + ```json -{"level":"INFO","msg":"NodeRed command \"nodered-deploy,...\" completed successfully.","duration":"...","uuid":"req-1"} +{ + "level": "INFO", + "msg": "NodeRed command \"nodered-deploy,...\" completed successfully.", + "duration": "...", + "uuid": "req-1" +} ``` Node-RED will start publishing speed data to the telemetry topic from the rendered profile, normally `m//c//msg`, within 3 seconds of deployment. @@ -256,12 +270,133 @@ The Node-RED URL is configured via environment variable: Device identity, MQTT credentials, domain ID, and telemetry/commands channel IDs come from the rendered bootstrap profile. -## Environment Variables +## Topic Map + +| Direction | Topic | QoS | Description | +| ------------- | --------------------------------- | --- | ------------------------------------------------------ | +| Cloud → Agent | `m//c//req` | 1 | Node-RED commands via `nodered` or `control` subsystem | +| Agent → Cloud | `m//c//res` | 1 | Command response | + +## MQTT Test Recipes + +All recipes use the **commands channel request topic**: `m//c//req`. + +Responses are published to: `m//c//res`. + +### Subscribe to responses + +```bash +mosquitto_sub \ + -h -p 1883 \ + -u -P \ + -t "m//c//res" \ + -v +``` + +### Ping Node-RED + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "nr-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"nodered", "vs":"nodered-ping"}]' +``` + +**Expected response:** + +```json +[ + { + "bn": "req-1", + "bt": 1781261269.1195753, + "n": "nodered", + "vs": "{\"httpNodeRoot\":\"/\",\"version\":\"4.1.10\",\"context\":{\"default\":\"memory\",\"stores\":[\"memory\"]},\"libraries\":[{\"id\":\"local\",\"label\":\"editor:library.types.local\",\"user\":false,\"icon\":\"font-awesome/fa-hdd-o\"},{\"id\":\"examples\",\"label\":\"editor:library.types.examples\",\"user\":false,\"icon\":\"font-awesome/fa-life-ring\",\"types\":[\"flows\"],\"readOnly\":true}],\"flowEncryptionType\":\"disabled\",\"diagnostics\":{\"enabled\":true,\"ui\":true},\"telemetryEnabled\":false,\"runtimeState\":{\"enabled\":false,\"ui\":false},\"functionExternalModules\":true,\"functionTimeout\":0,\"tlsConfigDisableLocalFiles\":false,\"editorTheme\":{\"projects\":{\"enabled\":false},\"languages\":[\"de\",\"en-US\",\"es-ES\",\"fr\",\"ja\",\"ko\",\"pt-BR\",\"ru\",\"zh-CN\",\"zh-TW\"]}}" + } +] +``` + +### Get Node-RED runtime state + +```bash +mosquitto_pub \ +-h localhost -p 1883 \ +-u faff2028-a7ba-4d11-8581-d9bbe9e1f75b -P ce6b440b-105a-40be-abf8-80f4c72938fb --id "nr-$(date +%s)" \ +-t "m/e9692c28-b730-4797-8a15-2e25c08f9641/c/bc9a0af7-6d0f-4806-aa5a-61d68c0a7cf7/req" \ + -m '[{"bn":"req-1:", "n":"nodered", "vs":"nodered-state"}]' +``` + +### Fetch current flows + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "nr-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"nodered", "vs":"nodered-flows"}]' +``` + +**Expected response (truncated):** + +```json +[ + { + "bn": "req-1", + "bt": 1781261340.08909, + "n": "nodered", + "vs": "[{\"id\":\"flow-magistrala-agent\",\"type\":\"tab\",\"label\":\"Magistrala Agent Flow\",\"disabled\":false,\"info\":\"Publishes SenML sensor data to Magistrala cloud every 30s via MQTT over TLS.\"},{\"id\":\"mqtt-broker-config\",\"type\":\"mqtt-broker\",\"name\":\"Magistrala Cloud MQTT\",\"broker\":\"host.docker.internal\",\"port\":\"8883\",\"clientid\":\"ffec2491-0de1-4051-9e75-ad2e2d241627-nr\",\"autoConnect\":true,\"usetls\":true,\"protocolVersion\":\"4\",\"keepalive\":\"60\",\"cleansession\":true,\"autoUnsubscribe\":true,\"credentials\":{\"user\":\"ffec2491-0de1-4051-9e75-ad2e2d241627\",\"password\":\"30c775d7-3504-42c6-976c-52c02474bf2f\"},\"birthTopic\":\"\",\"closeTopic\":\"\",\"willTopic\":\"\",\"z\":\"\",\"tls\":\"magistrala-agent-tls\"},{\"id\":\"inject-sensor\",\"type\":\"inject\",\"z\":\"flow-magistrala-agent\",\"name\":\"Every 30s\",\"props\":[{\"p\":\"payload\",\"v\":\"\",\"vt\":\"date\"}],\"repeat\":\"30\",\"crontab\":\"\",\"once\":true,\"onceDelay\":5,\"topic\":\"\",\"payload\":\"\",\"payloadType\":\"date\",\"x\":150,\"y\":160,\"wires\":[[\"build-senml\"]]},{\"id\":\"build-senml\",\"type\":\"function\",\"z\":\"flow-magistrala-agent\",\"name\":\"Build SenML payload\",\"func\":\"var now = Date.now() * 1e6;\\nmsg.payload = JSON.stringify([\\n {\\\"bn\\\": \\\"nodered:\\\", \\\"bt\\\": now, \\\"n\\\": \\\"temperature\\\", \\\"u\\\": \\\"Cel\\\", \\\"v\\\": 22.5 + Math.random() * 2},\\n {\\\"n\\\": \\\"humidity\\\", \\\"u\\\": \\\"%\\\", \\\"v\\\": 55.0 + Math.random() * 5}\\n]);\\nmsg.topic = \\\"m/e9692c28-b730-4797-8a15-2e25c08f9641/c/b465a688-c1ca-417d-a36f-71f6f1be2409/msg\\\";\\nreturn msg;\",\"outputs\":1,\"x\":380,\"y\":160,\"wires\":[[\"mqtt-pub-data\",\"debug-output\"]]},{\"id\":\"mqtt-pub-data\",\"type\":\"mqtt out\",\"z\":\"flow-magistrala-agent\",\"name\":\"Publish to Magistrala\",\"topic\":\"\",\"qos\":\"0\",\"retain\":\"false\",\"broker\":\"mqtt-broker-config\",\"x\":640,\"y\":140,\"wires\":[]},{\"id\":\"debug-output\",\"type\":\"debug\",\"z\":\"flow-magistrala-agent\",\"name\":\"Debug\",\"active\":true,\"tosidebar\":true,\"console\":false,\"complete\":\"payload\",\"x\":620,\"y\":200,\"wires\":[]},{\"id\":\"magistrala-agent-tls\",\"type\":\"tls-config\",\"name\":\"Magistrala MQTT TLS\",\"cert\":\"\",\"key\":\"\",\"ca\":\"\",\"certname\":\"\",\"keyname\":\"\",\"caname\":\"\",\"servername\":\"\",\"verifyservercert\":false,\"alpnprotocol\":\"\"}]" + } +] +``` + +### Deploy flows (replace all) + +```bash +FLOWS=$(cat examples/nodered/speed-flow.json | base64 -w 0) + +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "deploy-$(date +%s)" \ + -t "m//c//req" \ + -m "[{\"bn\":\"req-1:\",\"n\":\"nodered\",\"vs\":\"nodered-deploy,$FLOWS\"}]" +``` + +> **Warning:** `nodered-deploy` replaces **all** running flows. Use `nodered-add-flow` to add without replacing. + +### Add a flow tab (non-destructive) + +```bash +FLOWS=$(cat examples/nodered/speed-flow.json | base64 -w 0) + +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "addflow-$(date +%s)" \ + -t "m//c//req" \ + -m "[{\"bn\":\"req-1:\",\"n\":\"nodered\",\"vs\":\"nodered-add-flow,$FLOWS\"}]" +``` + +### Deploy via control subsystem + +Node-RED commands can also be sent via the `control` subsystem: + +```bash +FLOWS=$(cat examples/nodered/speed-flow.json | base64 -w 0) + +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "ctrl-$(date +%s)" \ + -t "m//c//req" \ + -m "[{\"bn\":\"req-1:\",\"n\":\"control\",\"vs\":\"nodered-deploy,$FLOWS\"}]" +``` + +## Configuration + +### Environment Variables -| Variable | Default | Description | -|----------|---------|-------------| -| `MG_AGENT_NODERED_URL` | `http://localhost:1880/` | Node-RED REST API base URL | -| `MG_AGENT_BOOTSTRAP_URL` | `http://bootstrap:9013/clients/bootstrap` | Bootstrap fetch URL | -| `MG_AGENT_BOOTSTRAP_EXTERNAL_ID` | | Bootstrap external ID | -| `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` | | Bootstrap external key | -| `MG_UI_PORT` | `3002` | Agent UI port | +| Variable | Default | Description | +| --------------------------------- | ----------------------------------------- | -------------------------- | +| `MG_AGENT_NODERED_URL` | `http://localhost:1880/` | Node-RED REST API base URL | +| `MG_AGENT_BOOTSTRAP_URL` | `http://bootstrap:9013/clients/bootstrap` | Bootstrap fetch URL | +| `MG_AGENT_BOOTSTRAP_EXTERNAL_ID` | | Bootstrap external ID | +| `MG_AGENT_BOOTSTRAP_EXTERNAL_KEY` | | Bootstrap external key | +| `MG_UI_PORT` | `9999` | Agent UI port | diff --git a/docs/ota.md b/docs/ota.md new file mode 100644 index 00000000..cf94843f --- /dev/null +++ b/docs/ota.md @@ -0,0 +1,205 @@ +# Over-the-Air (OTA) Updates + +The OTA subsystem allows remote binary updates of the agent via MQTT. A trigger message causes the agent to download a new binary, verify its integrity, replace the running binary, and restart the process in-place. + +## Overview + +``` +┌──────────────┐ MQTT trigger ┌──────────────┐ HTTP GET ┌──────────────┐ +│ Magistrala │ ──────────────► │ Agent │ ──────────── │ File Server │ +│ (cloud) │ │ OTA Run │ ◄─────────── │ (binary + │ +│ │ ◄── progress ── │ │ download │ .sha256) │ +└──────────────┘ └──────┬───────┘ └──────────────┘ + │ + verify + replace + │ + syscall.Exec() + (process replaced) +``` + +## State Machine + +The OTA update goes through these states: + +| State | Description | +| ------------- | ------------------------------------------------------ | +| `IDLE` | No OTA in progress | +| `TRIGGERED` | Trigger received, download not yet started | +| `DOWNLOADING` | Binary downloading; progress updates published | +| `VERIFYING` | SHA-256 hash verification in progress | +| `READY` | Download and verification complete, about to replace | +| `RESTARTING` | Binary replaced, process restarting via `syscall.Exec` | +| `ABORTED` | OTA cancelled via abort command | + +## Trigger Payload + +OTA can be triggered via two MQTT topics. Both use a multi-record SenML pack. + +### Via commands channel (`req`) + +**Topic:** `m//c//req` + +The first record dispatches to the `ota` handler; subsequent records carry the trigger fields: + +```json +[ + { "bn": "req-1:", "n": "ota", "vs": "" }, + { "n": "url", "vs": "https://example.com/agent" }, + { "n": "hash", "vs": "" }, + { "n": "size", "v": 8388608 } +] +``` + +### Via OTA config topic + +**Topic:** `m//c//ota/cfg` + +No dispatch record is needed; the entire pack is trigger fields: + +```json +[ + { "n": "url", "vs": "https://example.com/agent" }, + { "n": "hash", "vs": "" }, + { "n": "size", "v": 8388608 } +] +``` + +| Field | Required | Description | +| ------ | -------- | ------------------------------------------------------------------------------------------------- | +| `url` | Yes | HTTP/HTTPS URL to the new binary | +| `hash` | No | Hex-encoded SHA-256 digest. If omitted, the agent tries to fetch `.sha256` as a sidecar file | +| `size` | No | Expected byte count. If non-zero, the download is aborted if it exceeds this value | + +## Status Reporting + +During the OTA operation, the agent publishes progress to: + +**Topic:** `m//c//ota/status` + +```json +[ + { "bn": "gw:", "bt": 1749552000.0, "n": "ota_state", "vs": "downloading" }, + { "n": "ota_progress", "u": "%", "v": 50.0 } +] +``` + +Progress is reported at 5% increments during download and at state transitions. + +## Verification + +Verification is **mandatory**. The agent will abort the update if: + +1. No `hash` field was provided in the trigger **and** +2. The sidecar file at `.sha256` is not reachable + +In either case, the downloaded file is deleted and the running binary is left untouched. + +If a hash is provided or the sidecar is found, the downloaded file's SHA-256 must match exactly. On mismatch, the download is deleted and the OTA fails. + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +| --------------------------- | ---------------------- | ------------------------------------------------------ | +| `MG_AGENT_OTA_ENABLED` | `false` | Enable or disable OTA functionality | +| `MG_AGENT_OTA_BINARY_PATH` | `/usr/local/bin/agent` | Absolute path to the running binary (will be replaced) | +| `MG_AGENT_OTA_DOWNLOAD_DIR` | `/tmp` | Directory for the temporary download file | + +## Topic Map + +| Direction | Topic | QoS | Description | +| ------------- | ---------------------------------------- | --- | --------------------------------------------------------- | +| Cloud → Agent | `m//c//req` | 1 | Trigger via commands channel (uses `ota` dispatch record) | +| Cloud → Agent | `m//c//ota/cfg` | 0 | Direct OTA config trigger | +| Agent → Cloud | `m//c//ota/status` | QoS | Progress and state updates | + +## MQTT Test Recipes + +### Trigger OTA via commands channel + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "ota-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"ota","vs":""},{"n":"url","vs":"https://example.com/agent-v2"},{"n":"hash","vs":"abcdef1234567890..."},{"n":"size","v":8388608}]' +``` + +### Trigger OTA via OTA config topic + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "ota-$(date +%s)" \ + -t "m//c//ota/cfg" \ + -m '[{"n":"url","vs":"https://example.com/agent-v2"},{"n":"hash","vs":"abcdef1234567890..."}]' +``` + +### Abort an in-progress OTA + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "ota-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:","n":"ota","vs":"abort"}]' +``` + +### Trigger OTA with token auth (when command_secret is set) + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "ota-$(date +%s)" \ + -t "m//c//ota/cfg" \ + -m '[{"n":"url","vs":"https://example.com/agent-v2"},{"n":"hash","vs":"abcdef1234567890..."},{"n":"token","vs":"my-secret-token"}]' +``` + +### Subscribe to OTA status updates + +```bash +mosquitto_sub \ + -h -p 1883 \ + -u -P \ + -t "m//c//ota/status" \ + -v +``` + +**Expected output:** + +``` +m//c//ota/status [{"bn":"gw:","bt":...,"n":"ota_state","vs":"triggered"},{"n":"ota_progress","u":"%","v":0}] +m//c//ota/status [{"bn":"gw:","bt":...,"n":"ota_state","vs":"downloading"},{"n":"ota_progress","u":"%","v":5}] +m//c//ota/status [{"bn":"gw:","bt":...,"n":"ota_state","vs":"downloading"},{"n":"ota_progress","u":"%","v":50}] +m//c//ota/status [{"bn":"gw:","bt":...,"n":"ota_state","vs":"verifying"},{"n":"ota_progress","u":"%","v":100}] +m//c//ota/status [{"bn":"gw:","bt":...,"n":"ota_state","vs":"ready"},{"n":"ota_progress","u":"%","v":100}] +m//c//ota/status [{"bn":"gw:","bt":...,"n":"ota_state","vs":"restarting"},{"n":"ota_progress","u":"%","v":100}] +``` + +### Check OTA status via HTTP + +```bash +curl -s http://localhost:9999/ota/status | jq . +``` + +**Expected response (idle):** + +```json +{ + "busy": false, + "last_error": "" +} +``` + +### Trigger OTA via HTTP + +```bash +curl -s -X POST http://localhost:9999/ota \ + -H 'Content-Type: application/json' \ + -d '{ + "url": "https://example.com/agent-v2", + "sha256": "abcdef1234567890...", + "size": 8388608 + }' +``` diff --git a/docs/telemetry.md b/docs/telemetry.md new file mode 100644 index 00000000..3ed8cc10 --- /dev/null +++ b/docs/telemetry.md @@ -0,0 +1,167 @@ +# Telemetry + +The telemetry subsystem publishes periodic gateway telemetry data to the Magistrala telemetry channel. In addition to uptime, the agent can collect and report CPU temperature, memory usage, load averages, disk usage, and wireless signal strength from the host. This is distinct from the self-heartbeat (see [heartbeat.md](heartbeat.md)), which includes richer device metadata. + +## Overview + +When `MG_AGENT_TELEMETRY_INTERVAL` is set to a non-zero duration, the agent starts a background goroutine that publishes a SenML telemetry record at the configured interval. Individual telemetry readers can be enabled or disabled via environment variables or the runtime config. Setting the interval to `0s` disables telemetry entirely. + +## Telemetry Readers + +The following telemetry readers are available. All readers are Linux-specific and are silently skipped when the underlying data source is unavailable. + +| Reader | Record Name | Unit | Source | Default | +| ------------------- | -------------------------------------------- | ----- | ---------------------------------------- | --------- | +| **Uptime** | `uptime` | `s` | Go runtime `time.Since(startTime)` | Always on | +| **Memory** | `heap_free`, `heap_used` | `By` | `/proc/meminfo` (MemTotal, MemAvailable) | Always on | +| **Disk** | `disk_usage_percent` | `%` | `syscall.Statfs("/")` | Always on | +| **CPU Temperature** | `temperature` | `Cel` | `/sys/class/thermal/thermal_zone*/temp` | On | +| **Network RSSI** | `rssi` | `dB` | `/proc/net/wireless` (default interface) | On | +| **Load Average** | `load_avg_1m`, `load_avg_5m`, `load_avg_15m` | — | `/proc/loadavg` | On | + +Uptime, memory, and disk usage are always included. CPU temperature, network RSSI, and load average can be toggled via environment variables or the runtime config. + +## Payload Format + +**Topic:** `m//c//gateway/telemetry` + +**Payload (all readers enabled):** + +```json +[ + { "bn": "gw:", "bt": 1749552000.0, "n": "uptime", "u": "s", "v": 3600.5 }, + { "n": "heap_free", "u": "By", "v": 524288.0 }, + { "n": "heap_used", "u": "By", "v": 1048576.0 }, + { "n": "temperature", "u": "Cel", "v": 52.3 }, + { "n": "rssi", "u": "dB", "v": -65.0 }, + { "n": "load_avg_1m", "v": 0.75 }, + { "n": "load_avg_5m", "v": 0.82 }, + { "n": "load_avg_15m", "v": 0.68 }, + { "n": "disk_usage_percent", "u": "%", "v": 45.2 } +] +``` + +| Field | Type | Description | +| ----- | ------ | -------------------------------------------------- | +| `bn` | string | Base name: always `"gw:"` | +| `bt` | float | Unix timestamp (seconds with nanosecond precision) | +| `n` | string | Measurement name (see readers table above) | +| `u` | string | Unit (SenML standard units) | +| `v` | float | Measured value | + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +| ---------------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------- | +| `MG_AGENT_TELEMETRY_INTERVAL` | `30s` | Telemetry publish interval. Set to a positive duration (e.g. `30s`, `1m`) to enable. `0s` disables telemetry. | +| `MG_AGENT_TELEMETRY_INCLUDE_TEMPERATURE` | `true` | Include CPU temperature reading from thermal zones | +| `MG_AGENT_TELEMETRY_INCLUDE_NETWORK` | `true` | Include wireless RSSI reading from `/proc/net/wireless` | +| `MG_AGENT_TELEMETRY_INCLUDE_LOAD` | `true` | Include 1/5/15-minute load averages from `/proc/loadavg` | + +### Runtime Config (MQTT set) + +Telemetry can be enabled or reconfigured at runtime: + +``` +config set telemetry_interval +``` + +Setting to a positive value starts the ticker if it was not running, or resets the interval. Setting to `0s` or an invalid value stops the telemetry goroutine. + +Allowed range: `1s` – `1h`. + +## Topic Map + +| Direction | Topic | QoS | Description | +| ------------- | ----------------------------------------------- | ------------ | ------------------ | +| Agent → Cloud | `m//c//gateway/telemetry` | Configurable | Periodic telemetry | + +## MQTT Test Recipes + +### Subscribe to telemetry + +```bash +mosquitto_sub \ + -h -p 1883 \ + -u -P \ + -t "m//c//gateway/telemetry" \ + -v +``` + +**Expected output (repeats every interval):** + +``` +m/e9692c28-b730-4797-8a15-2e25c08f9641/c/b465a688-c1ca-417d-a36f-71f6f1be2409/gateway/telemetry [{"bn":"gw:","bt":1781188728.6078596,"n":"uptime","u":"s","v":40.125499445},{"n":"heap_free","u":"By","v":4659642368},{"n":"heap_used","u":"By","v":19801149440},{"n":"temperature","u":"Cel","v":99},{"n":"load_avg_1m","v":3.87},{"n":"load_avg_5m","v":3.79},{"n":"load_avg_15m","v":4.24},{"n":"disk_usage_percent","u":"%","v":96.84929070396741},{"n":"devices_active","v":0}] +``` + +> **Note:** Records for temperature, RSSI, and load average are only present when the corresponding reader is enabled **and** the underlying system file is available. On non-Linux hosts or devices without wireless interfaces, those records are silently omitted. + +### Enable telemetry at runtime + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,telemetry_interval,30s"}]' +``` + +### Change telemetry interval + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,telemetry_interval,1m"}]' +``` + +### Disable telemetry at runtime + +Setting the interval to `0s` stops the periodic publishing: + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,telemetry_interval,0s"}]' +``` + +### Query current telemetry interval + +Subscribe to command response: + +```bash +mosquitto_sub \ + -h -p 1883 \ + -u -P \ + -t "m//c//res" \ + -v +``` + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"get,telemetry_interval"}]' +``` + +**Response on `m//c//res`:** + +```json +[{ "bn": "req-1", "n": "get", "t": 1781190613.1251912, "vs": "10s" }] +``` + +### Reset telemetry interval to startup default + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"reset,telemetry_interval"}]' +``` diff --git a/docs/terminal.md b/docs/terminal.md new file mode 100644 index 00000000..58eb70b1 --- /dev/null +++ b/docs/terminal.md @@ -0,0 +1,188 @@ +# Terminal Sessions over MQTT + +The terminal subsystem provides interactive shell sessions tunneled over MQTT. Each session spawns a bash PTY (pseudo-terminal) on the agent host, and bidirectional I/O is carried in SenML messages. This enables remote terminal access from Magistrala. + +## Architecture + +The agent maintains a map of active terminal sessions. Each session: + +1. Spawns a `bash` process attached to a PTY +2. Reads PTY output and publishes it to the agent's response topic under `term/` +3. Accepts input bytes via MQTT and writes them to the PTY +4. Has an idle timeout — if no input or output occurs within the timeout, the session closes automatically + +## Session Lifecycle + +``` + open char (write) char (write) timeout / close + ──────► ─────────────────► ─────────────────► ─────────────────► + spawn write to PTY, write to PTY, session removed, + bash read PTY output, read PTY output, PTY closed + publish to MQTT publish to MQTT +``` + +## Message Format + +All terminal commands use the `term` subsystem. The `vs` field is **base64-encoded** and contains a comma-separated payload: `[,]`. + +### Open a session + +**Base64 payload:** `open` + +**Request:** + +```json +[{ "bn": ":", "n": "term", "vs": "b3Blbg==" }] +``` + +(`b3Blbg==` = base64 of `open`) + +### Write to a session + +**Base64 payload:** `char,` + +**Request:** + +```json +[{ "bn": ":", "n": "term", "vs": "Y2hhcixscw==" }] +``` + +(`Y2hhcixscw==` = base64 of `char,ls`) + +### Close a session + +**Base64 payload:** `close` + +**Request:** + +```json +[{ "bn": ":", "n": "term", "vs": "Y2xvc2U=" }] +``` + +(`Y2xvc2U=` = base64 of `close`) + +### Terminal output (agent → cloud) + +Output from the PTY is published as SenML on the control response topic under `term/`: + +```json +[{ "bn": ":", "n": "term", "vs": "", "t": ... }] +``` + +## Configuration + +### Environment Variables + +| Variable | Default | Description | +| ----------------------------------- | ------- | ------------------------------------------------------------------------------------------------------ | +| `MG_AGENT_TERMINAL_SESSION_TIMEOUT` | `60s` | Idle timeout for terminal sessions. After this duration with no I/O, the session closes automatically. | + +### Runtime Config (MQTT set) + +```bash +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "cfg-$(date +%s)" \ + -t "m//c//req" \ + -m '[{"bn":"req-1:", "n":"config", "vs":"set,terminal_session_timeout,120s"}]' +``` + +## Topic Map + +| Direction | Topic | QoS | Description | +| ------------- | --------------------------------------------- | --- | ------------------------------------ | +| Cloud → Agent | `m//c//req` | 1 | Terminal commands (`term` subsystem) | +| Agent → Cloud | `m//c//res/term/` | 1 | PTY output for a specific session | + +## MQTT Test Recipes + +### Subscribe to terminal output + +Open a separate terminal to watch session output: + +```bash +mosquitto_sub \ + -h -p 1883 \ + -u -P \ + -t "m//c//res/term/#" \ + -v +``` + +**Expected output:** + +``` +m/e9692c28-b730-4797-8a15-2e25c08f9641/c/bc9a0af7-6d0f-4806-aa5a-61d68c0a7cf7/res/term/term-1781257973 [{"bn":"term-1781257973","n":"term","t":1781257978.7125685,"vs":"/ # \u001b[6n"}] +``` + +### Open a terminal session + +```bash +UUID="term-$(date +%s)" + +# "open" → base64 = b3Blbg== +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "$UUID" \ + -t "m//c//req" \ + -m "[{\"bn\":\"$UUID:\", \"n\":\"term\", \"vs\":\"b3Blbg==\"}]" +``` + +### Send a command (e.g. `ls`) + +```bash +UUID="term-1781257973" + +# "char,ls\n" → base64 = Y2hhcixscwo= +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "$UUID" \ + -t "m//c//req" \ + -m "[{\"bn\":\"$UUID:\", \"n\":\"term\", \"vs\":\"Y2hhcixscwo=\"}]" +``` + +The subscriber terminal will show the `ls` output followed by a new shell prompt. + +### Send multiple commands in sequence + +```bash +UUID="term-1749552000" + +# "char,uname -a\n" +echo -n "char,uname -a" | base64 +# Y2hhcix1bmFtZSAtYQ== + +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "$UUID" \ + -t "m//c//req" \ + -m "[{\"bn\":\"$UUID:\", \"n\":\"term\", \"vs\":\"Y2hhcix1bmFtZSAtYQo=\"}]" +``` + +### Close a terminal session + +```bash +UUID="term-1749552000" + +# "close" → base64 = Y2xvc2U= +mosquitto_pub \ + -h -p 1883 \ + -u -P --id "$UUID" \ + -t "m//c//req" \ + -m "[{\"bn\":\"$UUID:\", \"n\":\"term\", \"vs\":\"Y2xvc2U=\"}]" +``` + +### Helper: base64-encode terminal commands + +```bash +# Encode a shell command for the terminal subsystem +echo -n "char,ls -la" | base64 +# Y2hhcixscyAtbGE= + +# Encode with newline (to actually execute the command) +printf "char,ls -la\n" | base64 +# Y2hhcixscyAtbGEK +``` + +### Use the Agent UI terminal + +Open `http://localhost:9999`, navigate to the **Execute** panel, and type shell commands. The UI handles base64 encoding and session management automatically. diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index fc8613a8..177cf0b2 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -136,8 +136,21 @@ func (b *broker) registerBuiltins() { b.RegisterHandler(nred, func(ctx context.Context, pack senml.Pack) error { uuid, cmdStr := extractCmd(pack) log.Info("NodeRed command", slog.String("uuid", uuid), slog.String("command", cmdStr)) - _, err := svc.NodeRed(cmdStr) - return err + resp, err := svc.NodeRed(cmdStr) + if err != nil { + if payload, encErr := encoder.EncodeSenML(uuid, nred, err.Error()); encErr == nil { + if pubErr := svc.Publish("control", string(payload)); pubErr != nil { + log.Warn("Failed to publish NodeRed error response", slog.Any("error", pubErr)) + } + } + return err + } + if payload, encErr := encoder.EncodeSenML(uuid, nred, resp); encErr == nil { + if pubErr := svc.Publish("control", string(payload)); pubErr != nil { + log.Warn("Failed to publish NodeRed response", slog.Any("error", pubErr)) + } + } + return nil }) b.RegisterHandler(ping, func(ctx context.Context, pack senml.Pack) error { diff --git a/pkg/encoder/encoder.go b/pkg/encoder/encoder.go index 1eb026a8..856c9dd1 100644 --- a/pkg/encoder/encoder.go +++ b/pkg/encoder/encoder.go @@ -10,7 +10,7 @@ import ( ) func EncodeSenML(bn, n, sv string) ([]byte, error) { - now := float64(time.Now().UnixNano()) + now := float64(time.Now().UnixNano()) / float64(time.Second) s := senml.Pack{ Records: []senml.Record{ { diff --git a/pkg/encoder/encoder_test.go b/pkg/encoder/encoder_test.go new file mode 100644 index 00000000..dcc41f39 --- /dev/null +++ b/pkg/encoder/encoder_test.go @@ -0,0 +1,86 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package encoder + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEncodeSenML(t *testing.T) { + tests := []struct { + name string + bn string + n string + sv string + wantBn string + wantN string + wantSv string + }{ + { + name: "basic encoding", + bn: "req-1:", + n: "exec", + sv: "pwd", + wantBn: "req-1:", + wantN: "exec", + wantSv: "pwd", + }, + { + name: "empty base name", + bn: "", + n: "response", + sv: "ok", + wantBn: "", + wantN: "response", + wantSv: "ok", + }, + { + name: "multiline output", + bn: "req-2:", + n: "ls", + sv: "file1\nfile2\nfile3", + wantBn: "req-2:", + wantN: "ls", + wantSv: "file1\nfile2\nfile3", + }, + { + name: "json in value", + bn: "req-3:", + n: "config", + sv: `{"key":"value"}`, + wantBn: "req-3:", + wantN: "config", + wantSv: `{"key":"value"}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + payload, err := EncodeSenML(tt.bn, tt.n, tt.sv) + require.NoError(t, err) + + var records []struct { + BaseName string `json:"bn"` + BaseTime float64 `json:"bt"` + Name string `json:"n"` + StringValue *string `json:"vs"` + } + require.NoError(t, json.Unmarshal(payload, &records)) + require.Len(t, records, 1) + + rec := records[0] + assert.Equal(t, tt.wantBn, rec.BaseName) + assert.Equal(t, tt.wantN, rec.Name) + require.NotNil(t, rec.StringValue) + assert.Equal(t, tt.wantSv, *rec.StringValue) + + assert.Greater(t, rec.BaseTime, float64(1700000000), "bt should be in seconds, not nanoseconds") + assert.Less(t, rec.BaseTime, float64(2000000000), "bt should be in seconds, not nanoseconds") + }) + } +} diff --git a/pkg/logstream/stream_test.go b/pkg/logstream/stream_test.go new file mode 100644 index 00000000..86f23beb --- /dev/null +++ b/pkg/logstream/stream_test.go @@ -0,0 +1,236 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package logstream + +import ( + "context" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + s := New() + assert.NotNil(t, s) + assert.Empty(t, s.subs) +} + +func TestPushAndGetBacklog(t *testing.T) { + s := New() + + s.push("line 1") + s.push("line 2") + s.push("line 3") + + _, backlog, _ := s.subscribe() + assert.Equal(t, []string{"line 1", "line 2", "line 3"}, backlog) +} + +func TestRingBuffer(t *testing.T) { + s := New() + + for i := 0; i < ringSize+50; i++ { + s.push("line") + } + + _, backlog, _ := s.subscribe() + assert.Equal(t, ringSize, len(backlog)) +} + +func TestSubscribeBroadcast(t *testing.T) { + s := New() + + id, _, ch := s.subscribe() + defer s.unsubscribe(id) + + var received string + var mu sync.Mutex + + go func() { + select { + case line := <-ch: + mu.Lock() + received = line + mu.Unlock() + case <-time.After(time.Second): + } + }() + + time.Sleep(50 * time.Millisecond) + s.push("hello") + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return received == "hello" + }, 2*time.Second, 10*time.Millisecond) +} + +func TestUnsubscribe(t *testing.T) { + s := New() + + id, _, ch := s.subscribe() + s.unsubscribe(id) + + s.push("after-unsubscribe") + + select { + case <-ch: + t.Fatal("should not receive after unsubscribe") + default: + } +} + +func TestSSEHandler(t *testing.T) { + s := New() + s.push("backlog-line") + + handler := SSEHandler(s) + + req := httptest.NewRequest(http.MethodGet, "/logs", nil) + ctx, cancel := context.WithCancel(req.Context()) + req = req.WithContext(ctx) + + rec := httptest.NewRecorder() + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + handler.ServeHTTP(rec, req) + }() + + time.Sleep(100 * time.Millisecond) + s.push("live-line") + time.Sleep(100 * time.Millisecond) + + cancel() + wg.Wait() + + body := rec.Body.String() + assert.Contains(t, body, "data: backlog-line") + assert.Contains(t, body, "data: live-line") + assert.Equal(t, "text/event-stream", rec.Header().Get("Content-Type")) +} + +func TestHandler(t *testing.T) { + s := New() + inner := slog.NewTextHandler(io.Discard, nil) + h := NewHandler(inner, s) + + assert.True(t, h.Enabled(context.Background(), slog.LevelInfo)) + + record := slog.NewRecord(time.Now(), slog.LevelInfo, "test message", 0) + record.AddAttrs(slog.String("key", "value")) + + err := h.Handle(context.Background(), record) + require.NoError(t, err) + + _, backlog, _ := s.subscribe() + require.Len(t, backlog, 1) + assert.Contains(t, backlog[0], "test message") + assert.Contains(t, backlog[0], "key=value") +} + +func TestHandlerWithAttrs(t *testing.T) { + s := New() + inner := slog.NewTextHandler(io.Discard, nil) + h := NewHandler(inner, s) + + h2 := h.WithAttrs([]slog.Attr{slog.String("persistent", "attr")}) + assert.NotNil(t, h2) +} + +func TestHandlerWithGroup(t *testing.T) { + s := New() + inner := slog.NewTextHandler(io.Discard, nil) + h := NewHandler(inner, s) + + h2 := h.WithGroup("mygroup") + assert.NotNil(t, h2) +} + +func TestMultipleSubscribers(t *testing.T) { + s := New() + + id1, _, ch1 := s.subscribe() + id2, _, ch2 := s.subscribe() + defer s.unsubscribe(id1) + defer s.unsubscribe(id2) + + s.push("broadcast") + + var r1, r2 string + var mu sync.Mutex + + go func() { + select { + case line := <-ch1: + mu.Lock() + r1 = line + mu.Unlock() + case <-time.After(time.Second): + } + }() + + go func() { + select { + case line := <-ch2: + mu.Lock() + r2 = line + mu.Unlock() + case <-time.After(time.Second): + } + }() + + assert.Eventually(t, func() bool { + mu.Lock() + defer mu.Unlock() + return r1 == "broadcast" && r2 == "broadcast" + }, 2*time.Second, 10*time.Millisecond) +} + +func TestSSEFormat(t *testing.T) { + s := New() + s.push("test line") + + handler := SSEHandler(s) + + req := httptest.NewRequest(http.MethodGet, "/logs", nil) + ctx, cancel := context.WithCancel(req.Context()) + req = req.WithContext(ctx) + + rec := httptest.NewRecorder() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + handler.ServeHTTP(rec, req) + }() + + time.Sleep(100 * time.Millisecond) + cancel() + wg.Wait() + + body := rec.Body.String() + lines := strings.Split(body, "\n") + found := false + for _, line := range lines { + if strings.HasPrefix(line, "data: test line") { + found = true + break + } + } + assert.True(t, found, "expected SSE data line format") +} diff --git a/pkg/terminal/terminal.go b/pkg/terminal/terminal.go index a0742212..d7fc3f8d 100644 --- a/pkg/terminal/terminal.go +++ b/pkg/terminal/terminal.go @@ -5,7 +5,6 @@ package terminal import ( "bytes" - "fmt" "io" "log/slog" "os" @@ -49,11 +48,15 @@ func NewSession(uuid string, timeout time.Duration, publish func(channel, payloa publish: publish, timeout: timeout, resetTimeout: timeout, - topic: fmt.Sprintf("term/%s", uuid), + topic: "term/" + uuid, done: make(chan bool), } - c := exec.Command("bash") + shell := os.Getenv("SHELL") + if shell == "" { + shell = "/bin/sh" + } + c := exec.Command(shell) ptmx, err := pty.Start(c) if err != nil { return t, errors.New(err.Error()) @@ -62,11 +65,21 @@ func NewSession(uuid string, timeout time.Duration, publish func(channel, payloa // Copy output to mqtt go func() { - n, err := io.Copy(t, t.ptmx) - if err != nil { - t.logger.Error(fmt.Sprintf("Error sending data: %s", err)) + buf := make([]byte, 4096) + for { + nr, readErr := t.ptmx.Read(buf) + if nr > 0 { + if _, writeErr := t.Write(buf[:nr]); writeErr != nil { + t.logger.Error("Error sending terminal data", slog.Any("error", writeErr)) + } + } + if readErr != nil { + if readErr != io.EOF { + t.logger.Error("PTY read error", slog.Any("error", readErr)) + } + return + } } - t.logger.Debug(fmt.Sprintf("Data being sent: %d", n)) }() t.timer = time.NewTicker(1 * time.Second) @@ -107,12 +120,14 @@ func (t *term) IsDone() chan bool { func (t *term) Write(p []byte) (int, error) { t.resetCounter(t.resetTimeout) n := len(p) + t.logger.Info("Terminal output", slog.Int("bytes", n), slog.String("uuid", t.uuid)) payload, err := senml.EncodeString(t.uuid, terminal, string(p)) if err != nil { return n, err } if err := t.publish(t.topic, string(payload)); err != nil { + t.logger.Error("Terminal publish failed", slog.Any("error", err), slog.String("uuid", t.uuid)) return n, err } return n, nil @@ -121,7 +136,7 @@ func (t *term) Write(p []byte) (int, error) { func (t *term) Send(p []byte) error { in := bytes.NewReader(p) nr, err := io.Copy(t.ptmx, in) - t.logger.Debug(fmt.Sprintf("Written to ptmx: %d", nr)) + t.logger.Info("Terminal input", slog.Int("bytes", int(nr)), slog.String("uuid", t.uuid)) if err != nil { return errors.New(err.Error()) } diff --git a/pkg/terminal/terminal_test.go b/pkg/terminal/terminal_test.go new file mode 100644 index 00000000..073285f9 --- /dev/null +++ b/pkg/terminal/terminal_test.go @@ -0,0 +1,149 @@ +// Copyright (c) Abstract Machines +// SPDX-License-Identifier: Apache-2.0 + +package terminal + +import ( + "encoding/json" + "log/slog" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockPublisher struct { + mu sync.Mutex + messages []struct { + topic string + payload string + } +} + +func (m *mockPublisher) publish(channel, payload string) error { + m.mu.Lock() + defer m.mu.Unlock() + m.messages = append(m.messages, struct { + topic string + payload string + }{channel, payload}) + return nil +} + +func (m *mockPublisher) messagesLen() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.messages) +} + +func (m *mockPublisher) lastMessage() (string, string) { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.messages) == 0 { + return "", "" + } + last := m.messages[len(m.messages)-1] + return last.topic, last.payload +} + +func TestNewSession(t *testing.T) { + pub := &mockPublisher{} + logger := slog.Default() + + sess, err := NewSession("test-uuid-1", 60*time.Second, pub.publish, logger) + require.NoError(t, err) + require.NotNil(t, sess) + + done := sess.IsDone() + assert.NotNil(t, done) + + time.Sleep(100 * time.Millisecond) + + err = sess.Send([]byte("echo hello\n")) + require.NoError(t, err) + time.Sleep(200 * time.Millisecond) + + assert.Greater(t, pub.messagesLen(), 0, "expected PTY output to be published") + + topic, _ := pub.lastMessage() + assert.Contains(t, topic, "term/test-uuid-1") + + select { + case <-done: + t.Fatal("session should not be done yet") + default: + } +} + +func TestSessionIdleTimeout(t *testing.T) { + pub := &mockPublisher{} + logger := slog.Default() + + sess, err := NewSession("test-uuid-timeout", 2*time.Second, pub.publish, logger) + require.NoError(t, err) + + done := sess.IsDone() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("session should have timed out") + } +} + +func TestSessionWritePublishes(t *testing.T) { + pub := &mockPublisher{} + logger := slog.Default() + + sess, err := NewSession("test-uuid-write", 10*time.Second, pub.publish, logger) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + err = sess.Send([]byte("echo test-output-123\n")) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + return pub.messagesLen() > 0 + }, 2*time.Second, 50*time.Millisecond, "expected output after write") + + for i := 0; i < pub.messagesLen(); i++ { + pub.mu.Lock() + _, payload := pub.messages[i].topic, pub.messages[i].payload + pub.mu.Unlock() + + var pack struct { + Records []struct { + Name string `json:"n"` + StringValue *string `json:"vs"` + } `json:"e"` + } + if err := json.Unmarshal([]byte(payload), &pack); err == nil && len(pack.Records) > 0 && pack.Records[0].StringValue != nil { + if *pack.Records[0].StringValue != "" { + return + } + } + } +} + +func TestSessionTopicFormat(t *testing.T) { + pub := &mockPublisher{} + logger := slog.Default() + uuid := "unique-session-id" + + sess, err := NewSession(uuid, 10*time.Second, pub.publish, logger) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + _ = sess.Send([]byte("echo hi\n")) + + assert.Eventually(t, func() bool { + return pub.messagesLen() > 0 + }, 2*time.Second, 50*time.Millisecond) + + topic, _ := pub.lastMessage() + assert.Equal(t, "term/"+uuid, topic) +} diff --git a/service.go b/service.go index 86a711dc..457041e2 100644 --- a/service.go +++ b/service.go @@ -47,7 +47,7 @@ const ( view = "view" save = "save" - char = "c" + char = "char" open = "open" close = "close" control = "control" @@ -75,6 +75,8 @@ const ( senmlNameUptime = "uptime" notAllowed = "not_allowed" + + provisionTimeout = 30 * time.Second ) var ( @@ -236,6 +238,7 @@ type OTAStatusInfo struct { var _ Service = (*agent)(nil) type agent struct { + ctx context.Context mqttClient paho.Client config *Config noderedClient nodered.Client @@ -252,6 +255,7 @@ type agent struct { store cfgstore.Store heartbeatIntervalCh chan time.Duration telemetryIntervalCh chan time.Duration + telemetryStarted atomic.Bool logLevel *slog.LevelVar cfgMu sync.RWMutex startupConfig Config @@ -265,6 +269,7 @@ type agent struct { // New returns agent service implementation. func New(ctx context.Context, mc paho.Client, cfg *Config, nc nodered.Client, logger *slog.Logger, devices *devicemgr.Manager, store cfgstore.Store, levelVar *slog.LevelVar, bootstrapCachePath string) (Service, error) { ag := &agent{ + ctx: ctx, mqttClient: mc, noderedClient: nc, config: cfg, @@ -296,6 +301,7 @@ func New(ctx context.Context, mc paho.Client, cfg *Config, nc nodered.Client, lo if cfg.Telemetry.Interval > 0 { telemetryTopic := fmt.Sprintf("m/%s/c/%s/gateway/telemetry", cfg.DomainID, cfg.Channels.DataChan()) + ag.telemetryStarted.Store(true) go ag.selfTelemetry(ctx, telemetryTopic, cfg.Telemetry.Interval, cfg.MQTT.QoS) } @@ -441,6 +447,8 @@ func (a *agent) ServiceConfig(ctx context.Context, uuid, cmdStr string) error { resp = notConfigured } else if val, ok := a.store.Get(key); ok { resp = val + } else if fallback := a.configFallback(key); fallback != "" { + resp = fallback } else { resp = notFound } @@ -1131,7 +1139,7 @@ func (a *agent) OTA(ctx context.Context, url, sha256hex string, size uint64) err statusTopic := fmt.Sprintf("m/%s/c/%s/ota/status", domainID, ctrlChan) progressFn := func(state ota.State, progress float64) { - now := float64(time.Now().UnixNano()) + now := float64(time.Now().UnixNano()) / float64(time.Second) stateStr := strings.ToLower(state.String()) statusPack := []senml.Record{ {BaseName: "gw:", BaseTime: now, Name: "ota_state", StringValue: &stateStr}, @@ -1365,7 +1373,7 @@ func validateSettableValue(key, val string) error { } case keyTelemetryInterval: d, err := time.ParseDuration(val) - if err != nil || d < time.Second || d > time.Hour { + if err != nil || (d != 0 && (d < time.Second || d > time.Hour)) { return errInvalidCommand } case keyTerminalSessionTimeout: @@ -1490,6 +1498,13 @@ func (a *agent) applyLiveUpdate(key, val string) { } case keyTelemetryInterval: if d, err := time.ParseDuration(val); err == nil { + if d > 0 && !a.telemetryStarted.Load() { + a.telemetryStarted.Store(true) + cfg := a.Config() + telemetryTopic := fmt.Sprintf("m/%s/c/%s/gateway/telemetry", + cfg.DomainID, cfg.Channels.DataChan()) + go a.selfTelemetry(a.ctx, telemetryTopic, 0, cfg.MQTT.QoS) + } select { case a.telemetryIntervalCh <- d: default: @@ -1501,6 +1516,21 @@ func (a *agent) applyLiveUpdate(key, val string) { } } +func (a *agent) configFallback(key string) string { + cfg := a.Config() + switch key { + case keyLogLevel: + return cfg.Log.Level + case keyHeartbeatInterval: + return cfg.Heartbeat.Interval.String() + case keyTelemetryInterval: + return cfg.Telemetry.Interval.String() + case keyTerminalSessionTimeout: + return cfg.Terminal.SessionTimeout.String() + } + return "" +} + func (a *agent) getTopic(topic string) (t string) { cfg := a.Config() domainID := cfg.DomainID @@ -1574,7 +1604,9 @@ func (a *agent) DeviceManager(ctx context.Context, uuid, cmdStr string) error { return errors.Wrap(errDeviceManagerFailed, errInvalidCommand) } ifaceType := iface.ParseInterfaceType(addReq.IfaceType) - d, aerr := a.devices.Add(ctx, addReq.Name, addReq.ExternalID, addReq.ExternalKey, ifaceType, addReq.IfaceAddr) + addCtx, cancel := context.WithTimeout(ctx, provisionTimeout) + d, aerr := a.devices.Add(addCtx, addReq.Name, addReq.ExternalID, addReq.ExternalKey, ifaceType, addReq.IfaceAddr) + cancel() if aerr != nil { return errors.Wrap(errDeviceManagerFailed, aerr) } diff --git a/service_test.go b/service_test.go index b21aaea6..28d24e21 100644 --- a/service_test.go +++ b/service_test.go @@ -527,12 +527,13 @@ func TestServiceConfig(t *testing.T) { func TestConfigGetSet(t *testing.T) { cases := []struct { - desc string - cmd string - useStore bool - seed map[string]string - wantResp string - err bool + desc string + cmd string + useStore bool + seed map[string]string + wantResp string + err bool + mockTelemetry bool }{ { desc: "get key without store returns not_configured", @@ -541,10 +542,10 @@ func TestConfigGetSet(t *testing.T) { wantResp: "not_configured", }, { - desc: "get missing key returns not_found", + desc: "get unset key returns running config value", cmd: "get,log_level", useStore: true, - wantResp: "not_found", + wantResp: "debug", }, { desc: "set key without store returns not_configured", @@ -590,10 +591,11 @@ func TestConfigGetSet(t *testing.T) { err: true, }, { - desc: "set telemetry_interval stores valid duration", - cmd: "set,telemetry_interval,30s", - useStore: true, - wantResp: "ok", + desc: "set telemetry_interval stores valid duration", + cmd: "set,telemetry_interval,30s", + useStore: true, + wantResp: "ok", + mockTelemetry: true, }, { desc: "get telemetry_interval after set returns previously set value", @@ -766,6 +768,14 @@ func TestConfigGetSet(t *testing.T) { svc, mqttClient, _, setupErr := newService(t, testConfig(), s) assert.Nil(t, setupErr, fmt.Sprintf("%s: unexpected setup error %v", tc.desc, setupErr)) + if tc.mockTelemetry { + telToken := agentmocks.NewMQTTToken(t) + telToken.On("Wait").Maybe().Return(true) + telToken.On("Error").Maybe().Return(error(nil)) + mqttClient.On("Publish", mqttTopic("data-channel", "gateway/telemetry"), + mock.Anything, mock.Anything, mock.Anything).Maybe().Return(telToken) + } + if !tc.err { expectMQTTPublish(t, mqttClient, mqttTopic("ctrl-channel", "res"), byte(1), nil).Run(func(args mock.Arguments) { payload, _ := args.Get(3).(string) @@ -1026,6 +1036,7 @@ func TestTerminal(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { if tc.emptyPath { t.Setenv("PATH", "") + t.Setenv("SHELL", "/no/such/shell") } svc, _, _, err := newService(t, testConfig(), nil) require.NoError(t, err)