diff --git a/deps/message-port/expected.h b/deps/message-port/expected.h index 603deeb4fe..6cb0ac3d2f 100644 --- a/deps/message-port/expected.h +++ b/deps/message-port/expected.h @@ -56,7 +56,9 @@ class Expected { Expected(T&& v) : value_(std::move(v)), has_value_(true) {} Expected(const E& e) : error_(e), has_value_(false) {} Expected(E&& e) : error_(std::move(e)), has_value_(false) {} - ~Expected() { has_value_ ? value_.~T() : error_.~E(); } + ~Expected() { + if (!has_value_) error_.~E(); + } bool valid() const { return has_value_; } T& value() { return value_; } diff --git a/deps/message-port/message-port.cc b/deps/message-port/message-port.cc index 7cda77ac4b..1d8aa4c995 100644 --- a/deps/message-port/message-port.cc +++ b/deps/message-port/message-port.cc @@ -75,14 +75,14 @@ const std::weak_ptr& MessageEvent::target() const { return internal_->target; } -// MessageEventSync::Internal +// MessageEventSync // ----------------------------------------------------------------------------- -struct MessageEventSync::Internal { - Internal() {} - ~Internal() { TRACE(MSGEVENT, "~MessageEventSync::Internal"); } - std::promise promise; -}; +MessageEventSync::HandleData::HandleData() {} + +MessageEventSync::HandleData::~HandleData() { + TRACE(MSGEVENT, "MessageEventSync::HandleData::~HandleData"); +} std::shared_ptr MessageEventSync::New( const std::string& data) { @@ -90,22 +90,12 @@ std::shared_ptr MessageEventSync::New( } MessageEventSync::MessageEventSync(const std::string& data) - : MessageEvent(data), internal_sync_(std::make_unique()) {} + : MessageEvent(data), handle_data_(std::make_shared()) {} MessageEventSync::~MessageEventSync() { TRACE(MSGEVENT, "~MessageEventSync"); } -void MessageEventSync::SetResult(const std::string& result) const { - try { - // If the user sets the result before in onmessage callback, set the - // promise value. Otherwise, The promise will be unresolved forever. - internal_sync_->promise.set_value(result); - } catch (const std::exception& e) { - LWNODE_DEV_LOG("[MessageEventSync::SetResult] promise error:", e.what()); - return; - } -} // Port::Internal // ----------------------------------------------------------------------------- @@ -224,7 +214,7 @@ Port::Result Port::PostMessage(std::shared_ptr event, // cause a deadlock. It should be called from another thread. std::future future; try { - future = event->internal_sync_->promise.get_future(); + future = event->handle_data_->promise.get_future(); } catch (...) { TRACE(MSGPORT, "invalid promise"); return Error::InvalidMessageEvent; diff --git a/deps/message-port/message-port.h b/deps/message-port/message-port.h index c10a0f0def..c3f483f62d 100644 --- a/deps/message-port/message-port.h +++ b/deps/message-port/message-port.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -39,7 +40,7 @@ class EXPORT_API MessageEvent { const std::vector> ports() const; const std::weak_ptr& target() const; - virtual bool IsSync() { return false; } + virtual bool IsSync() const { return false; } virtual ~MessageEvent(); @@ -56,15 +57,20 @@ class EXPORT_API MessageEventSync final : public MessageEvent { static std::shared_ptr New(const std::string& data); ~MessageEventSync(); - bool IsSync() override { return true; } + struct HandleData { + HandleData(); + ~HandleData(); + std::promise promise; + }; + + bool IsSync() const override { return true; } - void SetResult(const std::string& result) const; + std::shared_ptr handle_data() const { return handle_data_; } private: MessageEventSync(const std::string& data); - struct Internal; - std::unique_ptr internal_sync_; + std::shared_ptr handle_data_; friend class Port; }; diff --git a/include/lwnode/expected.h b/include/lwnode/expected.h index 603deeb4fe..6cb0ac3d2f 100644 --- a/include/lwnode/expected.h +++ b/include/lwnode/expected.h @@ -56,7 +56,9 @@ class Expected { Expected(T&& v) : value_(std::move(v)), has_value_(true) {} Expected(const E& e) : error_(e), has_value_(false) {} Expected(E&& e) : error_(std::move(e)), has_value_(false) {} - ~Expected() { has_value_ ? value_.~T() : error_.~E(); } + ~Expected() { + if (!has_value_) error_.~E(); + } bool valid() const { return has_value_; } T& value() { return value_; } diff --git a/include/lwnode/message-port.h b/include/lwnode/message-port.h index c10a0f0def..c3f483f62d 100644 --- a/include/lwnode/message-port.h +++ b/include/lwnode/message-port.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -39,7 +40,7 @@ class EXPORT_API MessageEvent { const std::vector> ports() const; const std::weak_ptr& target() const; - virtual bool IsSync() { return false; } + virtual bool IsSync() const { return false; } virtual ~MessageEvent(); @@ -56,15 +57,20 @@ class EXPORT_API MessageEventSync final : public MessageEvent { static std::shared_ptr New(const std::string& data); ~MessageEventSync(); - bool IsSync() override { return true; } + struct HandleData { + HandleData(); + ~HandleData(); + std::promise promise; + }; + + bool IsSync() const override { return true; } - void SetResult(const std::string& result) const; + std::shared_ptr handle_data() const { return handle_data_; } private: MessageEventSync(const std::string& data); - struct Internal; - std::unique_ptr internal_sync_; + std::shared_ptr handle_data_; friend class Port; }; diff --git a/src/lwnode/nd-mod-message-port.cc b/src/lwnode/nd-mod-message-port.cc index 557cca209e..f7151b60ce 100644 --- a/src/lwnode/nd-mod-message-port.cc +++ b/src/lwnode/nd-mod-message-port.cc @@ -76,37 +76,49 @@ static ObjectRef* InstantiateMessageEvent(ExecutionStateRef* state, // Create a new MessageEvent auto option = ObjectRef::create(state); - option->setExtraData((void*)(event)); + + struct MessageEventExtraData : public gc_cleanup { + std::shared_ptr handle; + }; + + if (event->IsSync()) { + auto* data = new MessageEventExtraData(); + auto* event_sync = + reinterpret_cast(const_cast(event)); + data->handle = event_sync->handle_data(); + option->setExtraData(data); + } + // TODO: Use atomic string option->set(state, OneByteString("data"), OneByteString(event->data())); option->set(state, OneByteString("origin"), OneByteString(event->origin())); - SetMethod(state, - option, - "setResult", - [](ExecutionStateRef* state, - ValueRef* this_value, - size_t argc, - ValueRef** argv, - bool is_construct) -> ValueRef* { - if (argc < 1 || !argv[0]->isString()) { - state->throwException(TypeErrorObjectRef::create( - state, OneByteString("Invalid argument"))); - } - - auto event = GetExtraData(this_value); - if (!event->IsSync()) { - state->throwException(ErrorObjectRef::create( - state, - ErrorObjectRef::Code::None, - OneByteString("only support MessageEventSync"))); - } - - reinterpret_cast(event)->SetResult( - argv[0]->asString()->toStdUTF8String()); - - return ValueRef::createUndefined(); - }); + SetMethod( + state, + option, + "setResult", + [](ExecutionStateRef* state, + ValueRef* this_value, + size_t argc, + ValueRef** argv, + bool is_construct) -> ValueRef* { + if (argc < 1 || !argv[0]->isString()) { + state->throwException(TypeErrorObjectRef::create( + state, OneByteString("Invalid argument"))); + } + + if (!this_value->asObject()->extraData()) { + state->throwException(ErrorObjectRef::create( + state, + ErrorObjectRef::Code::None, + OneByteString("only support MessageEventSync"))); + } + + auto* data = GetExtraData(this_value); + data->handle->promise.set_value(argv[0]->asString()->toStdUTF8String()); + + return ValueRef::createUndefined(); + }); ValueRef* argv[] = {OneByteString("message"), option}; return klass->construct(state, COUNT_OF(argv), argv)->asObject(); diff --git a/test/embedding/message-port-sync.cc b/test/embedding/message-port-sync.cc index 86b749d513..006e35c965 100644 --- a/test/embedding/message-port-sync.cc +++ b/test/embedding/message-port-sync.cc @@ -10,6 +10,23 @@ #define COUNT_OF(array) (sizeof(array) / sizeof((array)[0])) +void log(const std::string& message) { + std::cout << "\033[33m" << message << "\033[0m" << std::endl; +} + +std::string CallMethod(Port* port, const std::string& function) { + try { + auto result = port->PostMessage(MessageEventSync::New(function), 3000); + assert(result); + std::string message = result.value_or("error"); + return message; + } catch (const std::exception& e) { + std::cerr << e.what() << '\n'; + assert(false); + } + return ""; +} + int main(int argc, char* argv[]) { auto runtime = std::make_shared(); @@ -21,9 +38,6 @@ int main(int argc, char* argv[]) { std::thread worker = std::thread( [&](std::promise&& promise) mutable { - // FIXME: Fix Runtime::Init() call to ensure environment initialization - // before running the loop, Runtime::Run(). This workaround passes a - // promise directly to know when that is. runtime->Start(COUNT_OF(args), args, std::move(promise)); }, std::move(promise)); @@ -35,22 +49,46 @@ int main(int argc, char* argv[]) { std::cout << event->data() << std::endl; }); + // async test on other thread + std::thread worker2 = std::thread([&]() mutable { + for (int i = 0; i < 3; i++) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + std::cout << "post message(async)" << std::endl; + auto result = port2->PostMessage(MessageEvent::New("async")); + assert(result); + } + }); + // sync post message test + log("sync post message test - 1"); { for (int i = 0; i < 5; i++) { - auto result = port2->PostMessage(MessageEventSync::New("sync")); - if (result) - std::cout << "result(" << i << "): " << *result << std::endl; - else - std::cout << "result(" << i << "): error(" << result.error() << ")" - << std::endl; + auto result = CallMethod(port2.get(), "sync"); + std::cout << "result(" << i << "): " << result << std::endl; } + } + // simple case test + log("sync post message test - 2"); + { auto result = port2->PostMessage(MessageEventSync::New("sync")); + assert(result); + if (result) + std::cout << "result: " << *result << std::endl; + else + std::cout << "result: error(" << result.error() << ")" << std::endl; + } + + // simple case test + log("sync post message test - 3"); + { + auto result = port2->PostMessage(MessageEventSync::New("sync")); + assert(result); std::cout << "result(5): " << result.value_or("error") << std::endl; } // use same message event instance test + log("use same message event instance test"); { auto event = MessageEventSync::New("sync"); @@ -61,10 +99,12 @@ int main(int argc, char* argv[]) { result = port2->PostMessage(event); assert(!result); assert(result.error() == Port::Error::InvalidMessageEvent); - std::cout << "result: " << result.value_or("invalid message error") << std::endl; + std::cout << "result: " << result.value_or("invalid message error") + << std::endl; } // timeout test + log("timeout test"); { auto result = port2->PostMessage(MessageEventSync::New("sync-timeout"), 2000); @@ -75,7 +115,21 @@ int main(int argc, char* argv[]) { std::cout << "result: " << result.value_or("timeout error") << std::endl; } - // async post message test + // timeout test (delayed response) + log("timeout test (delayed response)"); + { + auto result = + port2->PostMessage(MessageEventSync::New("delay-timeout"), 1000); + + assert(!result); + assert(result.error() == Port::Error::Timeout); + + std::cout << "result: " << result.value_or("timeout error") << std::endl; + } + + worker2.join(); + + // exit test { port2->PostMessage(MessageEvent::New("exit")); } diff --git a/test/embedding/test-03-message-port-sync.js b/test/embedding/test-03-message-port-sync.js index b42a067078..0b162e7125 100644 --- a/test/embedding/test-03-message-port-sync.js +++ b/test/embedding/test-03-message-port-sync.js @@ -6,11 +6,11 @@ lwnode.ref(); let count = 0; -function test() { +function test(timeout = 1000) { return new Promise((resolve, reject) => { setTimeout(() => { - resolve(`sync test: ${count++}`); - }, 1000); + resolve(`test: ${count++} abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz`); + }, timeout); }); } @@ -19,10 +19,16 @@ port.onmessage = async (event) => { if (event.data == "sync") { let data = await test(); event.setResult(data); + } else if (event.data == "async") { + port.postMessage("async result from js"); + } else if (event.data == "delay-timeout") { + let data = await test(5000); // long delay. + event.setResult(data); + } else if (event.data == "sync-timeout") { + // Do not intentionally set the result value to test the timeout. } else if (event.data == "exit") { port.postMessage("exit javascript"); lwnode.unref(); - } else if (event.data == "sync-timeout") { - // Do not intentionally set the result value to test the timeout. } }; +