Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rs/execution_environment/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,11 @@ rust_ic_bench(
# Keep sorted.
":execution_environment",
"//rs/config",
"//rs/monitoring/logger",
"//rs/monitoring/metrics",
"//rs/registry/subnet_type",
"//rs/replicated_state",
"//rs/test_utilities/types",
"//rs/types/base_types",
"//rs/types/cycles",
"//rs/types/types",
Expand Down
5 changes: 4 additions & 1 deletion rs/execution_environment/benches/100k_canisters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ const NUM_CANISTERS_PER_CREATOR_CANISTER: usize = 10_000;

lazy_static::lazy_static! {
static ref STATE_MACHINE: Arc<Mutex<StateMachine>> = {
let env = StateMachine::new();
let mut env = StateMachine::new();
// Don't wait for the Replicated State metrics thread every round.
env.flush_replicated_state_metrics = false;

let features = [];
let wasm =
canister_test::Project::cargo_bin_maybe_from_env("canister_creator_canister", &features);
Expand Down
44 changes: 30 additions & 14 deletions rs/execution_environment/benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ use criterion::Criterion;
use ic_base_types::NumSeconds;
use ic_config::flag_status::FlagStatus;
use ic_execution_environment::{RoundSchedule, SchedulerMetrics};
use ic_logger::new_replica_logger_from_config;
use ic_metrics::MetricsRegistry;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::canister_state::canister_snapshots::CanisterSnapshots;
use ic_replicated_state::{CanisterState, ReplicatedState, SchedulerState, SystemState};
use ic_replicated_state::canister_state::system_state::PausedExecutionId;
use ic_replicated_state::{
CanisterState, ExecutionTask, InputQueueType, ReplicatedState, SchedulerState, SystemState,
};
use ic_test_utilities_types::messages::RequestBuilder;
use ic_types::messages::{CanisterMessageOrTask, CanisterTask};
use ic_types::{ExecutionRound, NumBytes, NumInstructions};
use ic_types_cycles::Cycles;
use ic_types_test_utils::ids::{canister_test_id, subnet_test_id, user_test_id};
Expand All @@ -15,8 +21,6 @@ use std::sync::Arc;
fn main() {
// 100k canisters, 5k active, 1k executed every round.
let mut canisters = BTreeMap::new();
let mut ordered_new_execution_canister_ids = Vec::new();
let mut ordered_long_execution_canister_ids = Vec::new();
let mut executed_canisters = BTreeSet::new();
for i in 0..100_000 {
let canister_id = canister_test_id(i);
Expand All @@ -28,15 +32,29 @@ fn main() {
NumSeconds::from(100_000),
);
let canister_snapshots = CanisterSnapshots::default();
let canister_state =
let mut canister_state =
CanisterState::new(system_state, None, scheduler_state, canister_snapshots);
// 5k active canisters.
if i < 5_000 {
// Every 10th canister has a long execution, the rest have new inputs.
if i % 10 == 0 {
ordered_long_execution_canister_ids.push(canister_id);
canister_state
.system_state
.task_queue
.enqueue(ExecutionTask::PausedExecution {
id: PausedExecutionId(0),
input: CanisterMessageOrTask::Task(CanisterTask::Heartbeat),
});
} else {
ordered_new_execution_canister_ids.push(canister_id);
let mut available_memory = i64::MAX;
canister_state
.push_input(
RequestBuilder::new().receiver(canister_id).build().into(),
&mut available_memory,
SubnetType::Application,
InputQueueType::RemoteSubnet,
)
.unwrap();
}
}
// First 1k canisters complete an execution every round.
Expand All @@ -53,44 +71,42 @@ fn main() {
let rate_limiting_of_heap_delta = FlagStatus::Enabled;
let install_code_rate_limit = NumInstructions::from(1_000_000);
let rate_limiting_of_instructions = FlagStatus::Enabled;
let long_execution_cores = 1;
let mut round_schedule = RoundSchedule::new(
scheduler_cores,
heap_delta_rate_limit,
rate_limiting_of_heap_delta,
install_code_rate_limit,
rate_limiting_of_instructions,
long_execution_cores,
ordered_new_execution_canister_ids,
ordered_long_execution_canister_ids,
);
let metrics_registry = MetricsRegistry::new();
let metrics = SchedulerMetrics::new(&metrics_registry);
let (log, _async_guard) = new_replica_logger_from_config(&Default::default());

let mut criterion = Criterion::default();
let mut group = criterion.benchmark_group("RoundSchedule");
let current_round = ExecutionRound::from(13);

group.bench_function("iteration", |bench| {
bench.iter(|| {
round_schedule.start_iteration(&mut state, true);
round_schedule.start_iteration(&mut state, true, &metrics, &log);
round_schedule.end_iteration(
&mut state,
&executed_canisters,
&executed_canisters,
&BTreeSet::new(),
ExecutionRound::from(1),
current_round,
);
});
});

// Populate the subnet schedule, even if the iteration benchmark is not run.
round_schedule.start_iteration(&mut state, true);
round_schedule.start_iteration(&mut state, true, &metrics, &log);
round_schedule.end_iteration(
&mut state,
&executed_canisters,
&executed_canisters,
&BTreeSet::new(),
ExecutionRound::from(1),
current_round,
);

group.bench_function("finish_round", |bench| {
Expand Down
5 changes: 3 additions & 2 deletions rs/execution_environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub use metrics::IngressFilterMetrics;
pub use query_handler::{DataCertificateWithDelegationMetadata, InternalHttpQueryHandler};
use query_handler::{HttpQueryHandler, QueryScheduler};
use scheduler::SchedulerImpl;
pub use scheduler::{RoundSchedule, SchedulerMetrics, abort_all_paused_executions};
pub use scheduler::{
IterationSchedule, RoundSchedule, SchedulerMetrics, abort_all_paused_executions,
};
use std::{path::Path, sync::Arc};
use tokio::sync::mpsc::Sender;

Expand Down Expand Up @@ -169,7 +171,6 @@ impl ExecutionServices {
let scheduler = Box::new(SchedulerImpl::new(
subnet_config.scheduler_config,
config.embedders_config,
own_subnet_id,
Arc::clone(&ingress_history_writer) as Arc<_>,
Arc::clone(&execution_environment) as Arc<_>,
Arc::clone(&cycles_account_manager),
Expand Down
62 changes: 25 additions & 37 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub use self::round_schedule::RoundSchedule;
use self::round_schedule::*;
pub use self::round_schedule::{IterationSchedule, RoundSchedule};
pub use self::scheduler_metrics::SchedulerMetrics;
use self::scheduler_metrics::*;
use self::threshold_signatures::*;
Expand Down Expand Up @@ -33,7 +33,6 @@ use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::SubnetSchedule;
use ic_replicated_state::canister_state::NextExecution;
use ic_replicated_state::canister_state::execution_state::NextScheduledMethod;
use ic_replicated_state::metrics::ReplicatedStateMetrics;
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use ic_replicated_state::{
CanisterState, ExecutionTask, InputQueueType, NetworkTopology, ReplicatedState,
Expand All @@ -43,7 +42,7 @@ use ic_types::ingress::{IngressState, IngressStatus};
use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage};
use ic_types::{
CanisterId, ComputeAllocation, ExecutionRound, MemoryAllocation, NumBytes, NumInstructions,
NumMessages, NumSlices, Randomness, ReplicaVersion, SubnetId, Time,
NumMessages, NumSlices, Randomness, ReplicaVersion, Time,
};
use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles};
use more_asserts::{debug_assert_ge, debug_assert_le, debug_assert_lt};
Expand Down Expand Up @@ -141,12 +140,10 @@ impl SchedulerRoundLimits {
pub(crate) struct SchedulerImpl {
config: SchedulerConfig,
hypervisor_config: HypervisorConfig,
own_subnet_id: SubnetId,
ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState>>,
exec_env: Arc<ExecutionEnvironment>,
cycles_account_manager: Arc<CyclesAccountManager>,
metrics: Arc<SchedulerMetrics>,
state_metrics: ReplicatedStateMetrics,
log: ReplicaLogger,
thread_pool: RefCell<scoped_threadpool::Pool>,
rate_limiting_of_heap_delta: FlagStatus,
Expand All @@ -159,7 +156,6 @@ impl SchedulerImpl {
pub(crate) fn new(
config: SchedulerConfig,
hypervisor_config: HypervisorConfig,
own_subnet_id: SubnetId,
ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState>>,
exec_env: Arc<ExecutionEnvironment>,
cycles_account_manager: Arc<CyclesAccountManager>,
Expand All @@ -174,12 +170,10 @@ impl SchedulerImpl {
config,
hypervisor_config,
thread_pool: RefCell::new(scoped_threadpool::Pool::new(scheduler_cores)),
own_subnet_id,
ingress_history_writer,
exec_env,
cycles_account_manager,
metrics: Arc::new(SchedulerMetrics::new(metrics_registry)),
state_metrics: ReplicatedStateMetrics::new(metrics_registry),
log,
rate_limiting_of_heap_delta,
rate_limiting_of_instructions,
Expand Down Expand Up @@ -422,6 +416,7 @@ impl SchedulerImpl {
canister_ingress_latencies: &mut CanisterIngressQueueLatencies,
scheduler_round_limits: &mut SchedulerRoundLimits,
root_measurement_scope: &MeasurementScope<'a>,
round_log: &ReplicaLogger,
) -> ReplicatedState {
let cost_schedule = state.get_own_cost_schedule();
let measurement_scope =
Expand Down Expand Up @@ -486,10 +481,12 @@ impl SchedulerImpl {

// Scheduling.
let scheduling_timer = self.metrics.round_inner_iteration_scheduling.start_timer();
round_schedule.charge_idle_canisters(state.canisters_and_schedule_mut().0);

// Obtain the active canisters for this iteration.
let iteration_schedule = round_schedule.start_iteration(&mut state, is_first_iteration);
let iteration_schedule = round_schedule.start_iteration(
&mut state,
is_first_iteration,
&self.metrics,
round_log,
);
if iteration_schedule.is_empty() {
break state;
}
Expand Down Expand Up @@ -593,6 +590,7 @@ impl SchedulerImpl {
.metrics
.round_inner_heartbeat_overhead_duration
.start_timer();

// Remove all remaining `Heartbeat` and `GlobalTimer` tasks
// because they will be added again in the next round.
for canister_id in &heartbeat_and_timer_canisters {
Expand Down Expand Up @@ -723,7 +721,7 @@ impl SchedulerImpl {
max_instructions_executed_per_thread =
max_instructions_executed_per_thread.max(instructions_executed);

let divisor = round_limits_per_thread.instructions.get();
let divisor = self.config.max_instructions_per_slice.get();
debug_assert_ne!(divisor, 0, "prevent divide by zero panic");
if divisor > 0 {
let value = instructions_executed.get() as f64 / divisor as f64;
Expand Down Expand Up @@ -1065,6 +1063,9 @@ impl SchedulerImpl {
///
/// NOTE: This is also called by `checkpoint_round_with_no_execution()`, so it
/// must be safe to call even when no execution has taken place.
//
// TODO(DSM-103): Consider only aborting / checking DTS invariants for actually
// scheduled canisters.
fn finish_round(
&self,
state: &mut ReplicatedState,
Expand All @@ -1089,17 +1090,12 @@ impl SchedulerImpl {
}
}

self.state_metrics.observe(
self.own_subnet_id,
state,
current_round.get().into(),
logger,
);

self.check_invariants(state, current_round_type, current_round, logger);
}

/// Checks the DTS and subnet memory usage invariants at the end of the round.
//
// TODO(DSM-103): Move into ReplicatedStateMetrics.
fn check_invariants(
&self,
state: &ReplicatedState,
Expand Down Expand Up @@ -1433,25 +1429,16 @@ impl Scheduler for SchedulerImpl {
scheduler_round_limits.update_subnet_round_limits(&subnet_round_limits);
}

// Scheduling.
let mut round_schedule = {
let _timer = self.metrics.round_scheduling_duration.start_timer();

RoundSchedule::apply_scheduling_strategy(
&mut state,
self.config.scheduler_cores,
self.config.heap_delta_rate_limit,
self.rate_limiting_of_heap_delta,
self.config.install_code_rate_limit,
self.rate_limiting_of_instructions,
current_round,
self.config.accumulated_priority_reset_interval,
&self.metrics,
&round_log,
)
};
// TODO(DSM-103): Consider routing messages from subnet output queues to local canisters.

// Inner round.
let mut round_schedule = RoundSchedule::new(
self.config.scheduler_cores,
self.config.heap_delta_rate_limit,
self.rate_limiting_of_heap_delta,
self.config.install_code_rate_limit,
self.rate_limiting_of_instructions,
);
let mut state = self.inner_round(
state,
current_round,
Expand All @@ -1463,6 +1450,7 @@ impl Scheduler for SchedulerImpl {
&mut canister_ingress_latencies,
&mut scheduler_round_limits,
&root_measurement_scope,
&round_log,
);

// Update [`SignWithThresholdContext`]s by assigning randomness and matching pre-signatures.
Expand Down
Loading
Loading