Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
use super::scheduler::SchedulerStarter;
use super::v8::V8HeapMetrics;
use super::wasmtime::WasmtimeRuntime;
use super::wasmtime::{WasmMemoryBytesMetric, WasmtimeRuntime};
use super::{Scheduler, UpdateDatabaseResult};
use crate::client::{ClientActorId, ClientName};
use crate::config::{V8Config, WasmConfig};
Expand Down Expand Up @@ -1411,8 +1411,8 @@ where
let _ = DATA_SIZE_METRICS
.data_size_blob_store_bytes_used_by_blobs
.remove_label_values(db);
let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db);

WasmMemoryBytesMetric::remove_all_metric_label_values_for_database(db);
V8HeapMetrics::remove_all_metric_label_values_for_database(db);

let _ = WORKER_METRICS.v8_request_queue_length.remove_label_values(db);
Expand Down
7 changes: 1 addition & 6 deletions crates/core/src/host/module_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
module_host_context::ModuleCreationContext,
replica_context::ReplicaContext,
};
use spacetimedb_lib::{Identity, RawModuleDef};
use spacetimedb_lib::RawModuleDef;
use spacetimedb_schema::{def::ModuleDef, error::ValidationErrors};
use std::sync::Arc;

Expand Down Expand Up @@ -70,11 +70,6 @@ impl ModuleCommon {
self.info.clone()
}

/// Returns the identity of the database.
pub fn database_identity(&self) -> &Identity {
&self.info.database_identity
}

/// Returns the energy monitor.
pub fn energy_monitor(&self) -> Arc<dyn EnergyMonitor> {
self.energy_monitor.clone()
Expand Down
25 changes: 1 addition & 24 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,6 @@ fn env_on_isolate_unwrap(isolate: &mut Isolate) -> &mut JsInstanceEnv {
struct JsInstanceEnv {
instance_env: InstanceEnv,
module_def: Option<Arc<ModuleDef>>,
/// Last used-heap sample captured by the worker's periodic heap checks.
cached_used_heap_size: usize,

/// The slab of `BufferIters` created for this instance.
iters: RowIters,
Expand All @@ -365,7 +363,6 @@ impl JsInstanceEnv {
Self {
instance_env,
module_def: None,
cached_used_heap_size: 0,
call_times: CallTimes::new(),
iters: <_>::default(),
chunk_pool: <_>::default(),
Expand Down Expand Up @@ -420,16 +417,6 @@ impl JsInstanceEnv {
}
}

/// Refresh the cached heap usage after an explicit V8 heap sample.
fn set_cached_used_heap_size(&mut self, bytes: usize) {
self.cached_used_heap_size = bytes;
}

/// Return the last heap sample without forcing a fresh V8 query.
fn cached_used_heap_size(&self) -> usize {
self.cached_used_heap_size
}

fn set_module_def(&mut self, module_def: Arc<ModuleDef>) {
self.module_def = Some(module_def);
}
Expand Down Expand Up @@ -1117,7 +1104,6 @@ fn sample_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics)
// Whenever we sample heap statistics, we cache them on the isolate so that
// the per-call execution stats can avoid querying them on each invocation.
Comment thread
gefjon marked this conversation as resolved.
Outdated
let stats = scope.get_heap_statistics();
env_on_isolate_unwrap(scope).set_cached_used_heap_size(stats.used_heap_size());
metrics.observe(&stats);
stats
}
Expand Down Expand Up @@ -1991,25 +1977,16 @@ where
// Derive energy stats.
let energy = energy_from_elapsed(budget, timings.total_duration);

// Reuse the last periodic heap sample instead of querying V8 on every call.
// We use this statistic for energy tracking, so eventual consistency is fine.
let memory_allocation = env.cached_used_heap_size();

if heap_limit_hit.get() > 1 {
let database_identity = *env.instance_env.database_identity();
tracing::warn!(
%database_identity,
used_heap_size = memory_allocation,
current_heap_limit = scope.get_heap_statistics().heap_size_limit(),
"Module hit heap limit multiple times in single call, even after doubling!",
)
}

let stats = ExecutionStats {
energy,
timings,
memory_allocation,
};
let stats = ExecutionStats { energy, timings };
ExecutionResult { stats, call_result }
})
}
Expand Down
30 changes: 9 additions & 21 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use anyhow::{anyhow, bail, ensure, Context};
use bytes::{Buf, Bytes};
use core::future::Future;
use core::time::Duration;
use prometheus::{Histogram, IntCounter, IntGauge};
use prometheus::{Histogram, IntCounter};
use spacetimedb_auth::identity::ConnectionAuthCtx;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::db_metrics::DB_METRICS;
Expand Down Expand Up @@ -105,6 +105,12 @@ pub struct EnergyStats {
pub remaining: FunctionBudget,
}

