From 9f2a5b6729516e019b5f3800db735042433497d2 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 14:04:39 +0100 Subject: [PATCH 01/27] add async processor --- Cargo.lock | 95 +++++++++++++++++++++++ Cargo.toml | 8 ++ README.md | 2 + examples/parallel_async.rs | 134 +++++++++++++++++++++++++++++++++ src/lib.rs | 4 + src/processor_async.rs | 149 +++++++++++++++++++++++++++++++++++++ 6 files changed, 392 insertions(+) create mode 100644 examples/parallel_async.rs create mode 100644 src/processor_async.rs diff --git a/Cargo.lock b/Cargo.lock index 17f2bc6..b43c40d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,6 +34,7 @@ version = "0.15.1" dependencies = [ "aic-sdk-sys", "approx", + "futures", "hound", "serde", "serde_json", @@ -312,6 +313,94 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -822,6 +911,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 30fef73..983d245 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,15 +36,18 @@ serde = { workspace = true, optional = true, features = ["derive"] } serde_json = { workspace = true, optional = true } sha2 = { workspace = true, optional = true } thiserror = { workspace = true } +tokio = { version = "1.49", optional = true, features = ["rt-multi-thread", "sync"] } ureq = { workspace = true, optional = true, features = ["rustls"] } [dev-dependencies] approx = "0.5" +futures = "0.3" hound = "3.5" serde_json = { workspace = true } tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "sync", "time"] } [features] +async = ["dep:tokio"] download-lib = ["aic-sdk-sys/download-lib"] download-model = ["dep:serde", "dep:serde_json", "dep:sha2", "dep:ureq"] @@ -55,3 +58,8 @@ path = "examples/basic_usage.rs" [[example]] name = "benchmark" path = "examples/benchmark.rs" + +[[example]] +name = "parallel_async" +path = "examples/parallel_async.rs" +required-features = ["async", "download-model"] diff --git a/README.md b/README.md index 49e7cd2..71ee313 100644 --- a/README.md +++ b/README.md @@ -211,11 +211,13 @@ See the example files for complete working examples: - [`examples/basic_usage.rs`](examples/basic_usage.rs) - Basic usage example - [`examples/build-time-download`](examples/build-time-download) - Download and embed models at compile-time - [`examples/benchmark.rs`](examples/benchmark.rs) - Run multiple processor instances concurrently until the real-time requirements are not met +- [`examples/parallel_async.rs`](examples/parallel_async.rs) - Demonstrates parallel processing with `ProcessorAsync`: runs N processors concurrently via `tokio::join_all` and prints each processor's individual time alongside a sequential baseline to confirm the speedup Run examples with: ```bash export AIC_SDK_LICENSE="your_license_key_here" cargo run --example basic_usage --features download-lib,download-model +cargo run --example parallel_async --features download-lib,download-model,async ``` ## Documentation diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs new file mode 100644 index 0000000..3d6cc3b --- /dev/null +++ b/examples/parallel_async.rs @@ -0,0 +1,134 @@ +#![cfg_attr( + not(all(feature = "download-model", feature = "async")), + allow(dead_code, unused_imports) +)] + +#[cfg(not(all(feature = "download-model", feature = "async")))] +fn main() -> Result<(), Box> { + Err("Enable the `download-model` and `async` features to run this example.".into()) +} + +/// Demonstrates that multiple [`ProcessorAsync`] instances genuinely run in +/// parallel when awaited concurrently. +/// +/// Each processor records its own wall-clock processing time. If they ran +/// sequentially, the total elapsed time would be roughly `N × per-processor +/// time`. When running in parallel the total time is close to the slowest +/// single processor, which is what we verify and print. +#[cfg(all(feature = "download-model", feature = "async"))] +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<(), Box> { + use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; + use std::{env, sync::Arc, time::Instant}; + + const MODEL: &str = "quail-vf-2.0-l-16khz"; + const NUM_PROCESSORS: usize = 4; + // Number of process calls per processor – enough to make timing visible. + const ITERATIONS: usize = 50; + + println!("ai-coustics SDK version: {}", aic_sdk::get_sdk_version()); + + let license = env::var("AIC_SDK_LICENSE").expect("AIC_SDK_LICENSE not set"); + + let model_path = Model::download(MODEL, "target")?; + let model = Arc::new(Model::from_file(&model_path)?); + println!("Model loaded from {}", model_path.display()); + + let config = ProcessorConfig::optimal(&model); + println!( + "Config: {} Hz, {} frames/buffer, {} channel(s)\n", + config.sample_rate, config.num_frames, config.num_channels + ); + + // ------------------------------------------------------------------------- + // Build all processors upfront so initialization is not part of the timed + // section. + // ------------------------------------------------------------------------- + let mut processors: Vec = Vec::with_capacity(NUM_PROCESSORS); + for _ in 0..NUM_PROCESSORS { + let p = ProcessorAsync::with_config(&model, &license, &config).await?; + processors.push(p); + } + + println!( + "Running {} processors × {} iterations each", + NUM_PROCESSORS, ITERATIONS + ); + + // ------------------------------------------------------------------------- + // Sequential baseline – process each processor one after the other. + // ------------------------------------------------------------------------- + let buf_len = config.num_channels as usize * config.num_frames; + + let sequential_start = Instant::now(); + for p in &processors { + let mut audio = vec![0.0f32; buf_len]; + for _ in 0..ITERATIONS { + p.process_interleaved(&mut audio).await?; + } + } + let sequential_elapsed = sequential_start.elapsed(); + + println!( + "Sequential total: {:>8.1} ms", + sequential_elapsed.as_secs_f64() * 1000.0 + ); + + // ------------------------------------------------------------------------- + // Parallel run – drive all processors concurrently with tokio::join_all. + // Each task times itself and returns its own elapsed duration. + // ------------------------------------------------------------------------- + let parallel_start = Instant::now(); + + let tasks: Vec<_> = processors + .iter() + .enumerate() + .map(|(id, p)| { + let config = config.clone(); + async move { + let mut audio = vec![0.0f32; config.num_channels as usize * config.num_frames]; + let t0 = Instant::now(); + for _ in 0..ITERATIONS { + p.process_interleaved(&mut audio).await?; + } + let elapsed = t0.elapsed(); + println!( + " Processor {:>2} finished in {:>8.1} ms", + id + 1, + elapsed.as_secs_f64() * 1000.0, + ); + Ok::<_, aic_sdk::AicError>(elapsed) + } + }) + .collect(); + + let results = futures::future::try_join_all(tasks).await?; + let parallel_elapsed = parallel_start.elapsed(); + + let max_individual = results.iter().max().copied().unwrap_or_default(); + + println!( + "\nParallel wall-clock: {:>8.1} ms", + parallel_elapsed.as_secs_f64() * 1000.0 + ); + println!( + "Slowest processor: {:>8.1} ms", + max_individual.as_secs_f64() * 1000.0, + ); + + let speedup = sequential_elapsed.as_secs_f64() / parallel_elapsed.as_secs_f64(); + println!( + "\nSpeedup vs sequential: {:.2}x (ideal ≈ {}x)", + speedup, NUM_PROCESSORS + ); + println!( + "{}", + if speedup > 1.5 { + "Parallel execution confirmed." + } else { + "Warning: low speedup – are you running with a multi-thread runtime?" + } + ); + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index a2d035d..d617003 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ use aic_sdk_sys::{aic_get_compatible_model_version, aic_get_sdk_version, aic_set_sdk_wrapper_id}; use std::ffi::CStr; +#[cfg(feature = "async")] +mod processor_async; #[cfg(feature = "download-model")] mod download; mod error; @@ -9,6 +11,8 @@ mod model; mod processor; mod vad; +#[cfg(feature = "async")] +pub use processor_async::*; pub use error::*; pub use model::*; pub use processor::*; diff --git a/src/processor_async.rs b/src/processor_async.rs new file mode 100644 index 0000000..b255031 --- /dev/null +++ b/src/processor_async.rs @@ -0,0 +1,149 @@ +use crate::{AicError, Model, Processor, ProcessorConfig, ProcessorContext, VadContext}; +use std::sync::Arc; +use tokio::sync::Mutex; + +/// An async wrapper around [`Processor`] for use in async/await contexts. +/// +/// This requires a **multi-threaded** runtime (e.g. `#[tokio::main]`). +/// +/// # Example +/// +/// ```rust,no_run +/// use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; +/// #[tokio::main] +/// async fn main() -> Result<(), aic_sdk::AicError> { +/// let license_key = std::env::var("AIC_SDK_LICENSE").unwrap(); +/// let model = Model::from_file("/path/to/model.aicmodel")?; +/// let config = ProcessorConfig::optimal(&model).with_num_channels(2); +/// +/// let processor = ProcessorAsync::new(&model, &license_key)?; +/// processor.initialize(&config).await?; +/// +/// let mut audio = vec![0.0f32; config.num_channels as usize * config.num_frames]; +/// processor.process_interleaved(&mut audio).await?; +/// Ok(()) +/// } +/// ``` +pub struct ProcessorAsync { + inner: Arc>>, +} + +impl ProcessorAsync { + /// Creates a new async audio enhancement processor instance. + /// + /// See [`Processor::new`] for details. + pub fn new(model: &Model<'_>, license_key: &str) -> Result { + let processor = Processor::new(model, license_key)?; + // SAFETY: The C library internally reference-counts the model weights, + // so the Processor remains valid even after the Rust Model is dropped. + let processor: Processor<'static> = unsafe { std::mem::transmute(processor) }; + Ok(Self { + inner: Arc::new(Mutex::new(processor)), + }) + } + + /// Creates a new async processor and initializes it with the given configuration. + /// + /// This is a convenience method combining [`ProcessorAsync::new`] and + /// [`ProcessorAsync::initialize`]. + pub async fn with_config( + model: &Model<'_>, + license_key: &str, + config: &ProcessorConfig, + ) -> Result { + let this = Self::new(model, license_key)?; + this.initialize(config).await?; + Ok(this) + } + + /// Initializes the processor with the given configuration. + /// + /// See [`Processor::initialize`] for details. + /// + /// # Warning + /// This allocates memory internally. Do not call from latency-sensitive paths. + pub async fn initialize(&self, config: &ProcessorConfig) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let config = config.clone(); + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + processor.initialize(&config) + }) + .await + .expect("spawn_blocking task panicked") + } + + /// Processes audio with interleaved channel data. + /// + /// See [`Processor::process_interleaved`] for details on the memory layout. + pub async fn process_interleaved(&self, audio: &mut [f32]) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let audio_addr = audio.as_mut_ptr() as usize; + let audio_len = audio.len(); + + // SAFETY: We `.await` the JoinHandle immediately, so `audio` outlives + // the blocking task. The Mutex guarantees exclusive Processor access. + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + let audio = + unsafe { std::slice::from_raw_parts_mut(audio_addr as *mut f32, audio_len) }; + processor.process_interleaved(audio) + }) + .await + .expect("spawn_blocking task panicked") + } + + /// Processes audio with separate buffers for each channel (planar layout). + /// + /// See [`Processor::process_planar`] for details on the memory layout. + pub async fn process_planar>(&self, audio: &mut [V]) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let audio_addr = audio.as_mut_ptr() as usize; + let audio_len = audio.len(); + + // SAFETY: Same as process_interleaved. + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + let audio: &mut [V] = + unsafe { std::slice::from_raw_parts_mut(audio_addr as *mut V, audio_len) }; + processor.process_planar(audio) + }) + .await + .expect("spawn_blocking task panicked") + } + + /// Processes audio with sequential channel data. + /// + /// See [`Processor::process_sequential`] for details on the memory layout. + pub async fn process_sequential(&self, audio: &mut [f32]) -> Result<(), AicError> { + let inner = Arc::clone(&self.inner); + let audio_addr = audio.as_mut_ptr() as usize; + let audio_len = audio.len(); + + // SAFETY: Same as process_interleaved. + tokio::task::spawn_blocking(move || { + let mut processor = inner.blocking_lock(); + let audio = + unsafe { std::slice::from_raw_parts_mut(audio_addr as *mut f32, audio_len) }; + processor.process_sequential(audio) + }) + .await + .expect("spawn_blocking task panicked") + } + + /// Creates a [`ProcessorContext`] for real-time parameter control. + /// + /// See [`Processor::processor_context`] for details. + pub fn processor_context(&self) -> ProcessorContext { + let processor = self.inner.blocking_lock(); + processor.processor_context() + } + + /// Creates a [`VadContext`] for voice activity detection. + /// + /// See [`Processor::vad_context`] for details. + pub fn vad_context(&self) -> VadContext { + let processor = self.inner.blocking_lock(); + processor.vad_context() + } +} From 9c9ba0c9393210a8eb5f5257ffc9dc65faac3531 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 14:04:43 +0100 Subject: [PATCH 02/27] Update parallel_async.rs --- examples/parallel_async.rs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index 3d6cc3b..9c0b6cb 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -1,13 +1,3 @@ -#![cfg_attr( - not(all(feature = "download-model", feature = "async")), - allow(dead_code, unused_imports) -)] - -#[cfg(not(all(feature = "download-model", feature = "async")))] -fn main() -> Result<(), Box> { - Err("Enable the `download-model` and `async` features to run this example.".into()) -} - /// Demonstrates that multiple [`ProcessorAsync`] instances genuinely run in /// parallel when awaited concurrently. /// @@ -15,7 +5,6 @@ fn main() -> Result<(), Box> { /// sequentially, the total elapsed time would be roughly `N × per-processor /// time`. When running in parallel the total time is close to the slowest /// single processor, which is what we verify and print. -#[cfg(all(feature = "download-model", feature = "async"))] #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; From a9213171385bf92fe199851a750a151869891c3a Mon Sep 17 00:00:00 2001 From: steckes Date: Thu, 5 Mar 2026 17:11:17 +0100 Subject: [PATCH 03/27] fmt --- src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d617003..35d72ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,20 +2,20 @@ use aic_sdk_sys::{aic_get_compatible_model_version, aic_get_sdk_version, aic_set_sdk_wrapper_id}; use std::ffi::CStr; -#[cfg(feature = "async")] -mod processor_async; #[cfg(feature = "download-model")] mod download; mod error; mod model; mod processor; +#[cfg(feature = "async")] +mod processor_async; mod vad; -#[cfg(feature = "async")] -pub use processor_async::*; pub use error::*; pub use model::*; pub use processor::*; +#[cfg(feature = "async")] +pub use processor_async::*; pub use vad::*; /// Returns the version of the ai-coustics SDK library. From 317ce9737f5d554ae420f4217ca85bb9b6661602 Mon Sep 17 00:00:00 2001 From: steckes Date: Fri, 6 Mar 2026 15:36:48 +0100 Subject: [PATCH 04/27] make model static --- src/processor_async.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/processor_async.rs b/src/processor_async.rs index b255031..e456184 100644 --- a/src/processor_async.rs +++ b/src/processor_async.rs @@ -4,8 +4,6 @@ use tokio::sync::Mutex; /// An async wrapper around [`Processor`] for use in async/await contexts. /// -/// This requires a **multi-threaded** runtime (e.g. `#[tokio::main]`). -/// /// # Example /// /// ```rust,no_run @@ -32,11 +30,8 @@ impl ProcessorAsync { /// Creates a new async audio enhancement processor instance. /// /// See [`Processor::new`] for details. - pub fn new(model: &Model<'_>, license_key: &str) -> Result { + pub fn new(model: &Model<'static>, license_key: &str) -> Result { let processor = Processor::new(model, license_key)?; - // SAFETY: The C library internally reference-counts the model weights, - // so the Processor remains valid even after the Rust Model is dropped. - let processor: Processor<'static> = unsafe { std::mem::transmute(processor) }; Ok(Self { inner: Arc::new(Mutex::new(processor)), }) @@ -47,7 +42,7 @@ impl ProcessorAsync { /// This is a convenience method combining [`ProcessorAsync::new`] and /// [`ProcessorAsync::initialize`]. pub async fn with_config( - model: &Model<'_>, + model: &Model<'static>, license_key: &str, config: &ProcessorConfig, ) -> Result { From a1aeb24c813f8dc53b61c429f969bd04518a5b2b Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 14:59:40 +0100 Subject: [PATCH 05/27] remove unused feature guards in example --- Cargo.toml | 2 ++ examples/basic_usage.rs | 9 --------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 983d245..f582734 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,10 +54,12 @@ download-model = ["dep:serde", "dep:serde_json", "dep:sha2", "dep:ureq"] [[example]] name = "basic_usage" path = "examples/basic_usage.rs" +required-features = ["download-model"] [[example]] name = "benchmark" path = "examples/benchmark.rs" +required-features = ["download-model"] [[example]] name = "parallel_async" diff --git a/examples/basic_usage.rs b/examples/basic_usage.rs index 8c7e6ca..3cc2850 100644 --- a/examples/basic_usage.rs +++ b/examples/basic_usage.rs @@ -1,15 +1,6 @@ -#![cfg_attr(not(feature = "download-model"), allow(dead_code, unused_imports))] - -#[cfg(feature = "download-model")] use aic_sdk::{Model, Processor, ProcessorConfig, ProcessorParameter, VadParameter}; use std::env; -#[cfg(not(feature = "download-model"))] -fn main() -> Result<(), Box> { - Err("Enable the `download-model` feature to run this example.".into()) -} - -#[cfg(feature = "download-model")] fn main() -> Result<(), Box> { // Display library version println!("ai-coustics SDK version: {}", aic_sdk::get_sdk_version()); From 37ba99bf2e87760f3c0354608d88992651b12c0b Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 15:36:02 +0100 Subject: [PATCH 06/27] make benchmark use async processor --- examples/benchmark.rs | 245 +++++++++++++++++++++--------------------- 1 file changed, 120 insertions(+), 125 deletions(-) diff --git a/examples/benchmark.rs b/examples/benchmark.rs index c953425..c619a75 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -1,17 +1,23 @@ -use aic_sdk::{Model, Processor, ProcessorConfig}; +use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; use std::{ env, io::Write, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::{Duration, Instant}, }; -use tokio::sync::{mpsc, watch}; +use tokio::{ + task::JoinSet, + time::{self, MissedTickBehavior}, +}; // Specify the model to benchmark const MODEL: &str = "quail-vf-2.0-l-16khz"; -// Interval between spawning new processing threads -const THREAD_SPAWN_INTERVAL: Duration = Duration::from_secs(3); +// Interval between spawning new processor sessions +const SESSION_SPAWN_INTERVAL: Duration = Duration::from_secs(3); // Safety margin to account for system variability // e.g. 0.3 means 30% of the period is reserved as a safety margin, @@ -37,8 +43,7 @@ async fn main() -> Result<(), Box> { let config = ProcessorConfig::optimal(&model); - let period = config.num_frames as f64 / config.sample_rate as f64; - let period = Duration::from_secs_f64(period); + let period = Duration::from_secs_f64(config.num_frames as f64 / config.sample_rate as f64); let safety_margin = Duration::from_secs_f64(period.as_secs_f64() * SAFETY_MARGIN); println!("Model: {}", model.id()); @@ -48,66 +53,60 @@ async fn main() -> Result<(), Box> { println!("Safety margin: {} ms\n", safety_margin.as_millis()); println!( - "Starting benchmark: spawning a processing thread every {} seconds until a deadline is missed...\n", - THREAD_SPAWN_INTERVAL.as_secs() + "Starting benchmark: spawning a processor session every {} seconds until a deadline is missed...\n", + SESSION_SPAWN_INTERVAL.as_secs() ); - let (stop_tx, stop_rx) = watch::channel(false); - let (report_tx, mut report_rx) = mpsc::unbounded_channel::(); - let mut active_threads = 0usize; - - let mut handles = Vec::new(); - let mut thread_id = 1usize; + let stop = Arc::new(AtomicBool::new(false)); + let mut sessions = JoinSet::new(); + let mut reports = Vec::new(); + let mut spawned_sessions = 0usize; + let mut next_session_id = 1usize; - handles.push(spawn_session( - thread_id, + spawn_session( + &mut sessions, + next_session_id, Arc::clone(&model), license.clone(), config.clone(), period, safety_margin, - stop_rx.clone(), - report_tx.clone(), - )); + Arc::clone(&stop), + ); + spawned_sessions += 1; print!("*"); std::io::stdout().flush().unwrap(); - active_threads += 1; - - let spawn_interval = THREAD_SPAWN_INTERVAL; - let mut next_spawn = tokio::time::Instant::now() + spawn_interval; + let mut spawn_ticks = time::interval(SESSION_SPAWN_INTERVAL); + spawn_ticks.set_missed_tick_behavior(MissedTickBehavior::Skip); + spawn_ticks.tick().await; - let mut reports = Vec::new(); - let first_session_report = loop { + let first_failed_report = loop { tokio::select! { - // Spawn a new session at regular intervals - _ = tokio::time::sleep_until(next_spawn) => { - thread_id += 1; - handles.push(spawn_session( - thread_id, + _ = spawn_ticks.tick() => { + next_session_id += 1; + spawn_session( + &mut sessions, + next_session_id, Arc::clone(&model), license.clone(), config.clone(), period, safety_margin, - stop_rx.clone(), - report_tx.clone(), - )); - active_threads += 1; + Arc::clone(&stop), + ); + spawned_sessions += 1; print!("*"); - if active_threads.is_multiple_of(50) { + if spawned_sessions.is_multiple_of(50) { print!("\n"); } - std::io::stdout().flush().unwrap(); - next_spawn += spawn_interval; } - // Check for deadline misses and break the loop if one occurs - Some(report) = report_rx.recv() => { - // Print line breaks for readability - if active_threads.is_multiple_of(50) { + Some(result) = sessions.join_next() => { + let report = result?; + if spawned_sessions.is_multiple_of(50) { println!(); } else { println!("\n"); @@ -116,6 +115,7 @@ async fn main() -> Result<(), Box> { let is_miss = report.error.is_some(); reports.push(report); if is_miss { + stop.store(true, Ordering::Relaxed); break reports.last().cloned(); } } @@ -124,32 +124,25 @@ async fn main() -> Result<(), Box> { println!("Benchmark complete\n"); - let _ = stop_tx.send(true); - drop(report_tx); - for handle in handles { - let _ = handle.await; - } - - while let Some(report) = report_rx.recv().await { - reports.push(report); + while let Some(result) = sessions.join_next().await { + reports.push(result?); } reports.sort_by_key(|report| report.session_id); - let mut number_of_missed_deadlines = 0; + let mut number_of_missed_deadlines = 0usize; println!(" ID | Max Exec Time | RTF | Notes"); println!("----+---------------+---------+------"); for report in &reports { let max_ms = report.max_execution_time.as_secs_f64() * 1000.0; let period_ms = period.as_secs_f64() * 1000.0; - let rtf = if period_ms > 0.0 { max_ms / period_ms } else { 0.0 }; - let miss_note = match report.error.as_deref() { + let note = match report.error.as_deref() { Some(reason) => { number_of_missed_deadlines += 1; format!("deadline missed: {}", reason) @@ -159,115 +152,117 @@ async fn main() -> Result<(), Box> { println!( "{:>3} | {:>9.3} ms | {:>7.3} | {}", - report.session_id, max_ms, rtf, miss_note + report.session_id, max_ms, rtf, note ); } println!(); - let max_ok = active_threads.saturating_sub(1); - + let max_ok = spawned_sessions.saturating_sub(1); println!( "System can run {} instances of this model/config concurrently while meeting real-time requirements", max_ok ); - if let Some(first_session_report) = &first_session_report { + if let Some(first_failed_report) = &first_failed_report { println!( - "After spawning the {}{} thread, thread #{} missed its deadline ({})", - active_threads, - number_suffix(active_threads), - first_session_report.session_id, - first_session_report.error.as_deref().unwrap_or("unknown") + "After spawning the {}{} session, session #{} missed its deadline ({})", + spawned_sessions, + number_suffix(spawned_sessions), + first_failed_report.session_id, + first_failed_report.error.as_deref().unwrap_or("unknown") ); if number_of_missed_deadlines > 1 { println!( - "Other threads also missed deadlines after thread #{}", - first_session_report.session_id + "Other sessions also missed deadlines after session #{}", + first_failed_report.session_id ); } } else { - println!("Missed deadline in thread unknown (no report)"); + println!("Missed deadline in session unknown (no report)"); } Ok(()) } fn spawn_session( + sessions: &mut JoinSet, session_id: usize, model: Arc>, license: String, config: ProcessorConfig, period: Duration, safety_margin: Duration, - stop_rx: watch::Receiver, - report_tx: mpsc::UnboundedSender, -) -> tokio::task::JoinHandle<()> { - tokio::task::spawn_blocking(move || { - let mut processor = - match Processor::new(&model, &license).and_then(|p| p.with_config(&config)) { - Ok(processor) => processor, - Err(err) => { - let reason = format!("processor init failed: {}", err); - let _ = report_tx.send(SessionReport { - session_id, - max_execution_time: Duration::from_secs(0), - error: Some(reason), - }); - return; - } - }; - - let mut buffer = vec![0.0f32; config.num_channels as usize * config.num_frames]; - - let mut max_execution_time = Duration::from_secs(0); - let mut error = None; - - let deadline = period - safety_margin; + stop: Arc, +) { + sessions.spawn(run_session( + session_id, + model, + license, + config, + period, + safety_margin, + stop, + )); +} - loop { - // Check if we should stop (another session missed a deadline) - if *stop_rx.borrow() { - break; - } +async fn run_session( + session_id: usize, + model: Arc>, + license: String, + config: ProcessorConfig, + period: Duration, + safety_margin: Duration, + stop: Arc, +) -> SessionReport { + let processor = match ProcessorAsync::with_config(&model, &license, &config).await { + Ok(processor) => processor, + Err(err) => { + return SessionReport { + session_id, + max_execution_time: Duration::ZERO, + error: Some(format!("processor init failed: {}", err)), + }; + } + }; - // Process the audio buffer - let process_start = Instant::now(); - if let Err(err) = processor.process_interleaved(&mut buffer) { - error = Some(format!("process error: {}", err)); - break; - } - let execution_time = process_start.elapsed(); + let mut buffer = vec![0.0f32; config.num_channels as usize * config.num_frames]; + let mut max_execution_time = Duration::ZERO; + let deadline = period.saturating_sub(safety_margin); + + while !stop.load(Ordering::Relaxed) { + let process_start = Instant::now(); + if let Err(err) = processor.process_interleaved(&mut buffer).await { + return SessionReport { + session_id, + max_execution_time, + error: Some(format!("process error: {}", err)), + }; + } - // Keep track of the maximum execution time - if execution_time > max_execution_time { - max_execution_time = execution_time; - } + let execution_time = process_start.elapsed(); + max_execution_time = max_execution_time.max(execution_time); - // Check if we missed the deadline - if execution_time > deadline { - let late_by = execution_time - deadline; - let reason = format!("late by {:?}", late_by); - error = Some(reason); - break; - } + if execution_time > deadline { + return SessionReport { + session_id, + max_execution_time, + error: Some(format!("late by {:?}", execution_time - deadline)), + }; + } - // Sleep until the next deadline - let next_deadline = process_start + period; - let sleep_for = next_deadline.saturating_duration_since(Instant::now()); - if sleep_for > Duration::from_secs(0) { - std::thread::sleep(sleep_for); - } + let sleep_for = (process_start + period).saturating_duration_since(Instant::now()); + if !sleep_for.is_zero() { + time::sleep(sleep_for).await; } + } - // Send the session report - let _ = report_tx.send(SessionReport { - session_id, - max_execution_time, - error, - }); - }) + SessionReport { + session_id, + max_execution_time, + error: None, + } } fn number_suffix(n: usize) -> &'static str { From 4713659512c5f4b8ed0e12e5d3e59d6e4225a957 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 15:36:20 +0100 Subject: [PATCH 07/27] add default features --- Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f582734..2363b41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,8 @@ serde_json = { workspace = true } tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "sync", "time"] } [features] +default = ["async", "download-lib", "download-model"] + async = ["dep:tokio"] download-lib = ["aic-sdk-sys/download-lib"] download-model = ["dep:serde", "dep:serde_json", "dep:sha2", "dep:ureq"] @@ -59,7 +61,7 @@ required-features = ["download-model"] [[example]] name = "benchmark" path = "examples/benchmark.rs" -required-features = ["download-model"] +required-features = ["async", "download-model"] [[example]] name = "parallel_async" From 1178b2c2cfe4efbf9e628fed03e541e0335a6ed0 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 17:06:09 +0100 Subject: [PATCH 08/27] update comments --- examples/parallel_async.rs | 4 ++-- src/processor.rs | 12 ++++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index 9c0b6cb..68ca67b 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -2,8 +2,8 @@ /// parallel when awaited concurrently. /// /// Each processor records its own wall-clock processing time. If they ran -/// sequentially, the total elapsed time would be roughly `N × per-processor -/// time`. When running in parallel the total time is close to the slowest +/// sequentially, the total elapsed time would be roughly `N × per-processor time`. +/// When running in parallel the total time is close to the slowest /// single processor, which is what we verify and print. #[tokio::main(flavor = "multi_thread")] async fn main() -> Result<(), Box> { diff --git a/src/processor.rs b/src/processor.rs index 87455dd..aafb4b3 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -101,13 +101,17 @@ pub enum ProcessorParameter { /// /// **Default:** 0.0 Bypass, - /// Controls the intensity of speech enhancement processing. + /// A tunable parameter to optimize for specific STT engines, deployment environments, + /// and user experience requirements. + /// + /// The exact behavior depends on the active model: + /// - **Quail Models:** Controls how aggressively the model suppresses noise. When used + /// with Quail Voice Focus, it also suppresses background and competing speech. + /// - **Sparrow Models:** Controls the mixback and therefore the intensity of the + /// enhancement. /// /// **Range:** 0.0 to 1.0 - /// - **0.0:** Bypass mode - original signal passes through unchanged - /// - **1.0:** Full enhancement - maximum noise reduction but also more audible artifacts /// - /// **Default:** 1.0 EnhancementLevel, } From b8110a7662f72be4cb716bb5fb0204aeabce3479 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 17:08:23 +0100 Subject: [PATCH 09/27] make all functions async --- src/processor_async.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/processor_async.rs b/src/processor_async.rs index e456184..3733f58 100644 --- a/src/processor_async.rs +++ b/src/processor_async.rs @@ -129,16 +129,16 @@ impl ProcessorAsync { /// Creates a [`ProcessorContext`] for real-time parameter control. /// /// See [`Processor::processor_context`] for details. - pub fn processor_context(&self) -> ProcessorContext { - let processor = self.inner.blocking_lock(); + pub async fn processor_context(&self) -> ProcessorContext { + let processor = self.inner.lock().await; processor.processor_context() } /// Creates a [`VadContext`] for voice activity detection. /// /// See [`Processor::vad_context`] for details. - pub fn vad_context(&self) -> VadContext { - let processor = self.inner.blocking_lock(); + pub async fn vad_context(&self) -> VadContext { + let processor = self.inner.lock().await; processor.vad_context() } } From 8235af49aff4f8f68ab71840e165f723c8079922 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 17:08:30 +0100 Subject: [PATCH 10/27] remove default features --- Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2363b41..c599465 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,8 +47,6 @@ serde_json = { workspace = true } tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "sync", "time"] } [features] -default = ["async", "download-lib", "download-model"] - async = ["dep:tokio"] download-lib = ["aic-sdk-sys/download-lib"] download-model = ["dep:serde", "dep:serde_json", "dep:sha2", "dep:ureq"] From 070bf802d89c0b71ca90a6b6dfc9eaf709abfc80 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 17:11:34 +0100 Subject: [PATCH 11/27] update version and changelog --- CHANGELOG.md | 6 ++++++ Cargo.lock | 4 ++-- Cargo.toml | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a52c5f3..56eb854 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.15.2 - 2026-03-xx + +### New features + +- Added `ProcessorAsync`, an async wrapper around `Processor` for use in `async`/`await` contexts (requires the `async` feature flag). It offloads blocking audio processing to a thread pool via `tokio::task::spawn_blocking`, keeping the async runtime responsive. All methods — `initialize`, `process_interleaved`, `process_planar`, `process_sequential`, `processor_context`, and `vad_context` — mirror the synchronous `Processor` API. A `ProcessorAsync::with_config` convenience constructor is also provided. + ## 0.15.1 - 2026-03-17 ## Improvements diff --git a/Cargo.lock b/Cargo.lock index b43c40d..7e72919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,7 +30,7 @@ dependencies = [ [[package]] name = "aic-sdk" -version = "0.15.1" +version = "0.15.2" dependencies = [ "aic-sdk-sys", "approx", @@ -114,7 +114,7 @@ dependencies = [ [[package]] name = "build-time-download" -version = "0.15.1" +version = "0.15.2" dependencies = [ "aic-sdk", ] diff --git a/Cargo.toml b/Cargo.toml index c599465..31fd2cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" homepage = "https://ai-coustics.com/sdk/" license = "Apache-2.0" repository = "https://github.com/ai-coustics/aic-sdk-rs" -version = "0.15.1" +version = "0.15.2" [workspace.dependencies] aic-sdk-sys = { version = "0.15.1", path = "aic-sdk-sys" } From ba16c4df36acaa59e2fc078a43ba9183187a01a2 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 18 Mar 2026 17:12:50 +0100 Subject: [PATCH 12/27] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56eb854..fd945ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.15.2 - 2026-03-xx +## 0.15.2 - 2026-03-18 ### New features From 18cac84a95d42a1badfae15b135ece15e81479e1 Mon Sep 17 00:00:00 2001 From: Stephan Eckes Date: Wed, 18 Mar 2026 17:59:00 +0100 Subject: [PATCH 13/27] Update CHANGELOG.md Co-authored-by: Andres O. Vela --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd945ad..15f0051 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### New features -- Added `ProcessorAsync`, an async wrapper around `Processor` for use in `async`/`await` contexts (requires the `async` feature flag). It offloads blocking audio processing to a thread pool via `tokio::task::spawn_blocking`, keeping the async runtime responsive. All methods — `initialize`, `process_interleaved`, `process_planar`, `process_sequential`, `processor_context`, and `vad_context` — mirror the synchronous `Processor` API. A `ProcessorAsync::with_config` convenience constructor is also provided. +- Added `ProcessorAsync`, an async wrapper around `Processor` for use in `async`/`await` contexts (requires the `async` feature flag). It offloads blocking audio processing to a thread pool via `tokio::task::spawn_blocking`, keeping the async runtime responsive. All methods mirror the synchronous `Processor` API. A `ProcessorAsync::with_config` convenience constructor is also provided. ## 0.15.1 - 2026-03-17 From 3d127ff4751a105d7bf535cf4136eee18bcc2dbe Mon Sep 17 00:00:00 2001 From: Stephan Eckes Date: Wed, 18 Mar 2026 18:00:26 +0100 Subject: [PATCH 14/27] Update src/processor.rs Co-authored-by: Andres O. Vela --- src/processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/processor.rs b/src/processor.rs index aafb4b3..20d46e3 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -107,7 +107,7 @@ pub enum ProcessorParameter { /// The exact behavior depends on the active model: /// - **Quail Models:** Controls how aggressively the model suppresses noise. When used /// with Quail Voice Focus, it also suppresses background and competing speech. - /// - **Sparrow Models:** Controls the mixback and therefore the intensity of the + /// - **Rook Models:** Controls the mixback and therefore the intensity of the /// enhancement. /// /// **Range:** 0.0 to 1.0 From de4479df0962e8561a1b1cb65e6cfe453807d29d Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 09:57:05 +0100 Subject: [PATCH 15/27] fix comment --- examples/benchmark.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/benchmark.rs b/examples/benchmark.rs index c619a75..29a2b4e 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -53,7 +53,7 @@ async fn main() -> Result<(), Box> { println!("Safety margin: {} ms\n", safety_margin.as_millis()); println!( - "Starting benchmark: spawning a processor session every {} seconds until a deadline is missed...\n", + "Starting benchmark: spawning a processor session every {} seconds until a deadline is missed or a process error occurs...\n", SESSION_SPAWN_INTERVAL.as_secs() ); @@ -109,7 +109,7 @@ async fn main() -> Result<(), Box> { if spawned_sessions.is_multiple_of(50) { println!(); } else { - println!("\n"); + println!(); } let is_miss = report.error.is_some(); From 2f728397337c369aaa0f5f2b6787027608d8ac52 Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 09:57:12 +0100 Subject: [PATCH 16/27] fix comment --- examples/parallel_async.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index 68ca67b..003ba94 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -64,7 +64,7 @@ async fn main() -> Result<(), Box> { ); // ------------------------------------------------------------------------- - // Parallel run – drive all processors concurrently with tokio::join_all. + // Parallel run – drive all processors concurrently with futures::future::try_join_all. // Each task times itself and returns its own elapsed duration. // ------------------------------------------------------------------------- let parallel_start = Instant::now(); @@ -97,7 +97,7 @@ async fn main() -> Result<(), Box> { let max_individual = results.iter().max().copied().unwrap_or_default(); println!( - "\nParallel wall-clock: {:>8.1} ms", + "\nParallel total: {:>8.1} ms", parallel_elapsed.as_secs_f64() * 1000.0 ); println!( From 3a18b5787fbcd38fb889e034ace0fbb01e76215d Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 11:28:29 +0100 Subject: [PATCH 17/27] remove unused features --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 31fd2cb..05d859b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ serde = { workspace = true, optional = true, features = ["derive"] } serde_json = { workspace = true, optional = true } sha2 = { workspace = true, optional = true } thiserror = { workspace = true } -tokio = { version = "1.49", optional = true, features = ["rt-multi-thread", "sync"] } +tokio = { version = "1.49", optional = true, features = ["rt", "sync"] } ureq = { workspace = true, optional = true, features = ["rustls"] } [dev-dependencies] From 62c5ab0da36789f29077a35ad5abe9853089b029 Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 11:28:37 +0100 Subject: [PATCH 18/27] change spawn duration to 1s --- examples/benchmark.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/benchmark.rs b/examples/benchmark.rs index 29a2b4e..fe4225e 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -17,7 +17,7 @@ use tokio::{ const MODEL: &str = "quail-vf-2.0-l-16khz"; // Interval between spawning new processor sessions -const SESSION_SPAWN_INTERVAL: Duration = Duration::from_secs(3); +const SESSION_SPAWN_INTERVAL: Duration = Duration::from_secs(1); // Safety margin to account for system variability // e.g. 0.3 means 30% of the period is reserved as a safety margin, From 2280c5b82b84c22b23e0a3f2ada61c230de187ec Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 11:28:53 +0100 Subject: [PATCH 19/27] do not use multithreaded runtime for parallel_async example --- examples/parallel_async.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index 003ba94..0cb66e4 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -5,7 +5,7 @@ /// sequentially, the total elapsed time would be roughly `N × per-processor time`. /// When running in parallel the total time is close to the slowest /// single processor, which is what we verify and print. -#[tokio::main(flavor = "multi_thread")] +#[tokio::main] async fn main() -> Result<(), Box> { use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; use std::{env, sync::Arc, time::Instant}; @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box> { if speedup > 1.5 { "Parallel execution confirmed." } else { - "Warning: low speedup – are you running with a multi-thread runtime?" + "Warning: low speedup – your system may not have enough CPU cores for parallel blocking tasks." } ); From 8fff347a3d2e5e1b2f76a391c5aa0363eb6715d9 Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 11:37:17 +0100 Subject: [PATCH 20/27] fix comment --- examples/benchmark.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/benchmark.rs b/examples/benchmark.rs index fe4225e..21247e6 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -53,7 +53,7 @@ async fn main() -> Result<(), Box> { println!("Safety margin: {} ms\n", safety_margin.as_millis()); println!( - "Starting benchmark: spawning a processor session every {} seconds until a deadline is missed or a process error occurs...\n", + "Starting benchmark: spawning a processor session every {} second(s) until a deadline is missed or a process error occurs...\n", SESSION_SPAWN_INTERVAL.as_secs() ); From 4a9f7c127feea74416c542147e4f9655d7ad6f51 Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 13:16:47 +0100 Subject: [PATCH 21/27] remove type annotation --- examples/parallel_async.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index 0cb66e4..af18ae6 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -33,7 +33,7 @@ async fn main() -> Result<(), Box> { // Build all processors upfront so initialization is not part of the timed // section. // ------------------------------------------------------------------------- - let mut processors: Vec = Vec::with_capacity(NUM_PROCESSORS); + let mut processors = Vec::with_capacity(NUM_PROCESSORS); for _ in 0..NUM_PROCESSORS { let p = ProcessorAsync::with_config(&model, &license, &config).await?; processors.push(p); From 3c7e2cc94645488ea19bc860ec31c9f2c8346631 Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 13:17:28 +0100 Subject: [PATCH 22/27] remove space from comment --- src/processor_async.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/processor_async.rs b/src/processor_async.rs index 3733f58..4513907 100644 --- a/src/processor_async.rs +++ b/src/processor_async.rs @@ -77,7 +77,7 @@ impl ProcessorAsync { let audio_len = audio.len(); // SAFETY: We `.await` the JoinHandle immediately, so `audio` outlives - // the blocking task. The Mutex guarantees exclusive Processor access. + // the blocking task. The Mutex guarantees exclusive Processor access. tokio::task::spawn_blocking(move || { let mut processor = inner.blocking_lock(); let audio = From 0fd8fbd8c77af4c00d41d97ec961e54e5aa392e0 Mon Sep 17 00:00:00 2001 From: steckes Date: Mon, 23 Mar 2026 14:01:32 +0100 Subject: [PATCH 23/27] Clone vectors in async processor to guarantee soundness (pointers can not be moved into another thread) --- src/processor_async.rs | 43 ++++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/src/processor_async.rs b/src/processor_async.rs index 4513907..e88b1b5 100644 --- a/src/processor_async.rs +++ b/src/processor_async.rs @@ -73,38 +73,38 @@ impl ProcessorAsync { /// See [`Processor::process_interleaved`] for details on the memory layout. pub async fn process_interleaved(&self, audio: &mut [f32]) -> Result<(), AicError> { let inner = Arc::clone(&self.inner); - let audio_addr = audio.as_mut_ptr() as usize; - let audio_len = audio.len(); - - // SAFETY: We `.await` the JoinHandle immediately, so `audio` outlives - // the blocking task. The Mutex guarantees exclusive Processor access. + let mut buf = audio.to_vec(); tokio::task::spawn_blocking(move || { let mut processor = inner.blocking_lock(); - let audio = - unsafe { std::slice::from_raw_parts_mut(audio_addr as *mut f32, audio_len) }; - processor.process_interleaved(audio) + processor.process_interleaved(&mut buf)?; + Ok(buf) }) .await .expect("spawn_blocking task panicked") + .map(|buf| audio.copy_from_slice(&buf)) } /// Processes audio with separate buffers for each channel (planar layout). /// /// See [`Processor::process_planar`] for details on the memory layout. - pub async fn process_planar>(&self, audio: &mut [V]) -> Result<(), AicError> { + pub async fn process_planar + AsRef<[f32]>>( + &self, + audio: &mut [V], + ) -> Result<(), AicError> { let inner = Arc::clone(&self.inner); - let audio_addr = audio.as_mut_ptr() as usize; - let audio_len = audio.len(); - - // SAFETY: Same as process_interleaved. + let mut buf: Vec> = audio.iter().map(|ch| ch.as_ref().to_vec()).collect(); tokio::task::spawn_blocking(move || { let mut processor = inner.blocking_lock(); - let audio: &mut [V] = - unsafe { std::slice::from_raw_parts_mut(audio_addr as *mut V, audio_len) }; - processor.process_planar(audio) + processor.process_planar(&mut buf)?; + Ok(buf) }) .await .expect("spawn_blocking task panicked") + .map(|buf| { + for (dst, src) in audio.iter_mut().zip(buf.iter()) { + dst.as_mut().copy_from_slice(src); + } + }) } /// Processes audio with sequential channel data. @@ -112,18 +112,15 @@ impl ProcessorAsync { /// See [`Processor::process_sequential`] for details on the memory layout. pub async fn process_sequential(&self, audio: &mut [f32]) -> Result<(), AicError> { let inner = Arc::clone(&self.inner); - let audio_addr = audio.as_mut_ptr() as usize; - let audio_len = audio.len(); - - // SAFETY: Same as process_interleaved. + let mut buf = audio.to_vec(); tokio::task::spawn_blocking(move || { let mut processor = inner.blocking_lock(); - let audio = - unsafe { std::slice::from_raw_parts_mut(audio_addr as *mut f32, audio_len) }; - processor.process_sequential(audio) + processor.process_sequential(&mut buf)?; + Ok(buf) }) .await .expect("spawn_blocking task panicked") + .map(|buf| audio.copy_from_slice(&buf)) } /// Creates a [`ProcessorContext`] for real-time parameter control. From f2035c2b8c355aa1f3c48c953a88245ac6a3a2e2 Mon Sep 17 00:00:00 2001 From: steckes Date: Wed, 25 Mar 2026 10:00:56 +0100 Subject: [PATCH 24/27] cleanup --- examples/benchmark.rs | 94 ++++++++++++------------------------------- 1 file changed, 26 insertions(+), 68 deletions(-) diff --git a/examples/benchmark.rs b/examples/benchmark.rs index 21247e6..da94ac3 100644 --- a/examples/benchmark.rs +++ b/examples/benchmark.rs @@ -61,19 +61,17 @@ async fn main() -> Result<(), Box> { let mut sessions = JoinSet::new(); let mut reports = Vec::new(); let mut spawned_sessions = 0usize; - let mut next_session_id = 1usize; - spawn_session( - &mut sessions, - next_session_id, + spawned_sessions += 1; + sessions.spawn(run_session( + spawned_sessions, Arc::clone(&model), license.clone(), config.clone(), period, safety_margin, Arc::clone(&stop), - ); - spawned_sessions += 1; + )); print!("*"); std::io::stdout().flush().unwrap(); @@ -85,44 +83,33 @@ async fn main() -> Result<(), Box> { let first_failed_report = loop { tokio::select! { _ = spawn_ticks.tick() => { - next_session_id += 1; - spawn_session( - &mut sessions, - next_session_id, + spawned_sessions += 1; + sessions.spawn(run_session( + spawned_sessions, Arc::clone(&model), license.clone(), config.clone(), period, safety_margin, Arc::clone(&stop), - ); - spawned_sessions += 1; + )); print!("*"); - if spawned_sessions.is_multiple_of(50) { - print!("\n"); - } std::io::stdout().flush().unwrap(); } Some(result) = sessions.join_next() => { let report = result?; - if spawned_sessions.is_multiple_of(50) { - println!(); - } else { - println!(); - } - - let is_miss = report.error.is_some(); - reports.push(report); - if is_miss { + if report.error.is_some() { stop.store(true, Ordering::Relaxed); - break reports.last().cloned(); + reports.push(report.clone()); + break report; } + reports.push(report); } } }; - println!("Benchmark complete\n"); + println!("\nBenchmark complete\n"); while let Some(result) = sessions.join_next().await { reports.push(result?); @@ -130,17 +117,13 @@ async fn main() -> Result<(), Box> { reports.sort_by_key(|report| report.session_id); let mut number_of_missed_deadlines = 0usize; + let period_ms = period.as_secs_f64() * 1000.0; println!(" ID | Max Exec Time | RTF | Notes"); println!("----+---------------+---------+------"); for report in &reports { let max_ms = report.max_execution_time.as_secs_f64() * 1000.0; - let period_ms = period.as_secs_f64() * 1000.0; - let rtf = if period_ms > 0.0 { - max_ms / period_ms - } else { - 0.0 - }; + let rtf = max_ms / period_ms; let note = match report.error.as_deref() { Some(reason) => { @@ -164,49 +147,24 @@ async fn main() -> Result<(), Box> { max_ok ); - if let Some(first_failed_report) = &first_failed_report { + println!( + "After spawning the {}{} session, session #{} missed its deadline ({})", + spawned_sessions, + number_suffix(spawned_sessions), + first_failed_report.session_id, + first_failed_report.error.as_deref().unwrap_or("unknown") + ); + + if number_of_missed_deadlines > 1 { println!( - "After spawning the {}{} session, session #{} missed its deadline ({})", - spawned_sessions, - number_suffix(spawned_sessions), - first_failed_report.session_id, - first_failed_report.error.as_deref().unwrap_or("unknown") + "Other sessions also missed deadlines after session #{}", + first_failed_report.session_id ); - - if number_of_missed_deadlines > 1 { - println!( - "Other sessions also missed deadlines after session #{}", - first_failed_report.session_id - ); - } - } else { - println!("Missed deadline in session unknown (no report)"); } Ok(()) } -fn spawn_session( - sessions: &mut JoinSet, - session_id: usize, - model: Arc>, - license: String, - config: ProcessorConfig, - period: Duration, - safety_margin: Duration, - stop: Arc, -) { - sessions.spawn(run_session( - session_id, - model, - license, - config, - period, - safety_margin, - stop, - )); -} - async fn run_session( session_id: usize, model: Arc>, From d9a4d58616c509065141e14b98d955665612318a Mon Sep 17 00:00:00 2001 From: Stephan Eckes Date: Wed, 25 Mar 2026 10:54:23 +0100 Subject: [PATCH 25/27] Update examples/parallel_async.rs Co-authored-by: Matthias Geier --- examples/parallel_async.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index af18ae6..ef3ea24 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -1,26 +1,27 @@ -/// Demonstrates that multiple [`ProcessorAsync`] instances genuinely run in -/// parallel when awaited concurrently. -/// -/// Each processor records its own wall-clock processing time. If they ran -/// sequentially, the total elapsed time would be roughly `N × per-processor time`. -/// When running in parallel the total time is close to the slowest -/// single processor, which is what we verify and print. +// Demonstrates that multiple `ProcessorAsync` instances genuinely run in +// parallel when awaited concurrently. +// +// Each processor records its own wall-clock processing time. If they ran +// sequentially, the total elapsed time would be roughly `N × per-processor time`. +// When running in parallel the total time is close to the slowest +// single processor, which is what we verify and print. + +use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; +use std::time::Instant; + +const MODEL: &str = "quail-vf-2.0-l-16khz"; +const NUM_PROCESSORS: usize = 4; +// Number of process calls per processor – enough to make timing visible. +const ITERATIONS: usize = 50; + #[tokio::main] async fn main() -> Result<(), Box> { - use aic_sdk::{Model, ProcessorAsync, ProcessorConfig}; - use std::{env, sync::Arc, time::Instant}; - - const MODEL: &str = "quail-vf-2.0-l-16khz"; - const NUM_PROCESSORS: usize = 4; - // Number of process calls per processor – enough to make timing visible. - const ITERATIONS: usize = 50; - println!("ai-coustics SDK version: {}", aic_sdk::get_sdk_version()); - let license = env::var("AIC_SDK_LICENSE").expect("AIC_SDK_LICENSE not set"); + let license = std::env::var("AIC_SDK_LICENSE").expect("AIC_SDK_LICENSE not set"); let model_path = Model::download(MODEL, "target")?; - let model = Arc::new(Model::from_file(&model_path)?); + let model = Model::from_file(&model_path)?; println!("Model loaded from {}", model_path.display()); let config = ProcessorConfig::optimal(&model); From e8b6c6e9004c5fdf5c5fadc65cfcc2585b0997a5 Mon Sep 17 00:00:00 2001 From: Stephan Eckes Date: Wed, 25 Mar 2026 11:55:11 +0100 Subject: [PATCH 26/27] Update examples/parallel_async.rs Co-authored-by: Matthias Geier --- examples/parallel_async.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index ef3ea24..49af8b8 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -34,11 +34,10 @@ async fn main() -> Result<(), Box> { // Build all processors upfront so initialization is not part of the timed // section. // ------------------------------------------------------------------------- - let mut processors = Vec::with_capacity(NUM_PROCESSORS); - for _ in 0..NUM_PROCESSORS { - let p = ProcessorAsync::with_config(&model, &license, &config).await?; - processors.push(p); - } + let processors = futures::future::try_join_all( + (0..NUM_PROCESSORS).map(|_| ProcessorAsync::with_config(&model, &license, &config)), + ) + .await?; println!( "Running {} processors × {} iterations each", From 1c156742e95b8a956d87f67785914b65efec685c Mon Sep 17 00:00:00 2001 From: Matthias Geier Date: Wed, 25 Mar 2026 12:16:19 +0100 Subject: [PATCH 27/27] move println out of timing measurement (#50) --- examples/parallel_async.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/examples/parallel_async.rs b/examples/parallel_async.rs index 49af8b8..05604e2 100644 --- a/examples/parallel_async.rs +++ b/examples/parallel_async.rs @@ -71,8 +71,7 @@ async fn main() -> Result<(), Box> { let tasks: Vec<_> = processors .iter() - .enumerate() - .map(|(id, p)| { + .map(|p| { let config = config.clone(); async move { let mut audio = vec![0.0f32; config.num_channels as usize * config.num_frames]; @@ -81,11 +80,6 @@ async fn main() -> Result<(), Box> { p.process_interleaved(&mut audio).await?; } let elapsed = t0.elapsed(); - println!( - " Processor {:>2} finished in {:>8.1} ms", - id + 1, - elapsed.as_secs_f64() * 1000.0, - ); Ok::<_, aic_sdk::AicError>(elapsed) } }) @@ -94,6 +88,14 @@ async fn main() -> Result<(), Box> { let results = futures::future::try_join_all(tasks).await?; let parallel_elapsed = parallel_start.elapsed(); + for (id, elapsed) in results.iter().enumerate() { + println!( + " Processor {:>2} finished in {:>8.1} ms", + id + 1, + elapsed.as_secs_f64() * 1000.0, + ); + } + let max_individual = results.iter().max().copied().unwrap_or_default(); println!(