diff --git a/compiler/rustc_interface/src/util.rs b/compiler/rustc_interface/src/util.rs index 24b23cc4199e9..c6fbf009edc86 100644 --- a/compiler/rustc_interface/src/util.rs +++ b/compiler/rustc_interface/src/util.rs @@ -25,7 +25,7 @@ use rustc_session::config::{ use rustc_session::{EarlyDiagCtxt, Session, filesearch}; use rustc_span::edition::Edition; use rustc_span::source_map::SourceMapInputs; -use rustc_span::{SessionGlobals, Symbol, sym}; +use rustc_span::{Symbol, sym}; use rustc_target::spec::Target; use tracing::info; @@ -218,56 +218,36 @@ pub(crate) fn run_in_thread_pool_with_globals< .release_thread_handler(move || proxy__.release_thread()) .num_threads(threads) .deadlock_handler(move || { - // On deadlock, creates a new thread and forwards information in thread - // locals to it. The new thread runs the deadlock handler. - - let current_gcx2 = current_gcx2.clone(); - let registry = rustc_thread_pool::Registry::current(); - let session_globals = rustc_span::with_session_globals(|session_globals| { - session_globals as *const SessionGlobals as usize + // On deadlock, one of thread pool's workers runs the deadlock handler. + + // FIXME: consider unwinding now that separate thread is no more + let on_panic = defer(|| { + // Split this long string so that it doesn't cause rustfmt to + // give up on the entire builder expression. + // + const MESSAGE: &str = "\ +internal compiler error: query cycle handler panicked, aborting process"; + eprintln!("{MESSAGE}"); + // We need to abort here as we failed to resolve the deadlock, + // otherwise the compiler could just hang, + process::abort(); }); - thread::Builder::new() - .name("rustc query cycle handler".to_string()) - .spawn(move || { - let on_panic = defer(|| { - // Split this long string so that it doesn't cause rustfmt to - // give up on the entire builder expression. - // - const MESSAGE: &str = "\ -internal compiler error: query cycle handler thread panicked, aborting process"; - eprintln!("{MESSAGE}"); - // We need to abort here as we failed to resolve the deadlock, - // otherwise the compiler could just hang, - process::abort(); - }); - // Get a `GlobalCtxt` reference from `CurrentGcx` as we cannot rely on having a - // `TyCtxt` TLS reference here. - current_gcx2.access(|gcx| { - tls::enter_context(&tls::ImplicitCtxt::new(gcx), || { - tls::with(|tcx| { - // Accessing session globals is sound as they outlive `GlobalCtxt`. - // They are needed to hash query keys containing spans or symbols. - let job_map = rustc_span::set_session_globals_then( - unsafe { &*(session_globals as *const SessionGlobals) }, - || { - // Ensure there were no errors collecting all active jobs. - // We need the complete map to ensure we find a cycle to - // break. - collect_active_query_jobs( - tcx, - CollectActiveJobsKind::FullNoContention, - ) - }, - ); - break_query_cycle(job_map, ®istry); - }) - }) + let parent = tls::with_context_opt(|icx| icx.and_then(|icx| icx.query)); + current_gcx2.access(|gcx| { + let icx = tls::ImplicitCtxt::new(gcx); + tls::enter_context(&icx, || { + // Ensure there were no errors collecting all active jobs. + // We need the complete map to ensure we find a cycle to break. + let job_map = + collect_active_query_jobs(icx.tcx, CollectActiveJobsKind::FullNoContention); + rustc_thread_pool::Registry::with_current(|registry| { + break_query_cycle(job_map, registry, parent) }); - - on_panic.disable(); }) - .unwrap(); + }); + + on_panic.disable(); }) .stack_size(thread_stack_size); diff --git a/compiler/rustc_middle/src/query/job.rs b/compiler/rustc_middle/src/query/job.rs index 8c78bf24287e0..9be3817a41bf9 100644 --- a/compiler/rustc_middle/src/query/job.rs +++ b/compiler/rustc_middle/src/query/job.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use std::num::NonZero; use std::sync::Arc; -use parking_lot::{Condvar, Mutex}; +use parking_lot::{Condvar, Mutex, MutexGuard}; use rustc_span::Span; use crate::query::Cycle; @@ -52,65 +52,89 @@ impl<'tcx> QueryJob<'tcx> { } #[derive(Debug)] -pub struct QueryWaiter<'tcx> { +pub struct QueryWaiter { pub parent: Option, - pub condvar: Condvar, + pub condvar: Arc, pub span: Span, - pub cycle: Mutex>>, } #[derive(Clone, Debug)] pub struct QueryLatch<'tcx> { /// The `Option` is `Some(..)` when the job is active, and `None` once completed. - pub waiters: Arc>>>>>, + pub inner: Arc>>>, +} + +#[derive(Debug)] +pub struct QueryLatchState<'tcx> { + pub waiters: Vec, + pub cycle: Option<(usize, Cycle<'tcx>)>, } impl<'tcx> QueryLatch<'tcx> { fn new() -> Self { - QueryLatch { waiters: Arc::new(Mutex::new(Some(Vec::new()))) } + QueryLatch { + inner: Arc::new(Mutex::new(Some(QueryLatchState { waiters: Vec::new(), cycle: None }))), + } } /// Awaits for the query job to complete. pub fn wait_on( &self, tcx: TyCtxt<'tcx>, - query: Option, + parent: Option, span: Span, ) -> Result<(), Cycle<'tcx>> { - let mut waiters_guard = self.waiters.lock(); - let Some(waiters) = &mut *waiters_guard else { + let mut state_lock = self.inner.lock(); + let Some(state) = &mut *state_lock else { return Ok(()); // already complete }; - let waiter = Arc::new(QueryWaiter { - parent: query, - span, - cycle: Mutex::new(None), - condvar: Condvar::new(), - }); + let condvar = Arc::new(Condvar::new()); + let waiter = QueryWaiter { parent, span, condvar: Arc::clone(&condvar) }; + state.waiters.reserve(state.waiters.len().saturating_sub(tcx.sess.threads())); // We push the waiter on to the `waiters` list. It can be accessed inside // the `wait` call below, by 1) the `set` method or 2) by deadlock detection. // Both of these will remove it from the `waiters` list before resuming // this thread. - waiters.push(Arc::clone(&waiter)); + state.waiters.push(waiter); // Awaits the caller on this latch by blocking the current thread. // If this detects a deadlock and the deadlock handler wants to resume this thread // we have to be in the `wait` call. This is ensured by the deadlock handler // getting the self.info lock. - rustc_thread_pool::mark_blocked(); - tcx.jobserver_proxy.release_thread(); - waiter.condvar.wait(&mut waiters_guard); + let cycle = rustc_thread_pool::Registry::with_current(|registry| { + let handler = rustc_thread_pool::mark_blocked(registry); + tcx.jobserver_proxy.release_thread(); + if let Some(handler) = handler { + MutexGuard::unlocked(&mut state_lock, handler); + } + + let cv_addr = (&*condvar as *const Condvar).addr(); + if handler.is_some() + && let Some((_, cycle)) = state_lock + .as_mut()? + .cycle + .take_if(|(resumed_cv_addr, _)| *resumed_cv_addr == cv_addr) + { + return Some(cycle); + } + + condvar.wait(&mut state_lock); + let (resumed_cv_addr, cycle) = state_lock + .as_mut()? + .cycle + .take() + .expect("resumed waiter for unfinished query without a cycle"); + assert_eq!(resumed_cv_addr, cv_addr); + Some(cycle) + }); + // Release the lock before we potentially block in `acquire_thread` - drop(waiters_guard); + drop(state_lock); tcx.jobserver_proxy.acquire_thread(); - // FIXME: Get rid of this lock. We have ownership of the QueryWaiter - // although another thread may still have a Arc reference so we cannot - // use Arc::get_mut - let mut cycle = waiter.cycle.lock(); - match cycle.take() { + match cycle { None => Ok(()), Some(cycle) => Err(cycle), } @@ -118,21 +142,13 @@ impl<'tcx> QueryLatch<'tcx> { /// Sets the latch and resumes all waiters on it fn set(&self) { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.take().unwrap(); // mark the latch as complete - let registry = rustc_thread_pool::Registry::current(); - for waiter in waiters { - rustc_thread_pool::mark_unblocked(®istry); - waiter.condvar.notify_one(); - } - } - - /// Removes a single waiter from the list of waiters. - /// This is used to break query cycles. - pub fn extract_waiter(&self, waiter: usize) -> Arc> { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.as_mut().expect("non-empty waiters vec"); - // Remove the waiter from the list of waiters - waiters.remove(waiter) + let mut state_lock = self.inner.lock(); + let waiters = state_lock.take().unwrap().waiters; // mark the latch as complete + rustc_thread_pool::Registry::with_current(|registry| { + for waiter in waiters { + rustc_thread_pool::mark_unblocked(®istry); + waiter.condvar.notify_one(); + } + }); } } diff --git a/compiler/rustc_query_impl/src/job.rs b/compiler/rustc_query_impl/src/job.rs index bf0493b29fd1e..fd3bdf4b689b8 100644 --- a/compiler/rustc_query_impl/src/job.rs +++ b/compiler/rustc_query_impl/src/job.rs @@ -1,6 +1,5 @@ use std::io::Write; use std::ops::ControlFlow; -use std::sync::Arc; use std::{iter, mem}; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; @@ -140,7 +139,9 @@ fn abstracted_waiters_of(job_map: &QueryJobMap<'_>, query: QueryJobId) -> Vec(job_map: &QueryJobMap<'tcx>, stack: Vec<(Span, QueryJobId fn find_and_process_cycle<'tcx>( job_map: &QueryJobMap<'tcx>, query: QueryJobId, -) -> Option>> { +) -> Option { let mut visited = FxHashSet::default(); let mut stack = Vec::new(); if let ControlFlow::Break(resumable) = @@ -321,11 +322,16 @@ fn find_and_process_cycle<'tcx>( // edge which is resumable / waited using a query latch let (waitee_query, waiter_idx) = resumable.unwrap(); - // Extract the waiter we want to resume - let waiter = job_map.latch_of(waitee_query).unwrap().extract_waiter(waiter_idx); + let latch = job_map.latch_of(waitee_query).unwrap(); + let mut latch_state_lock = latch.inner.try_lock().unwrap(); + let latch_state = latch_state_lock.as_mut().expect("non-empty waiters vec"); + + // Remove the waiter from the list of waiters we want to resume + let waiter = latch_state.waiters.remove(waiter_idx); // Set the cycle error so it will be picked up when resumed - *waiter.cycle.lock() = Some(error); + let old = latch_state.cycle.replace((<*const _>::addr(&*waiter.condvar), error)); + assert!(old.is_none(), "expected query cycle to break on a single waiter"); // Put the waiter on the list of things to resume Some(waiter) @@ -341,7 +347,11 @@ fn find_and_process_cycle<'tcx>( /// There may be multiple cycles involved in a deadlock, but this only breaks one at a time so /// there will be multiple rounds through the deadlock handler if multiple cycles are present. #[allow(rustc::potential_query_instability)] -pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thread_pool::Registry) { +pub fn break_query_cycle<'tcx>( + job_map: QueryJobMap<'tcx>, + registry: &rustc_thread_pool::Registry, + parent: Option, +) { // Look for a cycle starting at each query job let waiter = job_map .map @@ -352,7 +362,9 @@ pub fn break_query_cycle<'tcx>(job_map: QueryJobMap<'tcx>, registry: &rustc_thre // Mark the thread we're about to wake up as unblocked. rustc_thread_pool::mark_unblocked(registry); - assert!(waiter.condvar.notify_one(), "unable to wake the waiter"); + if !waiter.condvar.notify_one() { + assert_eq!(parent, waiter.parent, "unable to wake the waiter"); + } } pub fn print_query_stack<'tcx>( diff --git a/compiler/rustc_thread_pool/src/registry.rs b/compiler/rustc_thread_pool/src/registry.rs index 9510c1842f86a..410fccedfb387 100644 --- a/compiler/rustc_thread_pool/src/registry.rs +++ b/compiler/rustc_thread_pool/src/registry.rs @@ -319,7 +319,10 @@ impl Registry { Ok(registry) } - pub fn current() -> Arc { + pub fn with_current(f: F) -> R + where + F: FnOnce(&Arc) -> R, + { unsafe { let worker_thread = WorkerThread::current(); let registry = if worker_thread.is_null() { @@ -327,10 +330,14 @@ impl Registry { } else { &(*worker_thread).registry }; - Arc::clone(registry) + f(registry) } } + pub fn current() -> Arc { + Registry::with_current(Arc::clone) + } + /// Returns the number of threads in the current registry. This /// is better than `Registry::current().num_threads()` because it /// avoids incrementing the `Arc`. @@ -614,16 +621,12 @@ impl Registry { } } -/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler -/// if no other worker thread is active +/// Mark a Rayon worker thread as blocked. +/// +/// Returns the deadlock handler if no other worker thread is active. #[inline] -pub fn mark_blocked() { - let worker_thread = WorkerThread::current(); - assert!(!worker_thread.is_null()); - unsafe { - let registry = &(*worker_thread).registry; - registry.sleep.mark_blocked(®istry.deadlock_handler) - } +pub fn mark_blocked(registry: &Registry) -> Option<&DeadlockHandler> { + registry.sleep.mark_blocked().then(|| registry.deadlock_handler.as_deref().unwrap()) } /// Mark a previously blocked Rayon worker thread as unblocked diff --git a/compiler/rustc_thread_pool/src/sleep/mod.rs b/compiler/rustc_thread_pool/src/sleep/mod.rs index aa6666092147a..cfcbe951e9092 100644 --- a/compiler/rustc_thread_pool/src/sleep/mod.rs +++ b/compiler/rustc_thread_pool/src/sleep/mod.rs @@ -7,7 +7,6 @@ use std::thread; use crossbeam_utils::CachePadded; -use crate::DeadlockHandler; use crate::latch::CoreLatch; use crate::registry::WorkerThread; @@ -31,10 +30,8 @@ struct SleepData { impl SleepData { /// Checks if the conditions for a deadlock holds and if so calls the deadlock handler #[inline] - pub(super) fn deadlock_check(&self, deadlock_handler: &Option>) { - if self.active_threads == 0 && self.blocked_threads > 0 { - (deadlock_handler.as_ref().unwrap())(); - } + pub(super) fn deadlock_check(&self) -> bool { + self.active_threads == 0 && self.blocked_threads > 0 } } @@ -102,7 +99,7 @@ impl Sleep { /// Mark a Rayon worker thread as blocked. This triggers the deadlock handler /// if no other worker thread is active #[inline] - pub(super) fn mark_blocked(&self, deadlock_handler: &Option>) { + pub(super) fn mark_blocked(&self) -> bool { let mut data = self.data.lock().unwrap(); debug_assert!(data.active_threads > 0); debug_assert!(data.blocked_threads < data.worker_count); @@ -110,7 +107,7 @@ impl Sleep { data.active_threads -= 1; data.blocked_threads += 1; - data.deadlock_check(deadlock_handler); + data.deadlock_check() } /// Mark a previously blocked Rayon worker thread as unblocked @@ -228,23 +225,32 @@ impl Sleep { // the one that wakes us.) self.counters.sub_sleeping_thread(); } else { - { + let is_deadlock = { // Decrement the number of active threads and check for a deadlock let mut data = self.data.lock().unwrap(); data.active_threads -= 1; - data.deadlock_check(&thread.registry.deadlock_handler); + data.deadlock_check() + }; + + if is_deadlock { + thread.registry.release_thread(); + *is_blocked = true; + drop(is_blocked); + (thread.registry.deadlock_handler.as_deref().unwrap())(); + is_blocked = sleep_state.is_blocked.lock().unwrap(); + } else { + // If we don't see an injected job (the normal case), then flag + // ourselves as asleep and wait till we are notified. + // + // (Note that `is_blocked` is held under a mutex and the mutex was + // acquired *before* we incremented the "sleepy counter". This means + // that whomever is coming to wake us will have to wait until we + // release the mutex in the call to `wait`, so they will see this + // boolean as true.) + thread.registry.release_thread(); + *is_blocked = true; } - // If we don't see an injected job (the normal case), then flag - // ourselves as asleep and wait till we are notified. - // - // (Note that `is_blocked` is held under a mutex and the mutex was - // acquired *before* we incremented the "sleepy counter". This means - // that whomever is coming to wake us will have to wait until we - // release the mutex in the call to `wait`, so they will see this - // boolean as true.) - thread.registry.release_thread(); - *is_blocked = true; while *is_blocked { is_blocked = sleep_state.condvar.wait(is_blocked).unwrap(); }