Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,17 +420,42 @@ impl PlatformPaymentAddressProvider {
Ok(())
}

/// Update incremental sync state from a completed sync result.
/// Merge an incremental sync result into the provider's
/// watermarks, taking the per-field maximum so concurrent or
/// out-of-order completions can never roll the watermark
/// backwards. Each field tracks "latest height/time/block we are
/// confident we have scanned through", so taking the max is the
/// safe monotonic combine even if results arrive in a different
/// order than they finished on the network.
pub(crate) fn update_sync_state(
&mut self,
result: &AddressSyncResult<PlatformAddressTag, PlatformP2PKHAddress>,
) {
self.sync_height = result.new_sync_height;
self.sync_timestamp = result.new_sync_timestamp;
self.last_known_recent_block = result.last_known_recent_block;
self.sync_height = self.sync_height.max(result.new_sync_height);
self.sync_timestamp = self.sync_timestamp.max(result.new_sync_timestamp);
self.last_known_recent_block = self
.last_known_recent_block
.max(result.last_known_recent_block);
}

/// Restore incremental-sync watermark from persisted state.
/// Current `last_known_recent_block` watermark.
///
/// Read-only mirror of the field used by the trait
/// implementation; exposed `pub` so wallet-level helpers
/// (notably [`super::wallet::PlatformAddressWallet::sync_watermark`])
/// can return the value to callers without going through the
/// `AddressProvider` trait. Monotonic non-decreasing across
/// `sync_finished` calls.
pub fn last_known_recent_block(&self) -> u64 {
self.last_known_recent_block
}

/// Restore the incremental-sync watermark from persisted state.
/// Unlike [`Self::update_sync_state`], this is an unconditional
/// overwrite — callers use it during initialization to seed the
/// watermark from on-disk state before any incremental result
/// arrives. The monotonic invariant is maintained at update-time,
/// not load-time.
pub(crate) fn set_stored_sync_state(
&mut self,
height: u64,
Expand Down Expand Up @@ -696,3 +721,97 @@ impl AddressProvider for PlatformPaymentAddressProvider {
self.last_known_recent_block
}
}

#[cfg(test)]
mod tests {
use super::*;
use key_wallet::Network;
use key_wallet_manager::WalletManager;

/// Build a minimal provider with empty wallet/account state for
/// exercising the watermark merge logic. The address state itself
/// is irrelevant — `update_sync_state` only touches the three
/// watermark fields.
fn empty_provider() -> PlatformPaymentAddressProvider {
PlatformPaymentAddressProvider {
wallet_manager: Arc::new(RwLock::new(WalletManager::new(Network::Testnet))),
per_wallet: BTreeMap::new(),
per_wallet_in_sync: BTreeMap::new(),
pending: BiBTreeMap::new(),
sync_height: 0,
sync_timestamp: 0,
last_known_recent_block: 0,
}
}

fn sync_result(
height: u64,
timestamp: u64,
last_known_recent_block: u64,
) -> AddressSyncResult<PlatformAddressTag, PlatformP2PKHAddress> {
let mut r = AddressSyncResult::default();
r.new_sync_height = height;
r.new_sync_timestamp = timestamp;
r.last_known_recent_block = last_known_recent_block;
r
}

/// QA-002: forward updates lift the watermarks to the new values
/// across all three fields (the trivial monotonic case).
#[test]
fn update_sync_state_advances_watermarks() {
let mut p = empty_provider();
p.update_sync_state(&sync_result(100, 1_700_000_000, 99));
assert_eq!(p.sync_height, 100);
assert_eq!(p.sync_timestamp, 1_700_000_000);
assert_eq!(p.last_known_recent_block, 99);
}

/// QA-002: a backwards result (every field lower than the
/// current watermark) must NOT roll the watermarks back. Out-of-
/// order completion of a stale incremental scan is the canonical
/// trigger for this branch.
#[test]
fn update_sync_state_rejects_backwards_full_result() {
let mut p = empty_provider();
p.update_sync_state(&sync_result(200, 1_800_000_000, 199));
p.update_sync_state(&sync_result(100, 1_700_000_000, 99));
assert_eq!(p.sync_height, 200);
assert_eq!(p.sync_timestamp, 1_800_000_000);
assert_eq!(p.last_known_recent_block, 199);
}

/// QA-002: the merge is per-field — a result that advances some
/// fields and regresses others lifts only the advancing ones.
/// Each watermark is its own monotonic counter; tying them
/// together would either lose progress (reject the whole result)
/// or roll some fields back (accept the whole result).
#[test]
fn update_sync_state_merges_per_field() {
let mut p = empty_provider();
p.update_sync_state(&sync_result(200, 1_800_000_000, 199));
// height advances, timestamp regresses, recent_block ties.
p.update_sync_state(&sync_result(300, 1_700_000_000, 199));
assert_eq!(p.sync_height, 300, "advanced");
assert_eq!(p.sync_timestamp, 1_800_000_000, "regression rejected");
assert_eq!(p.last_known_recent_block, 199, "tie kept");
}

/// `set_stored_sync_state` is an unconditional overwrite — it's
/// the load-from-persistence entry point, used before any
/// incremental result has merged. The monotonic merge is
/// `update_sync_state`'s job, not the loader's.
#[test]
fn set_stored_sync_state_overwrites_unconditionally() {
let mut p = empty_provider();
p.update_sync_state(&sync_result(500, 1_900_000_000, 499));
// Load smaller persisted values: an unconditional overwrite
// is the documented semantic. (Production callers sequence
// load → updates, so the regression seen here cannot occur
// in flight.)
p.set_stored_sync_state(100, 1_700_000_000, 99);
assert_eq!(p.sync_height, 100);
assert_eq!(p.sync_timestamp, 1_700_000_000);
assert_eq!(p.last_known_recent_block, 99);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ impl PlatformAddressWallet {
.platform_payment_managed_account_at_index_mut(*account_index)
{
for (p2pkh, funds) in account_state.found() {
account.set_address_credit_balance(
*p2pkh,
funds.balance,
None,
);
account.set_address_credit_balance(*p2pkh, funds.balance, None);
}
}
}
Expand Down Expand Up @@ -291,6 +287,28 @@ impl PlatformAddressWallet {
.unwrap_or_default()
}

/// Read the current incremental-sync watermark from the unified
/// platform-address provider.
///
/// Returns `None` when the provider hasn't been initialised yet
/// (no [`Self::initialize`] call) or when the provider has no stored
/// watermark (whether restored via [`Self::apply_sync_state`] or
/// produced by a previous sync). The value is monotonic non-decreasing
/// across [`Self::sync_balances`](super::sync) calls against the
/// same chain — a later sync can only advance the watermark, never
/// roll it back. A zero-valued watermark is reported as `None` to
/// match the "no stored watermark" convention used elsewhere in
/// the wallet (see [`Self::apply_sync_state`]).
pub async fn sync_watermark(&self) -> Option<u64> {
let guard = self.provider.read().await;
let raw = guard.as_ref().map(|p| p.last_known_recent_block())?;
if raw == 0 {
None
} else {
Some(raw)
}
}

/// Get total platform credits across all addresses.
///
/// Returns the sum of all cached balances.
Expand Down
Loading