Skip to content
This repository was archived by the owner on Apr 2, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions core/00_infra.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

let isLeakTracingEnabled = false;
let submitLeakTrace;
let eventLoopTick;

function __setLeakTracingEnabled(enabled) {
isLeakTracingEnabled = enabled;
Expand All @@ -50,8 +49,7 @@
return isLeakTracingEnabled;
}

function __initializeCoreMethods(eventLoopTick_, submitLeakTrace_) {
eventLoopTick = eventLoopTick_;
function __initializeCoreMethods(submitLeakTrace_) {
submitLeakTrace = submitLeakTrace_;
}

Expand Down Expand Up @@ -143,9 +141,9 @@
});
const wrappedPromise = PromisePrototypeCatch(
promise,
(res) => {
// recreate the stacktrace and strip eventLoopTick() calls from stack trace
ErrorCaptureStackTrace(res, eventLoopTick);
function __opRejectHandler(res) {
// recreate the stacktrace and strip internal event loop frames
ErrorCaptureStackTrace(res, __opRejectHandler);
throw res;
},
);
Expand Down
115 changes: 44 additions & 71 deletions core/01_core.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
setQueueMicrotask,
SafeMap,
SafeWeakMap,
Set,
StringPrototypeSlice,
Symbol,
SymbolFor,
Expand Down Expand Up @@ -55,7 +54,6 @@
op_get_proxy_details,
op_get_ext_import_meta_proto,
op_has_tick_scheduled,
op_immediate_has_ref_count,
op_lazy_load_esm,
op_memory_usage,
op_op_names,
Expand Down Expand Up @@ -122,7 +120,6 @@
delete window.__infra;

__initializeCoreMethods(
eventLoopTick,
submitLeakTrace,
);

Expand All @@ -143,8 +140,6 @@

let unhandledPromiseRejectionHandler = () => false;
let timerDepth = 0;
let timersRunning = false;
const cancelledTimers = new Set();

const macrotaskCallbacks = [];
const nextTickCallbacks = [];
Expand All @@ -162,32 +157,30 @@
ArrayPrototypePush(immediateCallbacks, cb);
}