impl Default for EnergyStats {
fn default() -> Self {
Self::ZERO
}
}

impl EnergyStats {
pub const ZERO: Self = Self {
budget: FunctionBudget::ZERO,
Expand Down Expand Up @@ -207,6 +213,7 @@ pub(crate) fn run_query_for_view(
Ok(rows)
}

#[derive(Default)]
pub struct ExecutionTimings {
pub total_duration: Duration,
pub wasm_instance_env_call_times: CallTimes,
Expand All @@ -226,10 +233,10 @@ impl ExecutionTimings {
/// The result that `__call_reducer__` produces during normal non-trap execution.
pub type ReducerResult = Result<Option<Bytes>, Box<str>>;

#[derive(Default)]
pub struct ExecutionStats {
pub energy: EnergyStats,
pub timings: ExecutionTimings,
pub memory_allocation: usize,
}

impl ExecutionStats {
Expand Down Expand Up @@ -562,8 +569,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
pub struct InstanceCommon {
info: Arc<ModuleInfo>,
energy_monitor: Arc<dyn EnergyMonitor>,
allocated_memory: usize,
metric_wasm_memory_bytes: IntGauge,
vm_metrics: AllVmMetrics,
}

Expand All @@ -576,11 +581,6 @@ impl InstanceCommon {
info: module.info(),
vm_metrics,
energy_monitor: module.energy_monitor(),
// Will be updated on the first reducer call.
allocated_memory: 0,
metric_wasm_memory_bytes: WORKER_METRICS
.wasm_memory_bytes
.with_label_values(module.database_identity()),
}
}

Expand Down Expand Up @@ -762,19 +762,12 @@ impl InstanceCommon {
let ProcedureExecuteResult {
stats:
ExecutionStats {
memory_allocation,
// TODO(procedure-energy): Do something with timing and energy.
..
},
call_result,
} = result;

// TODO(shub): deduplicate with reducer and view logic.
if self.allocated_memory != memory_allocation {
self.metric_wasm_memory_bytes.set(memory_allocation as i64);
self.allocated_memory = memory_allocation;
}

let trapped = call_result.is_err();

let result = match call_result {
Expand Down Expand Up @@ -1022,14 +1015,9 @@ impl InstanceCommon {
let energy_used = stats.energy.used();
let energy_quanta_used = energy_used.into();
let timings = &stats.timings;
let memory_allocation = stats.memory_allocation;

self.energy_monitor
.record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration);
if self.allocated_memory != memory_allocation {
self.metric_wasm_memory_bytes.set(memory_allocation as i64);
self.allocated_memory = memory_allocation;
}

maybe_log_long_running_function(function_name, timings.total_duration);

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/host/wasmtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::Context;
use spacetimedb_paths::server::ServerDataDir;
use std::borrow::Cow;
use std::time::Duration;
pub(in crate::host) use wasm_instance_env::WasmMemoryBytesMetric;
use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut};
pub use wasmtime_module::{WasmtimeInstance, WasmtimeModule};

Expand Down
58 changes: 57 additions & 1 deletion crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use crate::host::wasm_common::module_host_actor::{
use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet};
use crate::host::AbiCall;
use crate::subscription::module_subscription_manager::TransactionOffset;
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context as _};
use prometheus::IntGauge;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
use spacetimedb_lib::{bsatn, ConnectionId, Timestamp};
use spacetimedb_lib::{bsatn, ConnectionId, Identity, Timestamp};
use spacetimedb_primitives::errno::HOST_CALL_FAILURE;
use spacetimedb_primitives::{errno, ColId};
use spacetimedb_schema::def::ModuleDef;
Expand Down Expand Up @@ -133,6 +135,53 @@ pub(super) struct WasmInstanceEnv {
/// A pool of unused allocated chunks that can be reused.
// TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
chunk_pool: ChunkPool,

linear_memory_size_metric: WasmMemoryBytesMetric,
}

pub(in crate::host) struct WasmMemoryBytesMetric {
wasm_memory_bytes: IntGauge,

/// Previous value observed by this intance.
///
/// In [`Self::observe`], we use this to compute a delta against the instance's new memory usage,
/// then increment/decrement the metric value by that delta.
/// We do this rather than `set`ting the metric value as multiple instances may coexist
/// and share the same metric label value.
/// This happens when a database has procedures and reducers running concurrently,
/// and may also happen during a module update, as there may be a period when
/// the new version has already been instantiated but the old version has not yet shut down.
last_observed: i64,
}

impl WasmMemoryBytesMetric {
fn new(database_identity: Identity) -> Self {
Self {
wasm_memory_bytes: WORKER_METRICS.wasm_memory_bytes.with_label_values(&database_identity),
last_observed: 0,
}
}

fn observe(&mut self, memory_usage: usize) {
let delta = memory_usage as i64 - self.last_observed;

if delta > 0 {
self.wasm_memory_bytes.add(delta);
} else {
self.wasm_memory_bytes.sub(-delta);
}
Comment thread
gefjon marked this conversation as resolved.
Outdated
}

pub(in crate::host) fn remove_all_metric_label_values_for_database(database_identity: &Identity) {
let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(database_identity);
}
}

impl Drop for WasmMemoryBytesMetric {
fn drop(&mut self) {
// Clean up this instance's metric value by subtracting its part of the usage.
self.wasm_memory_bytes.sub(self.last_observed);
}
}

const STANDARD_BYTES_SINK: u32 = 1;
Expand All @@ -145,6 +194,7 @@ type RtResult<T> = anyhow::Result<T>;
impl WasmInstanceEnv {
/// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`.
pub fn new(instance_env: InstanceEnv) -> Self {
let database_identity = *instance_env.database_identity();
Self {
instance_env,
module_def: None,
Expand All @@ -158,6 +208,7 @@ impl WasmInstanceEnv {
timing_spans: Default::default(),
call_times: CallTimes::new(),
chunk_pool: <_>::default(),
linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity),
}
}

Expand Down Expand Up @@ -216,6 +267,11 @@ impl WasmInstanceEnv {
self.call_view_anon = call_view_anon;
}

/// Record an observation in [`Self::linear_memory_size_metric`].
pub fn record_memory_size(&mut self, memory_size: usize) {
self.linear_memory_size_metric.observe(memory_size);
}

/// Returns a reference to the memory, assumed to be initialized.
pub fn get_mem(&self) -> Mem {
self.mem.expect("Initialized memory")
Expand Down
19 changes: 7 additions & 12 deletions crates/core/src/host/wasmtime/wasmtime_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {

let Some(call_procedure) = self.call_procedure.as_ref() else {
let res = module_host_actor::ProcedureExecuteResult {
stats: zero_execution_stats(store),
stats: ExecutionStats::default(),
call_result: Err(anyhow::anyhow!(
"Module defines procedure {} but does not export `{}`",
op.name,
Expand Down Expand Up @@ -696,20 +696,15 @@ fn finish_opcall(store: &mut Store<WasmInstanceEnv>, initial_budget: FunctionBud
remaining,
};

let stats = ExecutionStats {
energy,
timings,
memory_allocation: get_memory_size(store),
};
record_memory_size(store);

let stats = ExecutionStats { energy, timings };
(stats, ret_bytes)
}

fn zero_execution_stats(store: &Store<WasmInstanceEnv>) -> ExecutionStats {
ExecutionStats {
energy: module_host_actor::EnergyStats::ZERO,
timings: module_host_actor::ExecutionTimings::zero(),
memory_allocation: get_memory_size(store),
}
fn record_memory_size(store: &mut Store<WasmInstanceEnv>) {
let memory_usage = get_memory_size(store);
store.data_mut().record_memory_size(memory_usage);
}

fn get_memory_size(store: &Store<WasmInstanceEnv>) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ metrics_group!(
pub sender_errors: IntCounterVec,

#[name = spacetime_worker_wasm_memory_bytes]
#[help = "The number of bytes of linear memory allocated by the database's WASM module instance"]
#[help = "The total number of bytes of linear memory allocated by all of the database's WASM module instances"]
#[labels(database_identity: Identity)]
pub wasm_memory_bytes: IntGaugeVec,

Expand Down
Loading