Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions rs/messaging/src/routing/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,24 @@ impl StreamBuilderImpl {
}
RequestOrResponse::Response(rep) => {
// A Response: discard it.
error!(
self.log,
"{}: Discarding response, destination not found: {:?}",
CRITICAL_ERROR_RESPONSE_DESTINATION_NOT_FOUND,
rep
);
self.metrics
.critical_error_response_destination_not_found
.inc();
if rep.is_best_effort() {
// Expected when the destination subnet has been deleted.
warn!(
self.log,
"Discarding best-effort response, destination not found: {:?}",
rep
);
Comment thread
mraszyk marked this conversation as resolved.
Outdated
} else {
error!(
self.log,
"{}: Discarding response, destination not found: {:?}",
CRITICAL_ERROR_RESPONSE_DESTINATION_NOT_FOUND,
rep
);
self.metrics
.critical_error_response_destination_not_found
.inc();
}
}
Comment thread
mraszyk marked this conversation as resolved.
}
}
Expand Down
3 changes: 3 additions & 0 deletions rs/messaging/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ impl StateMachine for StateMachineImpl {

self.observe_phase_duration(PHASE_INDUCTION, &since);

// Discard streams to subnets no longer present in the network topology.
state_with_messages.discard_streams_for_deleted_subnets();

let execution_round_type = if requires_full_state_hash {
ExecutionRoundType::CheckpointRound
} else {
Expand Down
241 changes: 235 additions & 6 deletions rs/messaging/src/state_machine/tests.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
use super::*;
use crate::message_routing::CRITICAL_ERROR_NON_INCREASING_BATCH_TIME;
use crate::message_routing::{CRITICAL_ERROR_NON_INCREASING_BATCH_TIME, LatencyMetrics};
use crate::routing::demux::MockDemux;
use crate::routing::stream_builder::MockStreamBuilder;
use crate::routing::stream_builder::{MockStreamBuilder, StreamBuilderImpl};
use crate::state_machine::StateMachineImpl;
use ic_config::message_routing::{MAX_STREAM_MESSAGES, TARGET_STREAM_SIZE_BYTES};
use ic_interfaces::execution_environment::Scheduler;
use ic_limits::SYSTEM_SUBNET_STREAM_MSG_LIMIT;
use ic_metrics::MetricsRegistry;
use ic_registry_routing_table::{CANISTER_IDS_PER_SUBNET, CanisterIdRange, RoutingTable};
use ic_registry_subnet_features::SubnetFeatures;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::{
ReplicatedState, SubnetTopology, metadata_state::testing::NetworkTopologyTesting,
InputQueueType, ReplicatedState, SubnetTopology,
metadata_state::testing::NetworkTopologyTesting, testing::ReplicatedStateTesting,
};
use ic_test_utilities_execution_environment::test_registry_settings;
use ic_test_utilities_logger::with_test_replica_logger;
use ic_test_utilities_metrics::{fetch_int_counter_vec, nonzero_values};
use ic_test_utilities_state::new_canister_state;
use ic_test_utilities_state::{new_canister_state, register_callback};
use ic_test_utilities_types::batch::BatchBuilder;
use ic_test_utilities_types::ids::{SUBNET_0, SUBNET_1, SUBNET_2};
use ic_test_utilities_types::messages::SignedIngressBuilder;
use ic_test_utilities_types::messages::{RequestBuilder, SignedIngressBuilder};
use ic_types::batch::{BatchMessages, BlockmakerMetrics, ChainKeyData};
use ic_types::messages::SignedIngress;
use ic_types::messages::{CallbackId, CanisterMessage, Payload, Response, SignedIngress};
use ic_types::time::{CoarseTime, UNIX_EPOCH};
use ic_types::{
CanisterId, Height, PrincipalId, Randomness, RegistryVersion, ReplicaVersion, Time,
};
use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles};
use maplit::btreemap;
use mockall::{Sequence, mock, predicate::*};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Mutex};
use std::time::Duration;

mock! {
Expand Down Expand Up @@ -249,6 +254,230 @@ fn test_delivered_batch_interface() {
}
}

