diff --git a/Cargo.lock b/Cargo.lock index 966aac44be7..a8cfc34e97a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5056,6 +5056,7 @@ dependencies = [ "bcs 0.1.4", "bytes", "claims", + "core_affinity", "crossbeam-channel", "derive_more 0.99.17", "fail", diff --git a/aptos-move/aptos-vm/Cargo.toml b/aptos-move/aptos-vm/Cargo.toml index 4137846a7b5..1523e2a36ff 100644 --- a/aptos-move/aptos-vm/Cargo.toml +++ b/aptos-move/aptos-vm/Cargo.toml @@ -39,6 +39,7 @@ ark-groth16 = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } claims = { workspace = true } +core_affinity = { workspace = true } crossbeam-channel = { workspace = true } derive_more = { workspace = true } fail = { workspace = true } diff --git a/aptos-move/aptos-vm/src/block_executor/affinity.rs b/aptos-move/aptos-vm/src/block_executor/affinity.rs new file mode 100644 index 00000000000..2c5ae991c54 --- /dev/null +++ b/aptos-move/aptos-vm/src/block_executor/affinity.rs @@ -0,0 +1,137 @@ +// Copyright (c) Aptos Foundation +// Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE + +//! CPU-affinity helpers for the par_exec thread pool. +//! +//! Detects the physical cores (as opposed to hyperthread siblings) that the +//! current process is allowed to run on, so the block executor can pin one +//! rayon worker per physical core. Pinning is only beneficial when we have at +//! least as many physical cores as worker threads; the caller is expected to +//! skip pinning otherwise. +//! +//! Linux only. On other platforms, or if the topology cannot be determined +//! (e.g. sysfs is not readable), this module returns `None` and the caller +//! should fall back to unpinned scheduling. + +/// Returns one logical CPU id per distinct physical core that the current +/// process may run on, sorted ascending. Returns `None` if the topology +/// cannot be determined. +/// +/// The returned ids are always a subset of `core_affinity::get_core_ids()` +/// so pinning to any of them is safe with respect to cgroups or an inherited +/// affinity mask (e.g. under `taskset`). +#[cfg(target_os = "linux")] +pub(super) fn allowed_physical_cores() -> Option> { + let allowed = core_affinity::get_core_ids()?; + let allowed_ids: Vec = allowed.iter().map(|c| c.id).collect(); + let ids = group_by_physical_core(&allowed_ids, read_thread_siblings)?; + Some( + ids.into_iter() + .map(|id| core_affinity::CoreId { id }) + .collect(), + ) +} + +#[cfg(not(target_os = "linux"))] +pub(super) fn allowed_physical_cores() -> Option> { + None +} + +/// Groups `allowed` CPU ids by the sibling list returned by `read_siblings`, +/// keeping the lowest id in each group as the representative for that +/// physical core. Returns `None` if `read_siblings` fails for any CPU. +/// +/// Pure function — does not touch sysfs — to make the grouping logic unit +/// testable. +fn group_by_physical_core(allowed: &[usize], mut read_siblings: F) -> Option> +where + F: FnMut(usize) -> Option, +{ + use std::collections::BTreeMap; + + let mut representatives: BTreeMap = BTreeMap::new(); + for &cpu in allowed { + let siblings = read_siblings(cpu)?; + representatives + .entry(siblings) + .and_modify(|cur| { + if cpu < *cur { + *cur = cpu; + } + }) + .or_insert(cpu); + } + let mut ids: Vec = representatives.into_values().collect(); + ids.sort_unstable(); + Some(ids) +} + +#[cfg(target_os = "linux")] +fn read_thread_siblings(cpu: usize) -> Option { + use aptos_logger::warn; + + let path = format!( + "/sys/devices/system/cpu/cpu{}/topology/thread_siblings_list", + cpu + ); + match std::fs::read_to_string(&path) { + Ok(s) => Some(s.trim().to_string()), + Err(e) => { + warn!( + cpu = cpu, + path = %path, + error = %e, + "Failed to read thread_siblings_list; cannot detect physical cores", + ); + None + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn groups_ht_siblings_into_one_physical_core() { + // 4 logical CPUs, 2 physical cores with HT enabled: {0,2} and {1,3}. + let result = group_by_physical_core(&[0, 1, 2, 3], |cpu| match cpu { + 0 | 2 => Some("0,2".to_string()), + 1 | 3 => Some("1,3".to_string()), + _ => None, + }); + assert_eq!(result, Some(vec![0, 1])); + } + + #[test] + fn no_ht_returns_all_cpus() { + // 4 logical CPUs, HT disabled: each CPU is its own sibling. + let result = group_by_physical_core(&[0, 1, 2, 3], |cpu| Some(cpu.to_string())); + assert_eq!(result, Some(vec![0, 1, 2, 3])); + } + + #[test] + fn respects_restricted_affinity_set() { + // The process may only run on CPUs {1,2}. Their sibling lists still + // reference CPUs outside the allowed set, but we never consider those, + // so the representatives come only from the allowed CPUs. + let result = group_by_physical_core(&[1, 2], |cpu| match cpu { + 0 | 2 => Some("0,2".to_string()), + 1 | 3 => Some("1,3".to_string()), + _ => None, + }); + assert_eq!(result, Some(vec![1, 2])); + } + + #[test] + fn returns_none_when_sibling_read_fails() { + let result = group_by_physical_core(&[0, 1], |_| None); + assert_eq!(result, None); + } + + #[test] + fn empty_input_yields_empty_output() { + let result = group_by_physical_core(&[], |_| Some(String::new())); + assert_eq!(result, Some(Vec::new())); + } +} diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 696ac4187c2..6ccbaa408aa 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos Foundation // Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE +mod affinity; pub(crate) mod vm_wrapper; use crate::counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS}; @@ -19,7 +20,7 @@ use aptos_block_executor::{ txn_provider::TxnProvider, types::InputOutputKey, }; -use aptos_logger::info; +use aptos_logger::{info, warn}; use aptos_types::{ block_executor::{ config::BlockExecutorConfig, transaction_slice_metadata::TransactionSliceMetadata, @@ -602,14 +603,7 @@ impl< let pool = match &mut *RAYON_EXEC_POOL.lock().unwrap() { Some((pool, n)) if *n == num_threads => Arc::clone(pool), slot => { - info!(num_threads = num_threads, "Creating par_exec thread pool"); - let pool = Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(num_threads) - .thread_name(|index| format!("par_exec-{}", index)) - .build() - .unwrap(), - ); + let pool = Arc::new(build_par_exec_pool(num_threads)); *slot = Some((Arc::clone(&pool), num_threads)); pool }, @@ -626,5 +620,56 @@ impl< } } +/// Builds the rayon thread pool that backs parallel block execution. +/// +/// When the process has at least as many physical cores available as worker +/// threads, each worker is pinned 1:1 to a distinct physical core so two +/// workers never land on HT siblings of the same core (which hurts throughput +/// on CPU-bound workloads). If the topology cannot be detected (non-Linux, +/// sysfs unreadable) or we have more threads than physical cores, the pool +/// is built without pinning and the OS scheduler decides placement. +fn build_par_exec_pool(num_threads: usize) -> rayon::ThreadPool { + let mut builder = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|index| format!("par_exec-{}", index)); + + let physical_cores = affinity::allowed_physical_cores(); + match &physical_cores { + Some(cores) if num_threads > 0 && cores.len() >= num_threads => { + info!( + num_threads = num_threads, + num_physical_cores = cores.len(), + "Creating par_exec thread pool with per-worker physical-core pinning", + ); + let cores = cores.clone(); + builder = builder.start_handler(move |index| { + let core = cores[index]; + if !core_affinity::set_for_current(core) { + warn!( + thread_index = index, + core_id = core.id, + "Failed to pin par_exec thread to physical core; running unpinned", + ); + } + }); + }, + Some(cores) => { + info!( + num_threads = num_threads, + num_physical_cores = cores.len(), + "Creating par_exec thread pool without pinning (more threads than physical cores)", + ); + }, + None => { + info!( + num_threads = num_threads, + "Creating par_exec thread pool without pinning (CPU topology unavailable)", + ); + }, + } + + builder.build().unwrap() +} + // Same as AptosBlockExecutorWrapper with AptosExecutorTask pub type AptosVMBlockExecutorWrapper = AptosBlockExecutorWrapper;