Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/polkadot-omni-node/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ sc-network-statement = { workspace = true, default-features = true }
sc-network-sync = { workspace = true, default-features = true }
sc-offchain = { workspace = true, default-features = true }
sc-rpc = { workspace = true, default-features = true }
sc-rpc-spec-v2 = { workspace = true, default-features = true }
sc-runtime-utilities = { workspace = true, default-features = true }
sc-service = { workspace = true, default-features = false }
sc-statement-store = { workspace = true, default-features = true }
Expand Down
6 changes: 5 additions & 1 deletion cumulus/polkadot-omni-node/lib/src/common/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use sc_rpc::{
dev::{Dev, DevApiServer},
statement::{StatementApiServer, StatementStore},
};
use sc_rpc_spec_v2::statement::{StatementSpec, StatementSpecApiServer};
use sp_runtime::traits::Block as BlockT;
use std::{marker::PhantomData, sync::Arc};
use substrate_frame_rpc_system::{System, SystemApiServer};
Expand Down Expand Up @@ -76,7 +77,10 @@ where
module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
module.merge(StateMigration::new(client.clone(), backend).into_rpc())?;
if let Some(statement_store) = statement_store {
module.merge(StatementStore::new(statement_store, spawn_handle).into_rpc())?;
module.merge(
StatementStore::new(statement_store.clone(), spawn_handle.clone()).into_rpc(),
)?;
module.merge(StatementSpec::new(statement_store, spawn_handle).into_rpc())?;
}
module.merge(Dev::new(client).into_rpc())?;

