diff --git a/.cargo/config b/.cargo/config new file mode 100644 index 00000000..b6376048 --- /dev/null +++ b/.cargo/config @@ -0,0 +1,2 @@ +[target.'cfg(any(windows, unix))'] +rustflags = ["-C", "target-cpu=native"] diff --git a/bastion/Cargo.toml b/bastion/Cargo.toml index 0d280420..f6bb6ff2 100644 --- a/bastion/Cargo.toml +++ b/bastion/Cargo.toml @@ -46,7 +46,8 @@ lightproc = { version = "= 0.3.5-alpha.0", path = "../lightproc" } dashmap = "3.4.0" futures = { version = "0.3", features = ["async-await"] } futures-timer = "3.0.0" -fxhash = "0.2" +t1ha = "0.1" +state = "0.4" lazy_static = "1.4" log = "0.4" # TODO: https://github.com/cogciprocate/qutex/pull/5 diff --git a/bastion/src/bastion.rs b/bastion/src/bastion.rs index 67d6c82c..de37cd81 100644 --- a/bastion/src/bastion.rs +++ b/bastion/src/bastion.rs @@ -12,6 +12,7 @@ use crate::system::SYSTEM; use core::future::Future; use std::fmt::{self, Debug, Formatter}; +use lightproc::proc_state::State; /// A `struct` allowing to access the system's API to initialize it, /// start, stop and kill it and to create new supervisors and top-level diff --git a/bastion/src/broadcast.rs b/bastion/src/broadcast.rs index ae97e1d6..8f70f9fa 100644 --- a/bastion/src/broadcast.rs +++ b/bastion/src/broadcast.rs @@ -7,7 +7,7 @@ use crate::supervisor::SupervisorRef; use crate::system::SYSTEM; use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender}; use futures::prelude::*; -use fxhash::FxHashMap; +use t1ha::T1haHashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -21,7 +21,7 @@ pub(crate) struct Broadcast { recver: Receiver, path: Arc, // Arc is needed because we put path to Envelope parent: Parent, - children: FxHashMap, + children: T1haHashMap, } #[derive(Debug, Clone)] @@ -51,7 +51,7 @@ impl Parent { impl Broadcast { pub(crate) fn new(parent: Parent, element: BastionPathElement) -> Self { let (sender, recver) = mpsc::unbounded(); - let children = FxHashMap::default(); + let children = T1haHashMap::default(); let parent_path: BastionPath = match &parent { Parent::None | Parent::System => BastionPath::root(), @@ -79,7 +79,7 @@ impl Broadcast { assert!(parent.is_none() || parent.is_system()); let (sender, recver) = mpsc::unbounded(); - let children = FxHashMap::default(); + let children = T1haHashMap::default(); let path = BastionPath::root(); let path = Arc::new(path); diff --git a/bastion/src/children.rs b/bastion/src/children.rs index b9ab5193..8db53486 100644 --- a/bastion/src/children.rs +++ b/bastion/src/children.rs @@ -10,22 +10,26 @@ use crate::dispatcher::Dispatcher; use crate::envelope::Envelope; use crate::message::BastionMessage; use crate::path::BastionPathElement; -use crate::system::SYSTEM; +use crate::{state::SharedState, system::SYSTEM}; use bastion_executor::pool; use futures::pending; use futures::poll; use futures::prelude::*; use futures::stream::{FuturesOrdered, FuturesUnordered}; -use fxhash::FxHashMap; +use t1ha::T1haHashMap; use lightproc::prelude::*; +// use crate::state::{State as CState}; +use lightproc::prelude::State as LPState; use qutex::Qutex; use std::fmt::Debug; use std::future::Future; use std::iter::FromIterator; -use std::sync::Arc; +use std::sync::{Mutex, Arc}; use std::task::Poll; +use std::fmt; +use state::Container; -#[derive(Debug)] +// #[derive(Debug)] /// A children group that will contain a defined number of /// elements (set with [`with_redundancy`] or `1` by default) /// all running a future (returned by the closure that is set @@ -71,13 +75,15 @@ use std::task::Poll; /// [`with_redundancy`]: #method.with_redundancy /// [`with_exec`]: #method.with_exec /// [`SupervisionStrategy`]: supervisor/enum.SupervisionStrategy.html -pub struct Children { +pub struct Children +{ bcast: Broadcast, // The currently launched elements of the group. - launched: FxHashMap)>, + launched: T1haHashMap)>, // The closure returning the future that will be used by // every element of the group. init: Init, + // Redundancy of the closure for replication redundancy: usize, // The callbacks called at the group's different lifecycle // events. @@ -87,20 +93,40 @@ pub struct Children { // is received. pre_start_msgs: Vec, started: bool, + // State shared amongst the children + state: SharedState, // List of dispatchers attached to each actor in the group. dispatchers: Vec>>, } -impl Children { +impl fmt::Debug for Children +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("Children") + .field("bcast", &self.bcast) + .field("launched", &self.launched) + .field("init", &self.init) + .field("redundancy", &self.redundancy) + .field("callbacks", &self.callbacks) + .field("pre_start_msgs", &self.pre_start_msgs) + .field("started", &self.started) + .field("dispatchers", &self.dispatchers) + .finish() + } +} + +impl Children +{ pub(crate) fn new(bcast: Broadcast) -> Self { debug!("Children({}): Initializing.", bcast.id()); - let launched = FxHashMap::default(); + let launched = T1haHashMap::default(); let init = Init::default(); let redundancy = 1; let callbacks = Callbacks::new(); let pre_start_msgs = Vec::new(); let started = false; let dispatchers = Vec::new(); + let state = SharedState::default(); Children { bcast, @@ -110,6 +136,7 @@ impl Children { callbacks, pre_start_msgs, started, + state, dispatchers, } } @@ -306,6 +333,17 @@ impl Children { self } + pub fn with_state(self, state: S) -> Self + where + S: LPState + { + if !self.state.set::(state) { + error!("This state type is already managed by this children!"); + } + + self + } + /// Appends each supervised element to the declared dispatcher. /// /// By default supervised elements aren't added to any of dispatcher. @@ -603,8 +641,14 @@ impl Children { } } - pub(crate) fn launch_elems(&mut self) { + pub(crate) fn launch_elems(&mut self) + { debug!("Children({}): Launching elements.", self.id()); + let mut container = Container::new(); + let st = self.state.get(); + container.set(st); + let shared_state = Arc::new(Mutex::new(SharedState::new(container))); + for _ in 0..self.redundancy { let parent = Parent::children(self.as_ref()); let bcast = Broadcast::new(parent, BastionPathElement::Child(BastionId::new())); @@ -622,7 +666,7 @@ impl Children { let state = Qutex::new(state); let ctx = - BastionContext::new(id, child_ref.clone(), children, supervisor, state.clone()); + BastionContext::new(id, child_ref.clone(), children, supervisor, state.clone(), shared_state.clone()); let exec = (self.init.0)(ctx); self.bcast.register(&bcast); diff --git a/bastion/src/context.rs b/bastion/src/context.rs index 56e6013c..efbd821e 100644 --- a/bastion/src/context.rs +++ b/bastion/src/context.rs @@ -2,19 +2,21 @@ //! A context allows a child's future to access its received //! messages, parent and supervisor. +use state::Container; use crate::child_ref::ChildRef; use crate::children_ref::ChildrenRef; use crate::dispatcher::{BroadcastTarget, DispatcherType, NotificationType}; use crate::envelope::{Envelope, RefAddr, SignedMessage}; use crate::message::{Answer, BastionMessage, Message, Msg}; use crate::supervisor::SupervisorRef; -use crate::system::SYSTEM; +use crate::{state::SharedState, system::SYSTEM}; use futures::pending; use qutex::{Guard, Qutex}; use std::collections::VecDeque; use std::fmt::{self, Display, Formatter}; -use std::sync::Arc; +use std::sync::{Mutex, Arc, MutexGuard}; use uuid::Uuid; +use lightproc::proc_state::State; /// Identifier for a root supervisor and dead-letters children. pub const NIL_ID: BastionId = BastionId(Uuid::nil()); @@ -54,7 +56,7 @@ pub const NIL_ID: BastionId = BastionId(Uuid::nil()); /// ``` pub struct BastionId(Uuid); -#[derive(Debug)] + /// A child's execution context, allowing its [`exec`] future /// to receive messages and access a [`ChildRef`] referencing /// it, a [`ChildrenRef`] referencing its children group and @@ -106,6 +108,17 @@ pub struct BastionContext { children: ChildrenRef, supervisor: Option, state: Qutex, + shared_state: Arc> +} + +impl fmt::Debug for BastionContext { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("BastionContext") + .field("id", &self.id) + .field("child", &self.child) + // TODO: write others + .finish() + } } #[derive(Debug)] @@ -128,6 +141,7 @@ impl BastionContext { children: ChildrenRef, supervisor: Option, state: Qutex, + shared_state: Arc>, ) -> Self { debug!("BastionContext({}): Creating.", id); BastionContext { @@ -136,6 +150,7 @@ impl BastionContext { children, supervisor, state, + shared_state } } @@ -207,6 +222,12 @@ impl BastionContext { &self.children } + pub fn state(&self) -> &MutexGuard<'_, crate::state::SharedState> + { + let ss = self.shared_state.clone(); + &ss.lock().unwrap() + } + /// Returns a [`SupervisorRef`] referencing the supervisor /// that supervises the element that is linked to this /// `BastionContext` if it isn't the system supervisor diff --git a/bastion/src/lib.rs b/bastion/src/lib.rs index 8896d9c3..287ed686 100644 --- a/bastion/src/lib.rs +++ b/bastion/src/lib.rs @@ -73,6 +73,7 @@ mod config; mod macros; mod system; +pub mod state; pub mod child_ref; pub mod children; pub mod children_ref; diff --git a/bastion/src/message.rs b/bastion/src/message.rs index de2bfa9b..e45551d2 100644 --- a/bastion/src/message.rs +++ b/bastion/src/message.rs @@ -17,6 +17,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use lightproc::proc_state::State; /// A trait that any message sent needs to implement (it is /// already automatically implemented but forces message to diff --git a/bastion/src/state.rs b/bastion/src/state.rs new file mode 100644 index 00000000..238c8a2a --- /dev/null +++ b/bastion/src/state.rs @@ -0,0 +1,73 @@ +use state::Container; +use core::ops::DerefMut; +use core::ops::Deref; +use lightproc::{proc_state::{EmptyState, EmptyProcState}, prelude::State as LPState}; +use crate::context::BastionContext; +use std::marker::PhantomData as marker; + + +pub struct SharedState(Container); + +impl SharedState +{ + #[inline(always)] + pub fn new(c: Container) -> SharedState + { + SharedState(c) + } + + #[inline(always)] + pub fn get(&self) -> &S + where + S: LPState + { + self.0.get::() + } + + #[inline(always)] + pub fn set(&self, s: S) -> bool + where + S: LPState + { + let s = s as dyn LPState; + self.0.set(s) + } + + #[inline(always)] + pub fn from_ctx(ctx: &'static BastionContext) -> Option + where + S: LPState + { + let container = Container::new(); + container.set((*ctx.state()).get::()); + Some(SharedState(container)) + } +} + +impl Default for SharedState { + fn default() -> Self { + let mut c = Container::new(); + c.set(EmptyState); + Self(c) + } +} + +// impl<'s> Deref for SharedState<'s> +// { +// type Target = S; + +// #[inline(always)] +// fn deref(&self) -> &S { +// self.container.get::() +// } +// } + + +// impl<'s, S> DerefMut for SharedState<'s> +// where +// S: LPState + 'static +// { +// fn deref_mut(&mut self) -> &mut Self::Target { +// &mut self.container.get::() +// } +// } diff --git a/bastion/src/supervisor.rs b/bastion/src/supervisor.rs index 45dfedb5..ee7d2e19 100644 --- a/bastion/src/supervisor.rs +++ b/bastion/src/supervisor.rs @@ -14,7 +14,7 @@ use futures::prelude::*; use futures::stream::FuturesOrdered; use futures::{pending, poll}; use futures_timer::Delay; -use fxhash::FxHashMap; +use t1ha::T1haHashMap; use lightproc::prelude::*; use log::Level; use std::cmp::{Eq, PartialEq}; @@ -69,14 +69,14 @@ pub struct Supervisor { order: Vec, // The currently launched supervised children and supervisors. // The last value is the amount of times a given actor has restarted. - launched: FxHashMap, usize)>, + launched: T1haHashMap, usize)>, // Supervised children and supervisors that are stopped. // This is used when resetting or recovering when the // supervision strategy is not "one-for-one". - stopped: FxHashMap, + stopped: T1haHashMap, // Supervised children and supervisors that were killed. // This is used when resetting only. - killed: FxHashMap, + killed: T1haHashMap, strategy: SupervisionStrategy, restart_strategy: RestartStrategy, // The callbacks called at the supervisor's different @@ -197,9 +197,9 @@ impl Supervisor { pub(crate) fn new(bcast: Broadcast) -> Self { debug!("Supervisor({}): Initializing.", bcast.id()); let order = Vec::new(); - let launched = FxHashMap::default(); - let stopped = FxHashMap::default(); - let killed = FxHashMap::default(); + let launched = T1haHashMap::default(); + let stopped = T1haHashMap::default(); + let killed = T1haHashMap::default(); let strategy = SupervisionStrategy::default(); let restart_strategy = RestartStrategy::default(); let callbacks = Callbacks::new(); diff --git a/bastion/src/system.rs b/bastion/src/system.rs index 28d29682..f496b757 100644 --- a/bastion/src/system.rs +++ b/bastion/src/system.rs @@ -1,3 +1,4 @@ +use t1ha::{T1haHashSet, T1haHashMap}; use crate::broadcast::{Broadcast, Parent, Sender}; use crate::children_ref::ChildrenRef; use crate::context::{BastionContext, BastionId, NIL_ID}; @@ -10,7 +11,6 @@ use bastion_executor::pool; use futures::prelude::*; use futures::stream::FuturesUnordered; use futures::{pending, poll}; -use fxhash::{FxHashMap, FxHashSet}; use lazy_static::lazy_static; use lightproc::prelude::*; use qutex::Qutex; @@ -35,9 +35,9 @@ pub(crate) struct GlobalSystem { #[derive(Debug)] struct System { bcast: Broadcast, - launched: FxHashMap>, + launched: T1haHashMap>, // TODO: set limit - restart: FxHashSet, + restart: T1haHashSet, waiting: FuturesUnordered>, pre_start_msgs: Vec, started: bool, @@ -113,8 +113,8 @@ impl System { info!("System: Initializing."); let parent = Parent::none(); let bcast = Broadcast::new_root(parent); - let launched = FxHashMap::default(); - let restart = FxHashSet::default(); + let launched = T1haHashMap::default(); + let restart = T1haHashSet::default(); let waiting = FuturesUnordered::new(); let pre_start_msgs = Vec::new(); let started = false; @@ -168,6 +168,7 @@ impl System { debug!("Received dead letter: {:?}", smsg); } }) + .with_state(EmptyState) }) }