Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/polkadot-omni-node/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ sc-network-statement = { workspace = true, default-features = true }
sc-network-sync = { workspace = true, default-features = true }
sc-offchain = { workspace = true, default-features = true }
sc-rpc = { workspace = true, default-features = true }
sc-rpc-spec-v2 = { workspace = true, default-features = true }
sc-runtime-utilities = { workspace = true, default-features = true }
sc-service = { workspace = true, default-features = false }
sc-statement-store = { workspace = true, default-features = true }
Expand Down
6 changes: 5 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/common/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sc_rpc::{
dev::{Dev, DevApiServer},
statement::{StatementApiServer, StatementStore},
};
use sc_rpc_spec_v2::statement::{StatementSpec, StatementSpecApiServer};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc};
use substrate_frame_rpc_system::{System, SystemApiServer};
Expand Down Expand Up @@ -76,7 +77,10 @@ where
module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
module.merge(StateMigration::new(client.clone(), backend).into_rpc())?;
if let Some(statement_store) = statement_store {
module.merge(StatementStore::new(statement_store, spawn_handle).into_rpc())?;
module.merge(
StatementStore::new(statement_store.clone(), spawn_handle.clone()).into_rpc(),
)?;
module.merge(StatementSpec::new(statement_store, spawn_handle).into_rpc())?;
}
module.merge(Dev::new(client).into_rpc())?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::anyhow;
use codec::Encode;
use log::info;
use sc_statement_store::test_utils::get_keypair;
use serde::Deserialize;
use sp_core::{hexdisplay::HexDisplay, Bytes, Pair};
use sp_statement_store::{
statement_allowance_key, StatementAllowance, StatementEvent, SubmitResult, Topic, TopicFilter,
Expand All @@ -28,6 +29,38 @@ use zombienet_sdk::{

pub(super) const RPC_POOL_SIZE: usize = 10000;

#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
Comment thread
DenzelPenzel marked this conversation as resolved.
Outdated
#[serde(tag = "event", rename_all = "camelCase")]
pub(super) enum UnstableStatementEvent {
ReplayStatements {
#[serde(rename = "filterId")]
filter_id: String,
statements: Vec<Bytes>,
},
ReplayDone {
#[serde(rename = "filterId")]
filter_id: String,
},
NewStatements {
statements: Vec<UnstableNewStatement>,
},
Stop,
}

#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub(super) struct UnstableNewStatement {
pub statement: Bytes,
#[serde(rename = "filterIds")]
pub filter_ids: Vec<String>,
}

#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub(super) enum UnstableAddFilterResponse {
Ok(String),
LimitReached { result: String },
}

