Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion deps/message-port/expected.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class Expected {
Expected(T&& v) : value_(std::move(v)), has_value_(true) {}
Expected(const E& e) : error_(e), has_value_(false) {}
Expected(E&& e) : error_(std::move(e)), has_value_(false) {}
~Expected() { has_value_ ? value_.~T() : error_.~E(); }
~Expected() {
if (!has_value_) error_.~E();
}

bool valid() const { return has_value_; }
T& value() { return value_; }
Expand Down
26 changes: 8 additions & 18 deletions deps/message-port/message-port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,37 +75,27 @@ const std::weak_ptr<Port>& MessageEvent::target() const {
return internal_->target;
}

// MessageEventSync::Internal
// MessageEventSync
// -----------------------------------------------------------------------------
struct MessageEventSync::Internal {
Internal() {}
~Internal() { TRACE(MSGEVENT, "~MessageEventSync::Internal"); }

std::promise<std::string> promise;
};
MessageEventSync::HandleData::HandleData() {}

MessageEventSync::HandleData::~HandleData() {
TRACE(MSGEVENT, "MessageEventSync::HandleData::~HandleData");
}

std::shared_ptr<MessageEventSync> MessageEventSync::New(
const std::string& data) {
return std::shared_ptr<MessageEventSync>(new MessageEventSync(data));
}

MessageEventSync::MessageEventSync(const std::string& data)
: MessageEvent(data), internal_sync_(std::make_unique<Internal>()) {}
: MessageEvent(data), handle_data_(std::make_shared<HandleData>()) {}

MessageEventSync::~MessageEventSync() {
TRACE(MSGEVENT, "~MessageEventSync");
}

void MessageEventSync::SetResult(const std::string& result) const {
try {
// If the user sets the result before in onmessage callback, set the
// promise value. Otherwise, The promise will be unresolved forever.
internal_sync_->promise.set_value(result);
} catch (const std::exception& e) {
LWNODE_DEV_LOG("[MessageEventSync::SetResult] promise error:", e.what());
return;
}
}

// Port::Internal
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -224,7 +214,7 @@ Port::Result Port::PostMessage(std::shared_ptr<MessageEventSync> event,
// cause a deadlock. It should be called from another thread.
std::future<std::string> future;
try {
future = event->internal_sync_->promise.get_future();
future = event->handle_data_->promise.get_future();
} catch (...) {
TRACE(MSGPORT, "invalid promise");
return Error::InvalidMessageEvent;
Expand Down
16 changes: 11 additions & 5 deletions deps/message-port/message-port.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <functional>
#include <future>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -39,7 +40,7 @@ class EXPORT_API MessageEvent {
const std::vector<std::weak_ptr<Port>> ports() const;
const std::weak_ptr<Port>& target() const;

virtual bool IsSync() { return false; }
virtual bool IsSync() const { return false; }

virtual ~MessageEvent();

Expand All @@ -56,15 +57,20 @@ class EXPORT_API MessageEventSync final : public MessageEvent {
static std::shared_ptr<MessageEventSync> New(const std::string& data);
~MessageEventSync();

bool IsSync() override { return true; }
struct HandleData {
HandleData();
~HandleData();
std::promise<std::string> promise;
};

bool IsSync() const override { return true; }

void SetResult(const std::string& result) const;
std::shared_ptr<HandleData> handle_data() const { return handle_data_; }

private:
MessageEventSync(const std::string& data);

struct Internal;
std::unique_ptr<Internal> internal_sync_;
std::shared_ptr<HandleData> handle_data_;
friend class Port;
};

Expand Down
4 changes: 3 additions & 1 deletion include/lwnode/expected.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class Expected {
Expected(T&& v) : value_(std::move(v)), has_value_(true) {}
Expected(const E& e) : error_(e), has_value_(false) {}
Expected(E&& e) : error_(std::move(e)), has_value_(false) {}
~Expected() { has_value_ ? value_.~T() : error_.~E(); }
~Expected() {
if (!has_value_) error_.~E();
}

bool valid() const { return has_value_; }
T& value() { return value_; }
Expand Down
16 changes: 11 additions & 5 deletions include/lwnode/message-port.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <functional>
#include <future>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -39,7 +40,7 @@ class EXPORT_API MessageEvent {
const std::vector<std::weak_ptr<Port>> ports() const;
const std::weak_ptr<Port>& target() const;

virtual bool IsSync() { return false; }
virtual bool IsSync() const { return false; }

virtual ~MessageEvent();

Expand All @@ -56,15 +57,20 @@ class EXPORT_API MessageEventSync final : public MessageEvent {
static std::shared_ptr<MessageEventSync> New(const std::string& data);
~MessageEventSync();

bool IsSync() override { return true; }
struct HandleData {
HandleData();
~HandleData();
std::promise<std::string> promise;
};

bool IsSync() const override { return true; }

void SetResult(const std::string& result) const;
std::shared_ptr<HandleData> handle_data() const { return handle_data_; }

private:
MessageEventSync(const std::string& data);

struct Internal;
std::unique_ptr<Internal> internal_sync_;
std::shared_ptr<HandleData> handle_data_;
friend class Port;
};

Expand Down
66 changes: 39 additions & 27 deletions src/lwnode/nd-mod-message-port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,49 @@ static ObjectRef* InstantiateMessageEvent(ExecutionStateRef* state,

// Create a new MessageEvent
auto option = ObjectRef::create(state);
option->setExtraData((void*)(event));

struct MessageEventExtraData : public gc_cleanup {
std::shared_ptr<MessageEventSync::HandleData> handle;
};

if (event->IsSync()) {
auto* data = new MessageEventExtraData();
auto* event_sync =
reinterpret_cast<MessageEventSync*>(const_cast<MessageEvent*>(event));
data->handle = event_sync->handle_data();
option->setExtraData(data);
}

// TODO: Use atomic string
option->set(state, OneByteString("data"), OneByteString(event->data()));
option->set(state, OneByteString("origin"), OneByteString(event->origin()));

SetMethod(state,
option,
"setResult",
[](ExecutionStateRef* state,
ValueRef* this_value,
size_t argc,
ValueRef** argv,
bool is_construct) -> ValueRef* {
if (argc < 1 || !argv[0]->isString()) {
state->throwException(TypeErrorObjectRef::create(
state, OneByteString("Invalid argument")));
}

auto event = GetExtraData<MessageEvent>(this_value);
if (!event->IsSync()) {
state->throwException(ErrorObjectRef::create(
state,
ErrorObjectRef::Code::None,
OneByteString("only support MessageEventSync")));
}

reinterpret_cast<MessageEventSync*>(event)->SetResult(
argv[0]->asString()->toStdUTF8String());

return ValueRef::createUndefined();
});
SetMethod(
state,
option,
"setResult",
[](ExecutionStateRef* state,
ValueRef* this_value,
size_t argc,
ValueRef** argv,
bool is_construct) -> ValueRef* {
if (argc < 1 || !argv[0]->isString()) {
state->throwException(TypeErrorObjectRef::create(
state, OneByteString("Invalid argument")));
}

if (!this_value->asObject()->extraData()) {
state->throwException(ErrorObjectRef::create(
state,
ErrorObjectRef::Code::None,
OneByteString("only support MessageEventSync")));
}

auto* data = GetExtraData<MessageEventExtraData>(this_value);
data->handle->promise.set_value(argv[0]->asString()->toStdUTF8String());

return ValueRef::createUndefined();
});

ValueRef* argv[] = {OneByteString("message"), option};
return klass->construct(state, COUNT_OF(argv), argv)->asObject();
Expand Down
76 changes: 65 additions & 11 deletions test/embedding/message-port-sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@

#define COUNT_OF(array) (sizeof(array) / sizeof((array)[0]))

void log(const std::string& message) {
std::cout << "\033[33m" << message << "\033[0m" << std::endl;
}

std::string CallMethod(Port* port, const std::string& function) {
try {
auto result = port->PostMessage(MessageEventSync::New(function), 3000);
assert(result);
std::string message = result.value_or("error");
return message;
} catch (const std::exception& e) {
std::cerr << e.what() << '\n';
assert(false);
}
return "";
}

int main(int argc, char* argv[]) {
auto runtime = std::make_shared<lwnode::Runtime>();

Expand All @@ -21,9 +38,6 @@ int main(int argc, char* argv[]) {

std::thread worker = std::thread(
[&](std::promise<void>&& promise) mutable {
// FIXME: Fix Runtime::Init() call to ensure environment initialization
// before running the loop, Runtime::Run(). This workaround passes a
// promise directly to know when that is.
runtime->Start(COUNT_OF(args), args, std::move(promise));
},
std::move(promise));
Expand All @@ -35,22 +49,46 @@ int main(int argc, char* argv[]) {
std::cout << event->data() << std::endl;
});

// async test on other thread
std::thread worker2 = std::thread([&]() mutable {
for (int i = 0; i < 3; i++) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "post message(async)" << std::endl;
auto result = port2->PostMessage(MessageEvent::New("async"));
assert(result);
}
});

// sync post message test
log("sync post message test - 1");
{
for (int i = 0; i < 5; i++) {
auto result = port2->PostMessage(MessageEventSync::New("sync"));
if (result)
std::cout << "result(" << i << "): " << *result << std::endl;
else
std::cout << "result(" << i << "): error(" << result.error() << ")"
<< std::endl;
auto result = CallMethod(port2.get(), "sync");
std::cout << "result(" << i << "): " << result << std::endl;
}
}

// simple case test
log("sync post message test - 2");
{
auto result = port2->PostMessage(MessageEventSync::New("sync"));
assert(result);
if (result)
std::cout << "result: " << *result << std::endl;
else
std::cout << "result: error(" << result.error() << ")" << std::endl;
}

// simple case test
log("sync post message test - 3");
{
auto result = port2->PostMessage(MessageEventSync::New("sync"));
assert(result);
std::cout << "result(5): " << result.value_or("error") << std::endl;
}

// use same message event instance test
log("use same message event instance test");
{
auto event = MessageEventSync::New("sync");

Expand All @@ -61,10 +99,12 @@ int main(int argc, char* argv[]) {
result = port2->PostMessage(event);
assert(!result);
assert(result.error() == Port::Error::InvalidMessageEvent);
std::cout << "result: " << result.value_or("invalid message error") << std::endl;
std::cout << "result: " << result.value_or("invalid message error")
<< std::endl;
}

// timeout test
log("timeout test");
{
auto result =
port2->PostMessage(MessageEventSync::New("sync-timeout"), 2000);
Expand All @@ -75,7 +115,21 @@ int main(int argc, char* argv[]) {
std::cout << "result: " << result.value_or("timeout error") << std::endl;
}

// async post message test
// timeout test (delayed response)
log("timeout test (delayed response)");
{
auto result =
port2->PostMessage(MessageEventSync::New("delay-timeout"), 1000);

assert(!result);
assert(result.error() == Port::Error::Timeout);

std::cout << "result: " << result.value_or("timeout error") << std::endl;
}

worker2.join();

// exit test
{
port2->PostMessage(MessageEvent::New("exit"));
}
Expand Down
16 changes: 11 additions & 5 deletions test/embedding/test-03-message-port-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ lwnode.ref();

let count = 0;

function test() {
function test(timeout = 1000) {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(`sync test: ${count++}`);
}, 1000);
resolve(`test: ${count++} abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz`);
}, timeout);
});
}

Expand All @@ -19,10 +19,16 @@ port.onmessage = async (event) => {
if (event.data == "sync") {
let data = await test();
event.setResult(data);
} else if (event.data == "async") {
port.postMessage("async result from js");
} else if (event.data == "delay-timeout") {
let data = await test(5000); // long delay.
event.setResult(data);
} else if (event.data == "sync-timeout") {
// Do not intentionally set the result value to test the timeout.
} else if (event.data == "exit") {
port.postMessage("exit javascript");
lwnode.unref();
} else if (event.data == "sync-timeout") {
// Do not intentionally set the result value to test the timeout.
}
};