diff --git a/core/00_infra.js b/core/00_infra.js index c515e2955..7907d8af6 100644 --- a/core/00_infra.js +++ b/core/00_infra.js @@ -40,7 +40,6 @@ let isLeakTracingEnabled = false; let submitLeakTrace; - let eventLoopTick; function __setLeakTracingEnabled(enabled) { isLeakTracingEnabled = enabled; @@ -50,8 +49,7 @@ return isLeakTracingEnabled; } - function __initializeCoreMethods(eventLoopTick_, submitLeakTrace_) { - eventLoopTick = eventLoopTick_; + function __initializeCoreMethods(submitLeakTrace_) { submitLeakTrace = submitLeakTrace_; } @@ -143,9 +141,9 @@ }); const wrappedPromise = PromisePrototypeCatch( promise, - (res) => { - // recreate the stacktrace and strip eventLoopTick() calls from stack trace - ErrorCaptureStackTrace(res, eventLoopTick); + function __opRejectHandler(res) { + // recreate the stacktrace and strip internal event loop frames + ErrorCaptureStackTrace(res, __opRejectHandler); throw res; }, ); diff --git a/core/01_core.js b/core/01_core.js index 3235e4f4e..506bc9f0c 100755 --- a/core/01_core.js +++ b/core/01_core.js @@ -16,7 +16,6 @@ setQueueMicrotask, SafeMap, SafeWeakMap, - Set, StringPrototypeSlice, Symbol, SymbolFor, @@ -55,7 +54,6 @@ op_get_proxy_details, op_get_ext_import_meta_proto, op_has_tick_scheduled, - op_immediate_has_ref_count, op_lazy_load_esm, op_memory_usage, op_op_names, @@ -122,7 +120,6 @@ delete window.__infra; __initializeCoreMethods( - eventLoopTick, submitLeakTrace, ); @@ -143,8 +140,6 @@ let unhandledPromiseRejectionHandler = () => false; let timerDepth = 0; - let timersRunning = false; - const cancelledTimers = new Set(); const macrotaskCallbacks = []; const nextTickCallbacks = []; @@ -162,24 +157,22 @@ ArrayPrototypePush(immediateCallbacks, cb); } - // This function has variable number of arguments. The last argument describes - // if there's a "next tick" scheduled by the Node.js compat layer. Arguments - // before last are alternating integers and any values that describe the - // responses of async ops. - function eventLoopTick() { - let didAnyWork = false; - - // First respond to all pending ops. - for (let i = 0; i < arguments.length - 3; i += 3) { - didAnyWork = true; + // Phase 2: Resolve completed async ops. Called from Rust with flat args: + // (promiseId, isOk, res, promiseId, isOk, res, ...) + function __resolveOps() { + for (let i = 0; i < arguments.length; i += 3) { const promiseId = arguments[i]; const isOk = arguments[i + 1]; const res = arguments[i + 2]; - __resolvePromise(promiseId, res, isOk); } + } + + // Phase 5: Drain nextTick queue and macrotask queue. + // Called from Rust. hasTickScheduled indicates if nextTick was scheduled. + function __drainNextTickAndMacrotasks(hasTickScheduled) { // Drain nextTick queue if there's a tick scheduled. - if (arguments[arguments.length - 1]) { + if (hasTickScheduled) { for (let i = 0; i < nextTickCallbacks.length; i++) { nextTickCallbacks[i](); } @@ -187,7 +180,7 @@ op_run_microtasks(); } - // Finally drain macrotask queue. + // Drain macrotask queue. for (let i = 0; i < macrotaskCallbacks.length; i++) { const cb = macrotaskCallbacks[i]; while (true) { @@ -207,61 +200,40 @@ } } } + } - const timers = arguments[arguments.length - 2]; - if (timers) { - didAnyWork = true; - timersRunning = true; - for (let i = 0; i < timers.length; i += 3) { - timerDepth = timers[i]; - const id = timers[i + 1]; - if (cancelledTimers.has(id)) { - continue; - } - try { - const f = timers[i + 2]; - f.call(window); - } catch (e) { - reportExceptionCallback(e); - } - for (let i = 0; i < nextTickCallbacks.length; i++) { - nextTickCallbacks[i](); + // Phase 2: Handle unhandled promise rejections. + // Called from Rust with a flat array: [promise, reason, context, promise, reason, context, ...] + function __handleRejections() { + for (let i = 0; i < arguments.length; i += 3) { + // Restore the async context that was active when the promise was + // rejected, so that AsyncLocalStorage.getStore() works correctly + // inside unhandledrejection handlers (matching Node.js behavior). + const prevContext = getAsyncContext(); + setAsyncContext(arguments[i + 2]); + try { + const handled = unhandledPromiseRejectionHandler( + arguments[i], + arguments[i + 1], + ); + if (!handled) { + const err = arguments[i + 1]; + op_dispatch_exception(err, true); } - op_run_microtasks(); + } finally { + setAsyncContext(prevContext); } - timersRunning = false; - timerDepth = 0; - cancelledTimers.clear(); } + } - // Drain immediates queue. - if (didAnyWork && op_immediate_has_ref_count()) { - runImmediateCallbacks(); - } + // Set timer depth before each timer callback (called from Rust). + function __setTimerDepth(depth) { + timerDepth = depth; + } - // If we have any rejections for this tick, attempt to process them - const rejections = arguments[arguments.length - 3]; - if (rejections) { - for (let i = 0; i < rejections.length; i += 3) { - // Restore the async context that was active when the promise was - // rejected, so that AsyncLocalStorage.getStore() works correctly - // inside unhandledrejection handlers (matching Node.js behavior). - const prevContext = getAsyncContext(); - setAsyncContext(rejections[i + 2]); - try { - const handled = unhandledPromiseRejectionHandler( - rejections[i], - rejections[i + 1], - ); - if (!handled) { - const err = rejections[i + 1]; - op_dispatch_exception(err, true); - } - } finally { - setAsyncContext(prevContext); - } - } - } + // Report an exception (called from Rust for timer callback errors). + function __reportException(e) { + reportExceptionCallback(e); } function runImmediateCallbacks() { @@ -697,7 +669,11 @@ internalRidSymbol: Symbol("Deno.internal.rid"), internalFdSymbol: Symbol("Deno.internal.fd"), resources, - eventLoopTick, + __resolveOps, + __drainNextTickAndMacrotasks, + __handleRejections, + __setTimerDepth, + __reportException, runImmediateCallbacks, BadResource, BadResourcePrototype, @@ -850,9 +826,6 @@ queueSystemTimer: (_associatedOp, repeat, timeout, task) => op_timer_queue_system(repeat, timeout, task), cancelTimer: (id) => { - if (timersRunning) { - cancelledTimers.add(id); - } op_timer_cancel(id); }, refTimer: (id) => op_timer_ref(id), diff --git a/core/Cargo.toml b/core/Cargo.toml index b8e5d96e8..62f80854a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,7 +14,8 @@ description = "A modern JavaScript/TypeScript runtime built with V8, Rust, and T path = "lib.rs" [features] -default = ["include_icu_data", "v8_use_custom_libcxx"] +default = ["include_icu_data", "v8_use_custom_libcxx", "reactor-tokio"] +reactor-tokio = [] include_icu_data = ["deno_core_icudata"] v8_use_custom_libcxx = ["v8/use_custom_libcxx"] v8_enable_pointer_compression = ["v8/v8_enable_pointer_compression"] diff --git a/core/event_loop.rs b/core/event_loop.rs new file mode 100644 index 000000000..6cb02146f --- /dev/null +++ b/core/event_loop.rs @@ -0,0 +1,36 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +//! Event loop phase state. +//! +//! The actual phase-based event loop is driven by `poll_event_loop_inner` in +//! `jsruntime.rs`. This module provides auxiliary state for phases that need +//! Rust-side callback queues (currently only close callbacks). +//! +//! libuv-style phases (timers, idle, prepare, poll, check) are driven +//! directly through `UvLoopInner` when a uv_loop is registered. + +use std::collections::VecDeque; + +/// Close callback for resource cleanup. +pub(crate) struct CloseCallback { + pub callback: Box, +} + +/// Phase-specific state for the event loop. +/// +/// Currently only tracks close callbacks. Other phase hooks (idle, prepare, +/// check) are handled by `UvLoopInner` for the libuv compat path. +#[derive(Default)] +pub(crate) struct EventLoopPhases { + /// Phase 6: Close callbacks. + pub close_callbacks: VecDeque, +} + +impl EventLoopPhases { + /// Drain and run all close callbacks. + pub fn run_close_callbacks(&mut self) { + while let Some(cb) = self.close_callbacks.pop_front() { + (cb.callback)(); + } + } +} diff --git a/core/lib.rs b/core/lib.rs index 59f27f5f3..49ee2923b 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -11,6 +11,7 @@ mod async_cell; pub mod convert; pub mod cppgc; pub mod error; +pub mod event_loop; mod extension_set; mod extensions; mod external; @@ -26,9 +27,18 @@ mod ops_builtin; mod ops_builtin_types; mod ops_builtin_v8; mod ops_metrics; +pub mod reactor; +#[cfg(feature = "reactor-tokio")] +pub mod reactor_tokio; mod runtime; mod source_map; mod tasks; +#[allow( + non_camel_case_types, + non_upper_case_globals, + clippy::missing_safety_doc +)] +pub mod uv_compat; mod web_timeout; pub mod webidl; @@ -240,8 +250,6 @@ mod tests { use std::process::Command; use std::process::Stdio; - use super::*; - #[test] fn located_script_name() { // Note that this test will fail if this file is moved. We don't diff --git a/core/reactor.rs b/core/reactor.rs new file mode 100644 index 000000000..1ccd351c0 --- /dev/null +++ b/core/reactor.rs @@ -0,0 +1,68 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +//! Reactor abstraction for timer and I/O primitives. +//! +//! Currently used by [`WebTimers`](crate::web_timeout::WebTimers) to abstract +//! over the timer backend. The default implementation (`reactor-tokio` feature) +//! delegates to tokio. +//! +//! Note: `uv_compat` does **not** use this trait -- it talks to tokio directly +//! because it needs lower-level control (poll_accept, try_read, try_write). + +use std::future::Future; +use std::ops::Add; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; + +/// Abstraction over the async I/O reactor (tokio, mio, io_uring, custom). +/// This is the only seam between deno_core and the underlying async runtime. +pub trait Reactor: 'static { + type Timer: ReactorTimer; + type Instant: ReactorInstant; + + /// Create a new one-shot timer that fires at the given instant. + fn timer(&self, deadline: Self::Instant) -> Self::Timer; + + /// Get the current instant. + fn now(&self) -> Self::Instant; + + /// Poll the reactor for I/O readiness. This is called during the "poll" phase. + /// Drives the underlying event source (epoll/kqueue/iocp). + /// `timeout` = None means block indefinitely, Some(Duration::ZERO) means non-blocking. + fn poll(&self, cx: &mut Context, timeout: Option) -> Poll<()>; + + /// Spawn a future onto the reactor's executor (if it has one). + /// Returns a handle that can be polled for the result. + fn spawn( + &self, + fut: Pin + 'static>>, + ) -> Pin>>; +} + +/// A timer future that can be reset to fire at a different deadline. +pub trait ReactorTimer: Future + Unpin { + fn reset(&mut self, deadline: impl Into) + where + Self: Sized; + + type Instant: ReactorInstant; + + /// The deadline this timer is set to fire at. + fn deadline(&self) -> Self::Instant; +} + +/// An instant in time, used for timer deadlines. +pub trait ReactorInstant: + Copy + Ord + Add + Send + Sync + 'static +{ + fn now() -> Self; + fn elapsed(&self) -> Duration; + fn checked_add(&self, duration: Duration) -> Option; +} + +/// The default reactor type, selected by feature flags. +/// When `reactor-tokio` is enabled, this is `TokioReactor`. +#[cfg(feature = "reactor-tokio")] +pub type DefaultReactor = crate::reactor_tokio::TokioReactor; diff --git a/core/reactor_tokio.rs b/core/reactor_tokio.rs new file mode 100644 index 000000000..299b3b392 --- /dev/null +++ b/core/reactor_tokio.rs @@ -0,0 +1,87 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use crate::reactor::Reactor; +use crate::reactor::ReactorInstant; +use crate::reactor::ReactorTimer; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use tokio::time::Instant; +use tokio::time::Sleep; + +/// Default reactor implementation backed by tokio. +#[derive(Default)] +pub struct TokioReactor; + +impl Reactor for TokioReactor { + type Timer = TokioTimer; + type Instant = Instant; + + fn timer(&self, deadline: Self::Instant) -> Self::Timer { + TokioTimer { + sleep: Box::pin(tokio::time::sleep_until(deadline)), + } + } + + fn now(&self) -> Self::Instant { + Instant::now() + } + + fn poll(&self, cx: &mut Context, _timeout: Option) -> Poll<()> { + // Tokio's reactor is driven implicitly by the runtime, + // so we just yield back. + cx.waker().wake_by_ref(); + Poll::Pending + } + + fn spawn( + &self, + fut: Pin + 'static>>, + ) -> Pin>> { + let handle = deno_unsync::spawn(fut); + Box::pin(async move { + let _ = handle.await; + }) + } +} + +/// A timer backed by tokio's [`Sleep`]. +pub struct TokioTimer { + sleep: Pin>, +} + +impl Future for TokioTimer { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.sleep.as_mut().poll(cx) + } +} + +impl ReactorTimer for TokioTimer { + type Instant = Instant; + + fn reset(&mut self, deadline: impl Into) { + self.sleep.as_mut().reset(deadline.into()); + } + + fn deadline(&self) -> Instant { + self.sleep.deadline() + } +} + +impl ReactorInstant for Instant { + fn now() -> Self { + Instant::now() + } + + fn elapsed(&self) -> Duration { + Instant::elapsed(self) + } + + fn checked_add(&self, duration: Duration) -> Option { + Instant::checked_add(self, duration) + } +} diff --git a/core/runtime/jsrealm.rs b/core/runtime/jsrealm.rs index fdfca802d..85b91f53d 100644 --- a/core/runtime/jsrealm.rs +++ b/core/runtime/jsrealm.rs @@ -12,6 +12,7 @@ use crate::error::CreateCodeCacheError; use crate::error::JsError; use crate::error::exception_to_err; use crate::error::exception_to_err_result; +use crate::event_loop::EventLoopPhases; use crate::module_specifier::ModuleSpecifier; use crate::modules::IntoModuleCodeString; use crate::modules::IntoModuleName; @@ -22,8 +23,10 @@ use crate::modules::ModuleName; use crate::modules::script_origin; use crate::ops::ExternalOpsTracker; use crate::ops::OpCtx; +use crate::reactor::DefaultReactor; use crate::stats::RuntimeActivityTraces; use crate::tasks::V8TaskSpawnerFactory; +use crate::uv_compat::UvLoopInner; use crate::web_timeout::WebTimers; use futures::stream::StreamExt; use std::cell::Cell; @@ -71,8 +74,14 @@ pub(crate) struct ImmediateInfo { pub struct ContextState { pub(crate) task_spawner_factory: Arc, - pub(crate) timers: WebTimers<(v8::Global, u32)>, - pub(crate) js_event_loop_tick_cb: RefCell>>, + pub(crate) timers: WebTimers<(v8::Global, u32), DefaultReactor>, + // Per-phase JS callbacks (replacing monolithic eventLoopTick) + pub(crate) js_resolve_ops_cb: RefCell>>, + pub(crate) js_drain_next_tick_and_macrotasks_cb: + RefCell>>, + pub(crate) js_handle_rejections_cb: RefCell>>, + pub(crate) js_set_timer_depth_cb: RefCell>>, + pub(crate) js_report_exception_cb: RefCell>>, pub(crate) run_immediate_callbacks_cb: RefCell>>, pub(crate) js_wasm_streaming_cb: RefCell>>, @@ -91,6 +100,26 @@ pub struct ContextState { pub(crate) immediate_info: RefCell, pub(crate) external_ops_tracker: ExternalOpsTracker, pub(crate) ext_import_meta_proto: RefCell>>, + /// Phase-specific state for the libuv-style event loop. + pub(crate) event_loop_phases: RefCell, + /// Pointer to the `UvLoopInner` for the libuv compat layer. + /// Set via [`JsRuntime::register_uv_loop`] when a `uv_loop_t` is + /// associated with this context. + /// + /// # Safety + /// The pointee is heap-allocated by `uv_loop_init` (boxed) and lives until + /// `uv_loop_close` destroys it. The caller of `register_uv_loop` must + /// guarantee the `uv_loop_t` outlives this `ContextState`. Both + /// `UvLoopInner` and `ContextState` are `!Send` -- all access is on the + /// event loop thread. + pub(crate) uv_loop_inner: Cell>, + /// Raw pointer to the `uv_loop_t` handle, used to set `loop_.data` + /// to the current `v8::Context` at the start of each event loop tick + /// so that libuv-style C callbacks can retrieve the context. + /// + /// # Safety + /// Same lifetime requirements as `uv_loop_inner` above. + pub(crate) uv_loop_ptr: Cell>, } impl ContextState { @@ -108,7 +137,11 @@ impl ContextState { exception_state: Default::default(), has_next_tick_scheduled: Default::default(), immediate_info: Default::default(), - js_event_loop_tick_cb: Default::default(), + js_resolve_ops_cb: Default::default(), + js_drain_next_tick_and_macrotasks_cb: Default::default(), + js_handle_rejections_cb: Default::default(), + js_set_timer_depth_cb: Default::default(), + js_report_exception_cb: Default::default(), run_immediate_callbacks_cb: Default::default(), js_wasm_streaming_cb: Default::default(), wasm_instance_fn: Default::default(), @@ -122,6 +155,9 @@ impl ContextState { unrefed_ops, external_ops_tracker, ext_import_meta_proto: Default::default(), + event_loop_phases: Default::default(), + uv_loop_inner: Cell::new(None), + uv_loop_ptr: Cell::new(None), } } } @@ -209,7 +245,13 @@ impl JsRealmInner { v8::scope!(let scope, &mut isolate); // These globals will prevent snapshots from completing, take them state.exception_state.prepare_to_destroy(); - std::mem::take(&mut *state.js_event_loop_tick_cb.borrow_mut()); + std::mem::take(&mut *state.js_resolve_ops_cb.borrow_mut()); + std::mem::take( + &mut *state.js_drain_next_tick_and_macrotasks_cb.borrow_mut(), + ); + std::mem::take(&mut *state.js_handle_rejections_cb.borrow_mut()); + std::mem::take(&mut *state.js_set_timer_depth_cb.borrow_mut()); + std::mem::take(&mut *state.js_report_exception_cb.borrow_mut()); std::mem::take(&mut *state.run_immediate_callbacks_cb.borrow_mut()); std::mem::take(&mut *state.js_wasm_streaming_cb.borrow_mut()); diff --git a/core/runtime/jsruntime.rs b/core/runtime/jsruntime.rs index e52919580..d1c1fd843 100644 --- a/core/runtime/jsruntime.rs +++ b/core/runtime/jsruntime.rs @@ -1409,7 +1409,11 @@ impl JsRuntime { /// JsRuntime to operate properly. fn store_js_callbacks(&mut self, realm: &JsRealm, will_snapshot: bool) { let ( - event_loop_tick_cb, + resolve_ops_cb, + drain_next_tick_and_macrotasks_cb, + handle_rejections_cb, + set_timer_depth_cb, + report_exception_cb, build_custom_error_cb, run_immediate_callbacks_cb, wasm_instance_fn, @@ -1418,18 +1422,37 @@ impl JsRuntime { let context = realm.context(); let context_local = v8::Local::new(scope, context); let global = context_local.global(scope); - // TODO(bartlomieju): these probably could be captured from main realm so we don't have to - // look up them again? let deno_obj: v8::Local = bindings::get(scope, global, DENO, "Deno"); let core_obj: v8::Local = bindings::get(scope, deno_obj, CORE, "Deno.core"); - let event_loop_tick_cb: v8::Local = bindings::get( + let resolve_ops_cb: v8::Local = + bindings::get(scope, core_obj, RESOLVE_OPS, "Deno.core.__resolveOps"); + let drain_next_tick_and_macrotasks_cb: v8::Local = + bindings::get( + scope, + core_obj, + DRAIN_NEXT_TICK_AND_MACROTASKS, + "Deno.core.__drainNextTickAndMacrotasks", + ); + let handle_rejections_cb: v8::Local = bindings::get( + scope, + core_obj, + HANDLE_REJECTIONS, + "Deno.core.__handleRejections", + ); + let set_timer_depth_cb: v8::Local = bindings::get( scope, core_obj, - EVENT_LOOP_TICK, - "Deno.core.eventLoopTick", + SET_TIMER_DEPTH, + "Deno.core.__setTimerDepth", + ); + let report_exception_cb: v8::Local = bindings::get( + scope, + core_obj, + REPORT_EXCEPTION, + "Deno.core.__reportException", ); let build_custom_error_cb: v8::Local = bindings::get( scope, @@ -1448,8 +1471,6 @@ impl JsRuntime { if !will_snapshot { let key = WEBASSEMBLY.v8_string(scope).unwrap(); if let Some(web_assembly_obj_value) = global.get(scope, key.into()) { - // NOTE(bartlomieju): This is still fallible, because in some V8 modes - // WebAssembly namespace is not available (eg. `--jitless`). let maybe_web_assembly_object = TryInto::>::try_into(web_assembly_obj_value); if let Ok(web_assembly_object) = maybe_web_assembly_object { @@ -1464,7 +1485,11 @@ impl JsRuntime { } ( - v8::Global::new(scope, event_loop_tick_cb), + v8::Global::new(scope, resolve_ops_cb), + v8::Global::new(scope, drain_next_tick_and_macrotasks_cb), + v8::Global::new(scope, handle_rejections_cb), + v8::Global::new(scope, set_timer_depth_cb), + v8::Global::new(scope, report_exception_cb), v8::Global::new(scope, build_custom_error_cb), v8::Global::new(scope, run_immediate_callbacks_cb), wasm_instance_fn.map(|f| v8::Global::new(scope, f)), @@ -1474,9 +1499,25 @@ impl JsRuntime { // Put global handles in the realm's ContextState let state_rc = realm.0.state(); state_rc - .js_event_loop_tick_cb + .js_resolve_ops_cb + .borrow_mut() + .replace(resolve_ops_cb); + state_rc + .js_drain_next_tick_and_macrotasks_cb .borrow_mut() - .replace(event_loop_tick_cb); + .replace(drain_next_tick_and_macrotasks_cb); + state_rc + .js_handle_rejections_cb + .borrow_mut() + .replace(handle_rejections_cb); + state_rc + .js_set_timer_depth_cb + .borrow_mut() + .replace(set_timer_depth_cb); + state_rc + .js_report_exception_cb + .borrow_mut() + .replace(report_exception_cb); state_rc .exception_state .js_build_custom_error_cb @@ -1500,6 +1541,28 @@ impl JsRuntime { self.inner.state.op_state.clone() } + /// Register a `uv_loop_t` with the runtime so that its event loop phases + /// (timers, I/O, idle, prepare, check, close) are driven by + /// `poll_event_loop`. + /// + /// The v8::Context pointer is stored in `loop_.data` at the start of each + /// event loop tick so that libuv-style callbacks can retrieve it. + /// + /// # Safety + /// `loop_ptr` must be a valid, initialized `uv_loop_t` pointer that + /// outlives the runtime. + pub unsafe fn register_uv_loop( + &mut self, + loop_ptr: *mut crate::uv_compat::uv_loop_t, + ) { + let context_state = &self.inner.main_realm.0.context_state; + let inner_ptr = + unsafe { crate::uv_compat::uv_loop_get_inner_ptr(loop_ptr) }; + let uv_inner = inner_ptr as *const crate::uv_compat::UvLoopInner; + context_state.uv_loop_inner.set(Some(uv_inner)); + context_state.uv_loop_ptr.set(Some(loop_ptr)); + } + /// Returns the runtime's op names, ordered by OpId. pub fn op_names(&self) -> Vec<&'static str> { let state = &self.inner.main_realm.0.context_state; @@ -1915,6 +1978,17 @@ impl JsRuntime { self.poll_event_loop_inner(cx, &mut scope, poll_options) } + /// Phase-based event loop tick, loosely following libuv's architecture: + /// + /// 1. Timers -- fire expired libuv C timers + JS WebTimers + /// 2. Pending work -- module progress, task spawner, async ops, + /// nextTick/macrotask drain, immediates, rejections + /// 3. I/O -- drive TCP read/write/accept via UvLoopInner + /// 4. Idle / Prepare -- libuv idle + prepare callbacks + /// 5. Check -- libuv check callbacks + /// 6. Close -- close callbacks (Rust + libuv) + /// + /// Microtask checkpoints run between phases. fn poll_event_loop_inner( &self, cx: &mut Context, @@ -1924,11 +1998,10 @@ impl JsRuntime { let has_inspector = self.inner.state.has_inspector.get(); self.inner.state.waker.register(cx.waker()); + // Pre-phase: Inspector + V8 message loop pump if has_inspector { - // We poll the inspector first. self.inspector().poll_sessions_from_event_loop(cx); } - if poll_options.pump_v8_message_loop { self.pump_v8_message_loop(scope)?; } @@ -1936,23 +2009,126 @@ impl JsRuntime { let realm = &self.inner.main_realm; let modules = &realm.0.module_map; let context_state = &realm.0.context_state; + + // Set the v8::Context pointer in the uv_loop so libuv-style callbacks + // can retrieve it via context_from_loop(). + if let Some(loop_ptr) = context_state.uv_loop_ptr.get() { + let context = scope.get_current_context(); + // SAFETY: `v8::Local` is a thin pointer (one pointer + // wide). We store it as `*mut c_void` in `loop_.data` for the + // duration of this event loop tick. Callbacks reconstruct it via + // `std::mem::transmute` in `context_from_loop()`. The context is + // alive for the entire tick because `scope` holds it. + const _: () = assert!( + std::mem::size_of::>() + == std::mem::size_of::<*mut std::ffi::c_void>() + ); + unsafe { + let ctx_ptr: *mut std::ffi::c_void = std::mem::transmute(context); + (*loop_ptr).data = ctx_ptr; + } + } let exception_state = &context_state.exception_state; + // Tight I/O loop: when run_io does work, re-run I/O phases immediately + // without returning to tokio. This avoids kqueue/kevent round-trip + // latency between batches. + let mut dispatched_ops = false; + let mut did_work = false; + let mut uv_did_io = false; + // ===== Phase 1: Timers ===== + // 1a. Fire expired libuv C timers + if let Some(uv_inner_ptr) = context_state.uv_loop_inner.get() { + unsafe { (*uv_inner_ptr).run_timers() }; + } + // 1b. Fire expired JS timers (direct v8::Function::call per timer) + did_work |= Self::dispatch_timers(cx, scope, context_state); + scope.perform_microtask_checkpoint(); + + // ===== Phase 2: Pending work ===== + // Module progress polling (before ops, matching original ordering) modules.poll_progress(cx, scope)?; - // Resolve async ops, run all next tick callbacks and macrotasks callbacks - // and only then check for any promise exceptions (`unhandledrejection` - // handlers are run in macrotasks callbacks so we need to let them run - // first). - let (dispatched_ops, did_work) = Self::do_js_event_loop_tick_realm( - cx, - scope, - context_state, - exception_state, - )?; + // 2a. V8 task spawner tasks + dispatched_ops |= Self::dispatch_task_spawner(cx, scope, context_state); + + // 2b. Poll and resolve completed async ops + // NOTE: No microtask checkpoint between ops and nextTick/macrotask! + // This matches the old eventLoopTick behavior where ops resolve, + // nextTick drains, and macrotasks run all within the same JS call + // before any microtask checkpoint. Promise continuations (like await + // resumption) run only after all three have completed. + dispatched_ops |= Self::dispatch_pending_ops(cx, scope, context_state)?; + + // 2c. nextTick drain + macrotask drain (before microtask checkpoint) + // Only drain if there's actual work (ops dispatched, tick scheduled, or timers fired). + // This prevents macrotask callbacks from running on empty iterations. + let has_tick_scheduled = context_state.has_next_tick_scheduled.get(); + dispatched_ops |= has_tick_scheduled; + if dispatched_ops || did_work || has_tick_scheduled { + Self::drain_next_tick_and_macrotasks(scope, context_state)?; + } + + // 2d. Immediates (if ops or timers did work) + if (did_work || dispatched_ops) + && context_state.immediate_info.borrow().ref_count > 0 + { + Self::do_js_run_immediate_callbacks(scope, context_state)?; + } + + // 2e. Handle promise rejections (after nextTick/macrotask, since + // unhandledrejection handlers are run in macrotask callbacks) + Self::dispatch_rejections(scope, context_state, exception_state)?; + scope.perform_microtask_checkpoint(); + + // ===== Phase 3: I/O ===== + // Tight I/O loop: when run_io reads data and fires callbacks, the + // resulting JS work (nextTick/macrotasks from HTTP2 frame processing) + // may produce write calls. Drain those immediately and re-poll for + // more data, avoiding kqueue/kevent round-trip latency between batches. + if let Some(uv_inner_ptr) = context_state.uv_loop_inner.get() { + unsafe { + (*uv_inner_ptr).set_waker(cx.waker()); + } + for _io_spin in 0..8 { + let did_io = unsafe { (*uv_inner_ptr).run_io() }; + if !did_io { + break; + } + uv_did_io = true; + scope.perform_microtask_checkpoint(); + Self::drain_next_tick_and_macrotasks(scope, context_state)?; + scope.perform_microtask_checkpoint(); + } + } + + // ===== Phase 4: Idle / Prepare ===== + if let Some(uv_inner_ptr) = context_state.uv_loop_inner.get() { + unsafe { + (*uv_inner_ptr).run_idle(); + (*uv_inner_ptr).run_prepare(); + }; + } + scope.perform_microtask_checkpoint(); + + // ===== Phase 5: Check ===== + if let Some(uv_inner_ptr) = context_state.uv_loop_inner.get() { + unsafe { (*uv_inner_ptr).run_check() }; + } + scope.perform_microtask_checkpoint(); + + // ===== Phase 6: Close ===== exception_state.check_exception_condition(scope)?; + { + let mut phases = context_state.event_loop_phases.borrow_mut(); + phases.run_close_callbacks(); + } + if let Some(uv_inner_ptr) = context_state.uv_loop_inner.get() { + unsafe { (*uv_inner_ptr).run_close() }; + } + scope.perform_microtask_checkpoint(); - // Get the pending state from the main realm, or all realms + // Evaluate pending state let pending_state = EventLoopPendingState::new(scope, context_state, modules); @@ -1978,25 +2154,13 @@ impl JsRuntime { return Poll::Ready(Ok(())); } - // eprintln!( - // "did work {}, dispatched_ops {}, has timers {}, has_refed_immediates {}, has_outstanding_immediates {}", - // did_work, - // dispatched_ops, - // context_state.timers.has_pending(), - // pending_state.has_refed_immediates, - // pending_state.has_outstanding_immediates - // ); - if !did_work && pending_state.has_refed_immediates > 0 { + // Run immediates if not already run above and there are refed immediates pending + if !did_work && !dispatched_ops && pending_state.has_refed_immediates > 0 { Self::do_js_run_immediate_callbacks(scope, context_state)?; + scope.perform_microtask_checkpoint(); } - // Check if more async ops have been dispatched - // during this turn of event loop. - // If there are any pending background tasks, we also wake the runtime to - // make sure we don't miss them. - // TODO(andreubotella) The event loop will spin as long as there are pending - // background tasks. We should look into having V8 notify us when a - // background task is done. + // Re-wake logic for next iteration #[allow(clippy::suspicious_else_formatting, clippy::if_same_then_else)] { if pending_state.has_pending_background_tasks @@ -2004,6 +2168,7 @@ impl JsRuntime { || pending_state.has_outstanding_immediates || pending_state.has_refed_immediates > 0 || pending_state.has_pending_promise_events + || uv_did_io { self.inner.state.waker.wake(); } else @@ -2053,9 +2218,6 @@ impl JsRuntime { .into_box(), )); } else { - // Delay the above error by one spin of the event loop. A dynamic import - // evaluation may complete during this, in which case the counter will - // reset. realm.increment_modules_idle(); self.inner.state.waker.wake(); } @@ -2265,6 +2427,7 @@ pub(crate) struct EventLoopPendingState { has_pending_external_ops: bool, has_outstanding_immediates: bool, has_refed_immediates: u32, + has_uv_alive_handles: bool, } impl EventLoopPendingState { @@ -2300,6 +2463,12 @@ impl EventLoopPendingState { let info = state.immediate_info.borrow(); (info.has_outstanding, info.ref_count) }; + let has_uv_alive_handles = + if let Some(uv_inner_ptr) = state.uv_loop_inner.get() { + unsafe { (*uv_inner_ptr).has_alive_handles() } + } else { + false + }; EventLoopPendingState { has_pending_ops: has_pending_refed_ops || has_pending_timers @@ -2314,6 +2483,7 @@ impl EventLoopPendingState { has_pending_external_ops: state.external_ops_tracker.has_pending_ops(), has_outstanding_immediates, has_refed_immediates, + has_uv_alive_handles, } } @@ -2334,6 +2504,7 @@ impl EventLoopPendingState { || self.has_refed_immediates > 0 || self.has_pending_promise_events || self.has_pending_external_ops + || self.has_uv_alive_handles } } @@ -2561,55 +2732,139 @@ impl JsRuntime { Ok(()) } - fn do_js_event_loop_tick_realm<'s, 'i>( + /// Phase 1 (Timers): Poll JS WebTimers, dispatch each callback directly + /// via v8::Function::call. Microtask checkpoint + nextTick drain between + /// each timer callback. + fn dispatch_timers<'s, 'i>( cx: &mut Context, scope: &mut v8::PinScope<'s, 'i>, context_state: &ContextState, - exception_state: &ExceptionState, - ) -> Result<(bool, bool), Box> { - let mut dispatched_ops = false; - let mut did_work = false; + ) -> bool { + let expired = match context_state.timers.poll_timers(cx) { + Poll::Ready(expired) => expired, + _ => return false, + }; + + if expired.is_empty() { + return false; + } - // Poll any pending task spawner tasks. Note that we need to poll separately because otherwise - // Rust will extend the lifetime of the borrow longer than we expect. + let traces_enabled = context_state.activity_traces.is_enabled(); + let undefined: v8::Local = v8::undefined(scope).into(); + let global_this = scope.get_current_context().global(scope).into(); + + for (timer_id, timer_type) in &expired { + // Extract the timer data; if it was cancelled during this dispatch + // loop (e.g. clearTimeout called from an earlier callback), skip it. + let Some((callback, depth)) = + context_state.timers.take_fired_timer(*timer_id, timer_type) + else { + continue; + }; + + if traces_enabled { + context_state + .activity_traces + .complete(RuntimeActivityType::Timer, *timer_id as _); + } + + // Set timer depth via JS setter + { + let set_timer_depth_cb = context_state.js_set_timer_depth_cb.borrow(); + let set_timer_depth_fn = + set_timer_depth_cb.as_ref().unwrap().open(scope); + let depth_val = v8::Integer::new(scope, depth as i32); + set_timer_depth_fn.call(scope, undefined, &[depth_val.into()]); + } + + // Call the timer callback directly + { + v8::tc_scope!(let tc_scope, scope); + let cb = callback.open(tc_scope); + cb.call(tc_scope, global_this, &[]); + + if let Some(exception) = tc_scope.exception() { + // Report exception but don't abort the timer loop. + // Globalize the exception value, then get report fn and call it. + let exc_global = v8::Global::new(tc_scope, exception); + { + let report_exception_cb = + context_state.js_report_exception_cb.borrow(); + if let Some(report_fn_global) = report_exception_cb.as_ref() { + let report_fn = report_fn_global.open(tc_scope); + let exc_local = v8::Local::new(tc_scope, &exc_global); + report_fn.call(tc_scope, undefined, &[exc_local]); + } + } + } + } + + // Microtask checkpoint between each timer + scope.perform_microtask_checkpoint(); + + // Drain nextTick between each timer callback + { + let has_tick = context_state.has_next_tick_scheduled.get(); + let drain_cb = + context_state.js_drain_next_tick_and_macrotasks_cb.borrow(); + let drain_fn = drain_cb.as_ref().unwrap().open(scope); + let has_tick_val = v8::Boolean::new(scope, has_tick); + drain_fn.call(scope, undefined, &[has_tick_val.into()]); + } + + scope.perform_microtask_checkpoint(); + } + + // Reset timer depth to 0 after all timers + { + let set_timer_depth_cb = context_state.js_set_timer_depth_cb.borrow(); + let set_timer_depth_fn = set_timer_depth_cb.as_ref().unwrap().open(scope); + let zero = v8::Integer::new(scope, 0); + set_timer_depth_fn.call(scope, undefined, &[zero.into()]); + } + + true + } + + /// Phase 2a: Poll and dispatch V8 task spawner tasks. + fn dispatch_task_spawner( + cx: &mut Context, + scope: &mut v8::PinScope, + context_state: &ContextState, + ) -> bool { + let mut dispatched = false; let mut retries = 3; while let Poll::Ready(tasks) = context_state.task_spawner_factory.poll_inner(cx) { - // TODO(mmastrac): we are using this flag - dispatched_ops = true; + dispatched = true; for task in tasks { task(scope); } - // We may need to perform a microtask checkpoint here scope.perform_microtask_checkpoint(); - // We don't want tasks that spawn other tasks to starve the event loop, so break - // after three times around and allow the remainder of the event loop to spin. retries -= 1; if retries == 0 { cx.waker().wake_by_ref(); break; } } + dispatched + } - // We return async responses to JS in bounded batches. Note that because - // we're passing these to JS as arguments, it is possible to overflow the - // JS stack by just passing too many. + /// Phase 2b: Poll completed async ops and batch-resolve via JS __resolveOps. + fn dispatch_pending_ops<'s, 'i>( + cx: &mut Context, + scope: &mut v8::PinScope<'s, 'i>, + context_state: &ContextState, + ) -> Result> { const MAX_VEC_SIZE_FOR_OPS: usize = 1024; - // each batch is a flat vector of tuples: - // `[promise_id1, op_result1, promise_id2, op_result2, ...]` - // promise_id is a simple integer, op_result is an ops::OpResult - // which contains a value OR an error, encoded as a tuple. - // This batch is received in JS via the special `arguments` variable - // and then each tuple is used to resolve or reject promises let mut args: SmallVec<[v8::Local; 32]> = SmallVec::with_capacity(32); loop { if args.len() >= MAX_VEC_SIZE_FOR_OPS { - // We have too many, bail for now but re-wake the waker cx.waker().wake_by_ref(); break; } @@ -2637,17 +2892,42 @@ impl JsRuntime { context_state .activity_traces .complete(RuntimeActivityType::AsyncOp, promise_id as _); - dispatched_ops |= true; - did_work |= true; args.push(v8::Integer::new(scope, promise_id).into()); args.push(v8::Boolean::new(scope, res.is_ok()).into()); args.push(res.unwrap_or_else(std::convert::identity)); } + if args.is_empty() { + return Ok(false); + } + + let undefined: v8::Local = v8::undefined(scope).into(); + + v8::tc_scope!(let tc_scope, scope); + + let resolve_ops_cb = context_state.js_resolve_ops_cb.borrow(); + let resolve_ops_fn = resolve_ops_cb.as_ref().unwrap().open(tc_scope); + resolve_ops_fn.call(tc_scope, undefined, args.as_slice()); + + if let Some(exception) = tc_scope.exception() { + return exception_to_err_result(tc_scope, exception, false, true); + } + if tc_scope.has_terminated() || tc_scope.is_execution_terminating() { + return Ok(false); + } + + Ok(true) + } + + /// Phase 2c: Handle promise rejections. + fn dispatch_rejections<'s, 'i>( + scope: &mut v8::PinScope<'s, 'i>, + context_state: &ContextState, + exception_state: &ExceptionState, + ) -> Result<(), Box> { let undefined: v8::Local = v8::undefined(scope).into(); - let has_tick_scheduled = context_state.has_next_tick_scheduled.get(); - dispatched_ops |= has_tick_scheduled; + // First handle "handled" rejections while let Some((promise, result)) = exception_state .pending_handled_promise_rejections .borrow_mut() @@ -2659,7 +2939,6 @@ impl JsRuntime { .as_ref() { let function = handler.open(scope); - let args = [ v8::Local::new(scope, promise).into(), v8::Local::new(scope, result), @@ -2668,87 +2947,63 @@ impl JsRuntime { } } - let rejections = if !exception_state + // Then handle unhandled rejections + if exception_state .pending_promise_rejections - .borrow_mut() + .borrow() .is_empty() { - // Avoid holding the pending rejection lock longer than necessary - let mut pending_rejections = - exception_state.pending_promise_rejections.borrow_mut(); - let mut rejections = VecDeque::default(); - std::mem::swap(&mut *pending_rejections, &mut rejections); - drop(pending_rejections); - - let arr = v8::Array::new(scope, (rejections.len() * 3) as i32); - let mut index = 0; - for rejection in rejections.into_iter() { - let value = v8::Local::new(scope, rejection.0); - arr.set_index(scope, index, value.into()); - index += 1; - let value = v8::Local::new(scope, rejection.1); - arr.set_index(scope, index, value); - index += 1; - let value = v8::Local::new(scope, rejection.2); - arr.set_index(scope, index, value); - index += 1; - } - arr.into() - } else { - undefined - }; - - args.push(rejections); - - // TODO(mmastrac): timer dispatch should be done via direct function call, but we will have to start - // storing the exception-reporting callback. - let timers = match context_state.timers.poll_timers(cx) { - Poll::Ready(timers) => { - did_work |= true; - let traces_enabled = context_state.activity_traces.is_enabled(); - let arr = v8::Array::new(scope, (timers.len() * 3) as _); - #[allow(clippy::needless_range_loop)] - for i in 0..timers.len() { - if traces_enabled { - // Timer and interval traces both use RuntimeActivityType::Timer - context_state - .activity_traces - .complete(RuntimeActivityType::Timer, timers[i].0 as _); - } - // depth, id, function - let value = v8::Integer::new(scope, timers[i].1.1 as _); - arr.set_index(scope, (i * 3) as _, value.into()); - let value = v8::Number::new(scope, timers[i].0 as _); - arr.set_index(scope, (i * 3 + 1) as _, value.into()); - let value = v8::Local::new(scope, timers[i].1.0.clone()); - arr.set_index(scope, (i * 3 + 2) as _, value.into()); - } - arr.into() - } - _ => undefined, - }; - args.push(timers); + return Ok(()); + } - let has_tick_scheduled = v8::Boolean::new(scope, has_tick_scheduled); - args.push(has_tick_scheduled.into()); + let mut pending_rejections = + exception_state.pending_promise_rejections.borrow_mut(); + let mut rejections = VecDeque::default(); + std::mem::swap(&mut *pending_rejections, &mut rejections); + drop(pending_rejections); + + let mut args: SmallVec<[v8::Local; 16]> = + SmallVec::with_capacity(rejections.len() * 3); + for rejection in rejections.into_iter() { + args.push(v8::Local::new(scope, rejection.0).into()); + args.push(v8::Local::new(scope, rejection.1)); + args.push(v8::Local::new(scope, rejection.2)); + } v8::tc_scope!(let tc_scope, scope); - let js_event_loop_tick_cb = context_state.js_event_loop_tick_cb.borrow(); - let js_event_loop_tick_cb = - js_event_loop_tick_cb.as_ref().unwrap().open(tc_scope); - - js_event_loop_tick_cb.call(tc_scope, undefined, args.as_slice()); + let handle_rejections_cb = context_state.js_handle_rejections_cb.borrow(); + let handle_rejections_fn = + handle_rejections_cb.as_ref().unwrap().open(tc_scope); + handle_rejections_fn.call(tc_scope, undefined, args.as_slice()); if let Some(exception) = tc_scope.exception() { return exception_to_err_result(tc_scope, exception, false, true); } - if tc_scope.has_terminated() || tc_scope.is_execution_terminating() { - return Ok((false, false)); + Ok(()) + } + + /// Phase 5a: Drain nextTick queue and macrotask queue. + fn drain_next_tick_and_macrotasks<'s, 'i>( + scope: &mut v8::PinScope<'s, 'i>, + context_state: &ContextState, + ) -> Result<(), Box> { + let undefined: v8::Local = v8::undefined(scope).into(); + let has_tick_scheduled = context_state.has_next_tick_scheduled.get(); + + v8::tc_scope!(let tc_scope, scope); + + let drain_cb = context_state.js_drain_next_tick_and_macrotasks_cb.borrow(); + let drain_fn = drain_cb.as_ref().unwrap().open(tc_scope); + let has_tick_val = v8::Boolean::new(tc_scope, has_tick_scheduled); + drain_fn.call(tc_scope, undefined, &[has_tick_val.into()]); + + if let Some(exception) = tc_scope.exception() { + return exception_to_err_result(tc_scope, exception, false, true); } - Ok((dispatched_ops, did_work)) + Ok(()) } } diff --git a/core/runtime/v8_static_strings.rs b/core/runtime/v8_static_strings.rs index 78e9216bf..269f7fa3c 100644 --- a/core/runtime/v8_static_strings.rs +++ b/core/runtime/v8_static_strings.rs @@ -25,7 +25,11 @@ v8_static_strings!( DIRNAME = "dirname", ERR_MODULE_NOT_FOUND = "ERR_MODULE_NOT_FOUND", ERRORS = "errors", - EVENT_LOOP_TICK = "eventLoopTick", + RESOLVE_OPS = "__resolveOps", + DRAIN_NEXT_TICK_AND_MACROTASKS = "__drainNextTickAndMacrotasks", + HANDLE_REJECTIONS = "__handleRejections", + SET_TIMER_DEPTH = "__setTimerDepth", + REPORT_EXCEPTION = "__reportException", RUN_IMMEDIATE_CALLBACKS = "runImmediateCallbacks", FILENAME = "filename", INSTANCE = "Instance", diff --git a/core/uv_compat.rs b/core/uv_compat.rs new file mode 100644 index 000000000..6b0415dc6 --- /dev/null +++ b/core/uv_compat.rs @@ -0,0 +1,1961 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +// Drop-in replacement for libuv integrated with deno_core's event loop. + +use std::cell::Cell; +use std::cell::RefCell; +use std::collections::BTreeSet; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::ffi::c_char; +use std::ffi::c_int; +use std::ffi::c_uint; +use std::ffi::c_void; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; +use std::time::Instant; + +#[cfg(unix)] +use libc::AF_INET; +#[cfg(unix)] +use libc::AF_INET6; +#[cfg(unix)] +use libc::sockaddr_in; +#[cfg(unix)] +use libc::sockaddr_in6; +#[cfg(unix)] +type sa_family_t = libc::sa_family_t; +#[cfg(windows)] +use win_sock::AF_INET; +#[cfg(windows)] +use win_sock::AF_INET6; +#[cfg(windows)] +use win_sock::sockaddr_in; +#[cfg(windows)] +use win_sock::sockaddr_in6; +#[cfg(windows)] +type sa_family_t = win_sock::sa_family_t; + +// libc doesn't export socket structs on Windows. +#[cfg(windows)] +mod win_sock { + #[repr(C)] + pub struct in_addr { + pub s_addr: u32, + } + #[repr(C)] + pub struct sockaddr_in { + pub sin_family: u16, + pub sin_port: u16, + pub sin_addr: in_addr, + pub sin_zero: [u8; 8], + } + #[repr(C)] + pub struct in6_addr { + pub s6_addr: [u8; 16], + } + #[repr(C)] + pub struct sockaddr_in6 { + pub sin6_family: u16, + pub sin6_port: u16, + pub sin6_flowinfo: u32, + pub sin6_addr: in6_addr, + pub sin6_scope_id: u32, + } + pub const AF_INET: i32 = 2; + pub const AF_INET6: i32 = 23; + pub type sa_family_t = u16; + pub const SD_SEND: i32 = 1; + unsafe extern "system" { + pub fn shutdown(socket: usize, how: i32) -> i32; + } +} + +#[repr(C)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum uv_handle_type { + UV_UNKNOWN_HANDLE = 0, + UV_TIMER = 1, + UV_IDLE = 2, + UV_PREPARE = 3, + UV_CHECK = 4, + UV_TCP = 12, +} + +const UV_HANDLE_ACTIVE: u32 = 1 << 0; +const UV_HANDLE_REF: u32 = 1 << 1; +const UV_HANDLE_CLOSING: u32 = 1 << 2; + +// libuv-compatible error codes (negative errno values on unix, +// which vary depending on platform, fixed values on windows). +macro_rules! uv_errno { + ($name:ident, $unix:expr, $win:expr) => { + #[cfg(unix)] + pub const $name: i32 = -($unix); + #[cfg(windows)] + pub const $name: i32 = $win; + }; +} + +uv_errno!(UV_EAGAIN, libc::EAGAIN, -4088); +uv_errno!(UV_EBADF, libc::EBADF, -4083); +uv_errno!(UV_EADDRINUSE, libc::EADDRINUSE, -4091); +uv_errno!(UV_ECONNREFUSED, libc::ECONNREFUSED, -4078); +uv_errno!(UV_EINVAL, libc::EINVAL, -4071); +uv_errno!(UV_ENOTCONN, libc::ENOTCONN, -4053); +uv_errno!(UV_ECANCELED, libc::ECANCELED, -4081); +uv_errno!(UV_EPIPE, libc::EPIPE, -4047); +pub const UV_EOF: i32 = -4095; + +#[repr(C)] +pub struct uv_loop_t { + internal: *mut c_void, + pub data: *mut c_void, + stop_flag: Cell, +} + +#[repr(C)] +pub struct uv_handle_t { + pub r#type: uv_handle_type, + pub loop_: *mut uv_loop_t, + pub data: *mut c_void, + pub flags: u32, +} + +#[repr(C)] +pub struct uv_timer_t { + pub r#type: uv_handle_type, + pub loop_: *mut uv_loop_t, + pub data: *mut c_void, + pub flags: u32, + internal_id: u64, + internal_deadline: u64, + cb: Option, + timeout: u64, + repeat: u64, +} + +#[repr(C)] +pub struct uv_idle_t { + pub r#type: uv_handle_type, + pub loop_: *mut uv_loop_t, + pub data: *mut c_void, + pub flags: u32, + cb: Option, +} + +#[repr(C)] +pub struct uv_prepare_t { + pub r#type: uv_handle_type, + pub loop_: *mut uv_loop_t, + pub data: *mut c_void, + pub flags: u32, + cb: Option, +} + +#[repr(C)] +pub struct uv_check_t { + pub r#type: uv_handle_type, + pub loop_: *mut uv_loop_t, + pub data: *mut c_void, + pub flags: u32, + cb: Option, +} + +#[repr(C)] +pub struct uv_stream_t { + pub r#type: uv_handle_type, + pub loop_: *mut uv_loop_t, + pub data: *mut c_void, + pub flags: u32, +} + +#[repr(C)] +pub struct uv_tcp_t { + pub r#type: uv_handle_type, + pub loop_: *mut uv_loop_t, + pub data: *mut c_void, + pub flags: u32, + #[cfg(unix)] + internal_fd: Option, + #[cfg(windows)] + internal_fd: Option, + internal_bind_addr: Option, + internal_stream: Option, + internal_listener: Option, + internal_listener_addr: Option, + internal_nodelay: bool, + internal_alloc_cb: Option, + internal_read_cb: Option, + internal_reading: bool, + internal_connect: Option, + internal_write_queue: VecDeque, + internal_connection_cb: Option, + internal_backlog: VecDeque, +} + +/// In-flight TCP connect operation. +/// +/// # Safety +/// `req` is a raw pointer to a caller-owned `uv_connect_t`. The caller must +/// ensure it remains valid until the connect callback fires (at which point +/// `ConnectPending` is consumed). This struct is `!Send` -- it lives on the +/// event loop thread alongside `UvLoopInner`. +struct ConnectPending { + future: Pin>>>, + req: *mut uv_connect_t, + cb: Option, +} + +/// Queued write operation waiting for the socket to become writable. +/// +/// # Safety +/// `req` is a raw pointer to a caller-owned `uv_write_t`. The caller must +/// ensure it remains valid until the write callback fires (at which point +/// `WritePending` is consumed). This struct is `!Send`. +struct WritePending { + req: *mut uv_write_t, + data: Vec, + offset: usize, + cb: Option, +} + +#[repr(C)] +pub struct uv_write_t { + pub r#type: i32, // UV_REQ_TYPE fields + pub data: *mut c_void, + pub handle: *mut uv_stream_t, +} + +#[repr(C)] +pub struct uv_connect_t { + pub r#type: i32, + pub data: *mut c_void, + pub handle: *mut uv_stream_t, +} + +#[repr(C)] +pub struct uv_shutdown_t { + pub r#type: i32, + pub data: *mut c_void, + pub handle: *mut uv_stream_t, +} + +/// I/O buffer descriptor matching libuv's `uv_buf_t`. +/// +/// Field order is `{base, len}` which matches the macOS/Windows layout. +/// On Linux, real libuv uses `{len, base}` (matching `struct iovec`). +/// This is fine as long as the struct is only constructed/consumed in Rust; +/// if it ever needs to cross an FFI boundary to real C code on Linux, +/// the field order must be made platform-conditional. +#[repr(C)] +pub struct uv_buf_t { + pub base: *mut c_char, + pub len: usize, +} + +pub type uv_timer_cb = unsafe extern "C" fn(*mut uv_timer_t); +pub type uv_idle_cb = unsafe extern "C" fn(*mut uv_idle_t); +pub type uv_prepare_cb = unsafe extern "C" fn(*mut uv_prepare_t); +pub type uv_check_cb = unsafe extern "C" fn(*mut uv_check_t); +pub type uv_close_cb = unsafe extern "C" fn(*mut uv_handle_t); +pub type uv_write_cb = unsafe extern "C" fn(*mut uv_write_t, i32); +pub type uv_alloc_cb = + unsafe extern "C" fn(*mut uv_handle_t, usize, *mut uv_buf_t); +pub type uv_read_cb = + unsafe extern "C" fn(*mut uv_stream_t, isize, *const uv_buf_t); +pub type uv_connection_cb = unsafe extern "C" fn(*mut uv_stream_t, i32); +pub type uv_connect_cb = unsafe extern "C" fn(*mut uv_connect_t, i32); +pub type uv_shutdown_cb = unsafe extern "C" fn(*mut uv_shutdown_t, i32); + +pub type UvHandle = uv_handle_t; +pub type UvLoop = uv_loop_t; +pub type UvStream = uv_stream_t; +pub type UvTcp = uv_tcp_t; +pub type UvWrite = uv_write_t; +pub type UvBuf = uv_buf_t; +pub type UvConnect = uv_connect_t; +pub type UvShutdown = uv_shutdown_t; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +struct TimerKey { + deadline_ms: u64, + id: u64, +} + +pub(crate) struct UvLoopInner { + timers: RefCell>, + next_timer_id: Cell, + timer_handles: RefCell>, + idle_handles: RefCell>, + prepare_handles: RefCell>, + check_handles: RefCell>, + tcp_handles: RefCell>, + waker: RefCell>, + closing_handles: RefCell)>>, + time_origin: Instant, +} + +impl UvLoopInner { + fn new() -> Self { + Self { + timers: RefCell::new(BTreeSet::new()), + next_timer_id: Cell::new(1), + timer_handles: RefCell::new(HashMap::with_capacity(16)), + idle_handles: RefCell::new(Vec::with_capacity(8)), + prepare_handles: RefCell::new(Vec::with_capacity(8)), + check_handles: RefCell::new(Vec::with_capacity(8)), + tcp_handles: RefCell::new(Vec::with_capacity(8)), + waker: RefCell::new(None), + closing_handles: RefCell::new(VecDeque::with_capacity(16)), + time_origin: Instant::now(), + } + } + + pub(crate) fn set_waker(&self, waker: &Waker) { + let mut slot = self.waker.borrow_mut(); + match slot.as_ref() { + Some(existing) if existing.will_wake(waker) => {} + _ => *slot = Some(waker.clone()), + } + } + + #[inline] + fn alloc_timer_id(&self) -> u64 { + let id = self.next_timer_id.get(); + self.next_timer_id.set(id + 1); + id + } + + #[inline] + fn now_ms(&self) -> u64 { + Instant::now().duration_since(self.time_origin).as_millis() as u64 + } + + pub(crate) fn has_alive_handles(&self) -> bool { + for (_, handle_ptr) in self.timer_handles.borrow().iter() { + // SAFETY: Handle pointers in timer_handles are kept valid by the C caller until uv_close. + let handle = unsafe { &**handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && handle.flags & UV_HANDLE_REF != 0 + { + return true; + } + } + for handle_ptr in self.idle_handles.borrow().iter() { + // SAFETY: Handle pointers in idle_handles are kept valid by the C caller until uv_close. + let handle = unsafe { &**handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && handle.flags & UV_HANDLE_REF != 0 + { + return true; + } + } + for handle_ptr in self.prepare_handles.borrow().iter() { + // SAFETY: Handle pointers in prepare_handles are kept valid by the C caller until uv_close. + let handle = unsafe { &**handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && handle.flags & UV_HANDLE_REF != 0 + { + return true; + } + } + for handle_ptr in self.check_handles.borrow().iter() { + // SAFETY: Handle pointers in check_handles are kept valid by the C caller until uv_close. + let handle = unsafe { &**handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && handle.flags & UV_HANDLE_REF != 0 + { + return true; + } + } + for handle_ptr in self.tcp_handles.borrow().iter() { + // SAFETY: Handle pointers in tcp_handles are kept valid by the C caller until uv_close. + let handle = unsafe { &**handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && handle.flags & UV_HANDLE_REF != 0 + { + return true; + } + } + if !self.closing_handles.borrow().is_empty() { + return true; + } + false + } + + /// ### Safety + /// All timer handle pointers stored in `timer_handles` must be valid. + pub(crate) unsafe fn run_timers(&self) { + let now = self.now_ms(); + let mut expired = Vec::new(); + { + let timers = self.timers.borrow(); + for key in timers.iter() { + if key.deadline_ms > now { + break; + } + expired.push(*key); + } + } + + for key in expired { + self.timers.borrow_mut().remove(&key); + let handle_ptr = match self.timer_handles.borrow().get(&key.id).copied() { + Some(h) => h, + None => continue, + }; + // SAFETY: handle_ptr comes from timer_handles; caller guarantees validity. + let handle = unsafe { &mut *handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE == 0 { + self.timer_handles.borrow_mut().remove(&key.id); + continue; + } + let cb = handle.cb; + let repeat = handle.repeat; + + if repeat > 0 { + let new_deadline = now + repeat; + let new_key = TimerKey { + deadline_ms: new_deadline, + id: key.id, + }; + handle.internal_deadline = new_deadline; + self.timers.borrow_mut().insert(new_key); + } else { + handle.flags &= !UV_HANDLE_ACTIVE; + self.timer_handles.borrow_mut().remove(&key.id); + } + + if let Some(cb) = cb { + // SAFETY: handle_ptr is valid; cb was set by the C caller via uv_timer_start. + unsafe { cb(handle_ptr) }; + } + } + } + + /// ### Safety + /// All idle handle pointers stored in `idle_handles` must be valid. + pub(crate) unsafe fn run_idle(&self) { + let mut i = 0; + loop { + let handle_ptr = { + let handles = self.idle_handles.borrow(); + if i >= handles.len() { + break; + } + handles[i] + }; + i += 1; + // SAFETY: handle_ptr comes from idle_handles; caller guarantees validity. + let handle = unsafe { &*handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && let Some(cb) = handle.cb + { + // SAFETY: Callback set by C caller via uv_idle_start; handle_ptr is valid. + unsafe { cb(handle_ptr) }; + } + } + } + + /// ### Safety + /// All prepare handle pointers stored in `prepare_handles` must be valid. + pub(crate) unsafe fn run_prepare(&self) { + let mut i = 0; + loop { + let handle_ptr = { + let handles = self.prepare_handles.borrow(); + if i >= handles.len() { + break; + } + handles[i] + }; + i += 1; + // SAFETY: handle_ptr comes from prepare_handles; caller guarantees validity. + let handle = unsafe { &*handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && let Some(cb) = handle.cb + { + // SAFETY: Callback set by C caller via uv_prepare_start; handle_ptr is valid. + unsafe { cb(handle_ptr) }; + } + } + } + + /// ### Safety + /// All check handle pointers stored in `check_handles` must be valid. + pub(crate) unsafe fn run_check(&self) { + let mut i = 0; + loop { + let handle_ptr = { + let handles = self.check_handles.borrow(); + if i >= handles.len() { + break; + } + handles[i] + }; + i += 1; + // SAFETY: handle_ptr comes from check_handles; caller guarantees validity. + let handle = unsafe { &*handle_ptr }; + if handle.flags & UV_HANDLE_ACTIVE != 0 + && let Some(cb) = handle.cb + { + // SAFETY: Callback set by C caller via uv_check_start; handle_ptr is valid. + unsafe { cb(handle_ptr) }; + } + } + } + + /// ### Safety + /// All handle pointers in `closing_handles` must be valid. + pub(crate) unsafe fn run_close(&self) { + let mut closing = self.closing_handles.borrow_mut(); + let snapshot: Vec<_> = closing.drain(..).collect(); + drop(closing); + for (handle_ptr, cb) in snapshot { + if let Some(cb) = cb { + // SAFETY: handle_ptr is valid; cb was registered by C caller via uv_close. + unsafe { cb(handle_ptr) }; + } + } + } + + /// Poll all TCP handles for I/O readiness and fire callbacks. + /// + /// Uses direct polling via tokio's `poll_accept`/`try_read`/`try_write`. + /// No spawned tasks, no channels -- zero allocation in the hot path. + /// + /// Multiple passes: after callbacks fire they may produce new data + /// (e.g. HTTP2 frame processing triggers writes which complete + /// immediately). Re-poll up to 16 times to batch I/O within a + /// single event loop tick. + /// + /// # Safety + /// All TCP handle pointers in `tcp_handles` must be valid. + pub(crate) unsafe fn run_io(&self) -> bool { + let noop = Waker::noop(); + let waker_ref = self.waker.borrow(); + let waker = waker_ref.as_ref().unwrap_or(noop); + let mut cx = Context::from_waker(waker); + + let mut did_any_work = false; + + for _pass in 0..16 { + let mut any_work = false; + + let mut i = 0; + loop { + let tcp_ptr = { + let handles = self.tcp_handles.borrow(); + if i >= handles.len() { + break; + } + handles[i] + }; + i += 1; + // SAFETY: tcp_ptr comes from tcp_handles; caller guarantees validity. + let tcp = unsafe { &mut *tcp_ptr }; + if tcp.flags & UV_HANDLE_ACTIVE == 0 { + continue; + } + + // 1. Poll pending connect + if let Some(ref mut pending) = tcp.internal_connect + && let Poll::Ready(result) = pending.future.as_mut().poll(&mut cx) + { + let req = pending.req; + let cb = pending.cb; + let status = match result { + Ok(stream) => { + if tcp.internal_nodelay { + stream.set_nodelay(true).ok(); + } + tcp.internal_stream = Some(stream); + 0 + } + Err(_) => UV_ECONNREFUSED, + }; + tcp.internal_connect = None; + // SAFETY: req pointer was provided by the C caller and remains valid until callback. + unsafe { + (*req).handle = tcp_ptr as *mut uv_stream_t; + } + if let Some(cb) = cb { + // SAFETY: Callback and req pointer validated above; set by C caller via uv_tcp_connect. + unsafe { cb(req, status) }; + } + } + + // 2. Poll listener for new connections + if let Some(ref listener) = tcp.internal_listener + && tcp.internal_connection_cb.is_some() + { + while let Poll::Ready(Ok((stream, _))) = listener.poll_accept(&mut cx) + { + tcp.internal_backlog.push_back(stream); + any_work = true; + } + while !tcp.internal_backlog.is_empty() { + if let Some(cb) = tcp.internal_connection_cb { + // SAFETY: tcp_ptr is valid; cb set by C caller via uv_listen. + unsafe { cb(tcp_ptr as *mut uv_stream_t, 0) }; + } + // If uv_accept wasn't called in the callback, stop + // to avoid an infinite loop. + if !tcp.internal_backlog.is_empty() { + break; + } + } + } + + // 3. Poll readable stream + if tcp.internal_reading && tcp.internal_stream.is_some() { + let alloc_cb = tcp.internal_alloc_cb; + let read_cb = tcp.internal_read_cb; + if let (Some(alloc_cb), Some(read_cb)) = (alloc_cb, read_cb) { + // Register interest so tokio's reactor wakes us. + let _ = tcp + .internal_stream + .as_ref() + .unwrap() + .poll_read_ready(&mut cx); + + loop { + // Re-check after each callback: the callback may have + // called uv_close or uv_read_stop. + if !tcp.internal_reading || tcp.internal_stream.is_none() { + break; + } + let mut buf = uv_buf_t { + base: std::ptr::null_mut(), + len: 0, + }; + // SAFETY: alloc_cb set by C caller via uv_read_start; tcp_ptr is valid. + unsafe { + alloc_cb(tcp_ptr as *mut uv_handle_t, 65536, &mut buf); + } + if buf.base.is_null() || buf.len == 0 { + break; + } + // SAFETY: alloc_cb guarantees buf.base is valid for buf.len bytes. + let slice = unsafe { + std::slice::from_raw_parts_mut(buf.base.cast::(), buf.len) + }; + match tcp.internal_stream.as_ref().unwrap().try_read(slice) { + Ok(0) => { + // SAFETY: read_cb set by C caller via uv_read_start; tcp_ptr and buf are valid. + unsafe { + read_cb(tcp_ptr as *mut uv_stream_t, UV_EOF as isize, &buf) + }; + tcp.internal_reading = false; + break; + } + Ok(n) => { + any_work = true; + // SAFETY: read_cb set by C caller via uv_read_start; tcp_ptr and buf are valid. + unsafe { + read_cb(tcp_ptr as *mut uv_stream_t, n as isize, &buf) + }; + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + break; + } + Err(_) => { + // SAFETY: read_cb set by C caller via uv_read_start; tcp_ptr and buf are valid. + unsafe { + read_cb(tcp_ptr as *mut uv_stream_t, UV_EOF as isize, &buf) + }; + tcp.internal_reading = false; + break; + } + } + } + } + } + + // 4. Drain write queue in order + if !tcp.internal_write_queue.is_empty() && tcp.internal_stream.is_some() + { + let stream = tcp.internal_stream.as_ref().unwrap(); + let _ = stream.poll_write_ready(&mut cx); + + while let Some(pw) = tcp.internal_write_queue.front_mut() { + let mut done = false; + let mut error = false; + loop { + if pw.offset >= pw.data.len() { + done = true; + break; + } + match stream.try_write(&pw.data[pw.offset..]) { + Ok(n) => pw.offset += n, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + break; + } + Err(_) => { + error = true; + break; + } + } + } + if done { + let pw = tcp.internal_write_queue.pop_front().unwrap(); + if let Some(cb) = pw.cb { + // SAFETY: Write cb and req set by C caller via uv_write; req is valid until callback. + unsafe { cb(pw.req, 0) }; + } + } else if error { + let pw = tcp.internal_write_queue.pop_front().unwrap(); + if let Some(cb) = pw.cb { + // SAFETY: Write cb and req set by C caller via uv_write; req is valid until callback. + unsafe { cb(pw.req, UV_EPIPE) }; + } + } else { + break; // WouldBlock -- retry next tick + } + } + } + } // end per-handle loop + + if !any_work { + break; + } + did_any_work = true; + } // end multi-pass loop + + did_any_work + } + + /// ### Safety + /// `handle` must be a valid pointer to an initialized `uv_timer_t`. + unsafe fn stop_timer(&self, handle: *mut uv_timer_t) { + // SAFETY: Caller guarantees handle is valid and initialized. + let handle_ref = unsafe { &mut *handle }; + let id = handle_ref.internal_id; + if id != 0 { + let key = TimerKey { + deadline_ms: handle_ref.internal_deadline, + id, + }; + self.timers.borrow_mut().remove(&key); + self.timer_handles.borrow_mut().remove(&id); + } + handle_ref.flags &= !UV_HANDLE_ACTIVE; + } + + fn stop_idle(&self, handle: *mut uv_idle_t) { + self + .idle_handles + .borrow_mut() + .retain(|&h| !std::ptr::eq(h, handle)); + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + (*handle).flags &= !UV_HANDLE_ACTIVE; + } + } + + fn stop_prepare(&self, handle: *mut uv_prepare_t) { + self + .prepare_handles + .borrow_mut() + .retain(|&h| !std::ptr::eq(h, handle)); + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + (*handle).flags &= !UV_HANDLE_ACTIVE; + } + } + + fn stop_check(&self, handle: *mut uv_check_t) { + self + .check_handles + .borrow_mut() + .retain(|&h| !std::ptr::eq(h, handle)); + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + (*handle).flags &= !UV_HANDLE_ACTIVE; + } + } + + fn stop_tcp(&self, handle: *mut uv_tcp_t) { + self + .tcp_handles + .borrow_mut() + .retain(|&h| !std::ptr::eq(h, handle)); + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + let tcp = &mut *handle; + tcp.internal_reading = false; + tcp.internal_alloc_cb = None; + tcp.internal_read_cb = None; + tcp.internal_connection_cb = None; + tcp.internal_connect = None; + tcp.internal_write_queue.clear(); + tcp.internal_stream = None; + tcp.internal_listener = None; + tcp.internal_backlog.clear(); + tcp.flags &= !UV_HANDLE_ACTIVE; + } + } +} + +/// ### Safety +/// `loop_` must be a valid pointer to a `uv_loop_t` previously initialized by `uv_loop_init`. +#[inline] +unsafe fn get_inner(loop_: *mut uv_loop_t) -> &'static UvLoopInner { + // SAFETY: Caller guarantees loop_ is valid and was initialized by uv_loop_init. + unsafe { &*((*loop_).internal as *const UvLoopInner) } +} + +/// ### Safety +/// `loop_` must be a valid pointer to a `uv_loop_t` previously initialized by `uv_loop_init`. +pub unsafe fn uv_loop_get_inner_ptr( + loop_: *const uv_loop_t, +) -> *const std::ffi::c_void { + // SAFETY: Caller guarantees loop_ is valid and was initialized by uv_loop_init. + unsafe { (*loop_).internal as *const std::ffi::c_void } +} + +/// ### Safety +/// `loop_` must be a valid, non-null pointer to an uninitialized `uv_loop_t`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_loop_init(loop_: *mut uv_loop_t) -> c_int { + let inner = Box::new(UvLoopInner::new()); + // SAFETY: Caller guarantees loop_ is a valid, writable pointer. + unsafe { + (*loop_).internal = Box::into_raw(inner) as *mut c_void; + (*loop_).data = std::ptr::null_mut(); + (*loop_).stop_flag = Cell::new(false); + } + 0 +} + +/// ### Safety +/// `loop_` must be a valid pointer to a `uv_loop_t` initialized by `uv_loop_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_loop_close(loop_: *mut uv_loop_t) -> c_int { + // SAFETY: Caller guarantees loop_ was initialized by uv_loop_init. + unsafe { + let internal = (*loop_).internal; + if !internal.is_null() { + drop(Box::from_raw(internal as *mut UvLoopInner)); + (*loop_).internal = std::ptr::null_mut(); + } + } + 0 +} + +/// ### Safety +/// `loop_` must be a valid pointer to a `uv_loop_t` initialized by `uv_loop_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_now(loop_: *mut uv_loop_t) -> u64 { + // SAFETY: Caller guarantees loop_ was initialized by uv_loop_init. + let inner = unsafe { get_inner(loop_) }; + inner.now_ms() +} + +/// ### Safety +/// `_loop_` must be a valid pointer to a `uv_loop_t` initialized by `uv_loop_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_update_time(_loop_: *mut uv_loop_t) {} + +/// ### Safety +/// `loop_` must be initialized by `uv_loop_init`. `handle` must be a valid, writable pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_timer_init( + loop_: *mut uv_loop_t, + handle: *mut uv_timer_t, +) -> c_int { + // SAFETY: Caller guarantees both pointers are valid. + unsafe { + (*handle).r#type = uv_handle_type::UV_TIMER; + (*handle).loop_ = loop_; + (*handle).data = std::ptr::null_mut(); + (*handle).flags = UV_HANDLE_REF; + (*handle).internal_id = 0; + (*handle).internal_deadline = 0; + (*handle).cb = None; + (*handle).timeout = 0; + (*handle).repeat = 0; + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_timer_t` initialized by `uv_timer_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_timer_start( + handle: *mut uv_timer_t, + cb: uv_timer_cb, + timeout: u64, + repeat: u64, +) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_timer_init. + unsafe { + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + + if (*handle).flags & UV_HANDLE_ACTIVE != 0 { + inner.stop_timer(handle); + } + + let id = inner.alloc_timer_id(); + let deadline = inner.now_ms() + timeout; + + (*handle).cb = Some(cb); + (*handle).timeout = timeout; + (*handle).repeat = repeat; + (*handle).internal_id = id; + (*handle).internal_deadline = deadline; + (*handle).flags |= UV_HANDLE_ACTIVE; + + let key = TimerKey { + deadline_ms: deadline, + id, + }; + inner.timers.borrow_mut().insert(key); + inner.timer_handles.borrow_mut().insert(id, handle); + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_timer_t` initialized by `uv_timer_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_timer_stop(handle: *mut uv_timer_t) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_timer_init. + unsafe { + let loop_ = (*handle).loop_; + if loop_.is_null() || (*loop_).internal.is_null() { + (*handle).flags &= !UV_HANDLE_ACTIVE; + return 0; + } + let inner = get_inner(loop_); + inner.stop_timer(handle); + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_timer_t` initialized by `uv_timer_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_timer_again(handle: *mut uv_timer_t) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_timer_init. + unsafe { + let repeat = (*handle).repeat; + if repeat == 0 { + return UV_EINVAL; + } + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + + inner.stop_timer(handle); + + let id = inner.alloc_timer_id(); + let deadline = inner.now_ms() + repeat; + + (*handle).internal_id = id; + (*handle).internal_deadline = deadline; + (*handle).flags |= UV_HANDLE_ACTIVE; + + let key = TimerKey { + deadline_ms: deadline, + id, + }; + inner.timers.borrow_mut().insert(key); + inner.timer_handles.borrow_mut().insert(id, handle); + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_timer_t` initialized by `uv_timer_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_timer_get_repeat(handle: *const uv_timer_t) -> u64 { + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { (*handle).repeat } +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_timer_t` initialized by `uv_timer_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_timer_set_repeat( + handle: *mut uv_timer_t, + repeat: u64, +) { + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + (*handle).repeat = repeat; + } +} + +/// ### Safety +/// `loop_` must be initialized by `uv_loop_init`. `handle` must be a valid, writable pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_idle_init( + loop_: *mut uv_loop_t, + handle: *mut uv_idle_t, +) -> c_int { + // SAFETY: Caller guarantees both pointers are valid. + unsafe { + (*handle).r#type = uv_handle_type::UV_IDLE; + (*handle).loop_ = loop_; + (*handle).data = std::ptr::null_mut(); + (*handle).flags = UV_HANDLE_REF; + (*handle).cb = None; + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_idle_t` initialized by `uv_idle_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_idle_start( + handle: *mut uv_idle_t, + cb: uv_idle_cb, +) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_idle_init. + unsafe { + if (*handle).flags & UV_HANDLE_ACTIVE != 0 { + (*handle).cb = Some(cb); + return 0; + } + (*handle).cb = Some(cb); + (*handle).flags |= UV_HANDLE_ACTIVE; + + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + inner.idle_handles.borrow_mut().push(handle); + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_idle_t` initialized by `uv_idle_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_idle_stop(handle: *mut uv_idle_t) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_idle_init. + unsafe { + if (*handle).flags & UV_HANDLE_ACTIVE == 0 { + return 0; + } + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + inner.stop_idle(handle); + (*handle).cb = None; + } + 0 +} + +/// ### Safety +/// `loop_` must be initialized by `uv_loop_init`. `handle` must be a valid, writable pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_prepare_init( + loop_: *mut uv_loop_t, + handle: *mut uv_prepare_t, +) -> c_int { + // SAFETY: Caller guarantees both pointers are valid. + unsafe { + (*handle).r#type = uv_handle_type::UV_PREPARE; + (*handle).loop_ = loop_; + (*handle).data = std::ptr::null_mut(); + (*handle).flags = UV_HANDLE_REF; + (*handle).cb = None; + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_prepare_t` initialized by `uv_prepare_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_prepare_start( + handle: *mut uv_prepare_t, + cb: uv_prepare_cb, +) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_prepare_init. + unsafe { + if (*handle).flags & UV_HANDLE_ACTIVE != 0 { + (*handle).cb = Some(cb); + return 0; + } + (*handle).cb = Some(cb); + (*handle).flags |= UV_HANDLE_ACTIVE; + + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + inner.prepare_handles.borrow_mut().push(handle); + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_prepare_t` initialized by `uv_prepare_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_prepare_stop(handle: *mut uv_prepare_t) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_prepare_init. + unsafe { + if (*handle).flags & UV_HANDLE_ACTIVE == 0 { + return 0; + } + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + inner.stop_prepare(handle); + (*handle).cb = None; + } + 0 +} + +/// ### Safety +/// `loop_` must be initialized by `uv_loop_init`. `handle` must be a valid, writable pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_check_init( + loop_: *mut uv_loop_t, + handle: *mut uv_check_t, +) -> c_int { + // SAFETY: Caller guarantees both pointers are valid. + unsafe { + (*handle).r#type = uv_handle_type::UV_CHECK; + (*handle).loop_ = loop_; + (*handle).data = std::ptr::null_mut(); + (*handle).flags = UV_HANDLE_REF; + (*handle).cb = None; + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_check_t` initialized by `uv_check_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_check_start( + handle: *mut uv_check_t, + cb: uv_check_cb, +) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_check_init. + unsafe { + if (*handle).flags & UV_HANDLE_ACTIVE != 0 { + (*handle).cb = Some(cb); + return 0; + } + (*handle).cb = Some(cb); + (*handle).flags |= UV_HANDLE_ACTIVE; + + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + inner.check_handles.borrow_mut().push(handle); + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to a `uv_check_t` initialized by `uv_check_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_check_stop(handle: *mut uv_check_t) -> c_int { + // SAFETY: Caller guarantees handle was initialized by uv_check_init. + unsafe { + if (*handle).flags & UV_HANDLE_ACTIVE == 0 { + return 0; + } + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + inner.stop_check(handle); + (*handle).cb = None; + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to any uv handle type (timer, idle, tcp, etc.) initialized +/// by the corresponding `uv_*_init` function. Must not be called twice on the same handle. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_close( + handle: *mut uv_handle_t, + close_cb: Option, +) { + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + (*handle).flags |= UV_HANDLE_CLOSING; + (*handle).flags &= !UV_HANDLE_ACTIVE; + + let loop_ = (*handle).loop_; + let inner = get_inner(loop_); + + match (*handle).r#type { + uv_handle_type::UV_TIMER => { + inner.stop_timer(handle as *mut uv_timer_t); + } + uv_handle_type::UV_IDLE => { + inner.stop_idle(handle as *mut uv_idle_t); + } + uv_handle_type::UV_PREPARE => { + inner.stop_prepare(handle as *mut uv_prepare_t); + } + uv_handle_type::UV_CHECK => { + inner.stop_check(handle as *mut uv_check_t); + } + uv_handle_type::UV_TCP => { + inner.stop_tcp(handle as *mut uv_tcp_t); + } + _ => {} + } + + inner + .closing_handles + .borrow_mut() + .push_back((handle, close_cb)); + } +} + +/// ### Safety +/// `handle` must be a valid pointer to an initialized uv handle. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_ref(handle: *mut uv_handle_t) { + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + (*handle).flags |= UV_HANDLE_REF; + } +} + +/// ### Safety +/// `handle` must be a valid pointer to an initialized uv handle. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_unref(handle: *mut uv_handle_t) { + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + (*handle).flags &= !UV_HANDLE_REF; + } +} + +/// ### Safety +/// `handle` must be a valid pointer to an initialized uv handle. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_is_active(handle: *const uv_handle_t) -> c_int { + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + if (*handle).flags & UV_HANDLE_ACTIVE != 0 { + 1 + } else { + 0 + } + } +} + +/// ### Safety +/// `handle` must be a valid pointer to an initialized uv handle. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_is_closing(handle: *const uv_handle_t) -> c_int { + // SAFETY: Caller guarantees handle is valid and initialized. + unsafe { + if (*handle).flags & UV_HANDLE_CLOSING != 0 { + 1 + } else { + 0 + } + } +} + +/// ### Safety +/// `addr` must point to a valid `sockaddr_in` or `sockaddr_in6` with correct `sa_family`. +unsafe fn sockaddr_to_std(addr: *const c_void) -> Option { + let sa = addr as *const libc::sockaddr; + // SAFETY: Caller guarantees addr points to a valid sockaddr. + let family = unsafe { (*sa).sa_family as i32 }; + if family == AF_INET { + // SAFETY: Family is AF_INET so addr is a valid sockaddr_in. + let sin = unsafe { &*(addr as *const sockaddr_in) }; + let ip = std::net::Ipv4Addr::from(u32::from_be(sin.sin_addr.s_addr)); + let port = u16::from_be(sin.sin_port); + Some(SocketAddr::from((ip, port))) + } else if family == AF_INET6 { + // SAFETY: Family is AF_INET6 so addr is a valid sockaddr_in6. + let sin6 = unsafe { &*(addr as *const sockaddr_in6) }; + let ip = std::net::Ipv6Addr::from(sin6.sin6_addr.s6_addr); + let port = u16::from_be(sin6.sin6_port); + Some(SocketAddr::from((ip, port))) + } else { + None + } +} + +/// ### Safety +/// `out` must be writable and large enough for `sockaddr_in` or `sockaddr_in6`. +/// `len` must be a valid, writable pointer. +unsafe fn std_to_sockaddr(addr: SocketAddr, out: *mut c_void, len: *mut c_int) { + match addr { + SocketAddr::V4(v4) => { + let sin = out as *mut sockaddr_in; + // SAFETY: Caller guarantees out is large enough for sockaddr_in. + unsafe { + std::ptr::write_bytes(sin, 0, 1); + #[cfg(any(target_os = "macos", target_os = "freebsd"))] + { + (*sin).sin_len = std::mem::size_of::() as u8; + } + (*sin).sin_family = AF_INET as sa_family_t; + (*sin).sin_port = v4.port().to_be(); + (*sin).sin_addr.s_addr = u32::from(*v4.ip()).to_be(); + *len = std::mem::size_of::() as c_int; + } + } + SocketAddr::V6(v6) => { + let sin6 = out as *mut sockaddr_in6; + // SAFETY: Caller guarantees out is large enough for sockaddr_in6. + unsafe { + std::ptr::write_bytes(sin6, 0, 1); + #[cfg(any(target_os = "macos", target_os = "freebsd"))] + { + (*sin6).sin6_len = std::mem::size_of::() as u8; + } + (*sin6).sin6_family = AF_INET6 as sa_family_t; + (*sin6).sin6_port = v6.port().to_be(); + (*sin6).sin6_addr.s6_addr = v6.ip().octets(); + (*sin6).sin6_scope_id = v6.scope_id(); + *len = std::mem::size_of::() as c_int; + } + } + } +} + +/// ### Safety +/// `loop_` must be initialized by `uv_loop_init`. `tcp` must be a valid, writable pointer. +pub unsafe fn uv_tcp_init(loop_: *mut uv_loop_t, tcp: *mut uv_tcp_t) -> c_int { + // SAFETY: Caller guarantees both pointers are valid. + unsafe { + use std::ptr::{addr_of_mut, write}; + write(addr_of_mut!((*tcp).r#type), uv_handle_type::UV_TCP); + write(addr_of_mut!((*tcp).loop_), loop_); + write(addr_of_mut!((*tcp).data), std::ptr::null_mut()); + write(addr_of_mut!((*tcp).flags), UV_HANDLE_REF); + write(addr_of_mut!((*tcp).internal_fd), None); + write(addr_of_mut!((*tcp).internal_bind_addr), None); + write(addr_of_mut!((*tcp).internal_stream), None); + write(addr_of_mut!((*tcp).internal_listener), None); + write(addr_of_mut!((*tcp).internal_listener_addr), None); + write(addr_of_mut!((*tcp).internal_nodelay), false); + write(addr_of_mut!((*tcp).internal_alloc_cb), None); + write(addr_of_mut!((*tcp).internal_read_cb), None); + write(addr_of_mut!((*tcp).internal_reading), false); + write(addr_of_mut!((*tcp).internal_connect), None); + write(addr_of_mut!((*tcp).internal_write_queue), VecDeque::new()); + write(addr_of_mut!((*tcp).internal_connection_cb), None); + write(addr_of_mut!((*tcp).internal_backlog), VecDeque::new()); + } + 0 +} + +/// ### Safety +/// `tcp` must be a valid pointer to a `uv_tcp_t` initialized by `uv_tcp_init`. +/// `fd` must be a valid, open file descriptor / socket. +pub unsafe fn uv_tcp_open(tcp: *mut uv_tcp_t, fd: c_int) -> c_int { + // SAFETY: Caller guarantees tcp is initialized and fd is valid. + unsafe { + #[cfg(unix)] + let std_stream = { + use std::os::unix::io::FromRawFd; + let s = std::net::TcpStream::from_raw_fd(fd); + (*tcp).internal_fd = Some(fd); + s + }; + #[cfg(windows)] + let std_stream = { + use std::os::windows::io::FromRawSocket; + let sock = fd as std::os::windows::io::RawSocket; + let s = std::net::TcpStream::from_raw_socket(sock); + (*tcp).internal_fd = Some(sock); + s + }; + std_stream.set_nonblocking(true).ok(); + match tokio::net::TcpStream::from_std(std_stream) { + Ok(stream) => { + if (*tcp).internal_nodelay { + stream.set_nodelay(true).ok(); + } + (*tcp).internal_stream = Some(stream); + 0 + } + Err(_) => UV_EINVAL, + } + } +} + +/// ### Safety +/// `tcp` must be initialized by `uv_tcp_init`. `addr` must point to a valid sockaddr. +pub unsafe fn uv_tcp_bind( + tcp: *mut uv_tcp_t, + addr: *const c_void, + _addrlen: u32, + _flags: u32, +) -> c_int { + // SAFETY: Caller guarantees addr points to a valid sockaddr. + let sock_addr = unsafe { sockaddr_to_std(addr) }; + match sock_addr { + Some(sa) => { + // SAFETY: Caller guarantees tcp is valid and initialized. + unsafe { (*tcp).internal_bind_addr = Some(sa) }; + 0 + } + None => UV_EINVAL, + } +} + +/// ### Safety +/// `req` must be a valid, writable pointer. `tcp` must be initialized by `uv_tcp_init`. +/// `addr` must point to a valid sockaddr. `req` must remain valid until the connect callback fires. +pub unsafe fn uv_tcp_connect( + req: *mut uv_connect_t, + tcp: *mut uv_tcp_t, + addr: *const c_void, + cb: Option, +) -> c_int { + // SAFETY: Caller guarantees addr points to a valid sockaddr. + let sock_addr = unsafe { sockaddr_to_std(addr) }; + let sock_addr = match sock_addr { + Some(sa) => sa, + None => return UV_EINVAL, + }; + + // SAFETY: Caller guarantees req and tcp are valid. + unsafe { + (*req).handle = tcp as *mut uv_stream_t; + } + + // SAFETY: tcp was initialized by uv_tcp_init which set loop_. + let inner = unsafe { get_inner((*tcp).loop_) }; + + // SAFETY: Caller guarantees tcp is valid and initialized. + unsafe { + (*tcp).flags |= UV_HANDLE_ACTIVE; + let mut handles = inner.tcp_handles.borrow_mut(); + if !handles.iter().any(|&h| std::ptr::eq(h, tcp)) { + handles.push(tcp); + } + + (*tcp).internal_connect = Some(ConnectPending { + future: Box::pin(tokio::net::TcpStream::connect(sock_addr)), + req, + cb, + }); + } + + 0 +} + +/// ### Safety +/// `tcp` must be a valid pointer to a `uv_tcp_t` initialized by `uv_tcp_init`. +pub unsafe fn uv_tcp_nodelay(tcp: *mut uv_tcp_t, enable: c_int) -> c_int { + // SAFETY: Caller guarantees tcp is valid and initialized. + unsafe { + let enabled = enable != 0; + (*tcp).internal_nodelay = enabled; + if let Some(ref stream) = (*tcp).internal_stream + && stream.set_nodelay(enabled).is_err() + { + return UV_EINVAL; + } + } + 0 +} + +/// ### Safety +/// `tcp` must be initialized by `uv_tcp_init`. `name` must be writable and large enough +/// for a sockaddr. `namelen` must be a valid, writable pointer. +pub unsafe fn uv_tcp_getpeername( + tcp: *const uv_tcp_t, + name: *mut c_void, + namelen: *mut c_int, +) -> c_int { + // SAFETY: Caller guarantees all pointers are valid. + unsafe { + if let Some(ref stream) = (*tcp).internal_stream { + match stream.peer_addr() { + Ok(addr) => { + std_to_sockaddr(addr, name, namelen); + 0 + } + Err(_) => UV_ENOTCONN, + } + } else { + UV_ENOTCONN + } + } +} + +/// ### Safety +/// `tcp` must be initialized by `uv_tcp_init`. `name` must be writable and large enough +/// for a sockaddr. `namelen` must be a valid, writable pointer. +pub unsafe fn uv_tcp_getsockname( + tcp: *const uv_tcp_t, + name: *mut c_void, + namelen: *mut c_int, +) -> c_int { + // SAFETY: Caller guarantees all pointers are valid. + unsafe { + if let Some(ref stream) = (*tcp).internal_stream { + match stream.local_addr() { + Ok(addr) => { + std_to_sockaddr(addr, name, namelen); + return 0; + } + Err(_) => return UV_EINVAL, + } + } + if let Some(addr) = (*tcp).internal_listener_addr { + std_to_sockaddr(addr, name, namelen); + return 0; + } + if let Some(addr) = (*tcp).internal_bind_addr { + std_to_sockaddr(addr, name, namelen); + return 0; + } + UV_EINVAL + } +} + +/// ### Safety +/// `_tcp` must be a valid pointer to a `uv_tcp_t` initialized by `uv_tcp_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_tcp_keepalive( + _tcp: *mut uv_tcp_t, + _enable: c_int, + _delay: c_uint, +) -> c_int { + // Keepalive is a no-op: tokio's TcpStream doesn't expose SO_KEEPALIVE + // configuration in a cross-platform way, and nghttp2 only uses this + // as a best-effort hint. + 0 +} + +/// ### Safety +/// `_tcp` must be a valid pointer to a `uv_tcp_t` initialized by `uv_tcp_init`. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_tcp_simultaneous_accepts( + _tcp: *mut uv_tcp_t, + _enable: c_int, +) -> c_int { + 0 // no-op +} + +/// ### Safety +/// `ip` must be a valid, null-terminated C string. `addr` must be a valid, writable pointer. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn uv_ip4_addr( + ip: *const c_char, + port: c_int, + addr: *mut sockaddr_in, +) -> c_int { + // SAFETY: Caller guarantees ip is a valid C string and addr is writable. + unsafe { + let c_str = std::ffi::CStr::from_ptr(ip); + let Ok(s) = c_str.to_str() else { + return UV_EINVAL; + }; + let Ok(ip_addr) = s.parse::() else { + return UV_EINVAL; + }; + std::ptr::write_bytes(addr, 0, 1); + #[cfg(any(target_os = "macos", target_os = "freebsd"))] + { + (*addr).sin_len = std::mem::size_of::() as u8; + } + (*addr).sin_family = AF_INET as sa_family_t; + (*addr).sin_port = (port as u16).to_be(); + (*addr).sin_addr.s_addr = u32::from(ip_addr).to_be(); + 0 + } +} + +/// ### Safety +/// `stream` must be a valid pointer to a `uv_tcp_t` (cast as `uv_stream_t`) initialized +/// by `uv_tcp_init`, with a bind address set via `uv_tcp_bind`. +pub unsafe fn uv_listen( + stream: *mut uv_stream_t, + _backlog: c_int, + cb: Option, +) -> c_int { + // SAFETY: Caller guarantees stream is a valid, initialized uv_tcp_t. + unsafe { + let tcp = stream as *mut uv_tcp_t; + let tcp_ref = &mut *tcp; + + let bind_addr = tcp_ref + .internal_bind_addr + .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()); + + let std_listener = match std::net::TcpListener::bind(bind_addr) { + Ok(l) => l, + Err(_) => return UV_EADDRINUSE, + }; + std_listener.set_nonblocking(true).ok(); + let listener_addr = std_listener.local_addr().ok(); + let tokio_listener = match tokio::net::TcpListener::from_std(std_listener) { + Ok(l) => l, + Err(_) => return UV_EINVAL, + }; + + tcp_ref.internal_listener = Some(tokio_listener); + tcp_ref.internal_listener_addr = listener_addr; + tcp_ref.internal_connection_cb = cb; + tcp_ref.flags |= UV_HANDLE_ACTIVE; + + let inner = get_inner(tcp_ref.loop_); + let mut handles = inner.tcp_handles.borrow_mut(); + if !handles.iter().any(|&h| std::ptr::eq(h, tcp)) { + handles.push(tcp); + } + } + 0 +} + +/// ### Safety +/// `server` must be a listening `uv_tcp_t`. `client` must be initialized by `uv_tcp_init`. +pub unsafe fn uv_accept( + server: *mut uv_stream_t, + client: *mut uv_stream_t, +) -> c_int { + // SAFETY: Caller guarantees both pointers are valid, initialized uv_tcp_t handles. + unsafe { + let server_tcp = &mut *(server as *mut uv_tcp_t); + let client_tcp = &mut *(client as *mut uv_tcp_t); + + match server_tcp.internal_backlog.pop_front() { + Some(stream) => { + if client_tcp.internal_nodelay { + stream.set_nodelay(true).ok(); + } + client_tcp.internal_stream = Some(stream); + 0 + } + None => UV_EAGAIN, + } + } +} + +/// ### Safety +/// `stream` must be a valid pointer to an initialized `uv_tcp_t` (cast as `uv_stream_t`). +pub unsafe fn uv_read_start( + stream: *mut uv_stream_t, + alloc_cb: Option, + read_cb: Option, +) -> c_int { + // SAFETY: Caller guarantees stream is a valid, initialized uv_tcp_t. + unsafe { + let tcp = stream as *mut uv_tcp_t; + let tcp_ref = &mut *tcp; + tcp_ref.internal_alloc_cb = alloc_cb; + tcp_ref.internal_read_cb = read_cb; + tcp_ref.internal_reading = true; + tcp_ref.flags |= UV_HANDLE_ACTIVE; + + let inner = get_inner(tcp_ref.loop_); + let mut handles = inner.tcp_handles.borrow_mut(); + if !handles.iter().any(|&h| std::ptr::eq(h, tcp)) { + handles.push(tcp); + } + } + 0 +} + +/// ### Safety +/// `stream` must be a valid pointer to an initialized `uv_tcp_t` (cast as `uv_stream_t`). +pub unsafe fn uv_read_stop(stream: *mut uv_stream_t) -> c_int { + // SAFETY: Caller guarantees stream is a valid, initialized uv_tcp_t. + unsafe { + let tcp = stream as *mut uv_tcp_t; + let tcp_ref = &mut *tcp; + tcp_ref.internal_reading = false; + tcp_ref.internal_alloc_cb = None; + tcp_ref.internal_read_cb = None; + if tcp_ref.internal_connection_cb.is_none() + && tcp_ref.internal_connect.is_none() + && tcp_ref.internal_write_queue.is_empty() + { + tcp_ref.flags &= !UV_HANDLE_ACTIVE; + } + } + 0 +} + +/// ### Safety +/// `handle` must be a valid pointer to an initialized `uv_tcp_t` (cast as `uv_stream_t`). +pub unsafe fn uv_try_write(handle: *mut uv_stream_t, data: &[u8]) -> i32 { + // SAFETY: Caller guarantees handle is a valid, initialized uv_tcp_t. + let tcp_ref = unsafe { &mut *(handle as *mut uv_tcp_t) }; + + if !tcp_ref.internal_write_queue.is_empty() { + return UV_EAGAIN; + } + + let stream = match tcp_ref.internal_stream.as_ref() { + Some(s) => s, + None => return UV_EBADF, + }; + + match stream.try_write(data) { + Ok(n) => n as i32, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => UV_EAGAIN, + Err(_) => UV_EPIPE, + } +} + +/// ### Safety +/// `req` must be valid and remain so until the write callback fires. `handle` must be an +/// initialized `uv_tcp_t`. `bufs` must point to `nbufs` valid `uv_buf_t` entries. +pub unsafe fn uv_write( + req: *mut uv_write_t, + handle: *mut uv_stream_t, + bufs: *const uv_buf_t, + nbufs: u32, + cb: Option, +) -> c_int { + // SAFETY: Caller guarantees all pointers are valid. + unsafe { + let tcp = handle as *mut uv_tcp_t; + let tcp_ref = &mut *tcp; + (*req).handle = handle; + + let stream = match tcp_ref.internal_stream.as_ref() { + Some(s) => s, + None => { + if let Some(cb) = cb { + cb(req, UV_ENOTCONN); + } + return 0; + } + }; + + if !tcp_ref.internal_write_queue.is_empty() { + let write_data = collect_bufs(bufs, nbufs); + tcp_ref.internal_write_queue.push_back(WritePending { + req, + data: write_data, + offset: 0, + cb, + }); + return 0; + } + + if nbufs == 1 { + let buf = &*bufs; + if !buf.base.is_null() && buf.len > 0 { + let data = std::slice::from_raw_parts(buf.base as *const u8, buf.len); + let mut offset = 0; + loop { + match stream.try_write(&data[offset..]) { + Ok(n) => { + offset += n; + if offset >= data.len() { + if let Some(cb) = cb { + cb(req, 0); + } + return 0; + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + tcp_ref.internal_write_queue.push_back(WritePending { + req, + data: data[offset..].to_vec(), + offset: 0, + cb, + }); + return 0; + } + Err(_) => { + if let Some(cb) = cb { + cb(req, UV_EPIPE); + } + return 0; + } + } + } + } + if let Some(cb) = cb { + cb(req, 0); + } + return 0; + } + + let iovecs: smallvec::SmallVec<[std::io::IoSlice<'_>; 8]> = (0..nbufs + as usize) + .filter_map(|i| { + let buf = &*bufs.add(i); + if buf.base.is_null() || buf.len == 0 { + None + } else { + Some(std::io::IoSlice::new(std::slice::from_raw_parts( + buf.base as *const u8, + buf.len, + ))) + } + }) + .collect(); + + let total_len: usize = iovecs.iter().map(|s| s.len()).sum(); + if total_len == 0 { + if let Some(cb) = cb { + cb(req, 0); + } + return 0; + } + + match stream.try_write_vectored(&iovecs) { + Ok(n) if n >= total_len => { + if let Some(cb) = cb { + cb(req, 0); + } + return 0; + } + Ok(n) => { + let mut write_data = Vec::with_capacity(total_len - n); + let mut skip = n; + for iov in &iovecs { + if skip >= iov.len() { + skip -= iov.len(); + } else { + write_data.extend_from_slice(&iov[skip..]); + skip = 0; + } + } + tcp_ref.internal_write_queue.push_back(WritePending { + req, + data: write_data, + offset: 0, + cb, + }); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + let write_data = collect_bufs(bufs, nbufs); + tcp_ref.internal_write_queue.push_back(WritePending { + req, + data: write_data, + offset: 0, + cb, + }); + } + Err(_) => { + if let Some(cb) = cb { + cb(req, UV_EPIPE); + } + } + } + } + 0 +} + +/// ### Safety +/// `bufs` must point to `nbufs` valid `uv_buf_t` entries with valid `base` pointers. +unsafe fn collect_bufs(bufs: *const uv_buf_t, nbufs: u32) -> Vec { + // SAFETY: Caller guarantees bufs points to nbufs valid entries. + unsafe { + let mut total = 0usize; + for i in 0..nbufs as usize { + let buf = &*bufs.add(i); + if !buf.base.is_null() { + total += buf.len; + } + } + let mut data = Vec::with_capacity(total); + for i in 0..nbufs as usize { + let buf = &*bufs.add(i); + if !buf.base.is_null() && buf.len > 0 { + data.extend_from_slice(std::slice::from_raw_parts( + buf.base as *const u8, + buf.len, + )); + } + } + data + } +} + +/// ### Safety +/// `req` must be a valid, writable pointer. `stream` must be an initialized `uv_tcp_t`. +/// `req` must remain valid until the shutdown callback fires. +pub unsafe fn uv_shutdown( + req: *mut uv_shutdown_t, + stream: *mut uv_stream_t, + cb: Option, +) -> c_int { + // SAFETY: Caller guarantees all pointers are valid. + unsafe { + let tcp = stream as *mut uv_tcp_t; + (*req).handle = stream; + + let status = if let Some(ref stream) = (*tcp).internal_stream { + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + let fd = stream.as_raw_fd(); + if libc::shutdown(fd, libc::SHUT_WR) == 0 { + 0 + } else { + UV_ENOTCONN + } + } + #[cfg(windows)] + { + use std::os::windows::io::AsRawSocket; + let sock = stream.as_raw_socket(); + if win_sock::shutdown(sock as usize, win_sock::SD_SEND) == 0 { + 0 + } else { + UV_ENOTCONN + } + } + } else { + UV_ENOTCONN + }; + + if let Some(cb) = cb { + cb(req, status); + } + } + 0 +} + +pub fn new_tcp() -> UvTcp { + uv_tcp_t { + r#type: uv_handle_type::UV_TCP, + loop_: std::ptr::null_mut(), + data: std::ptr::null_mut(), + flags: 0, + internal_fd: None, + internal_bind_addr: None, + internal_stream: None, + internal_listener: None, + internal_listener_addr: None, + internal_nodelay: false, + internal_alloc_cb: None, + internal_read_cb: None, + internal_reading: false, + internal_connect: None, + internal_write_queue: VecDeque::new(), + internal_connection_cb: None, + internal_backlog: VecDeque::new(), + } +} + +pub fn new_write() -> UvWrite { + uv_write_t { + r#type: 0, + data: std::ptr::null_mut(), + handle: std::ptr::null_mut(), + } +} + +pub fn new_connect() -> UvConnect { + uv_connect_t { + r#type: 0, + data: std::ptr::null_mut(), + handle: std::ptr::null_mut(), + } +} + +pub fn new_shutdown() -> UvShutdown { + uv_shutdown_t { + r#type: 0, + data: std::ptr::null_mut(), + handle: std::ptr::null_mut(), + } +} diff --git a/core/web_timeout.rs b/core/web_timeout.rs index 3d7faf85e..95db4be18 100644 --- a/core/web_timeout.rs +++ b/core/web_timeout.rs @@ -1,5 +1,8 @@ // Copyright 2018-2025 the Deno authors. MIT license. +use crate::reactor::Reactor; +use crate::reactor::ReactorInstant; +use crate::reactor::ReactorTimer; use cooked_waker::IntoWaker; use cooked_waker::ViaRawPointer; use cooked_waker::Wake; @@ -11,31 +14,29 @@ use std::cell::UnsafeCell; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::collections::btree_set; -use std::future::Future; use std::mem::MaybeUninit; use std::num::NonZeroU64; +use std::ops::Deref; use std::pin::Pin; use std::task::Context; use std::task::Poll; use std::task::Waker; use std::task::ready; use std::time::Duration; -use tokio::time::Instant; -use tokio::time::Sleep; pub(crate) type WebTimerId = u64; /// The minimum number of tombstones required to trigger compaction const COMPACTION_MINIMUM: usize = 16; -#[derive(PartialEq, Eq, PartialOrd, Ord)] -enum TimerType { +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) enum TimerType { Repeat(NonZeroU64), Once, } #[derive(PartialEq, Eq, PartialOrd, Ord)] -struct TimerKey(Instant, u64, TimerType, bool); +struct TimerKey(I, u64, TimerType, bool); struct TimerData { data: T, @@ -76,23 +77,31 @@ struct TimerData { /// https://github.com/denoland/deno/pull/12862 -- Refactor timers to use one async op per timer /// /// https://github.com/denoland/deno/issues/11398 -- Spurious assertion error when the callback to setInterval lasts longer than the interval -pub(crate) struct WebTimers { +pub(crate) struct WebTimers { + reactor: R, next_id: Cell, - timers: RefCell>, + timers: RefCell>>, /// We choose a `BTreeMap` over `HashMap` because of memory performance. data_map: RefCell>>, /// How many unref'd timers exist? unrefd_count: Cell, - /// A boxed MutableSleep that will allow us to change the Tokio sleep timeout as needed. - /// Because this is boxed, we can "safely" unsafely poll it. - sleep: Box, + /// A heap-allocated MutableSleep. Stored as a raw pointer (not Box) to avoid + /// Box's Unique retag conflicting with the self-referential waker pointer. + sleep: OwnedPtr>, /// The high-res timer lock. No-op on platforms other than Windows. high_res_timer_lock: HighResTimerLock, } -impl Default for WebTimers { +impl Default for WebTimers { fn default() -> Self { + Self::new(R::default()) + } +} + +impl WebTimers { + pub fn new(reactor: R) -> Self { Self { + reactor, next_id: Default::default(), timers: Default::default(), data_map: Default::default(), @@ -101,22 +110,22 @@ impl Default for WebTimers { high_res_timer_lock: Default::default(), } } -} -impl WebTimers { #[allow(unused)] pub fn has_pending(&self) -> bool { !self.timers.borrow().is_empty() } } -pub(crate) struct WebTimersIterator<'a, T> { +pub(crate) struct WebTimersIterator<'a, T, I: ReactorInstant> { data: Ref<'a, BTreeMap>>, - timers: Ref<'a, BTreeSet>, + timers: Ref<'a, BTreeSet>>, } -impl<'a, T> IntoIterator for &'a WebTimersIterator<'a, T> { - type IntoIter = WebTimersIteratorImpl<'a, T>; +impl<'a, T, I: ReactorInstant> IntoIterator + for &'a WebTimersIterator<'a, T, I> +{ + type IntoIter = WebTimersIteratorImpl<'a, T, I>; type Item = (u64, bool, bool); fn into_iter(self) -> Self::IntoIter { @@ -127,12 +136,12 @@ impl<'a, T> IntoIterator for &'a WebTimersIterator<'a, T> { } } -pub(crate) struct WebTimersIteratorImpl<'a, T> { +pub(crate) struct WebTimersIteratorImpl<'a, T, I: ReactorInstant> { data: &'a BTreeMap>, - timers: btree_set::Iter<'a, TimerKey>, + timers: btree_set::Iter<'a, TimerKey>, } -impl Iterator for WebTimersIteratorImpl<'_, T> { +impl Iterator for WebTimersIteratorImpl<'_, T, I> { type Item = (u64, bool, bool); fn next(&mut self) -> Option { loop { @@ -144,31 +153,65 @@ impl Iterator for WebTimersIteratorImpl<'_, T> { } } -struct MutableSleep { - sleep: UnsafeCell>, +/// Like a Box but without Uniqueness semantics, which +/// cause issues with self-referential waker pointers under Stacked Borrows. +#[repr(transparent)] +struct OwnedPtr { + ptr: *mut T, +} + +impl OwnedPtr { + fn from_box(b: Box) -> Self { + Self { + ptr: Box::into_raw(b), + } + } +} + +impl Deref for OwnedPtr { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.ptr } + } +} + +impl std::ops::DerefMut for OwnedPtr { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.ptr } + } +} + +impl Drop for OwnedPtr { + fn drop(&mut self) { + unsafe { + let _ = Box::from_raw(self.ptr); + } + } +} + +struct MutableSleep { + sleep: UnsafeCell>, ready: Cell, external_waker: UnsafeCell>, internal_waker: Waker, } -#[allow(clippy::borrowed_box)] -impl MutableSleep { - fn new() -> Box { - let mut new = Box::new(MaybeUninit::::uninit()); - - new.write(MutableSleep { - sleep: Default::default(), - ready: Cell::default(), - external_waker: UnsafeCell::default(), - internal_waker: MutableSleepWaker { - inner: new.as_ptr(), - } - .into_waker(), - }); - unsafe { std::mem::transmute(new) } +impl MutableSleep { + fn new() -> OwnedPtr { + unsafe { + let mut ptr = OwnedPtr::from_box(Box::new(MaybeUninit::::uninit())); + let raw = ptr.as_ptr(); + ptr.write(MutableSleep { + sleep: Default::default(), + ready: Default::default(), + external_waker: Default::default(), + internal_waker: MutableSleepWaker:: { inner: raw }.into_waker(), + }); + std::mem::transmute(ptr) + } } - fn poll_ready(self: &Box, cx: &mut Context) -> Poll<()> { + fn poll_ready(&self, cx: &mut Context) -> Poll<()> { if self.ready.take() { Poll::Ready(()) } else { @@ -181,14 +224,14 @@ impl MutableSleep { external.clone_from(waker); } - // We do a manual deadline check here. Tokio's timer wheel may not immediately check the deadline if the + // We do a manual deadline check here. The timer wheel may not immediately check the deadline if the // executor was blocked. // Skip this check under Miri as it interferes with time simulation. #[cfg(not(miri))] { let sleep = unsafe { self.sleep.get().as_mut().unwrap_unchecked() }; if let Some(sleep) = sleep - && Instant::now() >= sleep.deadline() + && Tmr::Instant::now() >= sleep.deadline() { return Poll::Ready(()); } @@ -201,17 +244,17 @@ impl MutableSleep { } } - fn clear(self: &Box) { + fn clear(&self) { unsafe { *self.sleep.get() = None; } self.ready.set(false); } - fn change(self: &Box, instant: Instant) { + fn change(&self, timer: Tmr) { let pin = unsafe { - // First replace the current sleep - *self.sleep.get() = Some(tokio::time::sleep_until(instant)); + // First replace the current timer + *self.sleep.get() = Some(timer); // Then get ourselves a Pin to this Pin::new_unchecked( @@ -235,15 +278,20 @@ impl MutableSleep { } #[repr(transparent)] -#[derive(Clone)] -struct MutableSleepWaker { - inner: *const MutableSleep, +struct MutableSleepWaker { + inner: *const MutableSleep, } -unsafe impl Send for MutableSleepWaker {} -unsafe impl Sync for MutableSleepWaker {} +impl Clone for MutableSleepWaker { + fn clone(&self) -> Self { + MutableSleepWaker { inner: self.inner } + } +} + +unsafe impl Send for MutableSleepWaker {} +unsafe impl Sync for MutableSleepWaker {} -impl WakeRef for MutableSleepWaker { +impl WakeRef for MutableSleepWaker { fn wake_by_ref(&self) { unsafe { let this = self.inner.as_ref().unwrap_unchecked(); @@ -256,17 +304,17 @@ impl WakeRef for MutableSleepWaker { } } -impl Wake for MutableSleepWaker { +impl Wake for MutableSleepWaker { fn wake(self) { self.wake_by_ref() } } -impl Drop for MutableSleepWaker { +impl Drop for MutableSleepWaker { fn drop(&mut self) {} } -unsafe impl ViaRawPointer for MutableSleepWaker { +unsafe impl ViaRawPointer for MutableSleepWaker { type Target = (); fn into_raw(self) -> *mut () { @@ -278,10 +326,10 @@ unsafe impl ViaRawPointer for MutableSleepWaker { } } -impl WebTimers { +impl WebTimers { /// Returns an internal iterator that locks the internal data structures for the period /// of iteration. Calling other methods on this collection will cause a panic. - pub(crate) fn iter(&self) -> WebTimersIterator<'_, T> { + pub(crate) fn iter(&self) -> WebTimersIterator<'_, T, R::Instant> { WebTimersIterator { data: self.data_map.borrow(), timers: self.timers.borrow(), @@ -341,17 +389,19 @@ impl WebTimers { self.next_id.set(id); let mut timers = self.timers.borrow_mut(); - let deadline = Instant::now() + let deadline = self + .reactor + .now() .checked_add(Duration::from_millis(timeout_ms)) .unwrap(); match timers.first() { Some(TimerKey(k, ..)) => { if &deadline < k { - self.sleep.change(deadline); + self.sleep.change(self.reactor.timer(deadline)); } } _ => { - self.sleep.change(deadline); + self.sleep.change(self.reactor.timer(deadline)); } } @@ -409,40 +459,39 @@ impl WebTimers { } /// Poll for any timers that have completed. - pub fn poll_timers(&self, cx: &mut Context) -> Poll> { + /// + /// Returns the IDs and [`TimerType`]s of expired timers. The associated + /// data must be retrieved per-timer via + /// [`take_fired_timer`](Self::take_fired_timer), which allows + /// `cancel_timer` to prevent dispatch of timers that expired in the + /// same batch. + pub fn poll_timers(&self, cx: &mut Context) -> Poll> { ready!(self.sleep.poll_ready(cx)); - let now = Instant::now(); + let now = R::Instant::now(); let mut timers = self.timers.borrow_mut(); - let mut data = self.data_map.borrow_mut(); + let data = self.data_map.borrow(); let mut output = vec![]; + let mut fired_once_count: usize = 0; let mut split = timers.split_off(&TimerKey(now, 0, TimerType::Once, false)); std::mem::swap(&mut split, &mut timers); for TimerKey(_, id, timer_type, is_system_timer) in split { - if let TimerType::Repeat(interval) = timer_type { - if let Some(TimerData { data, .. }) = data.get(&id) { - output.push((id, data.clone())); - timers.insert(TimerKey( - now - .checked_add(Duration::from_millis(interval.into())) - .unwrap(), - id, - timer_type, - is_system_timer, - )); - } - } else if let Some(TimerData { - data, - unrefd, - high_res, - }) = data.remove(&id) - { - self.high_res_timer_lock.maybe_unlock(high_res); - if unrefd { - self.unrefd_count.set(self.unrefd_count.get() - 1); - } - output.push((id, data)); + if !data.contains_key(&id) { + continue; // tombstone + } + if let TimerType::Repeat(interval) = &timer_type { + timers.insert(TimerKey( + now + .checked_add(Duration::from_millis((*interval).into())) + .unwrap(), + id, + timer_type.clone(), + is_system_timer, + )); + } else { + fired_once_count += 1; } + output.push((id, timer_type)); } // In-effective poll, run a front-compaction and try again later @@ -458,31 +507,66 @@ impl WebTimers { } } if let Some(TimerKey(k, ..)) = timers.first() { - self.sleep.change(*k); + self.sleep.change(self.reactor.timer(*k)); } return Poll::Pending; } - if data.is_empty() { - // When the # of running timers hits zero, clear the timer tree. + // Adjust for fired-once timers whose data is still in data_map + // (it will be removed by take_fired_timer). + let pending_data_count = data.len() - fired_once_count; + + if pending_data_count == 0 { + // No more pending timers; clear the tree and sleep. if !timers.is_empty() { timers.clear(); } self.sleep.clear(); } else { // Run compaction when there are enough tombstones to justify cleanup. - let tombstone_count = timers.len() - data.len(); + let tombstone_count = timers.len() - pending_data_count; if tombstone_count > COMPACTION_MINIMUM { timers.retain(|k| data.contains_key(&k.1)); } if let Some(TimerKey(k, ..)) = timers.first() { - self.sleep.change(*k); + self.sleep.change(self.reactor.timer(*k)); } } Poll::Ready(output) } + /// Extracts the data for a previously-fired timer. Returns `None` if + /// the timer was cancelled between [`poll_timers`](Self::poll_timers) + /// and this call. + pub fn take_fired_timer(&self, id: u64, timer_type: &TimerType) -> Option { + match timer_type { + TimerType::Repeat(_) => { + self.data_map.borrow().get(&id).map(|td| td.data.clone()) + } + TimerType::Once => { + let mut data = self.data_map.borrow_mut(); + let TimerData { + data: d, + unrefd, + high_res, + } = data.remove(&id)?; + if data.is_empty() { + self.high_res_timer_lock.clear(); + self.unrefd_count.set(0); + self.timers.borrow_mut().clear(); + self.sleep.clear(); + } else { + self.high_res_timer_lock.maybe_unlock(high_res); + if unrefd { + self.unrefd_count.set(self.unrefd_count.get() - 1); + } + } + Some(d) + } + } + } + /// Is this set of timers empty? pub fn is_empty(&self) -> bool { self.data_map.borrow().is_empty() @@ -613,9 +697,13 @@ impl HighResTimerLock { #[cfg(test)] mod tests { use super::*; + use crate::reactor_tokio::TokioReactor; use rstest::rstest; + use std::future::Future; use std::future::poll_fn; + type TestTimers = WebTimers<(), TokioReactor>; + /// Miri is way too slow here on some of the larger tests. const TEN_THOUSAND: u64 = if cfg!(miri) { 100 } else { 10_000 }; @@ -628,17 +716,20 @@ mod tests { runtime.block_on(f) } - async fn poll_all(timers: &WebTimers<()>) -> Vec<(u64, ())> { + async fn poll_all(timers: &TestTimers) -> Vec { timers.assert_consistent(); let len = timers.len(); let mut v = vec![]; while !timers.is_empty() { - let mut batch = poll_fn(|cx| { + let batch = poll_fn(|cx| { timers.assert_consistent(); timers.poll_timers(cx) }) .await; - v.append(&mut batch); + for (id, timer_type) in &batch { + timers.take_fired_timer(*id, timer_type); + } + v.extend(batch.into_iter().map(|(id, _)| id)); #[allow(clippy::print_stderr)] { eprintln!( @@ -667,7 +758,7 @@ mod tests { const TOMBSTONES: usize = 30; // > COMPACTION_MINIMUM but < ACTIVE_TIMERS const CLEANUP_THRESHOLD: usize = 5; // Threshold to determine if compaction happened async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); // Create mostly long-lived timers, with a few immediate ones // The immediate timers ensure poll_timers returns non-empty output @@ -699,7 +790,10 @@ mod tests { ); // Poll timers to trigger potential compaction - let _ = poll_fn(|cx| timers.poll_timers(cx)).await; + let fired = poll_fn(|cx| timers.poll_timers(cx)).await; + for (id, timer_type) in &fired { + timers.take_fired_timer(*id, timer_type); + } let remaining_tombstones = count_tombstones(); @@ -717,7 +811,7 @@ mod tests { #[test] fn test_timer() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); let _a = timers.queue_timer(1, ()); let v = poll_all(&timers).await; @@ -728,7 +822,7 @@ mod tests { #[test] fn test_high_res_lock() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); assert!(!timers.high_res_timer_lock.is_locked()); let _a = timers.queue_timer(1, ()); assert!(timers.high_res_timer_lock.is_locked()); @@ -743,7 +837,7 @@ mod tests { #[test] fn test_timer_cancel_1(#[values(0, 1, 2, 3)] which: u64) { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); for i in 0..4 { let id = timers.queue_timer(i * 25, ()); if i == which { @@ -761,7 +855,7 @@ mod tests { #[test] fn test_timer_cancel_2(#[values(0, 1, 2)] which: u64) { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); for i in 0..4 { let id = timers.queue_timer(i * 25, ()); if i == which || i == which + 1 { @@ -778,7 +872,7 @@ mod tests { #[test] fn test_timers_10_random() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); for i in 0..10 { timers.queue_timer((i % 3) * 10, ()); } @@ -791,7 +885,7 @@ mod tests { #[test] fn test_timers_10_random_cancel() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); for i in 0..10 { let id = timers.queue_timer((i % 3) * 10, ()); timers.cancel_timer(id); @@ -806,7 +900,7 @@ mod tests { #[test] fn test_timers_10_random_cancel_after(#[values(true, false)] reverse: bool) { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); let mut ids = vec![]; for i in 0..2 { ids.push(timers.queue_timer((i % 3) * 10, ())); @@ -827,7 +921,7 @@ mod tests { #[test] fn test_timers_10() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); for _i in 0..10 { timers.queue_timer(1, ()); } @@ -840,7 +934,7 @@ mod tests { #[test] fn test_timers_10_000_random() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); for i in 0..TEN_THOUSAND { timers.queue_timer(i % 10, ()); } @@ -855,7 +949,7 @@ mod tests { #[test] fn test_timers_cancel_first() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); let mut ids = vec![]; for _ in 0..TEN_THOUSAND { ids.push(timers.queue_timer(1, ())); @@ -874,7 +968,7 @@ mod tests { #[test] fn test_timers_10_000_cancel_most() { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); let mut ids = vec![]; for i in 0..TEN_THOUSAND { ids.push(timers.queue_timer(i % 100, ())); @@ -896,7 +990,7 @@ mod tests { #[test] fn test_chaos(#[values(42, 99, 1000)] seed: u64) { async_test(async { - let timers = WebTimers::<()>::default(); + let timers = TestTimers::default(); fastrand::seed(seed); let mut count = 0;