#[test]
fn state_machine_handles_messages_to_deleted_subnet() {
let provided_batch = BatchBuilder::new()
.batch_number(Height::new(1))
.time(Time::from_nanos_since_unix_epoch(1))
.build();

let mut demux = Box::new(MockDemux::new());
demux
.expect_process_payload()
.times(1)
.returning(|state, _, _| state);

let mut scheduler = Box::new(MockScheduler::new());
scheduler
.expect_execute_round()
.times(1)
.returning(|state, _, _, _, _, _, _, _| state);

// Initial state with a stream to SUBNET_2, which is not in the network topology.
let mut initial_state = ReplicatedState::new(SUBNET_1, SubnetType::Application);
initial_state.modify_streams(|streams| {
streams.insert(SUBNET_2, Default::default());
});
assert!(initial_state.get_stream(&SUBNET_2).is_some());

// Add a canister with a bounded-wait output request, a best-effort output response, and a
// bounded-wait subnet message (callee = the deleted subnet's ID), all destined for the
// deleted subnet. Also add a bounded-wait subnet output response (local subnet →
// remote canister) to the subnet queues.
let local_canister_id = CANISTER_RANGE_A.start;
// Use a canister ID outside the routing table range so it has no route,
// causing the stream builder to generate a reject for the output request.
let remote_canister_id = CANISTER_RANGE_B.start;
let deadline = CoarseTime::from_secs_since_unix_epoch(u32::MAX);
let mut canister_state = new_canister_state(
local_canister_id,
PrincipalId::new_anonymous(),
Cycles::new(1_000_000_000_000),
3600.into(),
);

// Output request: local → remote (on the deleted subnet).
// Requests with no route get a reject response — no critical error.
let callback_id = register_callback(&mut canister_state, remote_canister_id, deadline);
canister_state
.push_output_request(
Arc::new(
RequestBuilder::new()
.sender(local_canister_id)
.receiver(remote_canister_id)
.sender_reply_callback(callback_id)
.deadline(deadline)
.build(),
),
UNIX_EPOCH,
)
.unwrap();

// Output response: local → remote (best-effort to avoid critical error).
// Best-effort responses with no route are dropped without a critical error.
// First push then pop a matching input request to create the output-queue reservation.
let mut subnet_available_memory = i64::MAX / 2;
canister_state
.push_input(
RequestBuilder::new()
.sender(remote_canister_id)
.receiver(local_canister_id)
.build()
.into(),
&mut subnet_available_memory,
SubnetType::Application,
InputQueueType::RemoteSubnet,
)
.unwrap();
canister_state.pop_input().unwrap();
canister_state.push_output_response(Arc::new(Response {
originator: remote_canister_id,
respondent: local_canister_id,
originator_reply_callback: CallbackId::from(0),
refund: Cycles::zero(),
response_payload: Payload::Data(vec![]),
deadline,
}));

// Output subnet message: local → SUBNET_2 (callee = the deleted subnet's ID).
// Subnet messages with no route get a reject response — no critical error.
let subnet_as_canister_id = CanisterId::from(SUBNET_2);
let subnet_callback_id =
register_callback(&mut canister_state, subnet_as_canister_id, deadline);
canister_state
.push_output_request(
Arc::new(
RequestBuilder::new()
.sender(local_canister_id)
.receiver(subnet_as_canister_id)
.sender_reply_callback(subnet_callback_id)
.deadline(deadline)
.build(),
),
UNIX_EPOCH,
)
.unwrap();

initial_state.put_canister_state(canister_state);

// Subnet output response: local subnet → remote (bounded-wait).
// Bounded-wait responses with no route are dropped without a critical error.
// First push then pop a matching input request to create the output-queue reservation.
initial_state
.push_input(
RequestBuilder::new()
.sender(remote_canister_id)
.receiver(CanisterId::from(SUBNET_1))
.deadline(deadline)
.build()
.into(),
&mut subnet_available_memory,
)
.unwrap();
initial_state.pop_subnet_input().unwrap();
initial_state.push_subnet_output_response(Arc::new(Response {
originator: remote_canister_id,
respondent: CanisterId::from(SUBNET_1),
originator_reply_callback: CallbackId::from(0),
refund: Cycles::zero(),
response_payload: Payload::Data(vec![]),
deadline,
}));

// Network topology with only SUBNET_0 (NNS) and SUBNET_1 (local); SUBNET_2 is absent.
let mut subnets = BTreeMap::new();
subnets.insert(
SUBNET_0,
SubnetTopology {
public_key: vec![0, 1, 2, 3],
nodes: BTreeSet::new(),
subnet_type: SubnetType::System,
subnet_features: SubnetFeatures::default(),
chain_keys_held: BTreeSet::new(),
cost_schedule: CanisterCyclesCostSchedule::Normal,
subnet_admins: BTreeSet::new(),
},
);
subnets.insert(SUBNET_1, SubnetTopology::default());
let mut network_topology = NetworkTopology::default();
network_topology.nns_subnet_id = SUBNET_0;
network_topology.set_subnets(subnets);
network_topology.set_routing_table(
RoutingTable::try_from(btreemap! {
CANISTER_RANGE_NNS => SUBNET_0,
CANISTER_RANGE_A => SUBNET_1,
})
.unwrap(),
);

with_test_replica_logger(|log| {
let metrics_registry = MetricsRegistry::new();
let message_routing_metrics = MessageRoutingMetrics::new(&metrics_registry);
let stream_builder = Box::new(StreamBuilderImpl::new(
SUBNET_1,
MAX_STREAM_MESSAGES,
TARGET_STREAM_SIZE_BYTES,
SYSTEM_SUBNET_STREAM_MSG_LIMIT,
&metrics_registry,
&message_routing_metrics,
Arc::new(Mutex::new(LatencyMetrics::new_time_in_stream(
&metrics_registry,
))),
log.clone(),
));

let state_machine = Box::new(StateMachineImpl::new(
scheduler,
demux,
stream_builder,
Default::default(),
log,
message_routing_metrics,
));

let mut state = state_machine.execute_round(
initial_state,
network_topology,
provided_batch,
Default::default(),
Default::default(),
&test_registry_settings(),
Default::default(),
Default::default(),
);

// Stream to the deleted subnet is gone.
assert!(state.get_stream(&SUBNET_2).is_none());
// Output queues are empty: requests were consumed (reject responses generated),
// best-effort and bounded-wait responses were dropped.
assert!(
!state
.canister_state(&local_canister_id)
.unwrap()
.has_output()
);
assert!(!state.subnet_queues().has_output());
// Reject responses for both unroutable output requests (canister request and subnet
// message) are in the local canister's input queue.
let canister = Arc::make_mut(state.canister_state_mut_arc(&local_canister_id).unwrap());
let msg1 = canister.pop_input().unwrap();
let msg2 = canister.pop_input().unwrap();
assert!(canister.pop_input().is_none());
for msg in [msg1, msg2] {
assert!(matches!(
msg,
CanisterMessage::Response { response, .. }
if response.originator == local_canister_id
&& matches!(response.response_payload, Payload::Reject(_))
));
}
// No critical error was raised.
assert!(
nonzero_values(fetch_int_counter_vec(&metrics_registry, "critical_errors")).is_empty()
);
});
}

