Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,6 @@ jobs:
- name: Run embedtest
run: |
out/linux/Release/embedtest.x
- name: Run message-port sync test
run: |
out/linux/Release/message-port-sync.x
98 changes: 98 additions & 0 deletions deps/message-port/expected.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2025-present Samsung Electronics Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <utility> // for std::move

/*
Expected<T,E>

Usage:

enum class Error { NotFound, PermissionDenied };

Expected<int, Error> DoSomething(bool success) {
if (success) return 7;
return Error::NotFound;
}

auto result = DoSomething(false);
if (result) {
std::cout << "Success: " << *result << std::endl;
} else {
std::cout << "Error: " << (result.error() == Error::NotFound) << std::endl;
}

Why needed:

Expected<T, E> provides a structured way to handle both success and failure,
unlike std::optional<T> or v8::Maybe<T> which only convey whether a value
exists.

- std::optional: For values with an absence.
- std::expected: For values with potential success or known error types.
- v8::Maybe: For handling potentially missing results within the JS context.
*/

// `T != void`
template <typename T, typename E>
class Expected {
public:
Expected(const T& v) : value_(v), has_value_(true) {}
Expected(T&& v) : value_(std::move(v)), has_value_(true) {}
Expected(const E& e) : error_(e), has_value_(false) {}
Expected(E&& e) : error_(std::move(e)), has_value_(false) {}
~Expected() { has_value_ ? value_.~T() : error_.~E(); }

bool valid() const { return has_value_; }
T& value() { return value_; }
const T& value() const { return value_; }
E& error() { return error_; }
const E& error() const { return error_; }

operator bool() const { return valid(); }
T& operator*() { return value(); }
const T& operator*() const { return value(); }

T value_or(const T& default_value) {
return has_value_ ? value_ : default_value;
}

private:
T value_;
E error_;
bool has_value_;
};

// `T = void`
template <typename E>
class Expected<void, E> {
public:
Expected() : has_value_(true) {}
Expected(const E& e) : error_(e), has_value_(false) {}
Expected(E&& e) : error_(std::move(e)), has_value_(false) {}

bool valid() const { return has_value_; }
E& error() { return error_; }
const E& error() const { return error_; }

operator bool() const { return valid(); }

private:
E error_;
bool has_value_;
};
136 changes: 120 additions & 16 deletions deps/message-port/message-port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,18 @@
#include <uv.h>
#include <chrono>
#include <future>
#include <mutex>

#include "async-uv.h"
#include "channel.h"

#if __cplusplus >= 201703L
#include <optional>
#define Optional std::optional
#else
#include "utils/optional.h"
#endif

// MessageEvent::Internal
// -----------------------------------------------------------------------------

Expand Down Expand Up @@ -64,6 +73,38 @@ const std::weak_ptr<Port>& MessageEvent::target() const {
return internal_->target;
}

// MessageEventSync::Internal
// -----------------------------------------------------------------------------
struct MessageEventSync::Internal {
Internal() {}
~Internal() { TRACE(MSGEVENT, "~MessageEventSync::Internal"); }

Optional<std::string> result;
std::promise<std::string> promise;
};

std::shared_ptr<MessageEventSync> MessageEventSync::New(
const std::string& data) {
return std::shared_ptr<MessageEventSync>(new MessageEventSync(data));
}

MessageEventSync::MessageEventSync(const std::string& data)
: MessageEvent(data), internal_sync_(std::make_unique<Internal>()) {}

MessageEventSync::~MessageEventSync() {
TRACE(MSGEVENT, "~MessageEventSync");
}

void MessageEventSync::SetResult(const std::string& result) const {
internal_sync_->result = result;
}

const std::string MessageEventSync::result() const {
auto result = internal_sync_->result;

return result.has_value() ? result.value() : std::string();
}

// Port::Internal
// -----------------------------------------------------------------------------

Expand All @@ -83,29 +124,34 @@ struct Port::Internal {
std::weak_ptr<Port> sink;
std::shared_ptr<Port> sink_holder;
OnMessageCallback callback;
std::mutex callback_mutex;
};

Port::Port() : internal_(std::make_unique<Internal>()) {}

Port::~Port() {}

void Port::OnMessage(const OnMessageCallback& callback) {
std::unique_lock<std::mutex> lock(internal_->callback_mutex);
internal_->callback = std::move(callback);
}

