diff --git a/Cargo.lock b/Cargo.lock index 7ea51573..176aad26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,7 +106,7 @@ checksum = "30c5ef0ede93efbf733c1a727f3b6b5a1060bbedd5600183e66f6e4be4af0ec5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -128,7 +128,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -139,7 +139,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -337,7 +337,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -569,7 +569,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -1214,7 +1214,7 @@ dependencies = [ "chrono", "opslang-ast", "peg", - "thiserror", + "thiserror 1.0.58", ] [[package]] @@ -1339,7 +1339,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -1373,14 +1373,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550" dependencies = [ "proc-macro2", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] name = "proc-macro2" -version = "1.0.81" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" dependencies = [ "unicode-ident", ] @@ -1412,7 +1412,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.60", + "syn 2.0.98", "tempfile", ] @@ -1426,7 +1426,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -1500,7 +1500,7 @@ checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" dependencies = [ "getrandom", "libredox", - "thiserror", + "thiserror 1.0.58", ] [[package]] @@ -1625,7 +1625,7 @@ dependencies = [ "quote", "rust-embed-utils", "shellexpand", - "syn 2.0.60", + "syn 2.0.98", "walkdir", ] @@ -1838,7 +1838,7 @@ dependencies = [ "rand", "serde", "serde_json", - "thiserror", + "thiserror 1.0.58", "time", "url", "uuid", @@ -1861,7 +1861,7 @@ checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -2009,9 +2009,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.60" +version = "2.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" +checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" dependencies = [ "proc-macro2", "quote", @@ -2048,7 +2048,16 @@ version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.58", +] + +[[package]] +name = "thiserror" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" +dependencies = [ + "thiserror-impl 2.0.11", ] [[package]] @@ -2059,7 +2068,18 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", ] [[package]] @@ -2155,8 +2175,10 @@ dependencies = [ "serde", "serde_json", "structpack", + "thiserror 2.0.11", "tlmcmddb", "tokio", + "tokio-stream", "tokio-tungstenite", "tonic", "tonic-build", @@ -2206,7 +2228,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -2222,9 +2244,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -2299,7 +2321,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -2419,7 +2441,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -2481,7 +2503,7 @@ dependencies = [ "log", "rand", "sha1", - "thiserror", + "thiserror 1.0.58", "url", "utf-8", ] @@ -2644,7 +2666,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", "wasm-bindgen-shared", ] @@ -2678,7 +2700,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2711,7 +2733,7 @@ checksum = "b7f89739351a2e03cb94beb799d47fb2cac01759b40ec441f7de39b00cbf7ef0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] @@ -2959,7 +2981,7 @@ checksum = "125139de3f6b9d625c39e2efdd73d41bdac468ccd556556440e322be0e1bbd91" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.98", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index bcabb101..998b9bc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ license = "MPL-2.0" [workspace.dependencies] structpack = "1.2" -gaia-stub = "1.2" -gaia-ccsds-c2a = "1.2" -gaia-tmtc = "1.2" +gaia-stub = { path = "gaia-stub" } +gaia-ccsds-c2a = { path = "gaia-ccsds-c2a" } +gaia-tmtc = { path = "gaia-tmtc" } c2a-devtools-frontend = "1.2" diff --git a/gaia-stub/proto/broker.proto b/gaia-stub/proto/broker.proto index 6991bd04..69424fc2 100644 --- a/gaia-stub/proto/broker.proto +++ b/gaia-stub/proto/broker.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package broker; import "tco_tmiv.proto"; +import "google/protobuf/empty.proto"; service Broker { rpc PostCommand(PostCommandRequest) returns (PostCommandResponse); @@ -11,6 +12,13 @@ service Broker { rpc OpenCommandStream(stream CommandStreamRequest) returns (stream CommandStreamResponse); rpc PostTelemetry(PostTelemetryRequest) returns (PostTelemetryResponse); + + rpc PostSetVR(PostSetVRRequest) returns (PostSetVRResponse); + rpc PostUnlock(PostUnlockRequest) returns (PostUnlockResponse); + rpc PostADCommand(PostADCommandRequest) returns (PostADCommandResponse); + rpc ClearAD(ClearADRequest) returns (ClearADResponse); + rpc SubscribeFopFrameEvents(SubscribeFopFrameEventsRequest) returns (stream FopFrameEvent); + rpc GetFopStatus(GetFopStatusRequest) returns (GetFopStatusResponse); } message PostCommandRequest { @@ -52,3 +60,68 @@ message GetLastReceivedTelemetryRequest { message GetLastReceivedTelemetryResponse { tco_tmiv.Tmiv tmiv = 1; } + +message PostSetVRRequest { + uint32 vr = 1; +} + +message PostSetVRResponse { +} + +message PostUnlockRequest { } + +message PostUnlockResponse { } + +message PostADCommandRequest { + tco_tmiv.Tco tco = 1; +} + +message PostADCommandResponse { + bool success = 1; + uint64 frame_id = 2; +} + + +message SubscribeFopFrameEventsRequest { +} + +message FopFrameEvent { + uint64 frame_id = 1; + enum EventType { + TRANSMIT = 0; + ACKNOWLEDGED = 1; + RETRANSMIT = 2; + CANCEL = 3; + }; + EventType event_type = 2; +} + +message ClearADRequest { } + +message ClearADResponse { } + +message GetFopStatusRequest {} + + +message FopState { + message RetransmitState { + uint64 retransmit_count = 1; + } + oneof state { + google.protobuf.Empty initial = 1; + google.protobuf.Empty active = 2; + RetransmitState retransmit = 3; + } +} + +message GetFopStatusResponse { + bool received_clcw = 1; + bool lockout_flag = 2; + bool wait_flag = 3; + bool retransmit_flag = 4; + uint32 next_expected_sequence_number = 5; + + FopState fop_state = 10; + bool has_next_sequence_number = 11; + uint32 next_sequence_number = 12; +} diff --git a/gaia-tmtc/src/broker.rs b/gaia-tmtc/src/broker.rs index d9f429bc..f9ddf017 100644 --- a/gaia-tmtc/src/broker.rs +++ b/gaia-tmtc/src/broker.rs @@ -3,6 +3,7 @@ use std::{fmt::Debug, sync::Arc}; use anyhow::Result; use futures::prelude::*; use gaia_stub::tco_tmiv::Tco; +use std::pin::Pin; use tokio::sync::Mutex; use tokio_stream::wrappers::BroadcastStream; use tonic::{Request, Response, Status, Streaming}; @@ -11,36 +12,87 @@ use super::telemetry::{self, LastTmivStore}; pub use gaia_stub::broker::*; -pub struct BrokerService { +pub struct BrokerService { cmd_handler: Mutex, + fop_command_service: Mutex, tlm_bus: telemetry::Bus, last_tmiv_store: Arc, } -impl BrokerService { +impl BrokerService { pub fn new( cmd_service: C, + fop_command_service: F, tlm_bus: telemetry::Bus, last_tmiv_store: Arc, ) -> Self { Self { cmd_handler: Mutex::new(cmd_service), + fop_command_service: Mutex::new(fop_command_service), tlm_bus, last_tmiv_store, } } } +use async_trait::async_trait; +pub enum FopFrameEvent { + Transmit(u64), + Acknowledged(u64), + Retransmit(u64), + Cancel(u64), +} + +#[derive(Default)] +pub struct ClcwInfo { + pub lockout: bool, + pub wait: bool, + pub retransmit: bool, + pub next_expected_fsn: u8, +} + +pub enum StateSummary { + Initial, + Active, + Retransmit { retransmit_count: u64 }, +} + +pub struct FopStatus { + pub last_clcw: Option, + pub state_summary: StateSummary, + pub next_fsn: Option, +} + +#[async_trait] +pub trait FopCommandService { + async fn send_set_vr(&self, value: u8); + + async fn clear(&self); + + async fn send_unlock(&self); + + async fn send_ad_command(&self, tco: Tco) -> Result; + + async fn subscribe_frame_events( + &self, + ) -> Result + Send + Sync>>>; + + async fn get_fop_status(&self) -> Result; +} + #[tonic::async_trait] -impl broker_server::Broker for BrokerService +impl broker_server::Broker for BrokerService where C: super::Handle> + Send + Sync + 'static, C::Response: Send + 'static, + F: FopCommandService + Send + Sync + 'static, { type OpenCommandStreamStream = stream::BoxStream<'static, Result>; type OpenTelemetryStreamStream = stream::BoxStream<'static, Result>; + type SubscribeFopFrameEventsStream = + stream::BoxStream<'static, Result>; #[tracing::instrument(skip(self))] async fn post_command( @@ -66,6 +118,68 @@ where Ok(Response::new(PostCommandResponse {})) } + #[tracing::instrument(skip(self))] + async fn post_set_vr( + &self, + request: Request, + ) -> Result, tonic::Status> { + let message = request.into_inner(); + let value = message.vr; + self.fop_command_service + .lock() + .await + .send_set_vr(value as _) + .await; + Ok(Response::new(PostSetVrResponse {})) + } + + #[tracing::instrument(skip(self))] + async fn post_unlock( + &self, + _request: Request, + ) -> Result, tonic::Status> { + self.fop_command_service.lock().await.send_unlock().await; + Ok(Response::new(PostUnlockResponse {})) + } + + #[tracing::instrument(skip(self))] + async fn post_ad_command( + &self, + request: Request, + ) -> Result, tonic::Status> { + let message = request.into_inner(); + + let tco = message + .tco + .ok_or_else(|| Status::invalid_argument("tco is required"))?; + + fn internal_error(e: E) -> Status { + Status::internal(format!("{:?}", e)) + } + let id = self + .fop_command_service + .lock() + .await + .send_ad_command(tco) + .await + .map_err(internal_error)?; + + tracing::info!("AD command sent"); + Ok(Response::new(PostAdCommandResponse { + success: true, + frame_id: id, + })) + } + + #[tracing::instrument(skip(self))] + async fn clear_ad( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + self.fop_command_service.lock().await.clear().await; + Ok(Response::new(ClearAdResponse {})) + } + #[tracing::instrument(skip(self))] async fn open_telemetry_stream( &self, @@ -115,4 +229,77 @@ where Err(Status::not_found("not received yet")) } } + + #[tracing::instrument(skip(self))] + async fn subscribe_fop_frame_events( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + use futures::StreamExt; + let stream = self + .fop_command_service + .lock() + .await + .subscribe_frame_events() + .await + .map_err(|_| Status::internal("failed to subscribe frame events"))?; + use gaia_stub::broker::fop_frame_event::EventType; + let stream = stream.map(|e| { + let (frame_id, event_type) = match e { + FopFrameEvent::Transmit(id) => (id, EventType::Transmit), + FopFrameEvent::Acknowledged(id) => (id, EventType::Acknowledged), + FopFrameEvent::Retransmit(id) => (id, EventType::Retransmit), + FopFrameEvent::Cancel(id) => (id, EventType::Cancel), + }; + Ok(gaia_stub::broker::FopFrameEvent { + frame_id, + event_type: event_type.into(), + }) + }); + Ok(Response::new(stream.boxed())) + } + + #[tracing::instrument(skip(self))] + async fn get_fop_status( + &self, + _request: tonic::Request, + ) -> Result, tonic::Status> { + let state = self + .fop_command_service + .lock() + .await + .get_fop_status() + .await + .map_err(|_| Status::internal("failed to get fop state"))?; + let received_clcw = state.last_clcw.is_some(); + let clcw = state.last_clcw.unwrap_or_default(); + + use gaia_stub::broker::{ + fop_state::{RetransmitState, State}, + FopState, + }; + let fop_state = match state.state_summary { + StateSummary::Initial => State::Initial(()), + StateSummary::Active => State::Active(()), + StateSummary::Retransmit { retransmit_count } => { + State::Retransmit(RetransmitState { retransmit_count }) + } + }; + let fop_state = FopState { + state: Some(fop_state), + }; + + let resp = GetFopStatusResponse { + received_clcw, + lockout_flag: clcw.lockout, + wait_flag: clcw.wait, + retransmit_flag: clcw.retransmit, + next_expected_sequence_number: clcw.next_expected_fsn as _, + has_next_sequence_number: state.next_fsn.is_some(), + next_sequence_number: state.next_fsn.unwrap_or_default() as _, + fop_state: Some(fop_state), + }; + + Ok(Response::new(resp)) + } } diff --git a/tmtc-c2a/Cargo.toml b/tmtc-c2a/Cargo.toml index fb3385f3..efbfd0c0 100644 --- a/tmtc-c2a/Cargo.toml +++ b/tmtc-c2a/Cargo.toml @@ -43,6 +43,8 @@ tokio-tungstenite = "0.20.1" itertools = "0.12.1" notalawyer = "0.1.0" notalawyer-clap = "0.1.0" +tokio-stream = { version = "0.1.16", features = ["sync"] } +thiserror = "2.0.11" [build-dependencies] tonic-build = "0.11" diff --git a/tmtc-c2a/src/fop1.rs b/tmtc-c2a/src/fop1.rs new file mode 100644 index 00000000..231444c6 --- /dev/null +++ b/tmtc-c2a/src/fop1.rs @@ -0,0 +1,408 @@ +use anyhow::Result; +use gaia_ccsds_c2a::ccsds::tc::{self, clcw::CLCW}; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::broadcast; + +fn wrapping_le(a: u8, b: u8) -> bool { + let diff = b.wrapping_sub(a); + diff < 128 +} + +fn wrapping_lt(a: u8, b: u8) -> bool { + a != b && wrapping_le(a, b) +} + +fn remove_acknowledged_frames( + queue: &mut VecDeque, + acknowledged_fsn: u8, + on_acknowledge: impl Fn(u64), +) -> usize { + let mut ack_count = 0; + while !queue.is_empty() { + let front = queue.front().unwrap(); + if wrapping_lt(front.sequence_number, acknowledged_fsn) { + ack_count += 1; + let frame = queue.pop_front().unwrap().frame; + on_acknowledge(frame.id); + } else { + break; + } + } + ack_count +} + +#[derive(Clone, Copy)] +pub(crate) struct FarmState { + pub(crate) next_expected_fsn: u8, + pub(crate) lockout: bool, + pub(crate) wait: bool, + pub(crate) retransmit: bool, +} + +enum FopState { + Active(ActiveState), + Retransmit(RetransmitState), + Initial { expected_nr: Option }, +} + +pub enum FopStateSummary { + Active, + Retransmit { retransmit_count: u64 }, + Initial, +} + +struct SentFrame { + frame: Arc, + sent_at: std::time::Instant, + sequence_number: u8, +} + +struct ActiveState { + next_fsn: u8, + sent_queue: VecDeque, +} + +struct RetransmitState { + next_fsn: u8, + retransmit_count: usize, + retransmit_sent_queue: VecDeque, + retransmit_wait_queue: VecDeque, +} + +impl ActiveState { + fn acknowledge(&mut self, acknowledged_fsn: u8, on_acknowledge: impl Fn(u64)) { + remove_acknowledged_frames(&mut self.sent_queue, acknowledged_fsn, on_acknowledge); + } + + fn send( + &mut self, + next_frame_id: &mut u64, + data_field: Vec, + on_transmit: impl Fn(u64), + ) -> Option> { + let fsn = self.next_fsn; + self.next_fsn = self.next_fsn.wrapping_add(1); + let frame = Frame { + id: *next_frame_id, + frame_type: tc::sync_and_channel_coding::FrameType::TypeAD, + sequence_number: fsn, + data_field, + }; + *next_frame_id += 1; + let frame = Arc::new(frame); + on_transmit(frame.id); + self.sent_queue.push_back(SentFrame { + frame: frame.clone(), + sent_at: std::time::Instant::now(), + sequence_number: fsn, + }); + + Some(frame) + } + + fn timeout(&self) -> bool { + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + if let Some(head) = self.sent_queue.front() { + if head.sent_at.elapsed() > TIMEOUT { + return true; + } + } + false + } +} + +impl RetransmitState { + fn acknowledge( + &mut self, + acknowledged_fsn: u8, + retransmit: bool, + on_acknowledge: impl Fn(u64), + ) -> bool { + let ack_count = remove_acknowledged_frames( + &mut self.retransmit_wait_queue, + acknowledged_fsn, + &on_acknowledge, + ) + remove_acknowledged_frames( + &mut self.retransmit_sent_queue, + acknowledged_fsn, + &on_acknowledge, + ); + if ack_count > 0 { + self.retransmit_count = 0; + } + + if !retransmit { + return self.retransmit_wait_queue.is_empty() && self.retransmit_sent_queue.is_empty(); + } + + if ack_count > 0 { + self.redo_retransmit(); + } + false + } + + fn redo_retransmit(&mut self) { + self.retransmit_count += 1; + // prepend sent_queue to wait_queue + // but the library doesn't provide "prepend" method... + self.retransmit_sent_queue + .append(&mut self.retransmit_wait_queue); + std::mem::swap( + &mut self.retransmit_sent_queue, + &mut self.retransmit_wait_queue, + ); + } + + fn update(&mut self) -> Option> { + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + if let Some(head) = self.retransmit_sent_queue.front() { + if head.sent_at.elapsed() > TIMEOUT { + self.redo_retransmit(); + } + } + + let mut next_retransmit = self.retransmit_wait_queue.pop_front()?; + let frame = next_retransmit.frame.clone(); + next_retransmit.sent_at = std::time::Instant::now(); + self.retransmit_sent_queue.push_back(next_retransmit); + Some(frame) + } +} + +pub(crate) struct Fop { + next_frame_id: u64, + state: FopState, + last_received_farm_state: Option, + event_sender: broadcast::Sender, +} + +impl Fop { + pub(crate) fn new() -> Self { + let (event_sender, _) = broadcast::channel(16); + Self { + next_frame_id: 0, + state: FopState::Initial { expected_nr: None }, + last_received_farm_state: None, + event_sender, + } + } + + pub(crate) fn last_received_farm_state(&self) -> Option<&FarmState> { + self.last_received_farm_state.as_ref() + } + + pub(crate) fn next_fsn(&self) -> Option { + match &self.state { + FopState::Initial { expected_nr } => *expected_nr, + FopState::Active(state) => Some(state.next_fsn), + FopState::Retransmit(state) => Some(state.next_fsn), + } + } + + pub(crate) fn state_summary(&self) -> FopStateSummary { + match &self.state { + FopState::Active(_) => FopStateSummary::Active, + FopState::Retransmit(s) => FopStateSummary::Retransmit { + retransmit_count: s.retransmit_count as u64, + }, + FopState::Initial { .. } => FopStateSummary::Initial, + } + } + + pub(crate) fn subscribe_frame_events(&self) -> broadcast::Receiver { + self.event_sender.subscribe() + } + + pub(crate) async fn handle_clcw(&mut self, clcw: CLCW) -> Result<()> { + tracing::debug!("Received CLCW: {:?}", clcw); + let farm_state = FarmState { + next_expected_fsn: clcw.report_value(), + lockout: clcw.lockout() != 0, + wait: clcw.wait() != 0, + retransmit: clcw.retransmit() != 0, + }; + self.last_received_farm_state = Some(farm_state); + + let on_acknowledge = |frame_id| { + self.event_sender + .send(FrameEvent::Acknowledged(frame_id)) + .ok(); + }; + + match &mut self.state { + FopState::Initial { expected_nr } => { + if Some(farm_state.next_expected_fsn) == *expected_nr && !farm_state.lockout { + tracing::info!("FOP initialized"); + self.state = FopState::Active(ActiveState { + next_fsn: farm_state.next_expected_fsn, + sent_queue: VecDeque::new(), + }); + } + } + FopState::Active(state) => { + state.acknowledge(farm_state.next_expected_fsn, on_acknowledge); + if farm_state.retransmit { + self.state = FopState::Retransmit(RetransmitState { + next_fsn: state.next_fsn, + retransmit_count: 1, + retransmit_sent_queue: VecDeque::new(), + retransmit_wait_queue: std::mem::take(&mut state.sent_queue), + }); + } + } + FopState::Retransmit(state) => { + let completed = state.acknowledge( + farm_state.next_expected_fsn, + farm_state.retransmit, + on_acknowledge, + ); + if completed { + self.state = FopState::Active(ActiveState { + next_fsn: state.next_fsn, + sent_queue: VecDeque::new(), + }); + } + } + } + + if !farm_state.lockout { + return Ok(()); + } + + //lockout + let mut canceled_frames = VecDeque::new(); + match &mut self.state { + FopState::Initial { .. } => { + // do nothing + } + FopState::Active(state) => { + canceled_frames.append(&mut state.sent_queue); + self.state = FopState::Initial { + expected_nr: Some(state.next_fsn), + }; + } + FopState::Retransmit(state) => { + canceled_frames.append(&mut state.retransmit_sent_queue); + canceled_frames.append(&mut state.retransmit_wait_queue); + self.state = FopState::Initial { + expected_nr: Some(state.next_fsn), + }; + } + } + + for frame in canceled_frames { + self.event_sender + .send(FrameEvent::Cancel(frame.frame.id)) + .ok(); + } + + Ok(()) + } + + pub(crate) fn clear(&mut self) { + let mut canceled_frames = VecDeque::new(); + match &mut self.state { + FopState::Initial { .. } => { + // forget the previous setvr command + // do nothing + } + FopState::Active(state) => { + canceled_frames.append(&mut state.sent_queue); + } + FopState::Retransmit(state) => { + canceled_frames.append(&mut state.retransmit_sent_queue); + canceled_frames.append(&mut state.retransmit_wait_queue); + } + } + + for frame in canceled_frames { + self.event_sender + .send(FrameEvent::Cancel(frame.frame.id)) + .ok(); + } + + self.state = FopState::Initial { expected_nr: None }; + } + + pub(crate) fn set_vr(&mut self, vr: u8) -> Option { + tracing::info!("Setting VR to {}", vr); + self.clear(); + + self.state = FopState::Initial { + expected_nr: Some(vr), + }; + let frame = Frame { + //TODO: manage BC retransmission and frame id for setvr command + //id: self.next_frame_id, + id: 0, + frame_type: tc::sync_and_channel_coding::FrameType::TypeBC, + // TODO: frame number of setvr command??? + sequence_number: 0, + data_field: vec![0x82, 0x00, vr], + }; + Some(frame) + } + + pub(crate) fn unlock(&mut self) -> Option { + let frame = Frame { + //TODO: manage BC retransmission and frame id for setvr command + //id: self.next_frame_id, + id: 0, + frame_type: tc::sync_and_channel_coding::FrameType::TypeBC, + // TODO: frame number of setvr command??? + sequence_number: 0, + data_field: vec![0x00], + }; + Some(frame) + } + + pub(crate) fn send_ad(&mut self, data_field: Vec) -> Option> { + let state = match &mut self.state { + FopState::Active(state) => state, + _ => return None, + }; + + state.send(&mut self.next_frame_id, data_field, |frame_id| { + self.event_sender.send(FrameEvent::Transmit(frame_id)).ok(); + }) + } + + pub(crate) fn update(&mut self) -> Option> { + if let FopState::Active(state) = &mut self.state { + if state.timeout() { + self.state = FopState::Retransmit(RetransmitState { + next_fsn: state.next_fsn, + retransmit_count: 1, + retransmit_sent_queue: VecDeque::new(), + retransmit_wait_queue: std::mem::take(&mut state.sent_queue), + }); + } + } + + let frame = match &mut self.state { + FopState::Retransmit(state) => state.update(), + _ => None, + }; + let frame = frame?; + self.event_sender + .send(FrameEvent::Retransmit(frame.id)) + .ok(); + Some(frame) + } +} + +pub struct Frame { + pub id: u64, + pub frame_type: tc::sync_and_channel_coding::FrameType, + pub sequence_number: u8, + pub data_field: Vec, +} + +#[derive(Debug, Clone)] +pub enum FrameEvent { + Transmit(u64), + Acknowledged(u64), + Retransmit(u64), + Cancel(u64), +} diff --git a/tmtc-c2a/src/lib.rs b/tmtc-c2a/src/lib.rs index 0093a2c3..d68aca11 100644 --- a/tmtc-c2a/src/lib.rs +++ b/tmtc-c2a/src/lib.rs @@ -1,5 +1,6 @@ mod satconfig; pub use satconfig::Satconfig; +mod fop1; pub mod kble_gs; pub mod proto; pub mod registry; diff --git a/tmtc-c2a/src/main.rs b/tmtc-c2a/src/main.rs index 51638393..e66c66e6 100644 --- a/tmtc-c2a/src/main.rs +++ b/tmtc-c2a/src/main.rs @@ -128,7 +128,7 @@ async fn main() -> Result<()> { let (link, socket) = kble_gs::new(); let kble_socket_fut = socket.serve((args.kble_addr, args.kble_port)); - let (satellite_svc, sat_tlm_reporter) = satellite::new( + let (satellite_svc, fop_cmd_service, sat_tlm_reporter) = satellite::new( satconfig.aos_scid, satconfig.tc_scid, tlm_registry, @@ -144,7 +144,8 @@ async fn main() -> Result<()> { // Constructing gRPC services let server_task = { - let broker_service = BrokerService::new(cmd_handler, tlm_bus, last_tmiv_store); + let broker_service = + BrokerService::new(cmd_handler, fop_cmd_service, tlm_bus, last_tmiv_store); let broker_server = BrokerServer::new(broker_service); let tmtc_generic_c2a_server = TmtcGenericC2aServer::new(tmtc_generic_c2a_service); diff --git a/tmtc-c2a/src/satellite.rs b/tmtc-c2a/src/satellite.rs index c8864986..59393d88 100644 --- a/tmtc-c2a/src/satellite.rs +++ b/tmtc-c2a/src/satellite.rs @@ -1,4 +1,5 @@ -use std::{sync::Arc, time}; +use std::{pin::Pin, sync::Arc, time}; +use tokio::sync::Mutex; use crate::{ registry::{CommandRegistry, FatCommandSchema, TelemetryRegistry}, @@ -120,7 +121,7 @@ impl<'a> CommandContext<'a> { #[derive(Clone)] pub struct Service { - sync_and_channel_coding: T, + sync_and_channel_coding: Arc>, registry: Arc, tc_scid: u16, } @@ -138,7 +139,8 @@ where fat_schema, tco, }; - ctx.transmit_to(&mut self.sync_and_channel_coding).await?; + ctx.transmit_to(&mut *self.sync_and_channel_coding.lock().await) + .await?; Ok(true) } } @@ -151,21 +153,29 @@ pub fn new( cmd_registry: impl Into>, receiver: R, transmitter: T, -) -> (Service, TelemetryReporter) +) -> (Service, FopCommandService, TelemetryReporter) where - T: tc::SyncAndChannelCoding, + T: tc::SyncAndChannelCoding + Send + 'static, R: aos::SyncAndChannelCoding, { + let registry = cmd_registry.into(); + let transmitter = Arc::new(Mutex::new(transmitter)); + let fop = crate::fop1::Fop::new(); + let fop = Arc::new(Mutex::new(fop)); + let fop_command_service = + FopCommandService::start(fop.clone(), tc_scid, transmitter.clone(), registry.clone()); ( Service { tc_scid, sync_and_channel_coding: transmitter, - registry: cmd_registry.into(), + registry, }, + fop_command_service, TelemetryReporter { aos_scid, receiver, tmiv_builder: TmivBuilder { tlm_registry }, + fop, }, ) } @@ -187,6 +197,7 @@ pub struct TelemetryReporter { aos_scid: u16, tmiv_builder: TmivBuilder, receiver: R, + fop: Arc>, } impl TelemetryReporter @@ -211,12 +222,23 @@ where ); continue; }; + let incoming_scid = tf.primary_header.scid(); if incoming_scid != self.aos_scid { warn!("unknown SCID: {incoming_scid}"); continue; } let vcid = tf.primary_header.vcid(); + + //TODO: filter by vcid?? + { + let clcw = tf.trailer.into_ref().clone(); + let mut fop = self.fop.lock().await; + if let Err(e) = fop.handle_clcw(clcw).await { + error!("failed to handle CLCW: {:?}", e); + } + } + let channel = demuxer.demux(vcid); let frame_count = tf.primary_header.frame_count(); if let Err(expected) = channel.synchronizer.next(frame_count) { @@ -233,6 +255,7 @@ where channel.defragmenter.reset(); continue; } + while let Some((space_packet_bytes, space_packet)) = channel.defragmenter.read_as_bytes_and_packet() { @@ -265,3 +288,248 @@ where } } } + +pub struct FopCommandService { + transmitter: Arc>, + tc_scid: u16, + fop: Arc>, + registry: Arc, +} + +impl FopCommandService { + pub(crate) fn start( + fop: Arc>, + tc_scid: u16, + transmitter: Arc>, + registry: Arc, + ) -> Self { + let service = Self { + transmitter, + tc_scid, + fop, + registry, + }; + + tokio::spawn(Self::run_update( + tc_scid, + service.transmitter.clone(), + service.fop.clone(), + )); + service + } + + async fn run_update( + tc_scid: u16, + transmitter: Arc>, + fop: Arc>, + ) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + //tracing::debug!("FopCommandService: update"); + while let Some(frame) = fop.lock().await.update() { + tracing::debug!( + "FopCommandService: retransmitting {}", + frame.sequence_number + ); + let mut transmitter = transmitter.lock().await; + let vcid = 0; + let _ = transmitter + .transmit( + tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + } + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum SendAdCommandError { + #[error("FOP is not ready")] + FopNotReady, + #[error("unknown command: {0}")] + UnknownCommand(String), + #[error("other error: {0}")] + Internal(anyhow::Error), +} + +impl FopCommandService { + pub async fn send_set_vr(&self, vr: u8) { + let frame = { + let mut fop = self.fop.lock().await; + let frame = fop.set_vr(vr); + match frame { + Some(frame) => frame, + None => { + //TODO: return error? + return; + } + } + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + } + + pub async fn send_unlock(&self) { + let frame = { + let mut fop = self.fop.lock().await; + let frame = fop.unlock(); + match frame { + Some(frame) => frame, + None => { + //TODO: return error? + return; + } + } + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + //transmitter. + } + + pub async fn send_ad_command(&self, tco: Tco) -> Result { + let Some(fat_schema) = self.registry.lookup(&tco.name) else { + return Err(SendAdCommandError::UnknownCommand(tco.name.clone())); + }; + let ctx = CommandContext { + tc_scid: 0, // dummy + fat_schema, + tco: &tco, + }; + let mut buf = vec![0u8; 1017]; // FIXME: hard-coded max size + let len = ctx + .build_tc_segment(&mut buf) + .map_err(SendAdCommandError::Internal)?; + buf.truncate(len); + + let mut fop = self.fop.lock().await; + let frame = match fop.send_ad(buf) { + None => { + return Err(SendAdCommandError::FopNotReady); + } + Some(frame) => frame, + }; + + let vcid = 0; + let mut transmitter = self.transmitter.lock().await; + let _ = transmitter + .transmit( + self.tc_scid, + vcid, + frame.frame_type, + frame.sequence_number, + &frame.data_field, + ) + .await; + + Ok(frame.id) + } + + pub async fn clear(&self) { + self.fop.lock().await.clear(); + } + + pub async fn subscribe_frame_events( + &self, + ) -> Result + Send + Sync>>> + { + use tokio_stream::StreamExt; + let rx = self.fop.lock().await.subscribe_frame_events(); + let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|e| { + use crate::fop1::FrameEvent; + use gaia_tmtc::broker::FopFrameEvent; + let e = e.ok()?; + let e = match e { + FrameEvent::Transmit(id) => FopFrameEvent::Transmit(id), + FrameEvent::Acknowledged(id) => FopFrameEvent::Acknowledged(id), + FrameEvent::Retransmit(id) => FopFrameEvent::Retransmit(id), + FrameEvent::Cancel(id) => FopFrameEvent::Cancel(id), + }; + Some(e) + }); + Ok(Box::pin(stream)) + } + + pub async fn get_fop_status(&self) -> Result { + let fop = self.fop.lock().await; + let last_clcw = fop + .last_received_farm_state() + .map(|s| gaia_tmtc::broker::ClcwInfo { + lockout: s.lockout, + wait: s.wait, + retransmit: s.retransmit, + next_expected_fsn: s.next_expected_fsn as _, + }); + use crate::fop1::FopStateSummary; + use gaia_tmtc::broker::StateSummary; + let state_summary = match fop.state_summary() { + FopStateSummary::Initial => StateSummary::Initial, + FopStateSummary::Active => StateSummary::Active, + FopStateSummary::Retransmit { retransmit_count } => { + StateSummary::Retransmit { retransmit_count } + } + }; + let next_fsn = fop.next_fsn(); + Ok(gaia_tmtc::broker::FopStatus { + last_clcw, + state_summary, + next_fsn, + }) + } +} + +#[async_trait] +impl gaia_tmtc::broker::FopCommandService + for FopCommandService +{ + async fn send_set_vr(&self, vr: u8) { + self.send_set_vr(vr).await + } + + async fn send_unlock(&self) { + self.send_unlock().await + } + + async fn send_ad_command(&self, tco: Tco) -> Result { + self.send_ad_command(tco).await.map_err(Into::into) + } + + async fn clear(&self) { + self.clear().await + } + + async fn subscribe_frame_events( + &self, + ) -> Result + Send + Sync>>> + { + self.subscribe_frame_events().await + } + + async fn get_fop_status(&self) -> Result { + self.get_fop_status().await + } +}