From af8a0d95cd85cf712ae8845880dbbdb197d0f4a0 Mon Sep 17 00:00:00 2001 From: wqfish <1171005+wqfish@users.noreply.github.com> Date: Fri, 17 Apr 2026 07:32:45 -0700 Subject: [PATCH] [vm] Pin par_exec rayon workers to physical cores When the block executor builds its parallel execution pool and the process has at least as many physical cores available as worker threads, pin each worker 1:1 to a distinct physical core. This prevents two workers from landing on HT siblings of the same core, which degrades throughput on CPU-bound Block-STM workloads. ## Approach - Detect the physical cores the process may run on by grouping the CPUs from `sched_getaffinity` (via `core_affinity::get_core_ids()`) by their `/sys/.../topology/thread_siblings_list`, keeping one representative per group. Linux only; other platforms return `None`. - Pin via a `rayon` `start_handler` that calls `core_affinity::set_for_current` using the worker's thread index. - Skip pinning (fall back to the OS scheduler) when topology cannot be detected (non-Linux, sysfs unreadable, any read fails), the process has fewer physical cores than `num_threads`, or `num_threads == 0`. - Per-thread pin failures are logged at `warn!` and the thread continues unpinned; the pool still comes up. ## Robustness - `core_affinity::get_core_ids` respects the process's affinity mask, so this composes correctly with cgroups, containers, and `taskset`. - Unit tests cover the sibling-grouping logic (HT on/off, restricted affinity set, read-failure path). Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 1 + aptos-move/aptos-vm/Cargo.toml | 1 + .../aptos-vm/src/block_executor/affinity.rs | 137 ++++++++++++++++++ aptos-move/aptos-vm/src/block_executor/mod.rs | 63 ++++++-- 4 files changed, 193 insertions(+), 9 deletions(-) create mode 100644 aptos-move/aptos-vm/src/block_executor/affinity.rs diff --git a/Cargo.lock b/Cargo.lock index 966aac44be7..a8cfc34e97a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5056,6 +5056,7 @@ dependencies = [ "bcs 0.1.4", "bytes", "claims", + "core_affinity", "crossbeam-channel", "derive_more 0.99.17", "fail", diff --git a/aptos-move/aptos-vm/Cargo.toml b/aptos-move/aptos-vm/Cargo.toml index 4137846a7b5..1523e2a36ff 100644 --- a/aptos-move/aptos-vm/Cargo.toml +++ b/aptos-move/aptos-vm/Cargo.toml @@ -39,6 +39,7 @@ ark-groth16 = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } claims = { workspace = true } +core_affinity = { workspace = true } crossbeam-channel = { workspace = true } derive_more = { workspace = true } fail = { workspace = true } diff --git a/aptos-move/aptos-vm/src/block_executor/affinity.rs b/aptos-move/aptos-vm/src/block_executor/affinity.rs new file mode 100644 index 00000000000..2c5ae991c54 --- /dev/null +++ b/aptos-move/aptos-vm/src/block_executor/affinity.rs @@ -0,0 +1,137 @@ +// Copyright (c) Aptos Foundation +// Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE + +//! CPU-affinity helpers for the par_exec thread pool. +//! +//! Detects the physical cores (as opposed to hyperthread siblings) that the +//! current process is allowed to run on, so the block executor can pin one +//! rayon worker per physical core. Pinning is only beneficial when we have at +//! least as many physical cores as worker threads; the caller is expected to +//! skip pinning otherwise. +//! +//! Linux only. On other platforms, or if the topology cannot be determined +//! (e.g. sysfs is not readable), this module returns `None` and the caller +//! should fall back to unpinned scheduling. + +/// Returns one logical CPU id per distinct physical core that the current +/// process may run on, sorted ascending. Returns `None` if the topology +/// cannot be determined. +/// +/// The returned ids are always a subset of `core_affinity::get_core_ids()` +/// so pinning to any of them is safe with respect to cgroups or an inherited +/// affinity mask (e.g. under `taskset`). +#[cfg(target_os = "linux")] +pub(super) fn allowed_physical_cores() -> Option> { + let allowed = core_affinity::get_core_ids()?; + let allowed_ids: Vec = allowed.iter().map(|c| c.id).collect(); + let ids = group_by_physical_core(&allowed_ids, read_thread_siblings)?; + Some( + ids.into_iter() + .map(|id| core_affinity::CoreId { id }) + .collect(), + ) +} + +#[cfg(not(target_os = "linux"))] +pub(super) fn allowed_physical_cores() -> Option> { + None +} + +/// Groups `allowed` CPU ids by the sibling list returned by `read_siblings`, +/// keeping the lowest id in each group as the representative for that +/// physical core. Returns `None` if `read_siblings` fails for any CPU. +/// +/// Pure function — does not touch sysfs — to make the grouping logic unit +/// testable. +fn group_by_physical_core(allowed: &[usize], mut read_siblings: F) -> Option> +where + F: FnMut(usize) -> Option, +{ + use std::collections::BTreeMap; + + let mut representatives: BTreeMap = BTreeMap::new(); + for &cpu in allowed { + let siblings = read_siblings(cpu)?; + representatives + .entry(siblings) + .and_modify(|cur| { + if cpu < *cur { + *cur = cpu; + } + }) + .or_insert(cpu); + } + let mut ids: Vec = representatives.into_values().collect(); + ids.sort_unstable(); + Some(ids) +} + +#[cfg(target_os = "linux")] +fn read_thread_siblings(cpu: usize) -> Option { + use aptos_logger::warn; + + let path = format!( + "/sys/devices/system/cpu/cpu{}/topology/thread_siblings_list", + cpu + ); + match std::fs::read_to_string(&path) { + Ok(s) => Some(s.trim().to_string()), + Err(e) => { + warn!( + cpu = cpu, + path = %path, + error = %e, + "Failed to read thread_siblings_list; cannot detect physical cores", + ); + None + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn groups_ht_siblings_into_one_physical_core() { + // 4 logical CPUs, 2 physical cores with HT enabled: {0,2} and {1,3}. + let result = group_by_physical_core(&[0, 1, 2, 3], |cpu| match cpu { + 0 | 2 => Some("0,2".to_string()), + 1 | 3 => Some("1,3".to_string()), + _ => None, + }); + assert_eq!(result, Some(vec![0, 1])); + } + + #[test] + fn no_ht_returns_all_cpus() { + // 4 logical CPUs, HT disabled: each CPU is its own sibling. + let result = group_by_physical_core(&[0, 1, 2, 3], |cpu| Some(cpu.to_string())); + assert_eq!(result, Some(vec![0, 1, 2, 3])); + } + + #[test] + fn respects_restricted_affinity_set() { + // The process may only run on CPUs {1,2}. Their sibling lists still + // reference CPUs outside the allowed set, but we never consider those, + // so the representatives come only from the allowed CPUs. + let result = group_by_physical_core(&[1, 2], |cpu| match cpu { + 0 | 2 => Some("0,2".to_string()), + 1 | 3 => Some("1,3".to_string()), + _ => None, + }); + assert_eq!(result, Some(vec![1, 2])); + } + + #[test] + fn returns_none_when_sibling_read_fails() { + let result = group_by_physical_core(&[0, 1], |_| None); + assert_eq!(result, None); + } + + #[test] + fn empty_input_yields_empty_output() { + let result = group_by_physical_core(&[], |_| Some(String::new())); + assert_eq!(result, Some(Vec::new())); + } +} diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index 696ac4187c2..6ccbaa408aa 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos Foundation // Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE +mod affinity; pub(crate) mod vm_wrapper; use crate::counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS}; @@ -19,7 +20,7 @@ use aptos_block_executor::{ txn_provider::TxnProvider, types::InputOutputKey, }; -use aptos_logger::info; +use aptos_logger::{info, warn}; use aptos_types::{ block_executor::{ config::BlockExecutorConfig, transaction_slice_metadata::TransactionSliceMetadata, @@ -602,14 +603,7 @@ impl< let pool = match &mut *RAYON_EXEC_POOL.lock().unwrap() { Some((pool, n)) if *n == num_threads => Arc::clone(pool), slot => { - info!(num_threads = num_threads, "Creating par_exec thread pool"); - let pool = Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(num_threads) - .thread_name(|index| format!("par_exec-{}", index)) - .build() - .unwrap(), - ); + let pool = Arc::new(build_par_exec_pool(num_threads)); *slot = Some((Arc::clone(&pool), num_threads)); pool }, @@ -626,5 +620,56 @@ impl< } } +/// Builds the rayon thread pool that backs parallel block execution. +/// +/// When the process has at least as many physical cores available as worker +/// threads, each worker is pinned 1:1 to a distinct physical core so two +/// workers never land on HT siblings of the same core (which hurts throughput +/// on CPU-bound workloads). If the topology cannot be detected (non-Linux, +/// sysfs unreadable) or we have more threads than physical cores, the pool +/// is built without pinning and the OS scheduler decides placement. +fn build_par_exec_pool(num_threads: usize) -> rayon::ThreadPool { + let mut builder = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .thread_name(|index| format!("par_exec-{}", index)); + + let physical_cores = affinity::allowed_physical_cores(); + match &physical_cores { + Some(cores) if num_threads > 0 && cores.len() >= num_threads => { + info!( + num_threads = num_threads, + num_physical_cores = cores.len(), + "Creating par_exec thread pool with per-worker physical-core pinning", + ); + let cores = cores.clone(); + builder = builder.start_handler(move |index| { + let core = cores[index]; + if !core_affinity::set_for_current(core) { + warn!( + thread_index = index, + core_id = core.id, + "Failed to pin par_exec thread to physical core; running unpinned", + ); + } + }); + }, + Some(cores) => { + info!( + num_threads = num_threads, + num_physical_cores = cores.len(), + "Creating par_exec thread pool without pinning (more threads than physical cores)", + ); + }, + None => { + info!( + num_threads = num_threads, + "Creating par_exec thread pool without pinning (CPU topology unavailable)", + ); + }, + } + + builder.build().unwrap() +} + // Same as AptosBlockExecutorWrapper with AptosExecutorTask pub type AptosVMBlockExecutorWrapper = AptosBlockExecutorWrapper;