Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-move/aptos-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ ark-groth16 = { workspace = true }
bcs = { workspace = true }
bytes = { workspace = true }
claims = { workspace = true }
core_affinity = { workspace = true }
crossbeam-channel = { workspace = true }
derive_more = { workspace = true }
fail = { workspace = true }
Expand Down
137 changes: 137 additions & 0 deletions aptos-move/aptos-vm/src/block_executor/affinity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (c) Aptos Foundation
// Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE

//! CPU-affinity helpers for the par_exec thread pool.
//!
//! Detects the physical cores (as opposed to hyperthread siblings) that the
//! current process is allowed to run on, so the block executor can pin one
//! rayon worker per physical core. Pinning is only beneficial when we have at
//! least as many physical cores as worker threads; the caller is expected to
//! skip pinning otherwise.
//!
//! Linux only. On other platforms, or if the topology cannot be determined
//! (e.g. sysfs is not readable), this module returns `None` and the caller
//! should fall back to unpinned scheduling.

/// Returns one logical CPU id per distinct physical core that the current
/// process may run on, sorted ascending. Returns `None` if the topology
/// cannot be determined.
///
/// The returned ids are always a subset of `core_affinity::get_core_ids()`
/// so pinning to any of them is safe with respect to cgroups or an inherited
/// affinity mask (e.g. under `taskset`).
#[cfg(target_os = "linux")]
pub(super) fn allowed_physical_cores() -> Option<Vec<core_affinity::CoreId>> {
let allowed = core_affinity::get_core_ids()?;
let allowed_ids: Vec<usize> = allowed.iter().map(|c| c.id).collect();
let ids = group_by_physical_core(&allowed_ids, read_thread_siblings)?;
Some(
ids.into_iter()
.map(|id| core_affinity::CoreId { id })
.collect(),
)
}

#[cfg(not(target_os = "linux"))]
pub(super) fn allowed_physical_cores() -> Option<Vec<core_affinity::CoreId>> {
None
}

/// Groups `allowed` CPU ids by the sibling list returned by `read_siblings`,
/// keeping the lowest id in each group as the representative for that
/// physical core. Returns `None` if `read_siblings` fails for any CPU.
///
/// Pure function — does not touch sysfs — to make the grouping logic unit
/// testable.
fn group_by_physical_core<F>(allowed: &[usize], mut read_siblings: F) -> Option<Vec<usize>>
where
F: FnMut(usize) -> Option<String>,
{
use std::collections::BTreeMap;

let mut representatives: BTreeMap<String, usize> = BTreeMap::new();
for &cpu in allowed {
let siblings = read_siblings(cpu)?;
representatives
.entry(siblings)
.and_modify(|cur| {
if cpu < *cur {
*cur = cpu;
}
})
.or_insert(cpu);
}
let mut ids: Vec<usize> = representatives.into_values().collect();
ids.sort_unstable();
Some(ids)
}

