Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions crates/bevy_remote/src/builtin_methods.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Built-in verbs for the Bevy Remote Protocol.

use core::any::TypeId;
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Result as AnyhowResult};
use bevy_dev_tools::schedule_data::serde::ScheduleData;
Expand All @@ -12,6 +13,7 @@ use bevy_ecs::{
message::MessageCursor,
query::QueryBuilder,
reflect::{AppTypeRegistry, ReflectComponent, ReflectEvent, ReflectMessage, ReflectResource},
resource::Resource,
schedule::Schedules,
system::{In, Local},
world::{EntityRef, EntityWorldMut, FilteredEntityRef, Mut, World},
Expand Down Expand Up @@ -98,6 +100,9 @@ pub const BRP_REGISTRY_SCHEMA_METHOD: &str = "registry.schema";
/// The method path for a `schedule.list` request.
pub const BRP_SCHEDULE_LIST: &str = "schedule.list";

/// The method path for a `world.observe+watch` request.
pub const BRP_OBSERVE_METHOD: &str = "world.observe+watch";

/// The method path for a `schedule.graph` request.
pub const BRP_SCHEDULE_GRAPH: &str = "schedule.graph";

Expand Down Expand Up @@ -340,6 +345,26 @@ pub struct BrpWriteMessageParams {
pub value: Option<Value>,
}

/// `world.observe+watch`: Registers an observer for the given event type and
/// streams event data back to the client each time the event is triggered.
///
/// If `entity` is provided, observes only events targeting that entity.
/// Otherwise, observes all global triggers of the event.
///
/// The server responds with serialized event data when events are observed.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct BrpObserveParams {
/// The [full path] of the event type to observe.
///
/// [full path]: bevy_reflect::TypePath::type_path
pub event: String,

/// An optional entity to scope the observer to.
/// When set, only events targeting this entity will be observed.
#[serde(default)]
pub entity: Option<Entity>,
}

/// `schedule.graph`:
///
/// The server responds with [`BrpScheduleGraphResponse`] if the schedule is found,
Expand Down Expand Up @@ -1527,6 +1552,108 @@ pub fn process_remote_write_message_request(
})
}

/// Stores observer state for `world.observe+watch` requests.
#[derive(Resource, Default)]
pub struct BrpEventObservers {
/// Map from observer key to buffered serialized events.
/// The key encodes both event type and optional entity scope.
observers: HashMap<String, Arc<Mutex<Vec<Value>>>>,
}

/// Handles a `world.observe+watch` request coming from a client.
///
/// On the first call for a given event/entity combination, this registers an observer that captures triggered
/// events. On each subsequent poll, it returns any events that have been captured since the last poll.
///
/// When `entity` is provided, the observer is scoped to that entity. Otherwise a global observer is registered.
pub fn process_remote_observe_watching_request(
In(params): In<Option<Value>>,
world: &mut World,
) -> BrpResult<Option<Value>> {
let BrpObserveParams { event, entity } = parse_some(params)?;

let key = match entity {
Some(e) => format!("{event}@{e}"),
None => event.clone(),
};

if !world.contains_resource::<BrpEventObservers>() {
world.init_resource::<BrpEventObservers>();
}

let already_registered = world
.resource::<BrpEventObservers>()
.observers
.contains_key(&key);

if !already_registered {
let app_type_registry = world.resource::<AppTypeRegistry>().clone();
let reflect_event = {
let type_registry = app_type_registry.read();
let Some(registration) = type_registry.get_with_type_path(&event) else {
return Err(BrpError::resource_error(format!(
"Unknown event type: `{event}`"
)));
};
let Some(reflect_event) = registration.data::<ReflectEvent>() else {
return Err(BrpError::resource_error(format!(
"Event `{event}` is not reflectable"
)));
};
reflect_event.clone()
};

let buffer: Arc<Mutex<Vec<Value>>> = Arc::new(Mutex::new(Vec::new()));
let buffer_clone = buffer.clone();
let registry_clone = app_type_registry.clone();
let event_clone = event.clone();

let callback = Box::new(move |event_data: &dyn PartialReflect| {
let reg = registry_clone.read();
let serializer = ReflectSerializer::new(event_data, &reg);
match serde_json::to_value(&serializer) {
Ok(value) => {
buffer_clone.lock().unwrap().push(value);
}
Err(err) => {
warn_once!("Failed to serialize observed event `{event_clone}`: {err}");
// Push a placeholder so the client still gets notified.
buffer_clone
.lock()
.unwrap()
.push(serde_json::json!({ "event": event_clone }));
}
}
});

if let Some(target) = entity {
reflect_event.observe_entity(world, target, callback);
} else {
reflect_event.observe(world, callback);
}

world
.resource_mut::<BrpEventObservers>()
.observers
.insert(key.clone(), buffer);
}

let observers = world.resource::<BrpEventObservers>();
let Some(buffer) = observers.observers.get(&key) else {
return Err(BrpError::internal(anyhow!("Observer state missing")));
};

let mut events = buffer.lock().unwrap();
if events.is_empty() {
return Ok(None);
}

let captured: Vec<Value> = events.drain(..).collect();
serde_json::to_value(captured)
.map(Some)
.map_err(BrpError::internal)
}

