Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 16 additions & 33 deletions payjoin/src/core/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,25 +809,21 @@ pub trait AsyncSessionPersister: Send + Sync {
}

/// In-memory session persister for replaying sessions and introspecting events.
Copy link
Copy Markdown
Collaborator

@chavic chavic May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cACk. I haven't found any functional issues; I was gonna recommend clearer documentation on intent, but I've seen the previous thread now already touched on this

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you feel it's still not clear for follow up? I addressed prior thread to remove the single-owner part. What would you like to see?

#[derive(Clone)]
pub struct InMemoryPersister<V> {
pub(crate) inner: std::sync::Arc<std::sync::RwLock<InnerStorage<V>>>,
pub(crate) inner: std::sync::Mutex<InnerStorage<V>>,
}

impl<V> Default for InMemoryPersister<V> {
fn default() -> Self {
Self { inner: std::sync::Arc::new(std::sync::RwLock::new(InnerStorage::default())) }
}
fn default() -> Self { Self { inner: std::sync::Mutex::new(InnerStorage::default()) } }
}

#[derive(Clone)]
pub(crate) struct InnerStorage<V> {
pub(crate) events: std::sync::Arc<Vec<V>>,
pub(crate) events: Vec<V>,
pub(crate) is_closed: bool,
}

impl<V> Default for InnerStorage<V> {
fn default() -> Self { Self { events: std::sync::Arc::new(vec![]), is_closed: false } }
fn default() -> Self { Self { events: vec![], is_closed: false } }
}

impl<V> SessionPersister for InMemoryPersister<V>
Expand All @@ -838,40 +834,32 @@ where
type SessionEvent = V;

fn save_event(&self, event: Self::SessionEvent) -> Result<(), Self::InternalStorageError> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn save_event(&self, event: Self::SessionEvent) -> Result<(), Self::InternalStorageError> {
fn save_event(&mut self, event: Self::SessionEvent) -> Result<(), Self::InternalStorageError> {

maybe it makes sense to change the trait definition... @arminsabouri ? thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested fn save(&mut self, …) on JsonReceiverSessionPersister with clankers. It fails to compile:
E0596: cannot borrow data in an Arc as mutable, both in the uniffi::export(with_foreign) macro
expansion and CallbackPersisterAdapter::save_event. Arc<dyn Trait> only implements
Deref, not DerefMut β€” not uniffi-specific.

Workarounds (split native vs FFI traits , or Arc<Mutex<dyn Trait>> callbacks) just
relocate the lock rather than remove it. Curious what @arminsabouri thinks β€” for this
PR I'd like to land the footgun fix and treat trait shape as its own discussion.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we decided to go with &self bc it simplified stuff at the FFI level. Otherwise you would have to wrap the callbacks with a mutex.

struct CallbackPersisterAdapter {
    callback_persister: Arc<dyn JsonReceiverSessionPersister>,
}

Regardless I agree with Dan and we should ticket this up and revisit this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need to ticket it? As both of our comments suggest, this was a deliberate decision already compared to known alternatives, not tech-debt. Please ticket if I'm missing something.

let mut inner = self.inner.write().expect("Lock should not be poisoned");
std::sync::Arc::make_mut(&mut inner.events).push(event);
self.inner.lock().expect("Lock should not be poisoned").events.push(event);
Ok(())
}

fn load(
&self,
) -> Result<Box<dyn Iterator<Item = Self::SessionEvent>>, Self::InternalStorageError> {
let inner = self.inner.read().expect("Lock should not be poisoned");
let events = std::sync::Arc::clone(&inner.events);
Ok(Box::new(
std::sync::Arc::try_unwrap(events).unwrap_or_else(|arc| (*arc).clone()).into_iter(),
))
let events = self.inner.lock().expect("Lock should not be poisoned").events.clone();
Ok(Box::new(events.into_iter()))
}

fn close(&self) -> Result<(), Self::InternalStorageError> {
let mut inner = self.inner.write().expect("Lock should not be poisoned");
inner.is_closed = true;
self.inner.lock().expect("Lock should not be poisoned").is_closed = true;
Ok(())
}
}

#[cfg(test)]
#[derive(Clone)]
/// Async in-memory session persister for replaying async sessions and introspecting events.
pub struct InMemoryAsyncPersister<V> {
pub(crate) inner: std::sync::Arc<tokio::sync::RwLock<InnerStorage<V>>>,
pub(crate) inner: tokio::sync::Mutex<InnerStorage<V>>,
}

