Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 27 additions & 47 deletions compiler/rustc_interface/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use rustc_session::config::{
use rustc_session::{EarlyDiagCtxt, Session, filesearch};
use rustc_span::edition::Edition;
use rustc_span::source_map::SourceMapInputs;
use rustc_span::{SessionGlobals, Symbol, sym};
use rustc_span::{Symbol, sym};
use rustc_target::spec::Target;
use tracing::info;

Expand Down Expand Up @@ -218,56 +218,36 @@ pub(crate) fn run_in_thread_pool_with_globals<
.release_thread_handler(move || proxy__.release_thread())
.num_threads(threads)
.deadlock_handler(move || {
// On deadlock, creates a new thread and forwards information in thread
// locals to it. The new thread runs the deadlock handler.

let current_gcx2 = current_gcx2.clone();
let registry = rustc_thread_pool::Registry::current();
let session_globals = rustc_span::with_session_globals(|session_globals| {
session_globals as *const SessionGlobals as usize
// On deadlock, one of thread pool's workers runs the deadlock handler.

// FIXME: consider unwinding now that separate thread is no more
let on_panic = defer(|| {
// Split this long string so that it doesn't cause rustfmt to
// give up on the entire builder expression.
// <https://github.com/rust-lang/rustfmt/issues/3863>
const MESSAGE: &str = "\
internal compiler error: query cycle handler panicked, aborting process";
eprintln!("{MESSAGE}");
// We need to abort here as we failed to resolve the deadlock,
// otherwise the compiler could just hang,
process::abort();
});
thread::Builder::new()
.name("rustc query cycle handler".to_string())
.spawn(move || {
let on_panic = defer(|| {
// Split this long string so that it doesn't cause rustfmt to
// give up on the entire builder expression.
// <https://github.com/rust-lang/rustfmt/issues/3863>
const MESSAGE: &str = "\
internal compiler error: query cycle handler thread panicked, aborting process";
eprintln!("{MESSAGE}");
// We need to abort here as we failed to resolve the deadlock,
// otherwise the compiler could just hang,
process::abort();
});

// Get a `GlobalCtxt` reference from `CurrentGcx` as we cannot rely on having a
// `TyCtxt` TLS reference here.
current_gcx2.access(|gcx| {
tls::enter_context(&tls::ImplicitCtxt::new(gcx), || {
tls::with(|tcx| {
// Accessing session globals is sound as they outlive `GlobalCtxt`.
// They are needed to hash query keys containing spans or symbols.
let job_map = rustc_span::set_session_globals_then(
unsafe { &*(session_globals as *const SessionGlobals) },
|| {
// Ensure there were no errors collecting all active jobs.
// We need the complete map to ensure we find a cycle to
// break.
collect_active_query_jobs(
tcx,
CollectActiveJobsKind::FullNoContention,
)
},
);
break_query_cycle(job_map, &registry);
})
})
let parent = tls::with_context_opt(|icx| icx.and_then(|icx| icx.query));
current_gcx2.access(|gcx| {
let icx = tls::ImplicitCtxt::new(gcx);
tls::enter_context(&icx, || {
// Ensure there were no errors collecting all active jobs.
// We need the complete map to ensure we find a cycle to break.
let job_map =
collect_active_query_jobs(icx.tcx, CollectActiveJobsKind::FullNoContention);
rustc_thread_pool::Registry::with_current(|registry| {
break_query_cycle(job_map, registry, parent)
});

on_panic.disable();
})
.unwrap();
});

on_panic.disable();
})
.stack_size(thread_stack_size);

Expand Down
98 changes: 57 additions & 41 deletions compiler/rustc_middle/src/query/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::hash::Hash;
use std::num::NonZero;
use std::sync::Arc;

use parking_lot::{Condvar, Mutex};
use parking_lot::{Condvar, Mutex, MutexGuard};
use rustc_span::Span;

use crate::query::Cycle;
Expand Down Expand Up @@ -52,87 +52,103 @@ impl<'tcx> QueryJob<'tcx> {
}

#[derive(Debug)]
pub struct QueryWaiter<'tcx> {
pub struct QueryWaiter {
pub parent: Option<QueryJobId>,
pub condvar: Condvar,
pub condvar: Arc<Condvar>,
pub span: Span,
pub cycle: Mutex<Option<Cycle<'tcx>>>,
}

#[derive(Clone, Debug)]
pub struct QueryLatch<'tcx> {
/// The `Option` is `Some(..)` when the job is active, and `None` once completed.
pub waiters: Arc<Mutex<Option<Vec<Arc<QueryWaiter<'tcx>>>>>>,
pub inner: Arc<Mutex<Option<QueryLatchState<'tcx>>>>,
}

#[derive(Debug)]
pub struct QueryLatchState<'tcx> {
pub waiters: Vec<QueryWaiter>,
pub cycle: Option<(usize, Cycle<'tcx>)>,
}

impl<'tcx> QueryLatch<'tcx> {
fn new() -> Self {
QueryLatch { waiters: Arc::new(Mutex::new(Some(Vec::new()))) }
QueryLatch {
inner: Arc::new(Mutex::new(Some(QueryLatchState { waiters: Vec::new(), cycle: None }))),
}
}

/// Awaits for the query job to complete.
pub fn wait_on(
&self,
tcx: TyCtxt<'tcx>,
query: Option<QueryJobId>,
parent: Option<QueryJobId>,
span: Span,
) -> Result<(), Cycle<'tcx>> {
let mut waiters_guard = self.waiters.lock();
let Some(waiters) = &mut *waiters_guard else {
let mut state_lock = self.inner.lock();
let Some(state) = &mut *state_lock else {
return Ok(()); // already complete
};

let waiter = Arc::new(QueryWaiter {
parent: query,
span,
cycle: Mutex::new(None),
condvar: Condvar::new(),
});
let condvar = Arc::new(Condvar::new());
let waiter = QueryWaiter { parent, span, condvar: Arc::clone(&condvar) };

state.waiters.reserve(state.waiters.len().saturating_sub(tcx.sess.threads()));
// We push the waiter on to the `waiters` list. It can be accessed inside
// the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
// Both of these will remove it from the `waiters` list before resuming
// this thread.
waiters.push(Arc::clone(&waiter));
state.waiters.push(waiter);

// Awaits the caller on this latch by blocking the current thread.
// If this detects a deadlock and the deadlock handler wants to resume this thread
// we have to be in the `wait` call. This is ensured by the deadlock handler
// getting the self.info lock.
rustc_thread_pool::mark_blocked();
tcx.jobserver_proxy.release_thread();
waiter.condvar.wait(&mut waiters_guard);
let cycle = rustc_thread_pool::Registry::with_current(|registry| {
let handler = rustc_thread_pool::mark_blocked(registry);
tcx.jobserver_proxy.release_thread();
if let Some(handler) = handler {
MutexGuard::unlocked(&mut state_lock, handler);
}

let cv_addr = (&*condvar as *const Condvar).addr();
if handler.is_some()
&& let Some((_, cycle)) = state_lock
.as_mut()?
.cycle
.take_if(|(resumed_cv_addr, _)| *resumed_cv_addr == cv_addr)
{
return Some(cycle);
}

condvar.wait(&mut state_lock);
let (resumed_cv_addr, cycle) = state_lock
.as_mut()?
.cycle
.take()
.expect("resumed waiter for unfinished query without a cycle");
assert_eq!(resumed_cv_addr, cv_addr);
Some(cycle)
});

// Release the lock before we potentially block in `acquire_thread`
drop(waiters_guard);
drop(state_lock);
tcx.jobserver_proxy.acquire_thread();

// FIXME: Get rid of this lock. We have ownership of the QueryWaiter
// although another thread may still have a Arc reference so we cannot
// use Arc::get_mut
let mut cycle = waiter.cycle.lock();
match cycle.take() {
match cycle {
None => Ok(()),
Some(cycle) => Err(cycle),
}
}

/// Sets the latch and resumes all waiters on it
fn set(&self) {
let mut waiters_guard = self.waiters.lock();
let waiters = waiters_guard.take().unwrap(); // mark the latch as complete
let registry = rustc_thread_pool::Registry::current();
for waiter in waiters {
rustc_thread_pool::mark_unblocked(&registry);
waiter.condvar.notify_one();
}
}

/// Removes a single waiter from the list of waiters.
/// This is used to break query cycles.
pub fn extract_waiter(&self, waiter: usize) -> Arc<QueryWaiter<'tcx>> {
let mut waiters_guard = self.waiters.lock();
let waiters = waiters_guard.as_mut().expect("non-empty waiters vec");
// Remove the waiter from the list of waiters
waiters.remove(waiter)
let mut state_lock = self.inner.lock();
let waiters = state_lock.take().unwrap().waiters; // mark the latch as complete
rustc_thread_pool::Registry::with_current(|registry| {
for waiter in waiters {
rustc_thread_pool::mark_unblocked(&registry);
waiter.condvar.notify_one();
}
});
}
}
28 changes: 20 additions & 8 deletions compiler/rustc_query_impl/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::io::Write;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::{iter, mem};

use rustc_data_structures::fx::{FxHashMap, FxHashSet};
Expand Down Expand Up @@ -140,7 +139,9 @@ fn abstracted_waiters_of(job_map: &QueryJobMap<'_>, query: QueryJobId) -> Vec<Ab

// Add the explicit waiters which use condvars and are resumable
if let Some(latch) = job_map.latch_of(query) {
for (i, waiter) in latch.waiters.lock().as_ref().unwrap().iter().enumerate() {
for (i, waiter) in
latch.inner.try_lock().unwrap().as_ref().unwrap().waiters.iter().enumerate()
{
result.push(AbstractedWaiter {
span: waiter.span,
parent: waiter.parent,
Expand Down Expand Up @@ -308,7 +309,7 @@ fn process_cycle<'tcx>(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId
fn find_and_process_cycle<'tcx>(
job_map: &QueryJobMap<'tcx>,
query: QueryJobId,
) -> Option<Arc<QueryWaiter<'tcx>>> {
) -> Option<QueryWaiter> {
let mut visited = FxHashSet::default();
let mut stack = Vec::new();
if let ControlFlow::Break(resumable) =
Expand All @@ -321,11 +322,16 @@ fn find_and_process_cycle<'tcx>(
// edge which is resumable / waited using a query latch
let (waitee_query, waiter_idx) = resumable.unwrap();

// Extract the waiter we want to resume
let waiter = job_map.latch_of(waitee_query).unwrap().extract_waiter(waiter_idx);
let latch = job_map.latch_of(waitee_query).unwrap();
let mut latch_state_lock = latch.inner.try_lock().unwrap();
let latch_state = latch_state_lock.as_mut().expect("non-empty waiters vec");

// Remove the waiter from the list of waiters we want to resume
let waiter = latch_state.waiters.remove(waiter_idx);

// Set the cycle error so it will be picked up when resumed
*waiter.cycle.lock() = Some(error);
let old = latch_state.cycle.replace((<*const _>::addr(&*waiter.condvar), error));
assert!(old.is_none(), "expected query cycle to break on a single waiter");

// Put the waiter on the list of things to resume
Some(waiter)
Expand All @@ -341,7 +347,11 @@ fn find_and_process_cycle<'tcx>(
/// There may be multiple cycles involved in a deadlock, but this only breaks one at a time so
/// there will be multiple rounds through the deadlock handler if multiple cycles are present.
#[allow(rustc::potential_query_instability)]
pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thread_pool::Registry) {
pub fn break_query_cycle<'tcx>(
job_map: QueryJobMap<'tcx>,
registry: &rustc_thread_pool::Registry,
parent: Option<QueryJobId>,
) {
// Look for a cycle starting at each query job
let waiter = job_map
.map
Expand All @@ -352,7 +362,9 @@ pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thre
// Mark the thread we're about to wake up as unblocked.
rustc_thread_pool::mark_unblocked(registry);

assert!(waiter.condvar.notify_one(), "unable to wake the waiter");
if !waiter.condvar.notify_one() {
assert_eq!(parent, waiter.parent, "unable to wake the waiter");
}
}

pub fn print_query_stack<'tcx>(
Expand Down
25 changes: 14 additions & 11 deletions compiler/rustc_thread_pool/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,18 +319,25 @@ impl Registry {
Ok(registry)
}

pub fn current() -> Arc<Registry> {
pub fn with_current<F, R>(f: F) -> R
where
F: FnOnce(&Arc<Registry>) -> R,
{
unsafe {
let worker_thread = WorkerThread::current();
let registry = if worker_thread.is_null() {
global_registry()
} else {
&(*worker_thread).registry
};
Arc::clone(registry)
f(registry)
}
}

pub fn current() -> Arc<Registry> {
Registry::with_current(Arc::clone)
}

/// Returns the number of threads in the current registry. This
/// is better than `Registry::current().num_threads()` because it
/// avoids incrementing the `Arc`.
Expand Down Expand Up @@ -614,16 +621,12 @@ impl Registry {
}
}

/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
/// if no other worker thread is active
/// Mark a Rayon worker thread as blocked.
///
/// Returns the deadlock handler if no other worker thread is active.
#[inline]
pub fn mark_blocked() {
let worker_thread = WorkerThread::current();
assert!(!worker_thread.is_null());
unsafe {
let registry = &(*worker_thread).registry;
registry.sleep.mark_blocked(&registry.deadlock_handler)
}
pub fn mark_blocked(registry: &Registry) -> Option<&DeadlockHandler> {
registry.sleep.mark_blocked().then(|| registry.deadlock_handler.as_deref().unwrap())
}

/// Mark a previously blocked Rayon worker thread as unblocked
Expand Down
Loading
Loading