From cc288b47e642e990eb2b69619d4c2660d4b57cb3 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 18 Apr 2026 17:04:27 -0700 Subject: [PATCH 1/2] Add a "kicker" that bevy_task will invoke whenever a future wakes. --- crates/bevy_tasks/src/futures.rs | 67 ++++++++++++++- .../src/single_threaded_task_pool.rs | 82 +++++++++++++++---- crates/bevy_tasks/src/task_pool.rs | 73 ++++++++++++++--- 3 files changed, 195 insertions(+), 27 deletions(-) diff --git a/crates/bevy_tasks/src/futures.rs b/crates/bevy_tasks/src/futures.rs index 3f0c72c890ef5..6a8857e229fd3 100644 --- a/crates/bevy_tasks/src/futures.rs +++ b/crates/bevy_tasks/src/futures.rs @@ -1,7 +1,9 @@ //! Utilities for working with [`Future`]s. +use alloc::task::Wake; +use bevy_platform::sync::Arc; use core::{ future::Future, - pin::pin, + pin::{pin, Pin}, task::{Context, Poll, Waker}, }; @@ -22,3 +24,66 @@ pub fn now_or_never(future: F) -> Option { pub fn check_ready(future: &mut F) -> Option { now_or_never(future) } + +/// Wraps a future such that the Waker given to the future also runs the "kicker". +/// +/// This allows us to trigger an action (the "kicker") in addition to just waking the future. The +/// kicker is also triggered when the future resolves (i.e., returns [`Poll::Ready`]). +pub(crate) struct KickOnWake { + /// The "kicker" that will be invoked when the future wakes up or resolves. + pub(crate) kicker: Option>, + /// The inner future. + pub(crate) f: F, +} + +impl Future for KickOnWake { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Some(kicker) = self.kicker.clone() else { + #[expect( + unsafe_code, + reason = "We need to manually pin so we can support wrapping any future." + )] + // SAFETY: We don't move out of `this` inside the closure, and we don't move out of `f` + // in any case - we assume that pinning `self` also means pinning `self.f`. + return unsafe { self.map_unchecked_mut(|this| &mut this.f) }.poll(cx); + }; + let wrapped_waker = Waker::from(Arc::new(KickThenWake { + kicker, + waker: cx.waker().clone(), + })); + let mut cx = Context::from_waker(&wrapped_waker); + #[expect( + unsafe_code, + reason = "We need to manually pin so we can support wrapping any future." + )] + // SAFETY: We don't move out of `this` inside the closure, and we don't move out of `f` + // in any case - we assume that pinning `self` also means pinning `self.f`. + let result = unsafe { self.map_unchecked_mut(|this| &mut this.f) }.poll(&mut cx); + // Also kick if the future resolves. + if result.is_ready() { + wrapped_waker.wake_by_ref(); + } + result + } +} + +/// A waker that wraps another waker, but first executing the "kicker". +struct KickThenWake { + /// The "kicker" that will be invoked when the future wakes up or resolves. + kicker: Arc, + /// The actual waker to invoke after the kicker. + waker: Waker, +} + +impl Wake for KickThenWake { + fn wake(self: Arc) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc) { + (*self.kicker)(); + self.waker.wake_by_ref(); + } +} diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 0321f5a4596fd..7dd675c032056 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,9 +1,14 @@ use alloc::{string::String, vec::Vec}; -use bevy_platform::sync::Arc; -use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem}; +use bevy_platform::sync::{Arc, PoisonError, RwLock}; +use core::{ + cell::{Cell, RefCell}, + future::Future, + marker::PhantomData, + mem, +}; -use crate::executor::LocalExecutor; use crate::{block_on, Task}; +use crate::{executor::LocalExecutor, futures::KickOnWake}; crate::cfg::std! { if { @@ -80,8 +85,16 @@ impl TaskPoolBuilder { /// A thread pool for executing tasks. Tasks are futures that are being automatically driven by /// the pool on threads owned by the pool. In this case - main thread only. -#[derive(Debug, Default, Clone)] -pub struct TaskPool {} +#[derive(Default)] +pub struct TaskPool { + kicker: RwLock>>, +} + +impl core::fmt::Debug for TaskPool { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("TaskPool").finish() + } +} impl TaskPool { /// Just create a new `ThreadExecutor` for wasm @@ -95,7 +108,18 @@ impl TaskPool { } fn new_internal() -> Self { - Self {} + Self { + kicker: Default::default(), + } + } + + /// Sets the "kicker" that futures will invoke when waking. + /// + /// This allows event loops to be notified whenever a future resolves. Note changing this at + /// runtime can have **unpredictable results**. Users should set this before spawning any + /// futures to ensure the kicker is invoked. + pub fn set_kicker(&self, kicker: Arc) { + *self.kicker.write().unwrap_or_else(PoisonError::into_inner) = Some(kicker); } /// Return the number of threads owned by the task pool @@ -156,6 +180,11 @@ impl TaskPool { executor_ref, pending_tasks, results_ref, + kicker: self + .kicker + .read() + .unwrap_or_else(PoisonError::into_inner) + .clone(), scope: PhantomData, env: PhantomData, }; @@ -192,20 +221,25 @@ impl TaskPool { where T: 'static + MaybeSend + MaybeSync, { + let kicker = self + .kicker + .read() + .unwrap_or_else(PoisonError::into_inner) + .clone(); crate::cfg::switch! {{ crate::cfg::web => { - web_task::spawn_local(future) + web_task::spawn_local(KickOnWake { kicker, f: future }) } crate::cfg::std => { LOCAL_EXECUTOR.with(|executor| { - let task = executor.spawn(future); + let task = executor.spawn(KickOnWake { kicker, f: future }); // Loop until all tasks are done while executor.try_tick() {} task }) } _ => { - let task = LOCAL_EXECUTOR.spawn(future); + let task = LOCAL_EXECUTOR.spawn(KickOnWake { kicker, f: future }); // Loop until all tasks are done while LOCAL_EXECUTOR.try_tick() {} task @@ -253,19 +287,32 @@ impl TaskPool { /// A `TaskPool` scope for running one or more non-`'static` futures. /// /// For more information, see [`TaskPool::scope`]. -#[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { executor_ref: &'scope LocalExecutor<'scope>, // The number of pending tasks spawned on the scope pending_tasks: &'scope Cell, // Vector to gather results of all futures spawned during scope run results_ref: &'env RefCell>>, + /// The kicker to wake whenever a future wakes. + kicker: Option>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, } +impl<'scope, 'env: 'scope, T: core::fmt::Debug> core::fmt::Debug for Scope<'scope, 'env, T> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Scope") + .field("executor_ref", &self.executor_ref) + .field("pending_tasks", &self.pending_tasks) + .field("results_ref", &self.results_ref) + .field("scope", &self.scope) + .field("env", &self.env) + .finish() + } +} + impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// Spawns a scoped future onto the executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of @@ -320,7 +367,12 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { }; // spawn the job itself - self.executor_ref.spawn(f).detach(); + self.executor_ref + .spawn(KickOnWake { + kicker: self.kicker.clone(), + f, + }) + .detach(); } } @@ -342,7 +394,7 @@ crate::cfg::std! { #[cfg(test)] mod test { - use std::{time, thread}; + use std::{thread, time}; use super::*; @@ -355,16 +407,14 @@ mod test { #[test] fn scoped_spawn() { let (sender, receiver) = async_channel::unbounded(); - let task_pool = TaskPool {}; + let task_pool = TaskPool::new(); let thread = thread::spawn(move || { let duration = time::Duration::from_millis(50); thread::sleep(duration); let _ = sender.send(0); }); task_pool.scope(|scope| { - scope.spawn(async { - receiver.recv().await - }); + scope.spawn(async { receiver.recv().await }); }); } } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 58c99b95bf0b7..d45d701240d64 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -5,8 +5,8 @@ use std::{ thread_local, }; -use crate::executor::FallibleTask; -use bevy_platform::sync::Arc; +use crate::{executor::FallibleTask, futures::KickOnWake}; +use bevy_platform::sync::{Arc, PoisonError, RwLock}; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; @@ -131,16 +131,27 @@ impl TaskPoolBuilder { /// /// If the result is not required, one may also use [`Task::detach`] and the pool /// will still execute a task, even if it is dropped. -#[derive(Debug)] pub struct TaskPool { /// The executor for the pool. executor: Arc>, + /// Kicker that is triggered whenever a future is awoken. + kicker: RwLock>>, // The inner state of the pool. threads: Vec>, shutdown_tx: async_channel::Sender<()>, } +impl core::fmt::Debug for TaskPool { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("TaskPool") + .field("executor", &self.executor) + .field("threads", &self.threads) + .field("shutdown_tx", &self.shutdown_tx) + .finish() + } +} + impl TaskPool { thread_local! { static LOCAL_EXECUTOR: crate::executor::LocalExecutor<'static> = const { crate::executor::LocalExecutor::new() }; @@ -216,11 +227,21 @@ impl TaskPool { Self { executor, + kicker: RwLock::default(), threads, shutdown_tx, } } + /// Sets the "kicker" that futures will invoke when waking. + /// + /// This allows event loops to be notified whenever a future resolves. Note changing this at + /// runtime can have **unpredictable results**. Users should set this before spawning any + /// futures to ensure the kicker is invoked. + pub fn set_kicker(&self, kicker: Arc) { + *self.kicker.write().unwrap_or_else(PoisonError::into_inner) = Some(kicker); + } + /// Return the number of threads owned by the task pool pub fn thread_num(&self) -> usize { self.threads.len() @@ -397,6 +418,11 @@ impl TaskPool { external_executor, scope_executor, spawned, + kicker: self + .kicker + .read() + .unwrap_or_else(PoisonError::into_inner) + .clone(), scope: PhantomData, env: PhantomData, }; @@ -560,7 +586,12 @@ impl TaskPool { where T: Send + 'static, { - self.executor.spawn(future) + let kicker = self + .kicker + .read() + .unwrap_or_else(PoisonError::into_inner) + .clone(); + self.executor.spawn(KickOnWake { kicker, f: future }) } /// Spawns a static future on the thread-local async executor for the @@ -578,7 +609,12 @@ impl TaskPool { where T: 'static, { - TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)) + let kicker = self + .kicker + .read() + .unwrap_or_else(PoisonError::into_inner) + .clone(); + TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(KickOnWake { kicker, f: future })) } /// Runs a function with the local executor. Typically used to tick @@ -623,17 +659,31 @@ impl Drop for TaskPool { /// A [`TaskPool`] scope for running one or more non-`'static` futures. /// /// For more information, see [`TaskPool::scope`]. -#[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { executor: &'scope crate::executor::Executor<'scope>, external_executor: &'scope ThreadExecutor<'scope>, scope_executor: &'scope ThreadExecutor<'scope>, spawned: &'scope ConcurrentQueue>>>, + /// The kicker to wake whenever a future wakes. + kicker: Option>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, } +impl<'scope, 'env: 'scope, T: core::fmt::Debug> core::fmt::Debug for Scope<'scope, 'env, T> { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("Scope") + .field("executor", &self.executor) + .field("external_executor", &self.external_executor) + .field("scope_executor", &self.scope_executor) + .field("spawned", &self.spawned) + .field("scope", &self.scope) + .field("env", &self.env) + .finish() + } +} + impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// Spawns a scoped future onto the thread pool. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of @@ -644,9 +694,10 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn + 'scope + Send>(&self, f: Fut) { + let kicker = self.kicker.clone(); let task = self .executor - .spawn(AssertUnwindSafe(f).catch_unwind()) + .spawn(AssertUnwindSafe(KickOnWake { kicker, f }).catch_unwind()) .fallible(); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbounded queue, so it is safe to unwrap @@ -660,9 +711,10 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { + let kicker = self.kicker.clone(); let task = self .scope_executor - .spawn(AssertUnwindSafe(f).catch_unwind()) + .spawn(AssertUnwindSafe(KickOnWake { kicker, f }).catch_unwind()) .fallible(); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbounded queue, so it is safe to unwrap @@ -677,9 +729,10 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_external + 'scope + Send>(&self, f: Fut) { + let kicker = self.kicker.clone(); let task = self - .external_executor - .spawn(AssertUnwindSafe(f).catch_unwind()) + .scope_executor + .spawn(AssertUnwindSafe(KickOnWake { kicker, f }).catch_unwind()) .fallible(); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbounded queue, so it is safe to unwrap From 09e2bc16fdd51c72b01befff40729ad4962abf4a Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 18 Apr 2026 19:18:31 -0700 Subject: [PATCH 2/2] Make `WinitPlugin` register a kicker that sends the WakeUp winit event. --- crates/bevy_winit/src/lib.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/crates/bevy_winit/src/lib.rs b/crates/bevy_winit/src/lib.rs index faa10523355a0..bcbc71b95ac10 100644 --- a/crates/bevy_winit/src/lib.rs +++ b/crates/bevy_winit/src/lib.rs @@ -14,6 +14,8 @@ extern crate alloc; +use alloc::sync::Arc; + use bevy_derive::Deref; use bevy_reflect::Reflect; use bevy_window::{ExitSystems, RawHandleWrapperHolder, WindowEvent}; @@ -127,10 +129,21 @@ impl Plugin for WinitPlugin { .build() .expect("Failed to build event loop"); + let event_loop_proxy = event_loop.create_proxy(); + { + let event_loop_proxy = event_loop_proxy.clone(); + let kicker = Arc::new(move || { + // Ignore any errors - this is a best-effort wakeup. + let _ = event_loop_proxy.send_event(WinitUserEvent::WakeUp); + }); + bevy_tasks::IoTaskPool::get().set_kicker(kicker.clone()); + bevy_tasks::AsyncComputeTaskPool::get().set_kicker(kicker); + } + app.init_resource::() .init_resource::() .insert_resource(DisplayHandleWrapper(event_loop.owned_display_handle())) - .insert_resource(EventLoopProxyWrapper(event_loop.create_proxy())) + .insert_resource(EventLoopProxyWrapper(event_loop_proxy)) .add_message::() .set_runner(|app| winit_runner(app, event_loop)) .add_systems(