#[cfg(test)]
impl<V> Default for InMemoryAsyncPersister<V> {
fn default() -> Self {
Self { inner: std::sync::Arc::new(tokio::sync::RwLock::new(InnerStorage::default())) }
}
fn default() -> Self { Self { inner: tokio::sync::Mutex::new(InnerStorage::default()) } }
}

#[cfg(test)]
Expand All @@ -886,25 +874,20 @@ where
&self,
event: Self::SessionEvent,
) -> Result<(), Self::InternalStorageError> {
let mut inner = self.inner.write().await;
std::sync::Arc::make_mut(&mut inner.events).push(event);
self.inner.lock().await.events.push(event);
Ok(())
}

async fn load(
&self,
) -> Result<Box<dyn Iterator<Item = Self::SessionEvent> + Send>, Self::InternalStorageError>
{
let inner = self.inner.read().await;
let events = std::sync::Arc::clone(&inner.events);
Ok(Box::new(
std::sync::Arc::try_unwrap(events).unwrap_or_else(|arc| (*arc).clone()).into_iter(),
))
let events = self.inner.lock().await.events.clone();
Ok(Box::new(events.into_iter()))
}

async fn close(&self) -> Result<(), Self::InternalStorageError> {
let mut inner = self.inner.write().await;
inner.is_closed = true;
self.inner.lock().await.is_closed = true;
Ok(())
}
}
Expand Down Expand Up @@ -960,7 +943,7 @@ mod tests {
}

assert_eq!(
persister.inner.read().expect("Lock should not be poisoned").is_closed,
persister.inner.lock().expect("Lock should not be poisoned").is_closed,
expected_result.is_closed
);

Expand Down Expand Up @@ -991,7 +974,7 @@ mod tests {
assert_eq!(event.0, expected_event.0);
}

assert_eq!(persister.inner.read().await.is_closed, expected_result.is_closed);
assert_eq!(persister.inner.lock().await.is_closed, expected_result.is_closed);

