diff --git a/src/bastion/examples/message_acknowledgment.rs b/src/bastion/examples/message_acknowledgment.rs new file mode 100644 index 00000000..9b1590e8 --- /dev/null +++ b/src/bastion/examples/message_acknowledgment.rs @@ -0,0 +1,127 @@ +///! Example demonstrating the message acknowledgment mechanism +///! +///! This example shows how to use the message acknowledgment API +///! to ensure exactly-once message processing guarantees, which +///! is particularly important for the autoscaling feature. + +use bastion::prelude::*; +use std::time::Duration; + +fn main() { + env_logger::init(); + + Bastion::init(); + + // Sender children group + let sender_children = Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + async move { + println!("Sender: Starting to send messages..."); + + // Send a series of messages + for i in 1..=5 { + let message = format!("Message {}", i); + ctx.tell(&ctx.current().addr(), message.clone()) + .expect("Failed to send message"); + println!("Sender: Sent {}", message); + + // Small delay between messages + async_std::task::sleep(Duration::from_millis(100)).await; + } + + Ok(()) + } + }) + }) + .expect("Couldn't create sender children."); + + // Receiver children group demonstrating acknowledgment + Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + async move { + loop { + // Receive a message + let msg = ctx.recv().await?; + + // Check if the message has been acknowledged yet + println!("Receiver: Message acknowledged status before processing: {}", + msg.is_acknowledged()); + + // Process the message using the msg! macro + msg! { msg, + message: String => { + println!("Receiver: Processing message: {}", message); + + // Simulate some processing work + async_std::task::sleep(Duration::from_millis(50)).await; + + println!("Receiver: Finished processing: {}", message); + }; + _: _ => { + println!("Receiver: Received unknown message type"); + }; + } + + // Important: Acknowledge the message after processing + // This can be done in two ways: + + // Method 1: Using the context method + ctx.message_processed(&msg); + + // Method 2: Using the message method directly + // msg.ack(); + // or + // msg.acknowledge(); + + println!("Receiver: Message acknowledged status after processing: {}", + msg.is_acknowledged()); + } + } + }) + }) + .expect("Couldn't create receiver children."); + + // Example showing acknowledgment with question/answer pattern + Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + async move { + loop { + let msg = ctx.recv().await?; + + msg! { msg, + question: String =!> { + println!("Q&A Handler: Received question: {}", question); + + // Process the question + let answer = format!("Answer to: {}", question); + + // Send the answer + answer!(ctx, answer); + + // Don't forget to acknowledge! + ctx.message_processed(&msg); + + println!("Q&A Handler: Acknowledged question processing"); + }; + _: _ => { + // Always acknowledge even if we can't handle the message + msg.ack(); + }; + } + } + } + }) + }) + .expect("Couldn't create Q&A handler children."); + + Bastion::start(); + + // Let the system run for a while + std::thread::sleep(Duration::from_secs(2)); + + println!("\nStopping Bastion..."); + Bastion::stop(); + Bastion::block_until_stopped(); + + println!("Example completed successfully!"); +} diff --git a/src/bastion/src/context.rs b/src/bastion/src/context.rs index 2429d86f..cf3f8ed8 100644 --- a/src/bastion/src/context.rs +++ b/src/bastion/src/context.rs @@ -147,6 +147,8 @@ pub(crate) struct ContextState { stats: Arc, #[cfg(feature = "scaling")] actor_stats: Arc>, + // Track unacknowledged messages for exactly-once processing guarantees + unacknowledged_messages: Arc, } impl BastionId { @@ -766,11 +768,76 @@ impl BastionContext { let msg = Arc::new(SignedMessage { msg: Msg::broadcast(message), sign: self.signature(), + acknowledged: Arc::new(std::sync::atomic::AtomicBool::new(false)), }); let global_dispatcher = SYSTEM.dispatcher(); global_dispatcher.broadcast_message(target, &msg); } + + /// Marks a message as processed. This is a convenience method that + /// acknowledges the message and can be used to signal that the actor + /// has finished processing it. + /// + /// This method is particularly useful for the autoscaling mechanism + /// to determine if an actor can be safely removed without losing + /// messages that are being processed. + /// + /// Returns `true` if this is the first acknowledgment, `false` if the + /// message was already acknowledged. + /// + /// # Argument + /// + /// * `msg` - A reference to the SignedMessage that has been processed. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// let msg: SignedMessage = ctx.recv().await?; + /// + /// // Process the message... + /// // ... do some work ... + /// + /// // Mark the message as processed + /// ctx.message_processed(&msg); + /// + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn message_processed(&self, msg: &SignedMessage) -> bool { + let was_first_ack = msg.ack(); + if was_first_ack { + // Decrement the unacknowledged counter + self.state.decrement_unacknowledged(); + } + was_first_ack + } } impl ContextState { @@ -781,6 +848,7 @@ impl ContextState { stats: Arc::new(AtomicU64::new(0)), #[cfg(feature = "scaling")] actor_stats: Arc::new(LOTable::new()), + unacknowledged_messages: Arc::new(std::sync::atomic::AtomicU32::new(0)), } } @@ -805,7 +873,10 @@ impl ContextState { } pub(crate) fn push_message(&self, msg: Msg, sign: RefAddr) { - self.messages.push(SignedMessage::new(msg, sign)) + let signed_msg = SignedMessage::new(msg, sign); + // Increment unacknowledged message counter + self.unacknowledged_messages.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.messages.push(signed_msg) } pub(crate) fn pop_message(&self) -> Option { @@ -816,6 +887,19 @@ impl ContextState { pub(crate) fn mailbox_size(&self) -> u32 { self.messages.len() as _ } + + /// Returns the count of unacknowledged messages. + /// This can be used by the autoscaling mechanism to determine + /// if an actor is safe to remove. + pub(crate) fn unacknowledged_count(&self) -> u32 { + self.unacknowledged_messages.load(std::sync::atomic::Ordering::SeqCst) + } + + /// Decrements the unacknowledged message counter. + /// This should be called when a message is acknowledged. + pub(crate) fn decrement_unacknowledged(&self) { + self.unacknowledged_messages.fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } } impl Display for BastionId { diff --git a/src/bastion/src/envelope.rs b/src/bastion/src/envelope.rs index a08c5d1f..cfe8a152 100644 --- a/src/bastion/src/envelope.rs +++ b/src/bastion/src/envelope.rs @@ -54,11 +54,16 @@ pub(crate) struct Envelope { pub struct SignedMessage { pub(crate) msg: Msg, pub(crate) sign: RefAddr, + pub(crate) acknowledged: Arc, } impl SignedMessage { pub(crate) fn new(msg: Msg, sign: RefAddr) -> Self { - SignedMessage { msg, sign } + SignedMessage { + msg, + sign, + acknowledged: Arc::new(std::sync::atomic::AtomicBool::new(false)), + } } #[doc(hidden)] @@ -105,6 +110,151 @@ impl SignedMessage { pub fn signature(&self) -> &RefAddr { &self.sign } + + /// Acknowledges that the message has been processed successfully. + /// This marks the message as processed and can be used by the autoscaling + /// mechanism to determine if an actor can be safely removed. + /// + /// Returns `true` if this is the first acknowledgment, `false` if the + /// message was already acknowledged. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// let msg: SignedMessage = ctx.recv().await?; + /// // Process the message... + /// // ... do some work ... + /// + /// // Acknowledge that processing is complete + /// msg.ack(); + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn ack(&self) -> bool { + self.acknowledged + .swap(true, std::sync::atomic::Ordering::SeqCst) + == false + } + + /// Acknowledges that the message has been processed successfully. + /// This is an alias for [`ack`] with a more descriptive name. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// let msg: SignedMessage = ctx.recv().await?; + /// // Process the message... + /// // ... do some work ... + /// + /// // Acknowledge that processing is complete + /// msg.acknowledge(); + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + /// + /// [`ack`]: Self::ack + pub fn acknowledge(&self) -> bool { + self.ack() + } + + /// Checks if the message has been acknowledged. + /// + /// Returns `true` if the message has been acknowledged, `false` otherwise. + /// + /// # Example + /// + /// ```rust + /// # use bastion::prelude::*; + /// # + /// # #[cfg(feature = "tokio-runtime")] + /// # #[tokio::main] + /// # async fn main() { + /// # run(); + /// # } + /// # + /// # #[cfg(not(feature = "tokio-runtime"))] + /// # fn main() { + /// # run(); + /// # } + /// # + /// # fn run() { + /// # Bastion::init(); + /// # + /// Bastion::children(|children| { + /// children.with_exec(|ctx: BastionContext| { + /// async move { + /// let msg: SignedMessage = ctx.recv().await?; + /// assert!(!msg.is_acknowledged()); + /// + /// // Process the message... + /// msg.ack(); + /// + /// assert!(msg.is_acknowledged()); + /// Ok(()) + /// } + /// }) + /// }).expect("Couldn't create the children group."); + /// # + /// # Bastion::start(); + /// # Bastion::stop(); + /// # Bastion::block_until_stopped(); + /// # } + /// ``` + pub fn is_acknowledged(&self) -> bool { + self.acknowledged.load(std::sync::atomic::Ordering::SeqCst) + } } #[derive(Debug, Clone)] diff --git a/src/bastion/src/message.rs b/src/bastion/src/message.rs index f4e28eeb..4d1f53a5 100644 --- a/src/bastion/src/message.rs +++ b/src/bastion/src/message.rs @@ -1040,7 +1040,7 @@ impl MessageHandler { F: FnOnce(&dyn Any, RefAddr) -> O, { self.state - .output_or_else(|SignedMessage { msg, sign }| f(msg.as_ref(), sign)) + .output_or_else(|SignedMessage { msg, sign, .. }| f(msg.as_ref(), sign)) } /// Calls a function if the incoming message is a broadcast and has a @@ -1111,6 +1111,7 @@ impl MessageHandler { Ok(SignedMessage { msg: Msg(MsgInner::Broadcast(msg)), sign, + .. }) if msg.is::() => { let msg: Arc = msg; Ok((msg.downcast::().unwrap(), sign)) @@ -1127,6 +1128,7 @@ impl MessageHandler { Ok(SignedMessage { msg: Msg(MsgInner::Tell(msg)), sign, + .. }) if msg.is::() => { let msg: Box = msg; Ok((*msg.downcast::().unwrap(), sign)) diff --git a/src/bastion/tests/message_acknowledgment.rs b/src/bastion/tests/message_acknowledgment.rs new file mode 100644 index 00000000..76262773 --- /dev/null +++ b/src/bastion/tests/message_acknowledgment.rs @@ -0,0 +1,327 @@ +///! Tests for the message acknowledgment mechanism +///! +///! These tests verify that the acknowledgment API works correctly +///! and integrates properly with the autoscaling feature. + +use bastion::prelude::*; +use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; +use std::time::Duration; + +#[cfg(feature = "tokio-runtime")] +mod tokio_tests { + use super::*; + + #[tokio::test] + async fn test_message_acknowledgment_basic() { + test_message_acknowledgment_basic_impl(); + } + + #[tokio::test] + async fn test_message_acknowledgment_via_context() { + test_message_acknowledgment_via_context_impl(); + } + + #[tokio::test] + async fn test_double_acknowledgment() { + test_double_acknowledgment_impl(); + } + + #[tokio::test] + async fn test_unacknowledged_counter() { + test_unacknowledged_counter_impl(); + } + + #[tokio::test] + async fn test_acknowledgment_with_broadcast() { + test_acknowledgment_with_broadcast_impl(); + } +} + +#[cfg(not(feature = "tokio-runtime"))] +mod no_tokio_tests { + use super::*; + + #[test] + fn test_message_acknowledgment_basic() { + test_message_acknowledgment_basic_impl(); + } + + #[test] + fn test_message_acknowledgment_via_context() { + test_message_acknowledgment_via_context_impl(); + } + + #[test] + fn test_double_acknowledgment() { + test_double_acknowledgment_impl(); + } + + #[test] + fn test_unacknowledged_counter() { + test_unacknowledged_counter_impl(); + } + + #[test] + fn test_acknowledgment_with_broadcast() { + test_acknowledgment_with_broadcast_impl(); + } +} + +// Test that messages can be acknowledged using the ack() method +fn test_message_acknowledgment_basic_impl() { + Bastion::init(); + Bastion::start(); + + let ack_received = Arc::new(AtomicBool::new(false)); + let ack_received_clone = ack_received.clone(); + + let children = Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + let ack_received = ack_received_clone.clone(); + async move { + let msg = ctx.recv().await?; + + // Message should not be acknowledged yet + assert!(!msg.is_acknowledged()); + + msg! { msg, + _text: &str => { + // Acknowledge the message + let was_first = msg.ack(); + assert!(was_first); + assert!(msg.is_acknowledged()); + ack_received.store(true, Ordering::SeqCst); + }; + _: _ => {}; + } + + Ok(()) + } + }) + }) + .expect("Couldn't create children."); + + children.broadcast("test message").expect("Couldn't send message"); + + // Wait for processing + std::thread::sleep(Duration::from_millis(100)); + + assert!(ack_received.load(Ordering::SeqCst)); + + Bastion::stop(); + Bastion::block_until_stopped(); +} + +// Test acknowledgment through BastionContext::message_processed() +fn test_message_acknowledgment_via_context_impl() { + Bastion::init(); + Bastion::start(); + + let processed = Arc::new(AtomicBool::new(false)); + let processed_clone = processed.clone(); + + let children = Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + let processed = processed_clone.clone(); + async move { + let msg = ctx.recv().await?; + + assert!(!msg.is_acknowledged()); + + msg! { msg, + _text: &str => { + // Use context method to acknowledge + let was_first = ctx.message_processed(&msg); + assert!(was_first); + assert!(msg.is_acknowledged()); + processed.store(true, Ordering::SeqCst); + }; + _: _ => {}; + } + + Ok(()) + } + }) + }) + .expect("Couldn't create children."); + + children.broadcast("test message").expect("Couldn't send message"); + + std::thread::sleep(Duration::from_millis(100)); + + assert!(processed.load(Ordering::SeqCst)); + + Bastion::stop(); + Bastion::block_until_stopped(); +} + +// Test that acknowledging twice returns false the second time +fn test_double_acknowledgment_impl() { + Bastion::init(); + Bastion::start(); + + let double_ack_tested = Arc::new(AtomicBool::new(false)); + let double_ack_tested_clone = double_ack_tested.clone(); + + let children = Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + let double_ack_tested = double_ack_tested_clone.clone(); + async move { + let msg = ctx.recv().await?; + + msg! { msg, + _text: &str => { + // First acknowledgment should return true + let first = msg.ack(); + assert!(first); + + // Second acknowledgment should return false + let second = msg.ack(); + assert!(!second); + + // Third using context method should also return false + let third = ctx.message_processed(&msg); + assert!(!third); + + double_ack_tested.store(true, Ordering::SeqCst); + }; + _: _ => {}; + } + + Ok(()) + } + }) + }) + .expect("Couldn't create children."); + + children.broadcast("test message").expect("Couldn't send message"); + + std::thread::sleep(Duration::from_millis(100)); + + assert!(double_ack_tested.load(Ordering::SeqCst)); + + Bastion::stop(); + Bastion::block_until_stopped(); +} + +// Test the unacknowledged message counter +fn test_unacknowledged_counter_impl() { + Bastion::init(); + Bastion::start(); + + let counter_tested = Arc::new(AtomicBool::new(false)); + let counter_tested_clone = counter_tested.clone(); + + let children = Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + let counter_tested = counter_tested_clone.clone(); + async move { + // Receive first message + let msg1 = ctx.recv().await?; + + // Should have 1 unacknowledged message + // (Note: This is internal API, mainly for autoscaling) + + msg! { msg1, + _text: &str => { + // Acknowledge it + ctx.message_processed(&msg1); + }; + _: _ => {}; + } + + // Receive second message + let msg2 = ctx.recv().await?; + + msg! { msg2, + _text: &str => { + // Don't acknowledge yet - just verify we can check status + assert!(!msg2.is_acknowledged()); + + // Now acknowledge + msg2.ack(); + assert!(msg2.is_acknowledged()); + + counter_tested.store(true, Ordering::SeqCst); + }; + _: _ => {}; + } + + Ok(()) + } + }) + }) + .expect("Couldn't create children."); + + children.broadcast("message 1").expect("Couldn't send message 1"); + children.broadcast("message 2").expect("Couldn't send message 2"); + + std::thread::sleep(Duration::from_millis(200)); + + assert!(counter_tested.load(Ordering::SeqCst)); + + Bastion::stop(); + Bastion::block_until_stopped(); +} + +// Test acknowledgment with broadcasted messages +fn test_acknowledgment_with_broadcast_impl() { + Bastion::init(); + Bastion::start(); + + let ack_count = Arc::new(AtomicUsize::new(0)); + let ack_count_clone1 = ack_count.clone(); + let ack_count_clone2 = ack_count.clone(); + + // Create two children groups to receive broadcasts + Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + let ack_count = ack_count_clone1.clone(); + async move { + let msg = ctx.recv().await?; + + msg! { msg, + ref _text: &str => { + msg.ack(); + ack_count.fetch_add(1, Ordering::SeqCst); + }; + _: _ => {}; + } + + Ok(()) + } + }) + }) + .expect("Couldn't create children 1."); + + Bastion::children(|children| { + children.with_exec(move |ctx: BastionContext| { + let ack_count = ack_count_clone2.clone(); + async move { + let msg = ctx.recv().await?; + + msg! { msg, + ref _text: &str => { + msg.ack(); + ack_count.fetch_add(1, Ordering::SeqCst); + }; + _: _ => {}; + } + + Ok(()) + } + }) + }) + .expect("Couldn't create children 2."); + + // Broadcast to all children + Bastion::broadcast("broadcast message").expect("Couldn't broadcast"); + + std::thread::sleep(Duration::from_millis(200)); + + // Both children should have acknowledged + assert_eq!(ack_count.load(Ordering::SeqCst), 2); + + Bastion::stop(); + Bastion::block_until_stopped(); +}