statement-store: new api implementation#11989
Conversation
|
/cmd fmt |
|
/cmd fmt |
…' into denzelpenzel/statement-store-api # Conflicts: # substrate/client/statement-store/src/subscription.rs
|
/cmd prdoc --audience node_dev --bump patch |
…e_dev --bump patch'
…ent-store-api # Conflicts: # cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store/integration.rs
| filter: OptimizedTopicFilter, | ||
| ) -> Result<AddFilterOutcome, Error> { | ||
| let handle = state.lock().handle.clone(); | ||
| match handle.add_filter(filter) { |
There was a problem hiding this comment.
Alright I thought a bit about what the approach should be I don't like at all having 4 locks for this subscription, so I would go about this the following way:
- Make this lock actually protect the SubscriptionHandle which makes sure the operation is allowed and makes sense.
- Maybe consider using an async mutex here is critical section might be long.
- After validating, while holding the lock send an "ADD_FILTER" to the subscription worker.
- Then the loop in SubscriptionsHandle should collect the needed statements
pendingReplysand park any statements that arrive while we send the pendingReplys. - Then the run_subscription_task, should need just the channel from matcher and will do just a serving of the subscription.
I think this will simplify the implementation, let me know if you find any roadblocks.
There was a problem hiding this comment.
I reworked this so add_filter no longer waits for replay collection. The handle only validates capacity, allocates the filterId under its local lock, queues an AddFilter control message, and returns the id.
The matcher worker now owns the replay path: once it receives AddFilter, it collects the snapshot and registers the filter in the same critical section, then wakes any pending stream request. LiveEventStream::poll_next no longer does replay/pending-live coordination itself, it only asks the matcher for the next ready event.
| #[derive(Debug, Clone, Eq, PartialEq)] | ||
| #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] | ||
| #[cfg_attr(feature = "serde", serde(tag = "reason", rename_all = "camelCase"))] | ||
| pub enum RejectionReason { |
There was a problem hiding this comment.
#[serde(rename_all_fields = "camelCase")] to rename Enum variants. details
| SubmitResult::Rejected(reason) => Ok(SubmitOutcome::Rejected(reason)), | ||
| SubmitResult::Invalid(reason) => Ok(SubmitOutcome::Invalid(reason)), | ||
| SubmitResult::KnownExpired => { | ||
| Err(Error::InternalError("store returned KnownExpired for local submit".into())) |
There was a problem hiding this comment.
Should not this also be SubmitOutcome::Invalid(reason)? Spec expects only new, known, rejected and invalid.
| { | ||
| let state = self.state.lock(); | ||
| if state.active_filter_ids.len() >= MAX_FILTERS_PER_SUBSCRIPTION || | ||
| state.pending_replays.len() >= PENDING_REPLAYS_HARD_CAP |
There was a problem hiding this comment.
Although this is capped, the size of the response is not, 128 matchAny requests can be open per every 16 connections. So it is 2048 matchAny that basically want to return all the statement store. And while 1 filter is streaming to the client everything else sits in RAM. Because as soon as filter request received we scale decode it and keep in store.pending_replays inside register_filter_with_snapshot. Even with 100MB state it will be a 200Gb.
There should be a way to make a "lazy" approach.
Statement Store RPC Bench Report
Aggregate ResultLower latency and send time are better. On the main latency metric, v2 is
Per-Run Datav1:
|
| Run | Send avg, s | Receive avg, s | Latency avg, s | Latency max, s | Elapsed, s |
|---|---|---|---|---|---|
| 1 | 35.729 | 0.000 | 35.729 | 58.866 | 218 |
v2: unstable_bench
| Run | Send avg, s | Receive avg, s | Latency avg, s | Latency max, s | Elapsed, s |
|---|---|---|---|---|---|
| 1 | 39.170 | 0.925 | 40.095 | 52.248 | 256 |
Tail Behavior
This run has only one sample per bench, so median, mean max, and worst max are
all single-run values. v2 has worse average latency, but better max latency in
this sample.
| Metric | v1 bench |
v2 unstable_bench |
v2 vs v1 |
|---|---|---|---|
| Median latency avg, s | 35.729 | 40.095 | +12.22% |
| Mean of latency max, s | 58.866 | 52.248 | -11.24% |
| Worst latency max, s | 58.866 | 52.248 | -11.24% |
Conclusion
v2 is worse on the primary latency path in this run:
| Comparison | Result |
|---|---|
| Average latency | v2 is 12.22% slower |
| Average send time | v2 is 9.63% slower |
| Median per-run latency | v2 is 12.22% slower |
| Mean max latency | v2 is 11.24% better |
| Worst observed max latency | v2 is 11.24% better |
| Average elapsed time | v2 is 17.43% slower |
| .and_then(|connection| connection.get(sub_id).cloned()) | ||
| } | ||
|
|
||
| pub async fn get_with_timeout( |
There was a problem hiding this comment.
Added get_with_timeout because accept().await can return the subscription id to the client before we finish storing it in StatementSubscriptions.
A fast client can then call add_filter immediately, and a plain get may falsely return InvalidSubscription.
So this is a short bounded wait for the subscribe/register handoff, not a general retry mechanism
There was a problem hiding this comment.
Did you ever hit this problem or just defensive programming here?
There was a problem hiding this comment.
No, I identified it during the bench tests. This seems to be the only viable option I see to fix it without changing the protocol contract
There was a problem hiding this comment.
No, I identified it during the bench tests.
You mean, you hit this problems or just thought it might happen ?
This seems to be the only viable option I see to fix it without changing the protocol contract
The core issue is that we call accept before we call register, we should be able to call subscriptions.register before accept, all we need is a getter in PendingSubscriptionSink for sub_id.
Can you see how much friction you would get if you go this route ?
There was a problem hiding this comment.
Yes, I hit this problem while running benchmark tests.
|
|
||
| let fut = async move { | ||
| run_subscription_task(sink, live_stream).await; | ||
| drop(entry); |
There was a problem hiding this comment.
for keeping SubscriptionEntry inside async block
Impl #10997
Summary
In this PR, we add the unstable statement-store JSON-RPC surface and wire it into the parachain node RPC stack. This lets clients submit SCALE-encoded statements over RPC, open one long-lived statement subscription, and then attach or remove topic filters on that subscription without opening a new stream for each filter.
The subscription flow is split into
statement_unstable_subscribeandstatement_unstable_add_filter. A subscription starts empty, each added filter gets its ownfilterId, and live notifications carry the ids of the filters that matched the statement. That lets a client track several statement topics over a single RPC subscription while still knowing which filters produced each replay or live event.RPC Shape
We add
statement_unstable_submit,statement_unstable_subscribe,statement_unstable_add_filter, andstatement_unstable_remove_filterundersc-rpc-spec-v2::statement.statement_unstable_submitdecodes submitted statement bytes and maps store results into RPC-level outcomes:new,known,rejected, orinvalid. Subscription state is scoped to the jsonrpsee connection that created it, so a filter can only be added to or removed from a subscription owned by the same connection.For filters, the unstable RPC accepts
anyandmatchAll.matchAnyis rejected at the RPC boundary for now, which keeps the external API aligned with the current unstable contract while the store internals can still use the optimized filter representation.Subscription Semantics
Multi-filter subscriptions are handled by the existing statement subscription matcher workers.
add_filtervalidates capacity, allocates afilterId, queues anAddFiltermessage for the matcher, and returns without waiting for replay snapshot collection. The matcher then collects the replay snapshot and registers the filter in the same critical section, so live statements cannot slip between the snapshot and filter registration.For each added filter the subscription emits:
replayStatementsbatches for already-admitted matching statementsreplayDoneonce that filter's replay is drainednewStatementsfor live statements, including all matchingfilterIdsstopif local subscription resource caps are hitLive statements that arrive while a replay is still in progress are kept in matcher-owned pending state, then released once replay ordering allows it. Statements already delivered by replay are kept out of the live path for that filter, avoiding duplicate delivery for the common "submit, then subscribe" case.
Each subscription is capped at 128 active filters. Filter removal is idempotent, and dropping the RPC subscription cleans up matcher state.