Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/bastion/examples/message_acknowledgment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
///! Example demonstrating the message acknowledgment mechanism
///!
///! This example shows how to use the message acknowledgment API
///! to ensure exactly-once message processing guarantees, which
///! is particularly important for the autoscaling feature.

use bastion::prelude::*;
use std::time::Duration;

fn main() {
env_logger::init();

Bastion::init();

// Sender children group
let sender_children = Bastion::children(|children| {
children.with_exec(move |ctx: BastionContext| {
async move {
println!("Sender: Starting to send messages...");

// Send a series of messages
for i in 1..=5 {
let message = format!("Message {}", i);
ctx.tell(&ctx.current().addr(), message.clone())
.expect("Failed to send message");
println!("Sender: Sent {}", message);

// Small delay between messages
async_std::task::sleep(Duration::from_millis(100)).await;
}

Ok(())
}
})
})
.expect("Couldn't create sender children.");

// Receiver children group demonstrating acknowledgment
Bastion::children(|children| {
children.with_exec(move |ctx: BastionContext| {
async move {
loop {
// Receive a message
let msg = ctx.recv().await?;

// Check if the message has been acknowledged yet
println!("Receiver: Message acknowledged status before processing: {}",
msg.is_acknowledged());

// Process the message using the msg! macro
msg! { msg,
message: String => {
println!("Receiver: Processing message: {}", message);

// Simulate some processing work
async_std::task::sleep(Duration::from_millis(50)).await;

println!("Receiver: Finished processing: {}", message);
};
_: _ => {
println!("Receiver: Received unknown message type");
};
}

// Important: Acknowledge the message after processing
// This can be done in two ways:

// Method 1: Using the context method
ctx.message_processed(&msg);

// Method 2: Using the message method directly
// msg.ack();
// or
// msg.acknowledge();

println!("Receiver: Message acknowledged status after processing: {}",
msg.is_acknowledged());
}
}
})
})
.expect("Couldn't create receiver children.");

// Example showing acknowledgment with question/answer pattern
Bastion::children(|children| {
children.with_exec(move |ctx: BastionContext| {
async move {
loop {
let msg = ctx.recv().await?;

msg! { msg,
question: String =!> {
println!("Q&A Handler: Received question: {}", question);

// Process the question
let answer = format!("Answer to: {}", question);

// Send the answer
answer!(ctx, answer);

// Don't forget to acknowledge!
ctx.message_processed(&msg);

println!("Q&A Handler: Acknowledged question processing");
};
_: _ => {
// Always acknowledge even if we can't handle the message
msg.ack();
};
}
}
}
})
})
.expect("Couldn't create Q&A handler children.");

Bastion::start();

// Let the system run for a while
std::thread::sleep(Duration::from_secs(2));

println!("\nStopping Bastion...");
Bastion::stop();
Bastion::block_until_stopped();

println!("Example completed successfully!");
}
86 changes: 85 additions & 1 deletion src/bastion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ pub(crate) struct ContextState {
stats: Arc<AtomicU64>,
#[cfg(feature = "scaling")]
actor_stats: Arc<LOTable<BastionId, u32>>,
// Track unacknowledged messages for exactly-once processing guarantees
unacknowledged_messages: Arc<std::sync::atomic::AtomicU32>,
}

impl BastionId {
Expand Down Expand Up @@ -766,11 +768,76 @@ impl BastionContext {
let msg = Arc::new(SignedMessage {
msg: Msg::broadcast(message),
sign: self.signature(),
acknowledged: Arc::new(std::sync::atomic::AtomicBool::new(false)),
});

let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.broadcast_message(target, &msg);
}

/// Marks a message as processed. This is a convenience method that
/// acknowledges the message and can be used to signal that the actor
/// has finished processing it.
///
/// This method is particularly useful for the autoscaling mechanism
/// to determine if an actor can be safely removed without losing
/// messages that are being processed.
///
/// Returns `true` if this is the first acknowledgment, `false` if the
/// message was already acknowledged.
///
/// # Argument
///
/// * `msg` - A reference to the SignedMessage that has been processed.
///
/// # Example
///
/// ```rust
/// # use bastion::prelude::*;
/// #
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # run();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # run();
/// # }
/// #
/// # fn run() {
/// # Bastion::init();
/// #
/// Bastion::children(|children| {
/// children.with_exec(|ctx: BastionContext| {
/// async move {
/// let msg: SignedMessage = ctx.recv().await?;
///
/// // Process the message...
/// // ... do some work ...
///
/// // Mark the message as processed
/// ctx.message_processed(&msg);
///
/// Ok(())
/// }
/// })
/// }).expect("Couldn't create the children group.");
/// #
/// # Bastion::start();
/// # Bastion::stop();
/// # Bastion::block_until_stopped();
/// # }
/// ```
pub fn message_processed(&self, msg: &SignedMessage) -> bool {
let was_first_ack = msg.ack();
if was_first_ack {
// Decrement the unacknowledged counter
self.state.decrement_unacknowledged();
}
was_first_ack
}
}

impl ContextState {
Expand All @@ -781,6 +848,7 @@ impl ContextState {
stats: Arc::new(AtomicU64::new(0)),
#[cfg(feature = "scaling")]
actor_stats: Arc::new(LOTable::new()),
unacknowledged_messages: Arc::new(std::sync::atomic::AtomicU32::new(0)),
}
}

Expand All @@ -805,7 +873,10 @@ impl ContextState {
}

pub(crate) fn push_message(&self, msg: Msg, sign: RefAddr) {
self.messages.push(SignedMessage::new(msg, sign))
let signed_msg = SignedMessage::new(msg, sign);
// Increment unacknowledged message counter
self.unacknowledged_messages.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.messages.push(signed_msg)
}

pub(crate) fn pop_message(&self) -> Option<SignedMessage> {
Expand All @@ -816,6 +887,19 @@ impl ContextState {
pub(crate) fn mailbox_size(&self) -> u32 {
self.messages.len() as _
}

/// Returns the count of unacknowledged messages.
/// This can be used by the autoscaling mechanism to determine
/// if an actor is safe to remove.
pub(crate) fn unacknowledged_count(&self) -> u32 {
self.unacknowledged_messages.load(std::sync::atomic::Ordering::SeqCst)
}

/// Decrements the unacknowledged message counter.
/// This should be called when a message is acknowledged.
pub(crate) fn decrement_unacknowledged(&self) {
self.unacknowledged_messages.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}

impl Display for BastionId {
Expand Down
Loading