Port::Result Port::PostMessage(std::shared_ptr<MessageEvent> event) {
Port::Result Port::PostMessageAsync(std::shared_ptr<MessageEvent> event) {
// Check if the sink is valid.
std::shared_ptr<Port> sink = internal_->sink.lock();
if (sink == nullptr) {
TRACE(MSGPORT, "sink port released.");
return Result::NoSink;
return Error::NoSink;
}

// Check if the sink has a receiver.
if (internal_->loop != nullptr && sink->internal_->callback == nullptr) {
// TODO: Enqueue events if the number of the events in queues is acceptable.
TRACE(MSGPORT, "sink has no callback.");
return Result::NoOnMessage;
if (internal_->loop) {
std::unique_lock<std::mutex> lock(sink->internal_->callback_mutex);
if (sink->internal_->callback == nullptr) {
// TODO: Consider enqueuing events in this case.
TRACE(MSGPORT, "sink has no callback.");
return Error::NoOnMessage;
}
}

// Set target and origin field of the given event.
Expand All @@ -116,44 +162,102 @@ Port::Result Port::PostMessage(std::shared_ptr<MessageEvent> event) {

// It's not allowed to use MessageEvents to be sent to different sinks.
if (event->internal_->target.lock() != internal_->sink.lock()) {
return Result::InvalidMessageEvent;
return Error::InvalidMessageEvent;
}

// Get a valid loop handle if invalid.
if (internal_->loop == nullptr) {
if (!internal_->future.valid()) {
return Result::InvalidPortLoop;
return Error::InvalidPortLoop;
}
if (internal_->future.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::ready) {
auto loop = internal_->future.get();
if (loop == nullptr) {
return Result::InvalidPortLoop;
return Error::InvalidPortLoop;
}
internal_->loop = loop;
AsyncUV::DrainPendingTasks(internal_->loop);
}
}

auto task = [event = event, sink_weak = internal_->sink](uv_async_t*) {
auto task = [event = event,
self = internal_.get(),
sink_weak = internal_->sink](uv_async_t*) {
// event: shared_ptr, sink_weak: weak_ptr
auto sink = sink_weak.lock();
if (sink && sink->internal_->callback) {
// Since sink is locked, event->target() is always valid
// inside the callback.
sink->internal_->callback(event.get());
if (sink) {
std::unique_lock<std::mutex> lock(sink->internal_->callback_mutex);
if (sink->internal_->callback) {
// Since sink is locked, event->target() is always valid
// inside the callback.
try {
sink->internal_->callback(event.get());
} catch (...) {
TRACE(MSGPORT, "user callback error");
return;
}
}
} else {
TRACE(MSGPORT, "sink port released, or no callback");
}

if (event->IsSync()) {
auto sync_event = static_cast<MessageEventSync*>(event.get());

try {
// If the user sets the result before in onmessage callback, set the
// promise value. Otherwise, The promise will be unresolved forever.
if (sync_event->internal_sync_->result.has_value()) {
sync_event->internal_sync_->promise.set_value(sync_event->result());
}
} catch (const std::exception& e) {
TRACE(MSGPORT, "promise error:", e.what());
return;
}
}
};

if (internal_->loop == nullptr) {
AsyncUV::EnqueueTask(std::move(task));
return Result::MessageEventQueued;
return Error::MessageEventQueued;
}

AsyncUV::Send(internal_->loop, std::move(task));
return Result::NoError;
return Error::NoError;
}

Port::Result Port::PostMessage(std::shared_ptr<MessageEvent> event) {
return PostMessageAsync(event);
}

Port::Result Port::PostMessage(std::shared_ptr<MessageEventSync> event,
int timeout_ms) {
// TODO: If this function is called from the same thread as lwnode, it can
// cause a deadlock. It should be called from another thread.
std::future<std::string> future;
try {
future = event->internal_sync_->promise.get_future();
} catch (...) {
TRACE(MSGPORT, "invalid promise");
return Error::InvalidMessageEvent;
}

auto result = PostMessageAsync(event);
if (result) {
return result.error();
}

if (timeout_ms > 0) {
auto status = future.wait_for(std::chrono::milliseconds(timeout_ms));
if (status == std::future_status::timeout) {
TRACE(MSGPORT, "PostMessageSync timeout");
return Error::Timeout;
}
} else {
future.wait();
}
return future.get();
}

void Port::Unref() {
Expand Down
35 changes: 32 additions & 3 deletions deps/message-port/message-port.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <string>
#include <vector>

#include "expected.h"

#ifndef EXPORT_API
#define EXPORT_API __attribute__((visibility("default")))
#endif
Expand All @@ -37,29 +39,54 @@ class EXPORT_API MessageEvent {
const std::vector<std::weak_ptr<Port>> ports() const;
const std::weak_ptr<Port>& target() const;

~MessageEvent();
virtual bool IsSync() { return false; }

private:
virtual ~MessageEvent();

protected:
MessageEvent(const std::string& data);

struct Internal;
std::unique_ptr<Internal> internal_;
friend class Port;
};

class EXPORT_API MessageEventSync final : public MessageEvent {
public:
static std::shared_ptr<MessageEventSync> New(const std::string& data);
~MessageEventSync();

bool IsSync() override { return true; }

const std::string result() const;
void SetResult(const std::string& result) const;

private:
MessageEventSync(const std::string& data);

struct Internal;
std::unique_ptr<Internal> internal_sync_;
friend class Port;
};

class EXPORT_API Port {
public:
using OnMessageCallback = std::function<void(const MessageEvent*)>;
enum Result {
enum Error {
NoError = 0,
MessageEventQueued,
NoSink,
NoOnMessage,
InvalidMessageEvent,
InvalidPortLoop,
Timeout,
};
using Result = Expected<std::string, Error>;

Result PostMessage(std::shared_ptr<MessageEvent> event);
Result PostMessage(std::shared_ptr<MessageEventSync> event,
int timeout_ms = -1);

void OnMessage(const OnMessageCallback& callback);
void Unref();

Expand All @@ -68,6 +95,8 @@ class EXPORT_API Port {
private:
Port();

Result PostMessageAsync(std::shared_ptr<MessageEvent> event);

struct Internal;
std::unique_ptr<Internal> internal_;
friend struct Channel;
Expand Down
Loading