diff --git a/deps/message-port/async-uv.cc b/deps/message-port/async-uv.cc index 9a1b462e52..e12cf9f2c5 100644 --- a/deps/message-port/async-uv.cc +++ b/deps/message-port/async-uv.cc @@ -17,62 +17,149 @@ #include "async-uv.h" #include #include -#include #include "debug-mem-trace.h" -std::queue AsyncUV::queue_; -std::mutex AsyncUV::queue_mutex_; +struct AsyncUV::LoopData { + uv_loop_t* loop{nullptr}; + uv_async_t* handle{nullptr}; + std::queue queue; + std::mutex queue_mutex; +}; -AsyncUV::AsyncUV(uv_loop_t* loop, Task task) : uv_h_(nullptr), task_(task) { - if (loop && task) { - Init(loop, task); - } +std::map AsyncUV::loop_data_; +std::mutex AsyncUV::loop_data_mutex_; +std::queue AsyncUV::pending_queue_; +std::mutex AsyncUV::pending_queue_mutex_; + +AsyncUV::AsyncUV(uv_loop_t* loop, Task task) : loop_(loop), task_(task) { TRACE_ADD(ASYNC, this); } AsyncUV::~AsyncUV() { - if (!uv_h_) { + TRACE(ASYNC, "~AsyncUV"); + TRACE_REMOVE(ASYNC, this); +} + +void AsyncUV::OnAsyncCalled(uv_async_t* handle) { + auto* loop_data = static_cast(handle->data); + if (loop_data == nullptr) { return; } - TRACE(ASYNC, "~AsyncUV"); - TRACE_REMOVE(ASYNC, this); + std::queue tasks_to_run; + { + std::lock_guard lock(loop_data->queue_mutex); + loop_data->queue.swap(tasks_to_run); + } - uv_close(reinterpret_cast(uv_h_), [](uv_handle_t* handle) { - TRACE(ASYNC, "~uv_close"); - TRACE_REMOVE(ASYNC_UV, handle); - delete reinterpret_cast(handle); - }); + while (!tasks_to_run.empty()) { + Task& task = tasks_to_run.front(); + if (task) { + TRACE(MSGPORT, "run task"); + task(handle); + TRACE(MSGPORT, "/run task"); + } + tasks_to_run.pop(); + } } -bool AsyncUV::Send(uv_loop_t* loop, Task task) { +AsyncUV::LoopData* AsyncUV::GetLoopData(uv_loop_t* loop, + bool create_if_not_found) { + std::lock_guard lock(loop_data_mutex_); + auto it = loop_data_.find(loop); + if (it != loop_data_.end()) { + return it->second; + } + + if (!create_if_not_found || loop == nullptr) { + return nullptr; + } + + auto* loop_data = new LoopData(); + loop_data->loop = loop; + loop_data->handle = new uv_async_t(); + loop_data->handle->data = loop_data; + uv_async_init(loop, loop_data->handle, OnAsyncCalled); + uv_unref(reinterpret_cast(loop_data->handle)); + TRACE_ADD(ASYNC_UV, loop_data->handle); + loop_data_[loop] = loop_data; + return loop_data; +} + +bool AsyncUV::InitPerThread(uv_loop_t* loop) { if (loop == nullptr) { + return false; + } + + return GetLoopData(loop, true) != nullptr; +} + +void AsyncUV::CleanupPerThread(uv_loop_t* loop) { + std::lock_guard lock(loop_data_mutex_); + auto it = loop_data_.find(loop); + if (it == loop_data_.end()) { + return; + } + + LoopData* loop_data = it->second; + loop_data_.erase(it); + + if (!uv_is_closing(reinterpret_cast(loop_data->handle))) { + uv_close(reinterpret_cast(loop_data->handle), + [](uv_handle_t* handle) { + TRACE(ASYNC, "~uv_close"); + TRACE_REMOVE(ASYNC_UV, handle); + auto* loop_data = static_cast(handle->data); + delete loop_data->handle; + delete loop_data; + }); + } +} + +bool AsyncUV::Send(uv_loop_t* loop, Task task) { + if (loop == nullptr || !task) { TRACE(MSGPORT, "invalid loop"); return false; } - return (new AsyncUV(loop, task))->Send(); + + LoopData* loop_data = GetLoopData(loop, true); + if (loop_data == nullptr) { + return false; + } + + { + std::lock_guard lock(loop_data->queue_mutex); + loop_data->queue.push(std::move(task)); + } + + uv_async_send(loop_data->handle); + return true; } size_t AsyncUV::EnqueueTask(Task task) { TRACE(MSGPORT, "EnqueueTask"); - std::lock_guard lock(queue_mutex_); - queue_.push(task); - return queue_.size(); + std::lock_guard lock(pending_queue_mutex_); + pending_queue_.push(std::move(task)); + return pending_queue_.size(); } bool AsyncUV::DrainPendingTasks(uv_loop_t* loop) { - TRACE(MSGPORT, "DrainPendingTasks"); - std::lock_guard lock(queue_mutex_); - TRACE(MSGPORT, "drain pending tasks %zu", queue_.size()); - if (loop == nullptr) { TRACE(MSGPORT, "invalid loop"); return false; } - while (!queue_.empty()) { - AsyncUV::Send(loop, queue_.front()); - queue_.pop(); + TRACE(MSGPORT, "DrainPendingTasks"); + std::queue pending_tasks; + { + std::lock_guard lock(pending_queue_mutex_); + TRACE(MSGPORT, "drain pending tasks %zu", pending_queue_.size()); + pending_queue_.swap(pending_tasks); + } + + while (!pending_tasks.empty()) { + AsyncUV::Send(loop, std::move(pending_tasks.front())); + pending_tasks.pop(); } TRACE(MSGPORT, "/drain pending tasks"); return true; @@ -80,41 +167,28 @@ bool AsyncUV::DrainPendingTasks(uv_loop_t* loop) { void AsyncUV::DeletePendingTasks() { TRACE(MSGPORT, "DeletePendingTasks"); - std::lock_guard lock(queue_mutex_); - TRACE(MSGPORT, "delete pending tasks %zu", queue_.size()); - if (!queue_.empty()) { + std::lock_guard lock(pending_queue_mutex_); + TRACE(MSGPORT, "delete pending tasks %zu", pending_queue_.size()); + if (!pending_queue_.empty()) { std::queue empty; - std::swap(queue_, empty); + std::swap(pending_queue_, empty); } } bool AsyncUV::IsPendingTasksEmpty() { TRACE(MSGPORT, "IsPendingTasksEmpty"); - std::lock_guard lock(queue_mutex_); - return queue_.empty(); + std::lock_guard lock(pending_queue_mutex_); + return pending_queue_.empty(); } void AsyncUV::Init(uv_loop_t* loop, Task task) { + loop_ = loop; task_ = task; - - uv_h_ = new uv_async_t(); - uv_h_->data = this; - uv_async_init(loop, uv_h_, [](uv_async_t* handle) { - auto event = static_cast(handle->data); - if (event->task_) { - TRACE(MSGPORT, "run task"); - event->task_(handle); - TRACE(MSGPORT, "/run task"); - } - delete event; - }); - TRACE_ADD(ASYNC_UV, uv_h_); } bool AsyncUV::Send() { - if (!uv_h_) { + if (loop_ == nullptr || !task_) { return false; } - uv_async_send(uv_h_); - return true; + return Send(loop_, std::move(task_)); } diff --git a/deps/message-port/async-uv.h b/deps/message-port/async-uv.h index 1444b3189c..31858398c3 100644 --- a/deps/message-port/async-uv.h +++ b/deps/message-port/async-uv.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include @@ -26,6 +27,7 @@ using uv_loop_t = struct uv_loop_s; using uv_async_t = struct uv_async_s; +using uv_handle_t = struct uv_handle_s; class EXPORT_API AsyncUV { public: @@ -39,6 +41,10 @@ class EXPORT_API AsyncUV { static void DeletePendingTasks(); static bool IsPendingTasksEmpty(); + // Register and cleanup one shared async handle per loop. + static bool InitPerThread(uv_loop_t* loop); + static void CleanupPerThread(uv_loop_t* loop); + AsyncUV(uv_loop_t* loop = nullptr, Task task = nullptr); ~AsyncUV(); @@ -46,9 +52,15 @@ class EXPORT_API AsyncUV { bool Send(); private: - uv_async_t* uv_h_; + struct LoopData; + static LoopData* GetLoopData(uv_loop_t* loop, bool create_if_not_found); + static void OnAsyncCalled(uv_async_t* handle); + + uv_loop_t* loop_; Task task_; - static std::queue queue_; - static std::mutex queue_mutex_; + static std::map loop_data_; + static std::mutex loop_data_mutex_; + static std::queue pending_queue_; + static std::mutex pending_queue_mutex_; }; diff --git a/src/lwnode/nd-vm-main-message-port.cc b/src/lwnode/nd-vm-main-message-port.cc index 1893d4c3d7..c52b3a2c62 100644 --- a/src/lwnode/nd-vm-main-message-port.cc +++ b/src/lwnode/nd-vm-main-message-port.cc @@ -38,6 +38,7 @@ MainMessagePort::MainMessagePort(std::shared_ptr port, MainMessagePort::~MainMessagePort() { LWNODE_DEV_FATAL_LOG("[MainMessagePort::~MainMessagePort]"); Channel::DeletePendingMessages(); + AsyncUV::CleanupPerThread(uv_loop_); } void MainMessagePort::SetMessageEventClass(FunctionObjectRef* klass) { @@ -53,6 +54,12 @@ void MainMessagePort::Init(ContextRef* context, uv_loop_t* loop) { context_ = context; uv_loop_ = loop; + AsyncUV::InitPerThread(uv_loop_); + + // Drain once before publishing the loop, then drain again after publication + // to reduce the window where another thread can enqueue into the pending + // queue based on a stale "loop not ready" observation. + Channel::DrainPendingMessages(uv_loop_); try { internal_->uv_promise_.set_value(uv_loop_);