Expand Down
1 change: 1 addition & 0 deletions cumulus/zombienet/zombienet-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ cumulus-zombienet-sdk-helpers = { workspace = true }
sp-rpc = { workspace = true, default-features = true }
sp-statement-store = { workspace = true, default-features = true, features = ["serde"] }
sc-network-statement = { workspace = true, default-features = true }
sc-rpc-spec-v2 = { workspace = true, default-features = true }
sc-statement-store = { workspace = true, default-features = true, features = ["test-helpers"] }
sp-keyring = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use anyhow::anyhow;
use codec::Encode;
use log::info;
use sc_statement_store::test_utils::get_keypair;
use serde::Deserialize;
use sp_core::{hexdisplay::HexDisplay, Bytes, Pair};
use sp_statement_store::{
statement_allowance_key, StatementAllowance, StatementEvent, SubmitResult, Topic, TopicFilter,
};
use std::{
path::{Path, PathBuf},
time::Duration,
time::{Duration, Instant},
};
use zombienet_sdk::{
subxt::{
Expand All @@ -33,6 +34,17 @@ pub(super) const COLLATOR_INFO_LOG_FILTER: &str = "info,statement-store=info,sta
pub(super) const COLLATOR_TRACE_LOG_FILTER: &str =
"info,statement-store=trace,statement-gossip=trace";

pub(super) use sc_rpc_spec_v2::statement::{
event::NewStatementEntry as UnstableNewStatement, SubscribeEvent as UnstableStatementEvent,
};

#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
#[serde(untagged)]
pub(super) enum UnstableAddFilterResponse {
Ok(String),
LimitReached { result: String },
}

pub(super) async fn submit_statement(
rpc: &RpcClient,
statement: &sp_statement_store::Statement,
Expand All @@ -42,6 +54,59 @@ pub(super) async fn submit_statement(
Ok(result)
}

pub(super) async fn submit_statement_unstable(
rpc: &RpcClient,
statement: &sp_statement_store::Statement,
) -> Result<SubmitResult, anyhow::Error> {
let encoded: Bytes = statement.encode().into();
let result: SubmitResult =
rpc.request("statement_unstable_submit", rpc_params![encoded]).await?;
Ok(result)
}

pub(super) async fn subscribe_unstable(
rpc: &RpcClient,
) -> Result<RpcSubscription<UnstableStatementEvent>, anyhow::Error> {
let subscription = rpc
.subscribe::<UnstableStatementEvent>(
"statement_unstable_subscribe",
rpc_params![],
"statement_unstable_unsubscribe",
)
.await?;
Ok(subscription)
}

pub(super) fn unstable_subscription_id(
subscription: &RpcSubscription<UnstableStatementEvent>,
) -> Result<String, anyhow::Error> {
subscription
.subscription_id()
.map(ToOwned::to_owned)
.ok_or_else(|| anyhow!("Subscription was accepted without an id"))
}

pub(super) async fn add_filter_unstable(
rpc: &RpcClient,
subscription_id: &str,
filter: TopicFilter,
) -> Result<UnstableAddFilterResponse, anyhow::Error> {
let response = rpc
.request("statement_unstable_add_filter", rpc_params![subscription_id, filter])
.await?;
Ok(response)
}

pub(super) async fn remove_filter_unstable(
rpc: &RpcClient,
subscription_id: &str,
filter_id: &str,
) -> Result<(), anyhow::Error> {
rpc.request::<()>("statement_unstable_remove_filter", rpc_params![subscription_id, filter_id])
.await?;
Ok(())
}

pub(super) async fn expect_one_statement(
subscription: &mut RpcSubscription<StatementEvent>,
timeout_secs: u64,
Expand Down Expand Up @@ -168,6 +233,13 @@ pub(super) fn create_chain_spec_with_allowances(
participant_count: u32,
base_dir: &Path,
) -> Result<PathBuf, anyhow::Error> {
let started_at = Instant::now();
info!(
"Creating statement allowance chain spec: participants={}, base_dir={}",
participant_count,
base_dir.display(),
);

let chain_spec_template = include_str!("people-westend-local-spec.json");
let mut chain_spec: serde_json::Value = serde_json::from_str(chain_spec_template)
.map_err(|e| anyhow!("Failed to parse chain spec JSON: {}", e))?;
Expand All @@ -182,6 +254,15 @@ pub(super) fn create_chain_spec_with_allowances(
let allowance_hex = format!("0x{}", HexDisplay::from(&allowance.encode()));
info!("Injecting statement allowance: {:}", allowance_hex);
for idx in 0..participant_count {
if idx > 0 && idx % 5_000 == 0 {
info!(
"Injected statement allowances: {}/{} elapsed_s={}",
idx,
participant_count,
started_at.elapsed().as_secs(),
);
}

let keypair = get_keypair(idx);
let account_id = keypair.public();

Expand All @@ -190,13 +271,25 @@ pub(super) fn create_chain_spec_with_allowances(

genesis.insert(storage_key_hex, serde_json::Value::String(allowance_hex.clone()));
}
info!(
"Finished injecting statement allowances: {}/{} elapsed_s={}",
participant_count,
participant_count,
started_at.elapsed().as_secs(),
);

let chain_spec_path = base_dir.join("people-westend-custom.json");
info!("Serializing chain spec: {}", chain_spec_path.display());
let chain_spec_json = serde_json::to_string_pretty(&chain_spec)
.map_err(|e| anyhow!("Failed to serialize chain spec: {}", e))?;

std::fs::write(&chain_spec_path, chain_spec_json)
.map_err(|e| anyhow!("Failed to write chain spec to file: {}", e))?;
info!(
"Wrote chain spec: {} elapsed_s={}",
chain_spec_path.display(),
started_at.elapsed().as_secs(),
);

Ok(chain_spec_path)
}
Expand Down Expand Up @@ -239,6 +332,12 @@ async fn launch_network(
) -> Result<Network<LocalFileSystem>, anyhow::Error> {
let images = zombienet_sdk::environment::get_images_from_env();
let base_dir = base_dir()?;
info!(
"Preparing zombienet network config: collators={}, chain_spec={}, base_dir={}",
collators.len(),
chain_spec_path.display(),
base_dir.display(),
);

let config = NetworkConfigBuilder::new()
.with_relaychain(|r| {
Expand Down Expand Up @@ -270,8 +369,12 @@ async fn launch_network(
.build()
.map_err(format_build_errors)?;

info!("Initialising zombienet network");
let network = crate::utils::initialize_network(config).await?;
assert!(network.wait_until_is_up(60).await.is_ok());
info!("Waiting for zombienet network to be up");
let wait_result = network.wait_until_is_up(60).await;
info!("Zombienet network wait finished: ok={}", wait_result.is_ok());
assert!(wait_result.is_ok());
Ok(network)
}

Expand All @@ -296,6 +399,11 @@ pub(super) async fn spawn_network_with_injected_allowances(
) -> Result<Network<LocalFileSystem>, anyhow::Error> {
assert!(!collators.is_empty());
let base_dir = base_dir()?;
info!(
"Spawning injected allowance network: collators={}, participants={}",
collators.len(),
participant_count,
);
let chain_spec_path = create_chain_spec_with_allowances(participant_count, &base_dir)?;
let args = collator_args(participant_count, COLLATOR_TRACE_LOG_FILTER);
launch_network(collators, &chain_spec_path, args).await
Expand Down
Loading
Loading