Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cargo/config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[target.'cfg(any(windows, unix))']
rustflags = ["-C", "target-cpu=native"]
3 changes: 2 additions & 1 deletion bastion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ lightproc = { version = "= 0.3.5-alpha.0", path = "../lightproc" }
dashmap = "3.4.0"
futures = { version = "0.3", features = ["async-await"] }
futures-timer = "3.0.0"
fxhash = "0.2"
t1ha = "0.1"
state = "0.4"
lazy_static = "1.4"
log = "0.4"
# TODO: https://github.com/cogciprocate/qutex/pull/5
Expand Down
1 change: 1 addition & 0 deletions bastion/src/bastion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::system::SYSTEM;
use core::future::Future;

use std::fmt::{self, Debug, Formatter};
use lightproc::proc_state::State;

/// A `struct` allowing to access the system's API to initialize it,
/// start, stop and kill it and to create new supervisors and top-level
Expand Down
8 changes: 4 additions & 4 deletions bastion/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::supervisor::SupervisorRef;
use crate::system::SYSTEM;
use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures::prelude::*;
use fxhash::FxHashMap;
use t1ha::T1haHashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
Expand All @@ -21,7 +21,7 @@ pub(crate) struct Broadcast {
recver: Receiver,
path: Arc<BastionPath>, // Arc is needed because we put path to Envelope
parent: Parent,
children: FxHashMap<BastionId, Sender>,
children: T1haHashMap<BastionId, Sender>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -51,7 +51,7 @@ impl Parent {
impl Broadcast {
pub(crate) fn new(parent: Parent, element: BastionPathElement) -> Self {
let (sender, recver) = mpsc::unbounded();
let children = FxHashMap::default();
let children = T1haHashMap::default();

let parent_path: BastionPath = match &parent {
Parent::None | Parent::System => BastionPath::root(),
Expand Down Expand Up @@ -79,7 +79,7 @@ impl Broadcast {
assert!(parent.is_none() || parent.is_system());

let (sender, recver) = mpsc::unbounded();
let children = FxHashMap::default();
let children = T1haHashMap::default();
let path = BastionPath::root();
let path = Arc::new(path);

Expand Down
64 changes: 54 additions & 10 deletions bastion/src/children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,26 @@ use crate::dispatcher::Dispatcher;
use crate::envelope::Envelope;
use crate::message::BastionMessage;
use crate::path::BastionPathElement;
use crate::system::SYSTEM;
use crate::{state::SharedState, system::SYSTEM};
use bastion_executor::pool;
use futures::pending;
use futures::poll;
use futures::prelude::*;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use fxhash::FxHashMap;
use t1ha::T1haHashMap;
use lightproc::prelude::*;
// use crate::state::{State as CState};
use lightproc::prelude::State as LPState;
use qutex::Qutex;
use std::fmt::Debug;
use std::future::Future;
use std::iter::FromIterator;
use std::sync::Arc;
use std::sync::{Mutex, Arc};
use std::task::Poll;
use std::fmt;
use state::Container;

#[derive(Debug)]
// #[derive(Debug)]
/// A children group that will contain a defined number of
/// elements (set with [`with_redundancy`] or `1` by default)
/// all running a future (returned by the closure that is set
Expand Down Expand Up @@ -71,13 +75,15 @@ use std::task::Poll;
/// [`with_redundancy`]: #method.with_redundancy
/// [`with_exec`]: #method.with_exec
/// [`SupervisionStrategy`]: supervisor/enum.SupervisionStrategy.html
pub struct Children {
pub struct Children
{
bcast: Broadcast,
// The currently launched elements of the group.
launched: FxHashMap<BastionId, (Sender, RecoverableHandle<()>)>,
launched: T1haHashMap<BastionId, (Sender, RecoverableHandle<()>)>,
// The closure returning the future that will be used by
// every element of the group.
init: Init,
// Redundancy of the closure for replication
redundancy: usize,
// The callbacks called at the group's different lifecycle
// events.
Expand All @@ -87,20 +93,40 @@ pub struct Children {
// is received.
pre_start_msgs: Vec<Envelope>,
started: bool,
// State shared amongst the children
state: SharedState,
// List of dispatchers attached to each actor in the group.
dispatchers: Vec<Arc<Box<Dispatcher>>>,
}

impl Children {
impl fmt::Debug for Children
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Children")
.field("bcast", &self.bcast)
.field("launched", &self.launched)
.field("init", &self.init)
.field("redundancy", &self.redundancy)
.field("callbacks", &self.callbacks)
.field("pre_start_msgs", &self.pre_start_msgs)
.field("started", &self.started)
.field("dispatchers", &self.dispatchers)
.finish()
}
}

impl Children
{
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Children({}): Initializing.", bcast.id());
let launched = FxHashMap::default();
let launched = T1haHashMap::default();
let init = Init::default();
let redundancy = 1;
let callbacks = Callbacks::new();
let pre_start_msgs = Vec::new();
let started = false;
let dispatchers = Vec::new();
let state = SharedState::default();

Children {
bcast,
Expand All @@ -110,6 +136,7 @@ impl Children {
callbacks,
pre_start_msgs,
started,
state,
dispatchers,
}
}
Expand Down Expand Up @@ -306,6 +333,17 @@ impl Children {
self
}

pub fn with_state<S>(self, state: S) -> Self
where
S: LPState
{
if !self.state.set::<S>(state) {
error!("This state type is already managed by this children!");
}

self
}

/// Appends each supervised element to the declared dispatcher.
///
/// By default supervised elements aren't added to any of dispatcher.
Expand Down Expand Up @@ -603,8 +641,14 @@ impl Children {
}
}

pub(crate) fn launch_elems(&mut self) {
pub(crate) fn launch_elems(&mut self)
{
debug!("Children({}): Launching elements.", self.id());
let mut container = Container::new();
let st = self.state.get();
container.set(st);
let shared_state = Arc::new(Mutex::new(SharedState::new(container)));

for _ in 0..self.redundancy {
let parent = Parent::children(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Child(BastionId::new()));
Expand All @@ -622,7 +666,7 @@ impl Children {
let state = Qutex::new(state);

let ctx =
BastionContext::new(id, child_ref.clone(), children, supervisor, state.clone());
BastionContext::new(id, child_ref.clone(), children, supervisor, state.clone(), shared_state.clone());
let exec = (self.init.0)(ctx);

self.bcast.register(&bcast);
Expand Down
27 changes: 24 additions & 3 deletions bastion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
//! A context allows a child's future to access its received
//! messages, parent and supervisor.

use state::Container;
use crate::child_ref::ChildRef;
use crate::children_ref::ChildrenRef;
use crate::dispatcher::{BroadcastTarget, DispatcherType, NotificationType};
use crate::envelope::{Envelope, RefAddr, SignedMessage};
use crate::message::{Answer, BastionMessage, Message, Msg};
use crate::supervisor::SupervisorRef;
use crate::system::SYSTEM;
use crate::{state::SharedState, system::SYSTEM};
use futures::pending;
use qutex::{Guard, Qutex};
use std::collections::VecDeque;
use std::fmt::{self, Display, Formatter};
use std::sync::Arc;
use std::sync::{Mutex, Arc, MutexGuard};
use uuid::Uuid;
use lightproc::proc_state::State;

/// Identifier for a root supervisor and dead-letters children.
pub const NIL_ID: BastionId = BastionId(Uuid::nil());
Expand Down Expand Up @@ -54,7 +56,7 @@ pub const NIL_ID: BastionId = BastionId(Uuid::nil());
/// ```
pub struct BastionId(Uuid);

#[derive(Debug)]

/// A child's execution context, allowing its [`exec`] future
/// to receive messages and access a [`ChildRef`] referencing
/// it, a [`ChildrenRef`] referencing its children group and
Expand Down Expand Up @@ -106,6 +108,17 @@ pub struct BastionContext {
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Qutex<ContextState>,
shared_state: Arc<Mutex<SharedState>>
}

impl fmt::Debug for BastionContext {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("BastionContext")
.field("id", &self.id)
.field("child", &self.child)
// TODO: write others
.finish()
}
}

#[derive(Debug)]
Expand All @@ -128,6 +141,7 @@ impl BastionContext {
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Qutex<ContextState>,
shared_state: Arc<Mutex<SharedState>>,
) -> Self {
debug!("BastionContext({}): Creating.", id);
BastionContext {
Expand All @@ -136,6 +150,7 @@ impl BastionContext {
children,
supervisor,
state,
shared_state
}
}

Expand Down Expand Up @@ -207,6 +222,12 @@ impl BastionContext {
&self.children
}

pub fn state(&self) -> &MutexGuard<'_, crate::state::SharedState>
{
let ss = self.shared_state.clone();
&ss.lock().unwrap()
}

/// Returns a [`SupervisorRef`] referencing the supervisor
/// that supervises the element that is linked to this
/// `BastionContext` if it isn't the system supervisor
Expand Down
1 change: 1 addition & 0 deletions bastion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod config;
mod macros;
mod system;

pub mod state;
pub mod child_ref;
pub mod children;
pub mod children_ref;
Expand Down
1 change: 1 addition & 0 deletions bastion/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use lightproc::proc_state::State;

/// A trait that any message sent needs to implement (it is
/// already automatically implemented but forces message to
Expand Down
73 changes: 73 additions & 0 deletions bastion/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use state::Container;
use core::ops::DerefMut;
use core::ops::Deref;
use lightproc::{proc_state::{EmptyState, EmptyProcState}, prelude::State as LPState};
use crate::context::BastionContext;
use std::marker::PhantomData as marker;


pub struct SharedState(Container);

impl SharedState
{
#[inline(always)]
pub fn new(c: Container) -> SharedState
{
SharedState(c)
}

#[inline(always)]
pub fn get<S>(&self) -> &S
where
S: LPState
{
self.0.get::<S>()
}

#[inline(always)]
pub fn set<S>(&self, s: S) -> bool
where
S: LPState
{
let s = s as dyn LPState;
self.0.set(s)
}

#[inline(always)]
pub fn from_ctx<S>(ctx: &'static BastionContext) -> Option<Self>
where
S: LPState
{
let container = Container::new();
container.set((*ctx.state()).get::<S>());
Some(SharedState(container))
}
}

impl Default for SharedState {
fn default() -> Self {
let mut c = Container::new();
c.set(EmptyState);
Self(c)
}
}

// impl<'s> Deref for SharedState<'s>
// {
// type Target = S;

// #[inline(always)]
// fn deref(&self) -> &S {
// self.container.get::<S>()
// }
// }


// impl<'s, S> DerefMut for SharedState<'s>
// where
// S: LPState + 'static
// {
// fn deref_mut(&mut self) -> &mut Self::Target {
// &mut self.container.get::<S>()
// }
// }
Loading