#[cfg(target_os = "linux")]
fn read_thread_siblings(cpu: usize) -> Option<String> {
use aptos_logger::warn;

let path = format!(
"/sys/devices/system/cpu/cpu{}/topology/thread_siblings_list",
cpu
);
match std::fs::read_to_string(&path) {
Ok(s) => Some(s.trim().to_string()),
Err(e) => {
warn!(
cpu = cpu,
path = %path,
error = %e,
"Failed to read thread_siblings_list; cannot detect physical cores",
);
None
},
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn groups_ht_siblings_into_one_physical_core() {
// 4 logical CPUs, 2 physical cores with HT enabled: {0,2} and {1,3}.
let result = group_by_physical_core(&[0, 1, 2, 3], |cpu| match cpu {
0 | 2 => Some("0,2".to_string()),
1 | 3 => Some("1,3".to_string()),
_ => None,
});
assert_eq!(result, Some(vec![0, 1]));
}

#[test]
fn no_ht_returns_all_cpus() {
// 4 logical CPUs, HT disabled: each CPU is its own sibling.
let result = group_by_physical_core(&[0, 1, 2, 3], |cpu| Some(cpu.to_string()));
assert_eq!(result, Some(vec![0, 1, 2, 3]));
}

#[test]
fn respects_restricted_affinity_set() {
// The process may only run on CPUs {1,2}. Their sibling lists still
// reference CPUs outside the allowed set, but we never consider those,
// so the representatives come only from the allowed CPUs.
let result = group_by_physical_core(&[1, 2], |cpu| match cpu {
0 | 2 => Some("0,2".to_string()),
1 | 3 => Some("1,3".to_string()),
_ => None,
});
assert_eq!(result, Some(vec![1, 2]));
}

#[test]
fn returns_none_when_sibling_read_fails() {
let result = group_by_physical_core(&[0, 1], |_| None);
assert_eq!(result, None);
}

#[test]
fn empty_input_yields_empty_output() {
let result = group_by_physical_core(&[], |_| Some(String::new()));
assert_eq!(result, Some(Vec::new()));
}
}
63 changes: 54 additions & 9 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Aptos Foundation
// Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE

mod affinity;
pub(crate) mod vm_wrapper;

use crate::counters::{BLOCK_EXECUTOR_CONCURRENCY, BLOCK_EXECUTOR_EXECUTE_BLOCK_SECONDS};
Expand All @@ -19,7 +20,7 @@ use aptos_block_executor::{
txn_provider::TxnProvider,
types::InputOutputKey,
};
use aptos_logger::info;
use aptos_logger::{info, warn};
use aptos_types::{
block_executor::{
config::BlockExecutorConfig, transaction_slice_metadata::TransactionSliceMetadata,
Expand Down Expand Up @@ -602,14 +603,7 @@ impl<
let pool = match &mut *RAYON_EXEC_POOL.lock().unwrap() {
Some((pool, n)) if *n == num_threads => Arc::clone(pool),
slot => {
info!(num_threads = num_threads, "Creating par_exec thread pool");
let pool = Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|index| format!("par_exec-{}", index))
.build()
.unwrap(),
);
let pool = Arc::new(build_par_exec_pool(num_threads));
*slot = Some((Arc::clone(&pool), num_threads));
pool
},
Expand All @@ -626,5 +620,56 @@ impl<
}
}

/// Builds the rayon thread pool that backs parallel block execution.
///
/// When the process has at least as many physical cores available as worker
/// threads, each worker is pinned 1:1 to a distinct physical core so two
/// workers never land on HT siblings of the same core (which hurts throughput
/// on CPU-bound workloads). If the topology cannot be detected (non-Linux,
/// sysfs unreadable) or we have more threads than physical cores, the pool
/// is built without pinning and the OS scheduler decides placement.
fn build_par_exec_pool(num_threads: usize) -> rayon::ThreadPool {
let mut builder = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(|index| format!("par_exec-{}", index));

let physical_cores = affinity::allowed_physical_cores();
match &physical_cores {
Some(cores) if num_threads > 0 && cores.len() >= num_threads => {
info!(
num_threads = num_threads,
num_physical_cores = cores.len(),
"Creating par_exec thread pool with per-worker physical-core pinning",
);
let cores = cores.clone();
builder = builder.start_handler(move |index| {
let core = cores[index];
if !core_affinity::set_for_current(core) {
warn!(
thread_index = index,
core_id = core.id,
"Failed to pin par_exec thread to physical core; running unpinned",
);
}
});
},
Some(cores) => {
info!(
num_threads = num_threads,
num_physical_cores = cores.len(),
"Creating par_exec thread pool without pinning (more threads than physical cores)",
);
},
None => {
info!(
num_threads = num_threads,
"Creating par_exec thread pool without pinning (CPU topology unavailable)",
);
},
}

builder.build().unwrap()
}

// Same as AptosBlockExecutorWrapper with AptosExecutorTask
pub type AptosVMBlockExecutorWrapper = AptosBlockExecutorWrapper<AptosExecutorTask>;
Loading