Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions horust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ oci-spec = "0.7.1"

[target.'cfg(target_os = "linux")'.dependencies]
libcgroups = { version = "0.5.3", features = ["v1", "v2"], default-features = false, git = "https://github.com/youki-dev/youki.git", rev = "1b840bb0936e61990f9eabbb0e094d08235b2220"}
notify = "7.0.0"

[features]
default = ["http-healthcheck"]
Expand Down
2 changes: 1 addition & 1 deletion horust/src/horust/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl ValidationErrors {
fn validation_errors(errors: &[ValidationError]) -> String {
errors
.iter()
.map(|s| format!("* {}", s))
.map(|s| format!("* {s}"))
.collect::<Vec<String>>()
.join("\n")
}
Expand Down
2 changes: 2 additions & 0 deletions horust/src/horust/formats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use nix::unistd::Pid;
use std::path::PathBuf;

pub use horust_config::HorustConfig;
pub use service::*;
Expand Down Expand Up @@ -26,6 +27,7 @@ pub enum Event {
Run(ServiceName),
ShuttingDownInitiated(ShuttingDown),
HealthCheck(ServiceName, HealthinessStatus),
ReloadConfig(PathBuf),
}

impl Event {
Expand Down
38 changes: 16 additions & 22 deletions horust/src/horust/formats/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct Service {
pub termination: Termination,
#[serde(default)]
pub resource_limit: ResourceLimit,
pub config_file: Option<PathBuf>,
}

fn default_as_false() -> bool {
Expand Down Expand Up @@ -127,6 +128,7 @@ impl Default for Service {
failure: Default::default(),
termination: Default::default(),
resource_limit: Default::default(),
config_file: None,
}
}
}
Expand Down Expand Up @@ -240,10 +242,11 @@ impl Environment {
/// Create the environment K=V variables, used for exec into the new process.
/// User defined environment variables overwrite the predefined variables.
pub(crate) fn get_environment(&self, user_name: String, user_home: String) -> Vec<String> {
let mut initial: HashMap<String, String> = self
.keep_env
.then(|| std::env::vars().collect())
.unwrap_or_default();
let mut initial: HashMap<String, String> = if self.keep_env {
std::env::vars().collect()
} else {
Default::default()
};

let mut additional = self.additional.clone();

Expand Down Expand Up @@ -294,7 +297,7 @@ impl Environment {
// This is the suitable format for `exec`
additional
.into_iter()
.map(|(k, v)| format!("{}={}", k, v))
.map(|(k, v)| format!("{k}={v}"))
.collect()
}
}
Expand Down Expand Up @@ -357,7 +360,7 @@ impl User {
match &self {
User::Name(name) => {
let user = unistd::User::from_name(name)?
.with_context(|| format!("User `{}` not found", name))?;
.with_context(|| format!("User `{name}` not found"))?;
Ok(user.uid)
}
User::Uid(uid) => Ok(unistd::Uid::from_raw(*uid)),
Expand All @@ -367,7 +370,7 @@ impl User {
fn get_raw_user(&self) -> Result<unistd::User> {
let uid = self.get_uid()?;
let user =
unistd::User::from_uid(uid)?.with_context(|| format!("User `{}` not found", uid))?;
unistd::User::from_uid(uid)?.with_context(|| format!("User `{uid}` not found"))?;
Ok(user)
}

Expand Down Expand Up @@ -627,7 +630,7 @@ impl From<TerminationSignal> for Signal {
}
}

#[derive(Serialize, Clone, Deserialize, Debug, PartialEq)]
#[derive(Serialize, Clone, Deserialize, Debug, Default, PartialEq)]
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
pub struct ResourceLimit {
#[serde(default)]
Expand All @@ -647,16 +650,6 @@ impl ResourceLimit {
}
}

impl Default for ResourceLimit {
fn default() -> Self {
ResourceLimit {
cpu: None,
memory: None,
pids_max: None,
}
}
}

impl Eq for ResourceLimit {}

impl ResourceLimit {
Expand All @@ -679,13 +672,13 @@ impl ResourceLimit {
}

// has to be an absolute path for cgroups v2
let cgroup_path = Path::new(DEFAULT_CGROUP_ROOT).join(format!("horust_{}", name));
let cgroup_path = Path::new(DEFAULT_CGROUP_ROOT).join(format!("horust_{name}"));
let manager = create_cgroup_manager(CgroupConfig {
cgroup_path: cgroup_path.to_path_buf(),
systemd_cgroup: false,
container_name: name.to_string(),
})
.with_context(|| format!("Failed to create cgroup manager for {}", name))?;
.with_context(|| format!("Failed to create cgroup manager for {name}"))?;
let mut resource = LinuxResources::default();
if let Some(cpu) = self.cpu {
let cpu = LinuxCpuBuilder::default()
Expand All @@ -705,15 +698,15 @@ impl ResourceLimit {

manager
.add_task(pid)
.with_context(|| format!("Failed to add task to cgroup {}", name))?;
.with_context(|| format!("Failed to add task to cgroup {name}"))?;
manager
.apply(&ControllerOpt {
resources: &resource,
disable_oom_killer: false,
oom_score_adj: None,
freezer_state: None,
})
.with_context(|| format!("Failed to apply resource limits to cgroup {}", name))?;
.with_context(|| format!("Failed to apply resource limits to cgroup {name}"))?;

Ok(())
}
Expand Down Expand Up @@ -802,6 +795,7 @@ mod test {
let current_user_name: String = super::User::default().get_name().unwrap();
let expected = Service {
name: "".to_string(),
config_file: None,
command: "/bin/bash -c \'echo hello world\'".to_string(),
user: super::User::Name(current_user_name),
environment: Environment {
Expand Down
6 changes: 3 additions & 3 deletions horust/src/horust/healthcheck/checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ pub(crate) struct CommandCheck {}

impl CommandCheck {
fn prepare_cmd(&self, cmd: &str) -> anyhow::Result<()> {
let mut chunks = shlex::split(cmd).context(format!("Failed to split command: {}", cmd))?;
let mut chunks = shlex::split(cmd).context(format!("Failed to split command: {cmd}"))?;
let program = chunks
.first()
.context(format!("Failed to get program from command: {}", cmd))?;
.context(format!("Failed to get program from command: {cmd}"))?;
let path = if program.contains('/') {
program.to_string()
} else {
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Check for CommandCheck {
.as_ref()
.map(|command| {
self.prepare_cmd(command)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
.map_err(|e| std::io::Error::other(e.to_string()))?;
Ok(())
})
.unwrap_or(Ok(()))
Expand Down
2 changes: 1 addition & 1 deletion horust/src/horust/healthcheck/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ mod test {
let socket = SocketAddrV4::new(loopback, 0);
let listener = TcpListener::bind(socket)?;
let port = listener.local_addr()?.port();
let endpoint = format!("http://localhost:{}", port);
let endpoint = format!("http://localhost:{port}");
let healthiness = Healthiness {
file_path: None,
http_endpoint: Some(endpoint),
Expand Down
1 change: 1 addition & 0 deletions horust/src/horust/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ where
let filename = path.file_name().unwrap().to_str().unwrap().to_owned();
service.name = filename;
}
service.config_file = Some(path.clone());
service
})
.map_err(|error| {
Expand Down
2 changes: 1 addition & 1 deletion horust/src/horust/signal_safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ mod test {
fn test_int_to_string_conversion() {
let test = |i| {
let (res, digits) = i32_to_str_bytes(i);
assert_eq!(&res[digits..], format!("{}", i).as_bytes());
assert_eq!(&res[digits..], format!("{i}").as_bytes());
};

for _ in 0..100 {
Expand Down
18 changes: 16 additions & 2 deletions horust/src/horust/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) use signal_handling::init;

use crate::horust::bus::BusConnector;
use crate::horust::formats::{Event, ExitStatus, Service, ServiceStatus, ShuttingDown};
use crate::horust::healthcheck;
use crate::horust::{healthcheck, load_service};

mod process_spawner;
mod reaper;
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Supervisor {
&& service_handler.is_early_state(),
);

let new_status = if has_failed
let new_status = if !service_handler.reload_config && has_failed
|| (service_handler.status == ServiceStatus::Running
&& service_handler.has_some_failed_healthchecks())
{
Expand Down Expand Up @@ -111,6 +111,7 @@ impl Supervisor {
}
Event::Run(service_name) if self.repo.get_sh(&service_name).is_initial() => {
let service_handler = self.repo.get_mut_sh(&service_name);
service_handler.reload_config = false;
service_handler.status = ServiceStatus::Starting;
let evs = vec![Event::StatusChanged(service_name, ServiceStatus::Starting)];

Expand Down Expand Up @@ -224,6 +225,19 @@ impl Supervisor {
vec![]
}
}
Event::ReloadConfig(path) => {
info!("Reloading config: {path:?}");
let service_handler = self
.repo
.get_service_by_path(&path)
.map(|service_name| self.repo.get_mut_sh(&service_name))
.zip(load_service(&path).ok());
if let Some((service_handler, service)) = service_handler {
service_handler.service = service;
service_handler.reload_config = true;
}
vec![]
}
ev => {
trace!("ignoring: {:?}", ev);
vec![]
Expand Down
53 changes: 51 additions & 2 deletions horust/src/horust/supervisor/repo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::HashMap;
use std::path::Path;

use nix::unistd::Pid;
use notify::event::ModifyKind;
use notify::{EventKind, Watcher};

use crate::horust::bus::BusConnector;
use crate::horust::formats::{Service, ServiceName};
Expand All @@ -12,20 +15,66 @@ pub(crate) struct Repo {
pub services: HashMap<ServiceName, ServiceHandler>,
pub(crate) bus: BusConnector<Event>,
pub(crate) pid_map: HashMap<Pid, ServiceName>,
_watcher: notify::RecommendedWatcher,
}

struct ConfigWatcher {
bus: BusConnector<Event>,
}

impl notify::EventHandler for ConfigWatcher {
fn handle_event(&mut self, event: notify::Result<notify::Event>) {
if let Ok(notify::Event {
kind: EventKind::Modify(ModifyKind::Data(_)),
paths,
attrs: _,
}) = event
{
paths
.iter()
.for_each(|path| self.bus.send_event(Event::ReloadConfig(path.clone())));
}
}
}

impl Repo {
pub(crate) fn new(bus: BusConnector<Event>, services: Vec<Service>) -> Self {
let config_watcher = ConfigWatcher {
bus: bus.join_bus(),
};
let mut watcher = notify::recommended_watcher(config_watcher).unwrap();
services.iter().for_each(|service| {
if let Some(path) = service.config_file.as_ref() {
_ = watcher.watch(path.as_path(), notify::RecursiveMode::NonRecursive);
}
});

let services = services
.into_iter()
.map(|service| (service.name.clone(), service.into()))
.iter()
.map(|service| (service.name.clone(), service.clone().into()))
.collect();

Self {
bus,
services,
pid_map: HashMap::new(),
_watcher: watcher,
}
}

pub fn get_service_by_path(&self, path: &Path) -> Option<ServiceName> {
self.services
.iter()
.find(|(_, handler)| {
handler
.service()
.config_file
.as_ref()
.is_some_and(|config_file| config_file == path.as_os_str())
})
.map(|(service_name, _)| service_name.to_owned())
}

pub(crate) fn insert_sh_by_name(&mut self, name: ServiceName, sh: ServiceHandler) {
self.services.insert(name, sh);
}
Expand Down
Loading
Loading