/// Handles a `registry.schema` request (list all registry types in form of schema) coming from a client.
pub fn export_registry_types(In(params): In<Option<Value>>, world: &World) -> BrpResult {
let filter: BrpJsonSchemaQueryFilter = match params {
Expand Down Expand Up @@ -1982,6 +2109,62 @@ mod tests {
assert!(world.resource::<TestResult>().0);
}

#[test]
fn observe_watching_captures_triggered_events() {
#[derive(Event, Reflect)]
#[reflect(Event)]
struct Ping {
value: u32,
}

let atr = AppTypeRegistry::default();
{
let mut register = atr.write();
register.register::<Ping>();
}
let mut world = World::new();
world.insert_resource(atr);

let observe_params = serde_json::to_value(&BrpObserveParams {
event: "bevy_remote::builtin_methods::tests::Ping".to_owned(),
entity: None,
})
.expect("FAIL");

assert_eq!(
process_remote_observe_watching_request(In(Some(observe_params.clone())), &mut world,),
Ok(None)
);
assert!(world.contains_resource::<BrpEventObservers>());

let trigger_params = serde_json::to_value(&BrpTriggerEventParams {
event: "bevy_remote::builtin_methods::tests::Ping".to_owned(),
value: Some(serde_json::json!({ "value": 42 })),
})
.expect("FAIL");
assert_eq!(
process_remote_trigger_event_request(In(Some(trigger_params)), &mut world),
Ok(Null)
);

let captured =
process_remote_observe_watching_request(In(Some(observe_params.clone())), &mut world)
.expect("poll should succeed")
.expect("events should be returned");
let events: Vec<Value> =
serde_json::from_value(captured).expect("captured events are a JSON array");
assert_eq!(events.len(), 1);
let payload = events[0]
.get("bevy_remote::builtin_methods::tests::Ping")
.expect("event keyed by its type path");
assert_eq!(payload.get("value"), Some(&serde_json::json!(42)));

assert_eq!(
process_remote_observe_watching_request(In(Some(observe_params)), &mut world),
Ok(None)
);
}

#[test]
fn write_reflect_only_message() {
#[derive(Message, Reflect)]
Expand Down
6 changes: 6 additions & 0 deletions crates/bevy_remote/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,11 @@ impl RemotePlugin {
builtin_methods::process_remote_write_message_request,
to_main,
)
.with_watching_method(
builtin_methods::BRP_OBSERVE_METHOD,
builtin_methods::process_remote_observe_watching_request,
to_main,
)
.with_method(
builtin_methods::BRP_REGISTRY_SCHEMA_METHOD,
builtin_methods::export_registry_types,
Expand Down Expand Up @@ -832,6 +837,7 @@ impl Plugin for RemotePlugin {
app.insert_resource(remote_methods)
.init_resource::<schemas::SchemaTypesMetadata>()
.init_resource::<RemoteWatchingRequests>()
.init_resource::<builtin_methods::BrpEventObservers>()
.add_systems(PreStartup, setup_mailbox_channel)
.configure_sets(
RemoteLast,
Expand Down
Loading