pub(super) async fn submit_statement(
rpc: &RpcClient,
statement: &sp_statement_store::Statement,
Expand All @@ -37,6 +70,59 @@ pub(super) async fn submit_statement(
Ok(result)
}

pub(super) async fn submit_statement_unstable(
rpc: &RpcClient,
statement: &sp_statement_store::Statement,
) -> Result<SubmitResult, anyhow::Error> {
let encoded: Bytes = statement.encode().into();
let result: SubmitResult =
rpc.request("statement_unstable_submit", rpc_params![encoded]).await?;
Ok(result)
}

pub(super) async fn subscribe_unstable(
rpc: &RpcClient,
) -> Result<RpcSubscription<UnstableStatementEvent>, anyhow::Error> {
let subscription = rpc
.subscribe::<UnstableStatementEvent>(
"statement_unstable_subscribe",
rpc_params![],
"statement_unstable_unsubscribe",
)
.await?;
Ok(subscription)
}

pub(super) fn unstable_subscription_id(
subscription: &RpcSubscription<UnstableStatementEvent>,
) -> Result<String, anyhow::Error> {
subscription
.subscription_id()
.map(ToOwned::to_owned)
.ok_or_else(|| anyhow!("Subscription was accepted without an id"))
}

pub(super) async fn add_filter_unstable(
rpc: &RpcClient,
subscription_id: &str,
filter: TopicFilter,
) -> Result<UnstableAddFilterResponse, anyhow::Error> {
let response = rpc
.request("statement_unstable_add_filter", rpc_params![subscription_id, filter])
.await?;
Ok(response)
}

pub(super) async fn remove_filter_unstable(
rpc: &RpcClient,
subscription_id: &str,
filter_id: &str,
) -> Result<(), anyhow::Error> {
rpc.request::<()>("statement_unstable_remove_filter", rpc_params![subscription_id, filter_id])
.await?;
Ok(())
}

pub(super) async fn expect_one_statement(
subscription: &mut RpcSubscription<StatementEvent>,
timeout_secs: u64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
// SPDX-License-Identifier: Apache-2.0

use super::common::{
assert_no_more_statements, assert_statements_match, base_dir, collator_default_args,
create_chain_spec_with_allowances, expect_one_statement, expect_statements_unordered,
spawn_network_sudo, spawn_network_with_injected_allowances, submit_statement, subscribe_topic,
subscribe_topic_filter,
add_filter_unstable, assert_no_more_statements, assert_statements_match, base_dir,
collator_default_args, create_chain_spec_with_allowances, expect_one_statement,
expect_statements_unordered, remove_filter_unstable, spawn_network_sudo,
spawn_network_with_injected_allowances, submit_statement, submit_statement_unstable,
subscribe_topic, subscribe_topic_filter, subscribe_unstable, unstable_subscription_id,
UnstableAddFilterResponse, UnstableStatementEvent,
};
use codec::Encode;
use futures::future::join_all;
use log::{debug, info};
use sc_network_statement::config::STATEMENTS_BURST_COEFFICIENT;
use sc_statement_store::test_utils::{create_allowance_items, create_test_statement, get_keypair};
use sp_core::Bytes;
use sp_runtime::BoundedVec;
use sp_statement_store::{
RejectionReason, Statement, StatementAllowance, SubmitResult, Topic, TopicFilter,
};
Expand All @@ -24,6 +27,65 @@ use std::{
};
use zombienet_sdk::{LocalFileSystem, Network, NetworkConfigBuilder};

async fn expect_unstable_event(
subscription: &mut zombienet_sdk::subxt::ext::subxt_rpcs::client::RpcSubscription<
UnstableStatementEvent,
>,
timeout_secs: u64,
) -> Result<UnstableStatementEvent, anyhow::Error> {
tokio::time::timeout(Duration::from_secs(timeout_secs), subscription.next())
.await
.map_err(|_| anyhow::anyhow!("Timeout waiting for unstable statement event"))?
.ok_or_else(|| anyhow::anyhow!("Unstable statement subscription ended"))?
.map_err(|e| anyhow::anyhow!("Unstable statement subscription error: {}", e))
}

async fn collect_unstable_replay(
subscription: &mut zombienet_sdk::subxt::ext::subxt_rpcs::client::RpcSubscription<
UnstableStatementEvent,
>,
filter_id: &str,
) -> Result<Vec<Bytes>, anyhow::Error> {
let mut statements = Vec::new();
loop {
match expect_unstable_event(subscription, 20).await? {
UnstableStatementEvent::ReplayStatements { filter_id: id, statements: chunk }
if id == filter_id =>
{
statements.extend(chunk)
},
UnstableStatementEvent::ReplayDone { filter_id: id } if id == filter_id => {
return Ok(statements)
},
event => anyhow::bail!("Unexpected unstable event before replayDone: {:?}", event),
}
}
}

async fn assert_no_unstable_event(
subscription: &mut zombienet_sdk::subxt::ext::subxt_rpcs::client::RpcSubscription<
UnstableStatementEvent,
>,
timeout_secs: u64,
) -> Result<(), anyhow::Error> {
let result = tokio::time::timeout(Duration::from_secs(timeout_secs), subscription.next()).await;
assert!(result.is_err(), "Expected no unstable statement event but received one");
Ok(())
}

fn match_all_filter(topic: Topic) -> TopicFilter {
TopicFilter::MatchAll(BoundedVec::truncate_from(vec![topic]))
}

fn filter_id(response: UnstableAddFilterResponse) -> String {
match response {
UnstableAddFilterResponse::Ok(id) => id,
UnstableAddFilterResponse::LimitReached { result } => {
panic!("Expected filter id, got {result}")
},
}
}

/// Verifies basic statement propagation and data integrity across two nodes
///
/// Tests uses the genesis-injection approach for setting allowances
Expand Down Expand Up @@ -57,6 +119,95 @@ async fn statement_store_basic_propagation() -> Result<(), anyhow::Error> {
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn statement_store_unstable_rpc_multi_filter_flow() -> Result<(), anyhow::Error> {
let _ = env_logger::try_init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);

let network = spawn_network_with_injected_allowances(&["alice"], 6).await?;
let alice = network.get_node("alice")?;
let alice_rpc = alice.rpc().await?;

let topic_a: Topic = [0xA1; 32].into();
let topic_b: Topic = [0xB2; 32].into();
let topic_c: Topic = [0xC3; 32].into();

let pre_existing =
create_test_statement(&get_keypair(0), &[topic_a], None, vec![1], u32::MAX, 0);
assert_eq!(submit_statement_unstable(&alice_rpc, &pre_existing).await?, SubmitResult::New);
assert_eq!(submit_statement_unstable(&alice_rpc, &pre_existing).await?, SubmitResult::Known);

let mut subscription = subscribe_unstable(&alice_rpc).await?;
let subscription_id = unstable_subscription_id(&subscription)?;

let filter_a = filter_id(
add_filter_unstable(&alice_rpc, &subscription_id, match_all_filter(topic_a)).await?,
);
let replayed = collect_unstable_replay(&mut subscription, &filter_a).await?;
assert_eq!(replayed, vec![Bytes::from(pre_existing.encode())]);

let filter_b = filter_id(
add_filter_unstable(&alice_rpc, &subscription_id, match_all_filter(topic_b)).await?,
);
let replayed = collect_unstable_replay(&mut subscription, &filter_b).await?;
assert!(replayed.is_empty(), "Filter B should not replay topic A statements");

let live_ab =
create_test_statement(&get_keypair(1), &[topic_a, topic_b], None, vec![2], u32::MAX, 0);

assert_eq!(submit_statement_unstable(&alice_rpc, &live_ab).await?, SubmitResult::New);

match expect_unstable_event(&mut subscription, 20).await? {
UnstableStatementEvent::NewStatements { statements } => {
assert_eq!(statements.len(), 1);
assert_eq!(statements[0].statement, Bytes::from(live_ab.encode()));
let filter_ids: HashSet<_> = statements[0].filter_ids.iter().cloned().collect();
assert_eq!(filter_ids, HashSet::from([filter_a.clone(), filter_b.clone()]));
},
event => anyhow::bail!("Expected newStatements event, got {:?}", event),
}

remove_filter_unstable(&alice_rpc, &subscription_id, &filter_a).await?;
let live_b_after_remove =
create_test_statement(&get_keypair(2), &[topic_a, topic_b], None, vec![3], u32::MAX, 0);

assert_eq!(
submit_statement_unstable(&alice_rpc, &live_b_after_remove).await?,
SubmitResult::New
);

match expect_unstable_event(&mut subscription, 20).await? {
UnstableStatementEvent::NewStatements { statements } => {
assert_eq!(statements.len(), 1);
assert_eq!(statements[0].statement, Bytes::from(live_b_after_remove.encode()));
assert_eq!(statements[0].filter_ids, vec![filter_b.clone()]);
},
event => anyhow::bail!("Expected newStatements event after remove, got {:?}", event),
}

remove_filter_unstable(&alice_rpc, &subscription_id, &filter_b).await?;
remove_filter_unstable(&alice_rpc, &subscription_id, "999").await?;

let ignored_after_remove =
create_test_statement(&get_keypair(3), &[topic_b], None, vec![4], u32::MAX, 0);

assert_eq!(
submit_statement_unstable(&alice_rpc, &ignored_after_remove).await?,
SubmitResult::New
);
assert_no_unstable_event(&mut subscription, 3).await?;

let unsupported_filter = TopicFilter::MatchAny(BoundedVec::truncate_from(vec![topic_c]));
let err = add_filter_unstable(&alice_rpc, &subscription_id, unsupported_filter)
.await
.expect_err("matchAny is not supported by the unstable add_filter RPC");

assert!(err.to_string().contains("matchAny"));

Ok(())
}

/// Verifies concurrent propagation, quota enforcement, and priority eviction
///
/// Spawns a single 4-node network with mixed allowances:
Expand Down
22 changes: 22 additions & 0 deletions prdoc/pr_11989.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
title: 'statement-store: new api implementation'
doc:
- audience: Node Dev
description: |-
## Summary

Adds the `statement_unstable_*` JSON-RPC API implementation for statement-store and wires it into the parachain node RPC stack.

## Changes

- Add `statement_unstable_submit`, `statement_unstable_subscribe`, `statement_unstable_add_filter`, and `statement_unstable_remove_filter`
- Add multi-filter subscription support with replay and live statement delivery
- Add zombienet test for the unstable RPC multi-filter flow
crates:
- name: polkadot-omni-node-lib
bump: patch
- name: sc-rpc-spec-v2
bump: patch
- name: sc-statement-store
bump: patch
- name: sp-statement-store
bump: patch
Loading
Loading