match (&result, &expected_result.error) {
(Ok(actual), None) => {
Expand Down
22 changes: 11 additions & 11 deletions payjoin/src/core/receive/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1513,8 +1513,8 @@ pub mod test {
.save(&persister)
.expect("InMemoryPersister shouldn't fail");
assert!(matches!(res, OptionalTransitionOutcome::Stasis(_)));
assert!(!persister.inner.read().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 0);
assert!(!persister.inner.lock().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.lock().expect("Shouldn't be poisoned").events.len(), 0);

// Payjoin was broadcasted, should progress to success
let persister = InMemoryPersister::default();
Expand All @@ -1524,10 +1524,10 @@ pub mod test {
.expect("InMemoryPersister shouldn't fail");

assert!(matches!(res, OptionalTransitionOutcome::Progress(_)));
assert!(persister.inner.read().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 1);
assert!(persister.inner.lock().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.lock().expect("Shouldn't be poisoned").events.len(), 1);
assert_eq!(
persister.inner.read().expect("Shouldn't be poisoned").events.last(),
persister.inner.lock().expect("Shouldn't be poisoned").events.last(),
Some(&SessionEvent::Closed(SessionOutcome::Success(vec![(
ScriptBuf::default(),
Witness::default()
Expand All @@ -1549,10 +1549,10 @@ pub mod test {
.expect("InMemoryPersister shouldn't fail");

assert!(matches!(res, OptionalTransitionOutcome::Progress(_)));
assert!(persister.inner.read().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 1);
assert!(persister.inner.lock().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.lock().expect("Shouldn't be poisoned").events.len(), 1);
assert_eq!(
persister.inner.read().expect("Shouldn't be poisoned").events.last(),
persister.inner.lock().expect("Shouldn't be poisoned").events.last(),
Some(&SessionEvent::Closed(SessionOutcome::FallbackBroadcasted))
);

Expand All @@ -1579,10 +1579,10 @@ pub mod test {
.expect("InMemoryPersister shouldn't fail");

assert!(matches!(res, OptionalTransitionOutcome::Progress(_)));
assert!(persister.inner.read().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.read().expect("Shouldn't be poisoned").events.len(), 1);
assert!(persister.inner.lock().expect("Shouldn't be poisoned").is_closed);
assert_eq!(persister.inner.lock().expect("Shouldn't be poisoned").events.len(), 1);
assert_eq!(
persister.inner.read().expect("Shouldn't be poisoned").events.last(),
persister.inner.lock().expect("Shouldn't be poisoned").events.last(),
Some(&SessionEvent::Closed(SessionOutcome::PayjoinProposalSent))
);

Expand Down
8 changes: 4 additions & 4 deletions payjoin/src/core/receive/v2/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ mod tests {
persister
.save_event(SessionEvent::CheckedBroadcastSuitability())
.expect("in memory persister save should not fail");
assert!(!persister.inner.read().expect("session read should succeed").is_closed);
assert!(!persister.inner.lock().expect("session read should succeed").is_closed);
let err = replay_event_log(&persister).expect_err("session replay should be fail");
let expected_err: ReplayError<ReceiveSession, SessionEvent> =
InternalReplayError::InvalidEvent(
Expand All @@ -414,14 +414,14 @@ mod tests {
)
.into();
assert_eq!(err.to_string(), expected_err.to_string());
assert!(persister.inner.read().expect("lock should not be poisoned").is_closed);
assert!(persister.inner.lock().expect("lock should not be poisoned").is_closed);

let persister = InMemoryAsyncPersister::<SessionEvent>::default();
persister
.save_event(SessionEvent::CheckedBroadcastSuitability())
.await
.expect("in memory async persister save should not fail");
assert!(!persister.inner.read().await.is_closed);
assert!(!persister.inner.lock().await.is_closed);
let err =
replay_event_log_async(&persister).await.expect_err("session replay should be fail");
let expected_err: ReplayError<ReceiveSession, SessionEvent> =
Expand All @@ -431,7 +431,7 @@ mod tests {
)
.into();
assert_eq!(err.to_string(), expected_err.to_string());
assert!(persister.inner.read().await.is_closed);
assert!(persister.inner.lock().await.is_closed);
}

#[tokio::test]
Expand Down
8 changes: 4 additions & 4 deletions payjoin/src/core/send/v2/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,26 +425,26 @@ mod tests {
persister
.save_event(SessionEvent::PostedOriginalPsbt())
.expect("in memory persister save should not fail");
assert!(!persister.inner.read().expect("session read should succeed").is_closed);
assert!(!persister.inner.lock().expect("session read should succeed").is_closed);
let err = replay_event_log(&persister).expect_err("session replay should be fail");
let expected_err: ReplayError<SendSession, SessionEvent> =
InternalReplayError::InvalidEvent(Box::new(SessionEvent::PostedOriginalPsbt()), None)
.into();
assert_eq!(err.to_string(), expected_err.to_string());
assert!(persister.inner.read().expect("lock should not be poisoned").is_closed);
assert!(persister.inner.lock().expect("lock should not be poisoned").is_closed);

let persister = InMemoryAsyncPersister::<SessionEvent>::default();
persister
.save_event(SessionEvent::PostedOriginalPsbt())
.await
.expect("in memory async persister save should not fail");
assert!(!persister.inner.read().await.is_closed);
assert!(!persister.inner.lock().await.is_closed);
let err =
replay_event_log_async(&persister).await.expect_err("session replay should be fail");
let expected_err: ReplayError<SendSession, SessionEvent> =
InternalReplayError::InvalidEvent(Box::new(SessionEvent::PostedOriginalPsbt()), None)
.into();
assert_eq!(err.to_string(), expected_err.to_string());
assert!(persister.inner.read().await.is_closed);
assert!(persister.inner.lock().await.is_closed);
}
}
4 changes: 2 additions & 2 deletions payjoin/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,8 @@ mod integration {
sender_final_action: SenderFinalAction,
) -> Result<(Transaction, Receiver<Monitor>), BoxError>
where
R: SessionPersister<SessionEvent = payjoin::receive::v2::SessionEvent> + Clone,
S: SessionPersister<SessionEvent = payjoin::send::v2::SessionEvent> + Clone,
R: SessionPersister<SessionEvent = payjoin::receive::v2::SessionEvent>,
S: SessionPersister<SessionEvent = payjoin::send::v2::SessionEvent>,
{
let agent = services.http_agent();
services.wait_for_services_ready().await?;
Expand Down
Loading