diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml index 201ed8d82e..82362706da 100644 --- a/.github/workflows/actions.yml +++ b/.github/workflows/actions.yml @@ -159,3 +159,6 @@ jobs: - name: Run embedtest run: | out/linux/Release/embedtest.x + - name: Run message-port sync test + run: | + out/linux/Release/message-port-sync.x diff --git a/deps/message-port/expected.h b/deps/message-port/expected.h new file mode 100644 index 0000000000..603deeb4fe --- /dev/null +++ b/deps/message-port/expected.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2025-present Samsung Electronics Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // for std::move + +/* +Expected + +Usage: + +enum class Error { NotFound, PermissionDenied }; + +Expected DoSomething(bool success) { + if (success) return 7; + return Error::NotFound; +} + +auto result = DoSomething(false); +if (result) { + std::cout << "Success: " << *result << std::endl; +} else { + std::cout << "Error: " << (result.error() == Error::NotFound) << std::endl; +} + +Why needed: + +Expected provides a structured way to handle both success and failure, +unlike std::optional or v8::Maybe which only convey whether a value +exists. + +- std::optional: For values with an absence. +- std::expected: For values with potential success or known error types. +- v8::Maybe: For handling potentially missing results within the JS context. +*/ + +// `T != void` +template +class Expected { + public: + Expected(const T& v) : value_(v), has_value_(true) {} + Expected(T&& v) : value_(std::move(v)), has_value_(true) {} + Expected(const E& e) : error_(e), has_value_(false) {} + Expected(E&& e) : error_(std::move(e)), has_value_(false) {} + ~Expected() { has_value_ ? value_.~T() : error_.~E(); } + + bool valid() const { return has_value_; } + T& value() { return value_; } + const T& value() const { return value_; } + E& error() { return error_; } + const E& error() const { return error_; } + + operator bool() const { return valid(); } + T& operator*() { return value(); } + const T& operator*() const { return value(); } + + T value_or(const T& default_value) { + return has_value_ ? value_ : default_value; + } + + private: + T value_; + E error_; + bool has_value_; +}; + +// `T = void` +template +class Expected { + public: + Expected() : has_value_(true) {} + Expected(const E& e) : error_(e), has_value_(false) {} + Expected(E&& e) : error_(std::move(e)), has_value_(false) {} + + bool valid() const { return has_value_; } + E& error() { return error_; } + const E& error() const { return error_; } + + operator bool() const { return valid(); } + + private: + E error_; + bool has_value_; +}; diff --git a/deps/message-port/message-port.cc b/deps/message-port/message-port.cc index 3587bc57b3..73a2446bb0 100644 --- a/deps/message-port/message-port.cc +++ b/deps/message-port/message-port.cc @@ -19,9 +19,18 @@ #include #include #include +#include + #include "async-uv.h" #include "channel.h" +#if __cplusplus >= 201703L +#include +#define Optional std::optional +#else +#include "utils/optional.h" +#endif + // MessageEvent::Internal // ----------------------------------------------------------------------------- @@ -64,6 +73,38 @@ const std::weak_ptr& MessageEvent::target() const { return internal_->target; } +// MessageEventSync::Internal +// ----------------------------------------------------------------------------- +struct MessageEventSync::Internal { + Internal() {} + ~Internal() { TRACE(MSGEVENT, "~MessageEventSync::Internal"); } + + Optional result; + std::promise promise; +}; + +std::shared_ptr MessageEventSync::New( + const std::string& data) { + return std::shared_ptr(new MessageEventSync(data)); +} + +MessageEventSync::MessageEventSync(const std::string& data) + : MessageEvent(data), internal_sync_(std::make_unique()) {} + +MessageEventSync::~MessageEventSync() { + TRACE(MSGEVENT, "~MessageEventSync"); +} + +void MessageEventSync::SetResult(const std::string& result) const { + internal_sync_->result = result; +} + +const std::string MessageEventSync::result() const { + auto result = internal_sync_->result; + + return result.has_value() ? result.value() : std::string(); +} + // Port::Internal // ----------------------------------------------------------------------------- @@ -83,6 +124,7 @@ struct Port::Internal { std::weak_ptr sink; std::shared_ptr sink_holder; OnMessageCallback callback; + std::mutex callback_mutex; }; Port::Port() : internal_(std::make_unique()) {} @@ -90,22 +132,26 @@ Port::Port() : internal_(std::make_unique()) {} Port::~Port() {} void Port::OnMessage(const OnMessageCallback& callback) { + std::unique_lock lock(internal_->callback_mutex); internal_->callback = std::move(callback); } -Port::Result Port::PostMessage(std::shared_ptr event) { +Port::Result Port::PostMessageAsync(std::shared_ptr event) { // Check if the sink is valid. std::shared_ptr sink = internal_->sink.lock(); if (sink == nullptr) { TRACE(MSGPORT, "sink port released."); - return Result::NoSink; + return Error::NoSink; } // Check if the sink has a receiver. - if (internal_->loop != nullptr && sink->internal_->callback == nullptr) { - // TODO: Enqueue events if the number of the events in queues is acceptable. - TRACE(MSGPORT, "sink has no callback."); - return Result::NoOnMessage; + if (internal_->loop) { + std::unique_lock lock(sink->internal_->callback_mutex); + if (sink->internal_->callback == nullptr) { + // TODO: Consider enqueuing events in this case. + TRACE(MSGPORT, "sink has no callback."); + return Error::NoOnMessage; + } } // Set target and origin field of the given event. @@ -116,44 +162,102 @@ Port::Result Port::PostMessage(std::shared_ptr event) { // It's not allowed to use MessageEvents to be sent to different sinks. if (event->internal_->target.lock() != internal_->sink.lock()) { - return Result::InvalidMessageEvent; + return Error::InvalidMessageEvent; } // Get a valid loop handle if invalid. if (internal_->loop == nullptr) { if (!internal_->future.valid()) { - return Result::InvalidPortLoop; + return Error::InvalidPortLoop; } if (internal_->future.wait_for(std::chrono::milliseconds(1)) == std::future_status::ready) { auto loop = internal_->future.get(); if (loop == nullptr) { - return Result::InvalidPortLoop; + return Error::InvalidPortLoop; } internal_->loop = loop; AsyncUV::DrainPendingTasks(internal_->loop); } } - auto task = [event = event, sink_weak = internal_->sink](uv_async_t*) { + auto task = [event = event, + self = internal_.get(), + sink_weak = internal_->sink](uv_async_t*) { // event: shared_ptr, sink_weak: weak_ptr auto sink = sink_weak.lock(); - if (sink && sink->internal_->callback) { - // Since sink is locked, event->target() is always valid - // inside the callback. - sink->internal_->callback(event.get()); + if (sink) { + std::unique_lock lock(sink->internal_->callback_mutex); + if (sink->internal_->callback) { + // Since sink is locked, event->target() is always valid + // inside the callback. + try { + sink->internal_->callback(event.get()); + } catch (...) { + TRACE(MSGPORT, "user callback error"); + return; + } + } } else { TRACE(MSGPORT, "sink port released, or no callback"); } + + if (event->IsSync()) { + auto sync_event = static_cast(event.get()); + + try { + // If the user sets the result before in onmessage callback, set the + // promise value. Otherwise, The promise will be unresolved forever. + if (sync_event->internal_sync_->result.has_value()) { + sync_event->internal_sync_->promise.set_value(sync_event->result()); + } + } catch (const std::exception& e) { + TRACE(MSGPORT, "promise error:", e.what()); + return; + } + } }; if (internal_->loop == nullptr) { AsyncUV::EnqueueTask(std::move(task)); - return Result::MessageEventQueued; + return Error::MessageEventQueued; } AsyncUV::Send(internal_->loop, std::move(task)); - return Result::NoError; + return Error::NoError; +} + +Port::Result Port::PostMessage(std::shared_ptr event) { + return PostMessageAsync(event); +} + +Port::Result Port::PostMessage(std::shared_ptr event, + int timeout_ms) { + // TODO: If this function is called from the same thread as lwnode, it can + // cause a deadlock. It should be called from another thread. + std::future future; + try { + future = event->internal_sync_->promise.get_future(); + } catch (...) { + TRACE(MSGPORT, "invalid promise"); + return Error::InvalidMessageEvent; + } + + auto result = PostMessageAsync(event); + if (result) { + return result.error(); + } + + if (timeout_ms > 0) { + auto status = future.wait_for(std::chrono::milliseconds(timeout_ms)); + if (status == std::future_status::timeout) { + TRACE(MSGPORT, "PostMessageSync timeout"); + return Error::Timeout; + } + } else { + future.wait(); + } + return future.get(); } void Port::Unref() { diff --git a/deps/message-port/message-port.h b/deps/message-port/message-port.h index 7cd48042ad..ae6873a525 100644 --- a/deps/message-port/message-port.h +++ b/deps/message-port/message-port.h @@ -21,6 +21,8 @@ #include #include +#include "expected.h" + #ifndef EXPORT_API #define EXPORT_API __attribute__((visibility("default"))) #endif @@ -37,9 +39,11 @@ class EXPORT_API MessageEvent { const std::vector> ports() const; const std::weak_ptr& target() const; - ~MessageEvent(); + virtual bool IsSync() { return false; } - private: + virtual ~MessageEvent(); + + protected: MessageEvent(const std::string& data); struct Internal; @@ -47,19 +51,42 @@ class EXPORT_API MessageEvent { friend class Port; }; +class EXPORT_API MessageEventSync final : public MessageEvent { + public: + static std::shared_ptr New(const std::string& data); + ~MessageEventSync(); + + bool IsSync() override { return true; } + + const std::string result() const; + void SetResult(const std::string& result) const; + + private: + MessageEventSync(const std::string& data); + + struct Internal; + std::unique_ptr internal_sync_; + friend class Port; +}; + class EXPORT_API Port { public: using OnMessageCallback = std::function; - enum Result { + enum Error { NoError = 0, MessageEventQueued, NoSink, NoOnMessage, InvalidMessageEvent, InvalidPortLoop, + Timeout, }; + using Result = Expected; Result PostMessage(std::shared_ptr event); + Result PostMessage(std::shared_ptr event, + int timeout_ms = -1); + void OnMessage(const OnMessageCallback& callback); void Unref(); @@ -68,6 +95,8 @@ class EXPORT_API Port { private: Port(); + Result PostMessageAsync(std::shared_ptr event); + struct Internal; std::unique_ptr internal_; friend struct Channel; diff --git a/deps/node/lib/internal/lwnode/message-port.js b/deps/node/lib/internal/lwnode/message-port.js index d21e19628d..9f44e668bc 100644 --- a/deps/node/lib/internal/lwnode/message-port.js +++ b/deps/node/lib/internal/lwnode/message-port.js @@ -50,6 +50,9 @@ class MessageEvent extends Event { get ports() { return this.#options.ports; } + setResult(data) { + return this.#options.setResult(data); + } } function setupMessagePort(target, binding) { diff --git a/include/lwnode/expected.h b/include/lwnode/expected.h new file mode 100644 index 0000000000..603deeb4fe --- /dev/null +++ b/include/lwnode/expected.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2025-present Samsung Electronics Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include // for std::move + +/* +Expected + +Usage: + +enum class Error { NotFound, PermissionDenied }; + +Expected DoSomething(bool success) { + if (success) return 7; + return Error::NotFound; +} + +auto result = DoSomething(false); +if (result) { + std::cout << "Success: " << *result << std::endl; +} else { + std::cout << "Error: " << (result.error() == Error::NotFound) << std::endl; +} + +Why needed: + +Expected provides a structured way to handle both success and failure, +unlike std::optional or v8::Maybe which only convey whether a value +exists. + +- std::optional: For values with an absence. +- std::expected: For values with potential success or known error types. +- v8::Maybe: For handling potentially missing results within the JS context. +*/ + +// `T != void` +template +class Expected { + public: + Expected(const T& v) : value_(v), has_value_(true) {} + Expected(T&& v) : value_(std::move(v)), has_value_(true) {} + Expected(const E& e) : error_(e), has_value_(false) {} + Expected(E&& e) : error_(std::move(e)), has_value_(false) {} + ~Expected() { has_value_ ? value_.~T() : error_.~E(); } + + bool valid() const { return has_value_; } + T& value() { return value_; } + const T& value() const { return value_; } + E& error() { return error_; } + const E& error() const { return error_; } + + operator bool() const { return valid(); } + T& operator*() { return value(); } + const T& operator*() const { return value(); } + + T value_or(const T& default_value) { + return has_value_ ? value_ : default_value; + } + + private: + T value_; + E error_; + bool has_value_; +}; + +// `T = void` +template +class Expected { + public: + Expected() : has_value_(true) {} + Expected(const E& e) : error_(e), has_value_(false) {} + Expected(E&& e) : error_(std::move(e)), has_value_(false) {} + + bool valid() const { return has_value_; } + E& error() { return error_; } + const E& error() const { return error_; } + + operator bool() const { return valid(); } + + private: + E error_; + bool has_value_; +}; diff --git a/include/lwnode/message-port.h b/include/lwnode/message-port.h index 7cd48042ad..ae6873a525 100644 --- a/include/lwnode/message-port.h +++ b/include/lwnode/message-port.h @@ -21,6 +21,8 @@ #include #include +#include "expected.h" + #ifndef EXPORT_API #define EXPORT_API __attribute__((visibility("default"))) #endif @@ -37,9 +39,11 @@ class EXPORT_API MessageEvent { const std::vector> ports() const; const std::weak_ptr& target() const; - ~MessageEvent(); + virtual bool IsSync() { return false; } - private: + virtual ~MessageEvent(); + + protected: MessageEvent(const std::string& data); struct Internal; @@ -47,19 +51,42 @@ class EXPORT_API MessageEvent { friend class Port; }; +class EXPORT_API MessageEventSync final : public MessageEvent { + public: + static std::shared_ptr New(const std::string& data); + ~MessageEventSync(); + + bool IsSync() override { return true; } + + const std::string result() const; + void SetResult(const std::string& result) const; + + private: + MessageEventSync(const std::string& data); + + struct Internal; + std::unique_ptr internal_sync_; + friend class Port; +}; + class EXPORT_API Port { public: using OnMessageCallback = std::function; - enum Result { + enum Error { NoError = 0, MessageEventQueued, NoSink, NoOnMessage, InvalidMessageEvent, InvalidPortLoop, + Timeout, }; + using Result = Expected; Result PostMessage(std::shared_ptr event); + Result PostMessage(std::shared_ptr event, + int timeout_ms = -1); + void OnMessage(const OnMessageCallback& callback); void Unref(); @@ -68,6 +95,8 @@ class EXPORT_API Port { private: Port(); + Result PostMessageAsync(std::shared_ptr event); + struct Internal; std::unique_ptr internal_; friend struct Channel; diff --git a/src/lwnode/nd-mod-message-port.cc b/src/lwnode/nd-mod-message-port.cc index 58ee2466db..557cca209e 100644 --- a/src/lwnode/nd-mod-message-port.cc +++ b/src/lwnode/nd-mod-message-port.cc @@ -76,10 +76,38 @@ static ObjectRef* InstantiateMessageEvent(ExecutionStateRef* state, // Create a new MessageEvent auto option = ObjectRef::create(state); + option->setExtraData((void*)(event)); // TODO: Use atomic string option->set(state, OneByteString("data"), OneByteString(event->data())); option->set(state, OneByteString("origin"), OneByteString(event->origin())); + SetMethod(state, + option, + "setResult", + [](ExecutionStateRef* state, + ValueRef* this_value, + size_t argc, + ValueRef** argv, + bool is_construct) -> ValueRef* { + if (argc < 1 || !argv[0]->isString()) { + state->throwException(TypeErrorObjectRef::create( + state, OneByteString("Invalid argument"))); + } + + auto event = GetExtraData(this_value); + if (!event->IsSync()) { + state->throwException(ErrorObjectRef::create( + state, + ErrorObjectRef::Code::None, + OneByteString("only support MessageEventSync"))); + } + + reinterpret_cast(event)->SetResult( + argv[0]->asString()->toStdUTF8String()); + + return ValueRef::createUndefined(); + }); + ValueRef* argv[] = {OneByteString("message"), option}; return klass->construct(state, COUNT_OF(argv), argv)->asObject(); } diff --git a/test/embedding/CMakeLists.txt b/test/embedding/CMakeLists.txt index ef473a7c6b..d22b28a0d5 100644 --- a/test/embedding/CMakeLists.txt +++ b/test/embedding/CMakeLists.txt @@ -21,5 +21,6 @@ add_compile_options(-std=c++17) add_executable(embedtest.x embedtest.cc) add_executable(example.x example.cc) +add_executable(message-port-sync.x message-port-sync.cc) add_executable(send-message-sync.x send-message-sync.cc) add_executable(runtime-stop.x runtime-stop.cc) diff --git a/test/embedding/message-port-sync.cc b/test/embedding/message-port-sync.cc new file mode 100644 index 0000000000..86b749d513 --- /dev/null +++ b/test/embedding/message-port-sync.cc @@ -0,0 +1,85 @@ +#include +#include +#include +#include +#include +#include + +#include +#include + +#define COUNT_OF(array) (sizeof(array) / sizeof((array)[0])) + +int main(int argc, char* argv[]) { + auto runtime = std::make_shared(); + + std::promise promise; + std::future init_future = promise.get_future(); + const char* script = "test/embedding/test-03-message-port-sync.js"; + std::string path = (std::filesystem::current_path() / script).string(); + char* args[] = {const_cast(""), const_cast(path.c_str())}; + + std::thread worker = std::thread( + [&](std::promise&& promise) mutable { + // FIXME: Fix Runtime::Init() call to ensure environment initialization + // before running the loop, Runtime::Run(). This workaround passes a + // promise directly to know when that is. + runtime->Start(COUNT_OF(args), args, std::move(promise)); + }, + std::move(promise)); + + init_future.wait(); + + auto port2 = runtime->GetPort(); + port2->OnMessage([&](const MessageEvent* event) { + std::cout << event->data() << std::endl; + }); + + // sync post message test + { + for (int i = 0; i < 5; i++) { + auto result = port2->PostMessage(MessageEventSync::New("sync")); + if (result) + std::cout << "result(" << i << "): " << *result << std::endl; + else + std::cout << "result(" << i << "): error(" << result.error() << ")" + << std::endl; + } + + auto result = port2->PostMessage(MessageEventSync::New("sync")); + std::cout << "result(5): " << result.value_or("error") << std::endl; + } + + // use same message event instance test + { + auto event = MessageEventSync::New("sync"); + + auto result = port2->PostMessage(event); + assert(result); + std::cout << "result: " << result.value_or("error") << std::endl; + + result = port2->PostMessage(event); + assert(!result); + assert(result.error() == Port::Error::InvalidMessageEvent); + std::cout << "result: " << result.value_or("invalid message error") << std::endl; + } + + // timeout test + { + auto result = + port2->PostMessage(MessageEventSync::New("sync-timeout"), 2000); + + assert(!result); + assert(result.error() == Port::Error::Timeout); + + std::cout << "result: " << result.value_or("timeout error") << std::endl; + } + + // async post message test + { + port2->PostMessage(MessageEvent::New("exit")); + } + + worker.join(); + return 0; +} diff --git a/test/embedding/test-03-message-port-sync.js b/test/embedding/test-03-message-port-sync.js new file mode 100644 index 0000000000..be8ac1f751 --- /dev/null +++ b/test/embedding/test-03-message-port-sync.js @@ -0,0 +1,18 @@ +const lwnode = process.lwnode; +const port = process.lwnode.port; + +lwnode.ref(); + +let count = 0; + +port.onmessage = (event) => { + console.log(`js: ${event.data}`); + if (event.data == "sync") { + event.setResult(`sync test: ${count++}`); + } else if (event.data == "exit") { + port.postMessage("exit javascript"); + lwnode.unref(); + } else if (event.data == "sync-timeout") { + // Do not intentionally set the result value to test the timeout. + } +};