const NNS_SUBNET_ID: SubnetId = SUBNET_0;
const SUBNET_A: SubnetId = SUBNET_1;
const SUBNET_B: SubnetId = SUBNET_2;
Expand Down
19 changes: 19 additions & 0 deletions rs/replicated_state/src/replicated_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,25 @@ impl ReplicatedState {
self.metadata.streams.get(destination_subnet_id)
}

/// Discards streams to subnets no longer present in the network topology.
///
/// Called after the induction phase of each round, once the new
/// `NetworkTopology` (reflecting any registry deletions) has been applied.
/// Safe to call because by the time the registry deletion takes effect, all
/// certified stream slices from the deleted subnet have already been inducted
/// in the same round, and no new certified slices can be produced once the
/// subnet's key is removed from the registry.
pub fn discard_streams_for_deleted_subnets(&mut self) {
let mut streams = self.take_streams();
streams.retain(|subnet_id, _| {
self.metadata
.network_topology
.subnets()
.contains_key(subnet_id)
});
self.put_streams(streams);
Comment thread
mraszyk marked this conversation as resolved.
Comment thread
mraszyk marked this conversation as resolved.
}

/// Returns the sum of reserved compute allocations of all currently
/// available canisters.
pub fn total_compute_allocation(&self) -> u64 {
Expand Down
Loading