diff --git a/crates/bevy_remote/src/builtin_methods.rs b/crates/bevy_remote/src/builtin_methods.rs index 1095e39446cfb..0fc096c43599c 100644 --- a/crates/bevy_remote/src/builtin_methods.rs +++ b/crates/bevy_remote/src/builtin_methods.rs @@ -1,6 +1,7 @@ //! Built-in verbs for the Bevy Remote Protocol. use core::any::TypeId; +use std::sync::{Arc, Mutex}; use anyhow::{anyhow, Result as AnyhowResult}; use bevy_dev_tools::schedule_data::serde::ScheduleData; @@ -12,6 +13,7 @@ use bevy_ecs::{ message::MessageCursor, query::QueryBuilder, reflect::{AppTypeRegistry, ReflectComponent, ReflectEvent, ReflectMessage, ReflectResource}, + resource::Resource, schedule::Schedules, system::{In, Local}, world::{EntityRef, EntityWorldMut, FilteredEntityRef, Mut, World}, @@ -98,6 +100,9 @@ pub const BRP_REGISTRY_SCHEMA_METHOD: &str = "registry.schema"; /// The method path for a `schedule.list` request. pub const BRP_SCHEDULE_LIST: &str = "schedule.list"; +/// The method path for a `world.observe+watch` request. +pub const BRP_OBSERVE_METHOD: &str = "world.observe+watch"; + /// The method path for a `schedule.graph` request. pub const BRP_SCHEDULE_GRAPH: &str = "schedule.graph"; @@ -340,6 +345,26 @@ pub struct BrpWriteMessageParams { pub value: Option, } +/// `world.observe+watch`: Registers an observer for the given event type and +/// streams event data back to the client each time the event is triggered. +/// +/// If `entity` is provided, observes only events targeting that entity. +/// Otherwise, observes all global triggers of the event. +/// +/// The server responds with serialized event data when events are observed. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct BrpObserveParams { + /// The [full path] of the event type to observe. + /// + /// [full path]: bevy_reflect::TypePath::type_path + pub event: String, + + /// An optional entity to scope the observer to. + /// When set, only events targeting this entity will be observed. + #[serde(default)] + pub entity: Option, +} + /// `schedule.graph`: /// /// The server responds with [`BrpScheduleGraphResponse`] if the schedule is found, @@ -1527,6 +1552,108 @@ pub fn process_remote_write_message_request( }) } +/// Stores observer state for `world.observe+watch` requests. +#[derive(Resource, Default)] +pub struct BrpEventObservers { + /// Map from observer key to buffered serialized events. + /// The key encodes both event type and optional entity scope. + observers: HashMap>>>, +} + +/// Handles a `world.observe+watch` request coming from a client. +/// +/// On the first call for a given event/entity combination, this registers an observer that captures triggered +/// events. On each subsequent poll, it returns any events that have been captured since the last poll. +/// +/// When `entity` is provided, the observer is scoped to that entity. Otherwise a global observer is registered. +pub fn process_remote_observe_watching_request( + In(params): In>, + world: &mut World, +) -> BrpResult> { + let BrpObserveParams { event, entity } = parse_some(params)?; + + let key = match entity { + Some(e) => format!("{event}@{e}"), + None => event.clone(), + }; + + if !world.contains_resource::() { + world.init_resource::(); + } + + let already_registered = world + .resource::() + .observers + .contains_key(&key); + + if !already_registered { + let app_type_registry = world.resource::().clone(); + let reflect_event = { + let type_registry = app_type_registry.read(); + let Some(registration) = type_registry.get_with_type_path(&event) else { + return Err(BrpError::resource_error(format!( + "Unknown event type: `{event}`" + ))); + }; + let Some(reflect_event) = registration.data::() else { + return Err(BrpError::resource_error(format!( + "Event `{event}` is not reflectable" + ))); + }; + reflect_event.clone() + }; + + let buffer: Arc>> = Arc::new(Mutex::new(Vec::new())); + let buffer_clone = buffer.clone(); + let registry_clone = app_type_registry.clone(); + let event_clone = event.clone(); + + let callback = Box::new(move |event_data: &dyn PartialReflect| { + let reg = registry_clone.read(); + let serializer = ReflectSerializer::new(event_data, ®); + match serde_json::to_value(&serializer) { + Ok(value) => { + buffer_clone.lock().unwrap().push(value); + } + Err(err) => { + warn_once!("Failed to serialize observed event `{event_clone}`: {err}"); + // Push a placeholder so the client still gets notified. + buffer_clone + .lock() + .unwrap() + .push(serde_json::json!({ "event": event_clone })); + } + } + }); + + if let Some(target) = entity { + reflect_event.observe_entity(world, target, callback); + } else { + reflect_event.observe(world, callback); + } + + world + .resource_mut::() + .observers + .insert(key.clone(), buffer); + } + + let observers = world.resource::(); + let Some(buffer) = observers.observers.get(&key) else { + return Err(BrpError::internal(anyhow!("Observer state missing"))); + }; + + let mut events = buffer.lock().unwrap(); + if events.is_empty() { + return Ok(None); + } + + let captured: Vec = events.drain(..).collect(); + serde_json::to_value(captured) + .map(Some) + .map_err(BrpError::internal) +} + /// Handles a `registry.schema` request (list all registry types in form of schema) coming from a client. pub fn export_registry_types(In(params): In>, world: &World) -> BrpResult { let filter: BrpJsonSchemaQueryFilter = match params { @@ -1982,6 +2109,62 @@ mod tests { assert!(world.resource::().0); } + #[test] + fn observe_watching_captures_triggered_events() { + #[derive(Event, Reflect)] + #[reflect(Event)] + struct Ping { + value: u32, + } + + let atr = AppTypeRegistry::default(); + { + let mut register = atr.write(); + register.register::(); + } + let mut world = World::new(); + world.insert_resource(atr); + + let observe_params = serde_json::to_value(&BrpObserveParams { + event: "bevy_remote::builtin_methods::tests::Ping".to_owned(), + entity: None, + }) + .expect("FAIL"); + + assert_eq!( + process_remote_observe_watching_request(In(Some(observe_params.clone())), &mut world,), + Ok(None) + ); + assert!(world.contains_resource::()); + + let trigger_params = serde_json::to_value(&BrpTriggerEventParams { + event: "bevy_remote::builtin_methods::tests::Ping".to_owned(), + value: Some(serde_json::json!({ "value": 42 })), + }) + .expect("FAIL"); + assert_eq!( + process_remote_trigger_event_request(In(Some(trigger_params)), &mut world), + Ok(Null) + ); + + let captured = + process_remote_observe_watching_request(In(Some(observe_params.clone())), &mut world) + .expect("poll should succeed") + .expect("events should be returned"); + let events: Vec = + serde_json::from_value(captured).expect("captured events are a JSON array"); + assert_eq!(events.len(), 1); + let payload = events[0] + .get("bevy_remote::builtin_methods::tests::Ping") + .expect("event keyed by its type path"); + assert_eq!(payload.get("value"), Some(&serde_json::json!(42))); + + assert_eq!( + process_remote_observe_watching_request(In(Some(observe_params)), &mut world), + Ok(None) + ); + } + #[test] fn write_reflect_only_message() { #[derive(Message, Reflect)] diff --git a/crates/bevy_remote/src/lib.rs b/crates/bevy_remote/src/lib.rs index 53a743fee6aa9..ec1f9b3b2c57f 100644 --- a/crates/bevy_remote/src/lib.rs +++ b/crates/bevy_remote/src/lib.rs @@ -765,6 +765,11 @@ impl RemotePlugin { builtin_methods::process_remote_write_message_request, to_main, ) + .with_watching_method( + builtin_methods::BRP_OBSERVE_METHOD, + builtin_methods::process_remote_observe_watching_request, + to_main, + ) .with_method( builtin_methods::BRP_REGISTRY_SCHEMA_METHOD, builtin_methods::export_registry_types, @@ -832,6 +837,7 @@ impl Plugin for RemotePlugin { app.insert_resource(remote_methods) .init_resource::() .init_resource::() + .init_resource::() .add_systems(PreStartup, setup_mailbox_channel) .configure_sets( RemoteLast,