Skip to content

Commit e48e449

Browse files
committed
fix: use shared uv_async queue for message-port
1 parent 295a0d7 commit e48e449

File tree

3 files changed

+145
-52
lines changed

3 files changed

+145
-52
lines changed

deps/message-port/async-uv.cc

Lines changed: 123 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,104 +17,178 @@
1717
#include "async-uv.h"
1818
#include <nd-logger.h>
1919
#include <uv.h>
20-
#include <functional>
2120
#include "debug-mem-trace.h"
2221

23-
std::queue<AsyncUV::Task> AsyncUV::queue_;
24-
std::mutex AsyncUV::queue_mutex_;
22+
struct AsyncUV::LoopData {
23+
uv_loop_t* loop{nullptr};
24+
uv_async_t* handle{nullptr};
25+
std::queue<Task> queue;
26+
std::mutex queue_mutex;
27+
};
2528

26-
AsyncUV::AsyncUV(uv_loop_t* loop, Task task) : uv_h_(nullptr), task_(task) {
27-
if (loop && task) {
28-
Init(loop, task);
29-
}
29+
std::map<uv_loop_t*, AsyncUV::LoopData*> AsyncUV::loop_data_;
30+
std::mutex AsyncUV::loop_data_mutex_;
31+
std::queue<AsyncUV::Task> AsyncUV::pending_queue_;
32+
std::mutex AsyncUV::pending_queue_mutex_;
33+
34+
AsyncUV::AsyncUV(uv_loop_t* loop, Task task) : loop_(loop), task_(task) {
3035
TRACE_ADD(ASYNC, this);
3136
}
3237

3338
AsyncUV::~AsyncUV() {
34-
if (!uv_h_) {
39+
TRACE(ASYNC, "~AsyncUV");
40+
TRACE_REMOVE(ASYNC, this);
41+
}
42+
43+
void AsyncUV::OnAsyncCalled(uv_async_t* handle) {
44+
auto* loop_data = static_cast<LoopData*>(handle->data);
45+
if (loop_data == nullptr) {
3546
return;
3647
}
3748

38-
TRACE(ASYNC, "~AsyncUV");
39-
TRACE_REMOVE(ASYNC, this);
49+
std::queue<Task> tasks_to_run;
50+
{
51+
std::lock_guard<std::mutex> lock(loop_data->queue_mutex);
52+
loop_data->queue.swap(tasks_to_run);
53+
}
4054

41-
uv_close(reinterpret_cast<uv_handle_t*>(uv_h_), [](uv_handle_t* handle) {
42-
TRACE(ASYNC, "~uv_close");
43-
TRACE_REMOVE(ASYNC_UV, handle);
44-
delete reinterpret_cast<uv_async_t*>(handle);
45-
});
55+
while (!tasks_to_run.empty()) {
56+
Task& task = tasks_to_run.front();
57+
if (task) {
58+
TRACE(MSGPORT, "run task");
59+
task(handle);
60+
TRACE(MSGPORT, "/run task");
61+
}
62+
tasks_to_run.pop();
63+
}
4664
}
4765

48-
bool AsyncUV::Send(uv_loop_t* loop, Task task) {
66+
AsyncUV::LoopData* AsyncUV::GetLoopData(uv_loop_t* loop,
67+
bool create_if_not_found) {
68+
std::lock_guard<std::mutex> lock(loop_data_mutex_);
69+
auto it = loop_data_.find(loop);
70+
if (it != loop_data_.end()) {
71+
return it->second;
72+
}
73+
74+
if (!create_if_not_found || loop == nullptr) {
75+
return nullptr;
76+
}
77+
78+
auto* loop_data = new LoopData();
79+
loop_data->loop = loop;
80+
loop_data->handle = new uv_async_t();
81+
loop_data->handle->data = loop_data;
82+
uv_async_init(loop, loop_data->handle, OnAsyncCalled);
83+
uv_unref(reinterpret_cast<uv_handle_t*>(loop_data->handle));
84+
TRACE_ADD(ASYNC_UV, loop_data->handle);
85+
loop_data_[loop] = loop_data;
86+
return loop_data;
87+
}
88+
89+
bool AsyncUV::InitPerThread(uv_loop_t* loop) {
4990
if (loop == nullptr) {
91+
return false;
92+
}
93+
94+
return GetLoopData(loop, true) != nullptr;
95+
}
96+
97+
void AsyncUV::CleanupPerThread(uv_loop_t* loop) {
98+
std::lock_guard<std::mutex> lock(loop_data_mutex_);
99+
auto it = loop_data_.find(loop);
100+
if (it == loop_data_.end()) {
101+
return;
102+
}
103+
104+
LoopData* loop_data = it->second;
105+
loop_data_.erase(it);
106+
107+
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(loop_data->handle))) {
108+
uv_close(reinterpret_cast<uv_handle_t*>(loop_data->handle),
109+
[](uv_handle_t* handle) {
110+
TRACE(ASYNC, "~uv_close");
111+
TRACE_REMOVE(ASYNC_UV, handle);
112+
auto* loop_data = static_cast<LoopData*>(handle->data);
113+
delete loop_data->handle;
114+
delete loop_data;
115+
});
116+
}
117+
}
118+
119+
bool AsyncUV::Send(uv_loop_t* loop, Task task) {
120+
if (loop == nullptr || !task) {
50121
TRACE(MSGPORT, "invalid loop");
51122
return false;
52123
}
53-
return (new AsyncUV(loop, task))->Send();
124+
125+
LoopData* loop_data = GetLoopData(loop, true);
126+
if (loop_data == nullptr) {
127+
return false;
128+
}
129+
130+
{
131+
std::lock_guard<std::mutex> lock(loop_data->queue_mutex);
132+
loop_data->queue.push(std::move(task));
133+
}
134+
135+
uv_async_send(loop_data->handle);
136+
return true;
54137
}
55138

56139
size_t AsyncUV::EnqueueTask(Task task) {
57140
TRACE(MSGPORT, "EnqueueTask");
58-
std::lock_guard<std::mutex> lock(queue_mutex_);
59-
queue_.push(task);
60-
return queue_.size();
141+
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
142+
pending_queue_.push(std::move(task));
143+
return pending_queue_.size();
61144
}
62145

63146
bool AsyncUV::DrainPendingTasks(uv_loop_t* loop) {
64-
TRACE(MSGPORT, "DrainPendingTasks");
65-
std::lock_guard<std::mutex> lock(queue_mutex_);
66-
TRACE(MSGPORT, "drain pending tasks %zu", queue_.size());
67-
68147
if (loop == nullptr) {
69148
TRACE(MSGPORT, "invalid loop");
70149
return false;
71150
}
72151

73-
while (!queue_.empty()) {
74-
AsyncUV::Send(loop, queue_.front());
75-
queue_.pop();
152+
TRACE(MSGPORT, "DrainPendingTasks");
153+
std::queue<Task> pending_tasks;
154+
{
155+
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
156+
TRACE(MSGPORT, "drain pending tasks %zu", pending_queue_.size());
157+
pending_queue_.swap(pending_tasks);
158+
}
159+
160+
while (!pending_tasks.empty()) {
161+
AsyncUV::Send(loop, std::move(pending_tasks.front()));
162+
pending_tasks.pop();
76163
}
77164
TRACE(MSGPORT, "/drain pending tasks");
78165
return true;
79166
}
80167

81168
void AsyncUV::DeletePendingTasks() {
82169
TRACE(MSGPORT, "DeletePendingTasks");
83-
std::lock_guard<std::mutex> lock(queue_mutex_);
84-
TRACE(MSGPORT, "delete pending tasks %zu", queue_.size());
85-
if (!queue_.empty()) {
170+
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
171+
TRACE(MSGPORT, "delete pending tasks %zu", pending_queue_.size());
172+
if (!pending_queue_.empty()) {
86173
std::queue<Task> empty;
87-
std::swap(queue_, empty);
174+
std::swap(pending_queue_, empty);
88175
}
89176
}
90177

91178
bool AsyncUV::IsPendingTasksEmpty() {
92179
TRACE(MSGPORT, "IsPendingTasksEmpty");
93-
std::lock_guard<std::mutex> lock(queue_mutex_);
94-
return queue_.empty();
180+
std::lock_guard<std::mutex> lock(pending_queue_mutex_);
181+
return pending_queue_.empty();
95182
}
96183

97184
void AsyncUV::Init(uv_loop_t* loop, Task task) {
185+
loop_ = loop;
98186
task_ = task;
99-
100-
uv_h_ = new uv_async_t();
101-
uv_h_->data = this;
102-
uv_async_init(loop, uv_h_, [](uv_async_t* handle) {
103-
auto event = static_cast<AsyncUV*>(handle->data);
104-
if (event->task_) {
105-
TRACE(MSGPORT, "run task");
106-
event->task_(handle);
107-
TRACE(MSGPORT, "/run task");
108-
}
109-
delete event;
110-
});
111-
TRACE_ADD(ASYNC_UV, uv_h_);
112187
}
113188

114189
bool AsyncUV::Send() {
115-
if (!uv_h_) {
190+
if (loop_ == nullptr || !task_) {
116191
return false;
117192
}
118-
uv_async_send(uv_h_);
119-
return true;
193+
return Send(loop_, std::move(task_));
120194
}

deps/message-port/async-uv.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#pragma once
1818

1919
#include <functional>
20+
#include <map>
2021
#include <mutex>
2122
#include <queue>
2223

@@ -26,6 +27,7 @@
2627

2728
using uv_loop_t = struct uv_loop_s;
2829
using uv_async_t = struct uv_async_s;
30+
using uv_handle_t = struct uv_handle_s;
2931

3032
class EXPORT_API AsyncUV {
3133
public:
@@ -39,16 +41,26 @@ class EXPORT_API AsyncUV {
3941
static void DeletePendingTasks();
4042
static bool IsPendingTasksEmpty();
4143

44+
// Register and cleanup one shared async handle per loop.
45+
static bool InitPerThread(uv_loop_t* loop);
46+
static void CleanupPerThread(uv_loop_t* loop);
47+
4248
AsyncUV(uv_loop_t* loop = nullptr, Task task = nullptr);
4349
~AsyncUV();
4450

4551
void Init(uv_loop_t* loop, Task task);
4652
bool Send();
4753

4854
private:
49-
uv_async_t* uv_h_;
55+
struct LoopData;
56+
static LoopData* GetLoopData(uv_loop_t* loop, bool create_if_not_found);
57+
static void OnAsyncCalled(uv_async_t* handle);
58+
59+
uv_loop_t* loop_;
5060
Task task_;
5161

52-
static std::queue<Task> queue_;
53-
static std::mutex queue_mutex_;
62+
static std::map<uv_loop_t*, LoopData*> loop_data_;
63+
static std::mutex loop_data_mutex_;
64+
static std::queue<Task> pending_queue_;
65+
static std::mutex pending_queue_mutex_;
5466
};

src/lwnode/nd-vm-main-message-port.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ MainMessagePort::MainMessagePort(std::shared_ptr<Port> port,
3838
MainMessagePort::~MainMessagePort() {
3939
LWNODE_DEV_FATAL_LOG("[MainMessagePort::~MainMessagePort]");
4040
Channel::DeletePendingMessages();
41+
AsyncUV::CleanupPerThread(uv_loop_);
4142
}
4243

4344
void MainMessagePort::SetMessageEventClass(FunctionObjectRef* klass) {
@@ -53,6 +54,12 @@ void MainMessagePort::Init(ContextRef* context, uv_loop_t* loop) {
5354

5455
context_ = context;
5556
uv_loop_ = loop;
57+
AsyncUV::InitPerThread(uv_loop_);
58+
59+
// Drain once before publishing the loop, then drain again after publication
60+
// to reduce the window where another thread can enqueue into the pending
61+
// queue based on a stale "loop not ready" observation.
62+
Channel::DrainPendingMessages(uv_loop_);
5663

5764
try {
5865
internal_->uv_promise_.set_value(uv_loop_);

0 commit comments

Comments
 (0)