// This function has variable number of arguments. The last argument describes
// if there's a "next tick" scheduled by the Node.js compat layer. Arguments
// before last are alternating integers and any values that describe the
// responses of async ops.
function eventLoopTick() {
let didAnyWork = false;

// First respond to all pending ops.
for (let i = 0; i < arguments.length - 3; i += 3) {
didAnyWork = true;
// Phase 2: Resolve completed async ops. Called from Rust with flat args:
// (promiseId, isOk, res, promiseId, isOk, res, ...)
function __resolveOps() {
Comment thread
nathanwhit marked this conversation as resolved.
for (let i = 0; i < arguments.length; i += 3) {
const promiseId = arguments[i];
const isOk = arguments[i + 1];
const res = arguments[i + 2];

__resolvePromise(promiseId, res, isOk);
}
}

// Phase 5: Drain nextTick queue and macrotask queue.
// Called from Rust. hasTickScheduled indicates if nextTick was scheduled.
function __drainNextTickAndMacrotasks(hasTickScheduled) {
// Drain nextTick queue if there's a tick scheduled.
if (arguments[arguments.length - 1]) {
if (hasTickScheduled) {
for (let i = 0; i < nextTickCallbacks.length; i++) {
nextTickCallbacks[i]();
}
} else {
op_run_microtasks();
}

// Finally drain macrotask queue.
// Drain macrotask queue.
for (let i = 0; i < macrotaskCallbacks.length; i++) {
const cb = macrotaskCallbacks[i];
while (true) {
Expand All @@ -207,61 +200,40 @@
}
}
}
}

const timers = arguments[arguments.length - 2];
if (timers) {
didAnyWork = true;
timersRunning = true;
for (let i = 0; i < timers.length; i += 3) {
timerDepth = timers[i];
const id = timers[i + 1];
if (cancelledTimers.has(id)) {
continue;
}
try {
const f = timers[i + 2];
f.call(window);
} catch (e) {
reportExceptionCallback(e);
}
for (let i = 0; i < nextTickCallbacks.length; i++) {
nextTickCallbacks[i]();
// Phase 2: Handle unhandled promise rejections.
// Called from Rust with a flat array: [promise, reason, context, promise, reason, context, ...]
function __handleRejections() {
for (let i = 0; i < arguments.length; i += 3) {
// Restore the async context that was active when the promise was
// rejected, so that AsyncLocalStorage.getStore() works correctly
// inside unhandledrejection handlers (matching Node.js behavior).
const prevContext = getAsyncContext();
setAsyncContext(arguments[i + 2]);
try {
const handled = unhandledPromiseRejectionHandler(
arguments[i],
arguments[i + 1],
);
if (!handled) {
const err = arguments[i + 1];
op_dispatch_exception(err, true);
}
op_run_microtasks();
} finally {
setAsyncContext(prevContext);
}
timersRunning = false;
timerDepth = 0;
cancelledTimers.clear();
}
}

// Drain immediates queue.
if (didAnyWork && op_immediate_has_ref_count()) {
runImmediateCallbacks();
}
// Set timer depth before each timer callback (called from Rust).
Comment thread
nathanwhit marked this conversation as resolved.
function __setTimerDepth(depth) {
timerDepth = depth;
}

// If we have any rejections for this tick, attempt to process them
const rejections = arguments[arguments.length - 3];
if (rejections) {
for (let i = 0; i < rejections.length; i += 3) {
// Restore the async context that was active when the promise was
// rejected, so that AsyncLocalStorage.getStore() works correctly
// inside unhandledrejection handlers (matching Node.js behavior).
const prevContext = getAsyncContext();
setAsyncContext(rejections[i + 2]);
try {
const handled = unhandledPromiseRejectionHandler(
rejections[i],
rejections[i + 1],
);
if (!handled) {
const err = rejections[i + 1];
op_dispatch_exception(err, true);
}
} finally {
setAsyncContext(prevContext);
}
}
}
// Report an exception (called from Rust for timer callback errors).
function __reportException(e) {
reportExceptionCallback(e);
}

function runImmediateCallbacks() {
Expand Down Expand Up @@ -697,7 +669,11 @@
internalRidSymbol: Symbol("Deno.internal.rid"),
internalFdSymbol: Symbol("Deno.internal.fd"),
resources,
eventLoopTick,
__resolveOps,
__drainNextTickAndMacrotasks,
__handleRejections,
__setTimerDepth,
__reportException,
runImmediateCallbacks,
BadResource,
BadResourcePrototype,
Expand Down Expand Up @@ -850,9 +826,6 @@
queueSystemTimer: (_associatedOp, repeat, timeout, task) =>
op_timer_queue_system(repeat, timeout, task),
cancelTimer: (id) => {
if (timersRunning) {
cancelledTimers.add(id);
}
op_timer_cancel(id);
},
refTimer: (id) => op_timer_ref(id),
Expand Down
3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ description = "A modern JavaScript/TypeScript runtime built with V8, Rust, and T
path = "lib.rs"

[features]
default = ["include_icu_data", "v8_use_custom_libcxx"]
default = ["include_icu_data", "v8_use_custom_libcxx", "reactor-tokio"]
reactor-tokio = []
include_icu_data = ["deno_core_icudata"]
v8_use_custom_libcxx = ["v8/use_custom_libcxx"]
v8_enable_pointer_compression = ["v8/v8_enable_pointer_compression"]
Expand Down
36 changes: 36 additions & 0 deletions core/event_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2018-2025 the Deno authors. MIT license.

//! Event loop phase state.
//!
//! The actual phase-based event loop is driven by `poll_event_loop_inner` in
//! `jsruntime.rs`. This module provides auxiliary state for phases that need
//! Rust-side callback queues (currently only close callbacks).
//!
//! libuv-style phases (timers, idle, prepare, poll, check) are driven
//! directly through `UvLoopInner` when a uv_loop is registered.

use std::collections::VecDeque;

/// Close callback for resource cleanup.
pub(crate) struct CloseCallback {
pub callback: Box<dyn FnOnce()>,
}

/// Phase-specific state for the event loop.
///
/// Currently only tracks close callbacks. Other phase hooks (idle, prepare,
/// check) are handled by `UvLoopInner` for the libuv compat path.
#[derive(Default)]
pub(crate) struct EventLoopPhases {
/// Phase 6: Close callbacks.
pub close_callbacks: VecDeque<CloseCallback>,
}

impl EventLoopPhases {
/// Drain and run all close callbacks.
pub fn run_close_callbacks(&mut self) {
while let Some(cb) = self.close_callbacks.pop_front() {
(cb.callback)();
}
}
}
12 changes: 10 additions & 2 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod async_cell;
pub mod convert;
pub mod cppgc;
pub mod error;
pub mod event_loop;
mod extension_set;
mod extensions;
mod external;
Expand All @@ -26,9 +27,18 @@ mod ops_builtin;
mod ops_builtin_types;
mod ops_builtin_v8;
mod ops_metrics;
pub mod reactor;
#[cfg(feature = "reactor-tokio")]
pub mod reactor_tokio;
mod runtime;
mod source_map;
mod tasks;
#[allow(
non_camel_case_types,
non_upper_case_globals,
clippy::missing_safety_doc
)]
pub mod uv_compat;
mod web_timeout;
pub mod webidl;

Expand Down Expand Up @@ -240,8 +250,6 @@ mod tests {
use std::process::Command;
use std::process::Stdio;

use super::*;

#[test]
fn located_script_name() {
// Note that this test will fail if this file is moved. We don't
Expand Down
68 changes: 68 additions & 0 deletions core/reactor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2018-2025 the Deno authors. MIT license.

//! Reactor abstraction for timer and I/O primitives.
Comment thread
nathanwhit marked this conversation as resolved.
//!
//! Currently used by [`WebTimers`](crate::web_timeout::WebTimers) to abstract
//! over the timer backend. The default implementation (`reactor-tokio` feature)
//! delegates to tokio.
//!
//! Note: `uv_compat` does **not** use this trait -- it talks to tokio directly
//! because it needs lower-level control (poll_accept, try_read, try_write).

use std::future::Future;
use std::ops::Add;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;

/// Abstraction over the async I/O reactor (tokio, mio, io_uring, custom).
/// This is the only seam between deno_core and the underlying async runtime.
pub trait Reactor: 'static {
type Timer: ReactorTimer;
type Instant: ReactorInstant;

/// Create a new one-shot timer that fires at the given instant.
fn timer(&self, deadline: Self::Instant) -> Self::Timer;

/// Get the current instant.
fn now(&self) -> Self::Instant;

/// Poll the reactor for I/O readiness. This is called during the "poll" phase.
/// Drives the underlying event source (epoll/kqueue/iocp).
/// `timeout` = None means block indefinitely, Some(Duration::ZERO) means non-blocking.
fn poll(&self, cx: &mut Context, timeout: Option<Duration>) -> Poll<()>;

/// Spawn a future onto the reactor's executor (if it has one).
/// Returns a handle that can be polled for the result.
fn spawn(
&self,
fut: Pin<Box<dyn Future<Output = ()> + 'static>>,
) -> Pin<Box<dyn Future<Output = ()>>>;
}

/// A timer future that can be reset to fire at a different deadline.
pub trait ReactorTimer: Future<Output = ()> + Unpin {
fn reset(&mut self, deadline: impl Into<Self::Instant>)
where
Self: Sized;

type Instant: ReactorInstant;

/// The deadline this timer is set to fire at.
fn deadline(&self) -> Self::Instant;
}

/// An instant in time, used for timer deadlines.
pub trait ReactorInstant:
Copy + Ord + Add<Duration, Output = Self> + Send + Sync + 'static
{
fn now() -> Self;
fn elapsed(&self) -> Duration;
fn checked_add(&self, duration: Duration) -> Option<Self>;
}

/// The default reactor type, selected by feature flags.
/// When `reactor-tokio` is enabled, this is `TokioReactor`.
#[cfg(feature = "reactor-tokio")]
pub type DefaultReactor = crate::reactor_tokio::TokioReactor;
Loading
Loading