Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 123 additions & 49 deletions deps/message-port/async-uv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,104 +17,178 @@
#include "async-uv.h"
#include <nd-logger.h>
#include <uv.h>
#include <functional>
#include "debug-mem-trace.h"

std::queue<AsyncUV::Task> AsyncUV::queue_;
std::mutex AsyncUV::queue_mutex_;
struct AsyncUV::LoopData {
uv_loop_t* loop{nullptr};
uv_async_t* handle{nullptr};
std::queue<Task> queue;
std::mutex queue_mutex;
};

AsyncUV::AsyncUV(uv_loop_t* loop, Task task) : uv_h_(nullptr), task_(task) {
if (loop && task) {
Init(loop, task);
}
std::map<uv_loop_t*, AsyncUV::LoopData*> AsyncUV::loop_data_;
std::mutex AsyncUV::loop_data_mutex_;
std::queue<AsyncUV::Task> AsyncUV::pending_queue_;
std::mutex AsyncUV::pending_queue_mutex_;

AsyncUV::AsyncUV(uv_loop_t* loop, Task task) : loop_(loop), task_(task) {
TRACE_ADD(ASYNC, this);
}

AsyncUV::~AsyncUV() {
if (!uv_h_) {
TRACE(ASYNC, "~AsyncUV");
TRACE_REMOVE(ASYNC, this);
}

void AsyncUV::OnAsyncCalled(uv_async_t* handle) {
auto* loop_data = static_cast<LoopData*>(handle->data);
if (loop_data == nullptr) {
return;
}

TRACE(ASYNC, "~AsyncUV");
TRACE_REMOVE(ASYNC, this);
std::queue<Task> tasks_to_run;
{
std::lock_guard<std::mutex> lock(loop_data->queue_mutex);
loop_data->queue.swap(tasks_to_run);
}

uv_close(reinterpret_cast<uv_handle_t*>(uv_h_), [](uv_handle_t* handle) {
TRACE(ASYNC, "~uv_close");
TRACE_REMOVE(ASYNC_UV, handle);
delete reinterpret_cast<uv_async_t*>(handle);
});
while (!tasks_to_run.empty()) {
Task& task = tasks_to_run.front();
if (task) {
TRACE(MSGPORT, "run task");
task(handle);
TRACE(MSGPORT, "/run task");
}
tasks_to_run.pop();
}
}

bool AsyncUV::Send(uv_loop_t* loop, Task task) {
AsyncUV::LoopData* AsyncUV::GetLoopData(uv_loop_t* loop,
bool create_if_not_found) {
std::lock_guard<std::mutex> lock(loop_data_mutex_);
auto it = loop_data_.find(loop);
if (it != loop_data_.end()) {
return it->second;
}

if (!create_if_not_found || loop == nullptr) {
return nullptr;
}

auto* loop_data = new LoopData();
loop_data->loop = loop;
loop_data->handle = new uv_async_t();
loop_data->handle->data = loop_data;
uv_async_init(loop, loop_data->handle, OnAsyncCalled);
uv_unref(reinterpret_cast<uv_handle_t*>(loop_data->handle));
TRACE_ADD(ASYNC_UV, loop_data->handle);
loop_data_[loop] = loop_data;
return loop_data;
}

bool AsyncUV::InitPerThread(uv_loop_t* loop) {
if (loop == nullptr) {
return false;
}

return GetLoopData(loop, true) != nullptr;
}

void AsyncUV::CleanupPerThread(uv_loop_t* loop) {
std::lock_guard<std::mutex> lock(loop_data_mutex_);
auto it = loop_data_.find(loop);
if (it == loop_data_.end()) {
return;
}

LoopData* loop_data = it->second;
loop_data_.erase(it);

if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(loop_data->handle))) {
uv_close(reinterpret_cast<uv_handle_t*>(loop_data->handle),
[](uv_handle_t* handle) {
TRACE(ASYNC, "~uv_close");
TRACE_REMOVE(ASYNC_UV, handle);
auto* loop_data = static_cast<LoopData*>(handle->data);
delete loop_data->handle;
delete loop_data;
});
}
}

bool AsyncUV::Send(uv_loop_t* loop, Task task) {
if (loop == nullptr || !task) {
TRACE(MSGPORT, "invalid loop");
return false;
}
return (new AsyncUV(loop, task))->Send();

LoopData* loop_data = GetLoopData(loop, true);
if (loop_data == nullptr) {
return false;
}

{
std::lock_guard<std::mutex> lock(loop_data->queue_mutex);
loop_data->queue.push(std::move(task));
}

uv_async_send(loop_data->handle);
return true;
}

size_t AsyncUV::EnqueueTask(Task task) {
TRACE(MSGPORT, "EnqueueTask");
std::lock_guard<std::mutex> lock(queue_mutex_);
queue_.push(task);
return queue_.size();
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
pending_queue_.push(std::move(task));
return pending_queue_.size();
}

bool AsyncUV::DrainPendingTasks(uv_loop_t* loop) {
TRACE(MSGPORT, "DrainPendingTasks");
std::lock_guard<std::mutex> lock(queue_mutex_);
TRACE(MSGPORT, "drain pending tasks %zu", queue_.size());

if (loop == nullptr) {
TRACE(MSGPORT, "invalid loop");
return false;
}

while (!queue_.empty()) {
AsyncUV::Send(loop, queue_.front());
queue_.pop();
TRACE(MSGPORT, "DrainPendingTasks");
std::queue<Task> pending_tasks;
{
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
TRACE(MSGPORT, "drain pending tasks %zu", pending_queue_.size());
pending_queue_.swap(pending_tasks);
}

while (!pending_tasks.empty()) {
AsyncUV::Send(loop, std::move(pending_tasks.front()));
pending_tasks.pop();
}
TRACE(MSGPORT, "/drain pending tasks");
return true;
}

void AsyncUV::DeletePendingTasks() {
TRACE(MSGPORT, "DeletePendingTasks");
std::lock_guard<std::mutex> lock(queue_mutex_);
TRACE(MSGPORT, "delete pending tasks %zu", queue_.size());
if (!queue_.empty()) {
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
TRACE(MSGPORT, "delete pending tasks %zu", pending_queue_.size());
if (!pending_queue_.empty()) {
std::queue<Task> empty;
std::swap(queue_, empty);
std::swap(pending_queue_, empty);
}
}

bool AsyncUV::IsPendingTasksEmpty() {
TRACE(MSGPORT, "IsPendingTasksEmpty");
std::lock_guard<std::mutex> lock(queue_mutex_);
return queue_.empty();
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
return pending_queue_.empty();
}

void AsyncUV::Init(uv_loop_t* loop, Task task) {
loop_ = loop;
task_ = task;

uv_h_ = new uv_async_t();
uv_h_->data = this;
uv_async_init(loop, uv_h_, [](uv_async_t* handle) {
auto event = static_cast<AsyncUV*>(handle->data);
if (event->task_) {
TRACE(MSGPORT, "run task");
event->task_(handle);
TRACE(MSGPORT, "/run task");
}
delete event;
});
TRACE_ADD(ASYNC_UV, uv_h_);
}

bool AsyncUV::Send() {
if (!uv_h_) {
if (loop_ == nullptr || !task_) {
return false;
}
uv_async_send(uv_h_);
return true;
return Send(loop_, std::move(task_));
}
18 changes: 15 additions & 3 deletions deps/message-port/async-uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <functional>
#include <map>
#include <mutex>
#include <queue>

Expand All @@ -26,6 +27,7 @@

using uv_loop_t = struct uv_loop_s;
using uv_async_t = struct uv_async_s;
using uv_handle_t = struct uv_handle_s;

class EXPORT_API AsyncUV {
public:
Expand All @@ -39,16 +41,26 @@ class EXPORT_API AsyncUV {
static void DeletePendingTasks();
static bool IsPendingTasksEmpty();

// Register and cleanup one shared async handle per loop.
static bool InitPerThread(uv_loop_t* loop);
static void CleanupPerThread(uv_loop_t* loop);

AsyncUV(uv_loop_t* loop = nullptr, Task task = nullptr);
~AsyncUV();

void Init(uv_loop_t* loop, Task task);
bool Send();

private:
uv_async_t* uv_h_;
struct LoopData;
static LoopData* GetLoopData(uv_loop_t* loop, bool create_if_not_found);
static void OnAsyncCalled(uv_async_t* handle);

uv_loop_t* loop_;
Task task_;

static std::queue<Task> queue_;
static std::mutex queue_mutex_;
static std::map<uv_loop_t*, LoopData*> loop_data_;
static std::mutex loop_data_mutex_;
static std::queue<Task> pending_queue_;
static std::mutex pending_queue_mutex_;
};
7 changes: 7 additions & 0 deletions src/lwnode/nd-vm-main-message-port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ MainMessagePort::MainMessagePort(std::shared_ptr<Port> port,
MainMessagePort::~MainMessagePort() {
LWNODE_DEV_FATAL_LOG("[MainMessagePort::~MainMessagePort]");
Channel::DeletePendingMessages();
AsyncUV::CleanupPerThread(uv_loop_);
}

void MainMessagePort::SetMessageEventClass(FunctionObjectRef* klass) {
Expand All @@ -53,6 +54,12 @@ void MainMessagePort::Init(ContextRef* context, uv_loop_t* loop) {

context_ = context;
uv_loop_ = loop;
AsyncUV::InitPerThread(uv_loop_);

// Drain once before publishing the loop, then drain again after publication
// to reduce the window where another thread can enqueue into the pending
// queue based on a stale "loop not ready" observation.
Channel::DrainPendingMessages(uv_loop_);

try {
internal_->uv_promise_.set_value(uv_loop_);
Expand Down
Loading