diff --git a/Cargo.lock b/Cargo.lock index c6b26aaa8fe0..4b6ae6f6a9d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1324,7 +1324,7 @@ dependencies = [ name = "example-component-wasm" version = "0.0.0" dependencies = [ - "wit-bindgen 0.55.0", + "wit-bindgen 0.56.0", ] [[package]] @@ -1335,7 +1335,7 @@ version = "0.0.0" name = "example-resource-component-wasm" version = "0.1.0" dependencies = [ - "wit-bindgen 0.55.0", + "wit-bindgen 0.56.0", ] [[package]] @@ -3794,7 +3794,7 @@ dependencies = [ "wasi-nn", "wasip1", "wasip2", - "wit-bindgen 0.55.0", + "wit-bindgen 0.56.0", ] [[package]] @@ -4381,7 +4381,7 @@ dependencies = [ "object 0.39.0", "wasip1", "wasm-encoder 0.246.2", - "wit-bindgen-rust-macro 0.55.0", + "wit-bindgen-rust-macro 0.56.0", ] [[package]] @@ -5115,7 +5115,7 @@ dependencies = [ "gdbstub_arch", "log", "wasip2", - "wit-bindgen 0.55.0", + "wit-bindgen 0.56.0", "wstd", ] @@ -5817,13 +5817,13 @@ dependencies = [ [[package]] name = "wit-bindgen" -version = "0.55.0" +version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6870386de1813a61406d88749d5897484e2f6fe90a39408a6a94e160d8c72378" +checksum = "7607d30e7e5e8fd5a0695f7cb8b2128829e0bf9dca7a1fe8c4d6ed3ca1058fce" dependencies = [ "bitflags 2.9.4", "futures", - "wit-bindgen-rust-macro 0.55.0", + "wit-bindgen-rust-macro 0.56.0", ] [[package]] @@ -5839,9 +5839,9 @@ dependencies = [ [[package]] name = "wit-bindgen-core" -version = "0.55.0" +version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4779c97d3b9dda56600c3404355d404f8c6567fae0c4d8dfeb92f6e9b2c4c8c3" +checksum = "fda3a4ce47c08d27f575d451a60102bab5251776abd0a7a323d1f038eb6339ab" dependencies = [ "anyhow", "heck 0.5.0", @@ -5875,9 +5875,9 @@ dependencies = [ [[package]] name = "wit-bindgen-rust" -version = "0.55.0" +version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a89a98e0efe034f47f5cf86fa8aeb5d6d7175bade32bbba476aeba29541fed9" +checksum = "920a1c8c0f89397431db4900a7bf7c511b78e1b7068289fe812dc76e993f1491" dependencies = [ "anyhow", "heck 0.5.0", @@ -5885,7 +5885,7 @@ dependencies = [ "prettyplease", "syn 2.0.106", "wasm-metadata 0.246.2", - "wit-bindgen-core 0.55.0", + "wit-bindgen-core 0.56.0", "wit-component 0.246.2", ] @@ -5906,9 +5906,9 @@ dependencies = [ [[package]] name = "wit-bindgen-rust-macro" -version = "0.55.0" +version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b81978b3d68d12116ae8e5ef3d2125c4cb619ea30002ed20cb7549383f6fca9" +checksum = "857a143d2373abfcd31ad946393efe775ed8c90a2a365ce73c61bf38f36a1000" dependencies = [ "anyhow", "macro-string", @@ -5916,8 +5916,8 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.106", - "wit-bindgen-core 0.55.0", - "wit-bindgen-rust 0.55.0", + "wit-bindgen-core 0.56.0", + "wit-bindgen-rust 0.56.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ce86164800a7..2e8581e61d33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -347,8 +347,8 @@ io-lifetimes = { version = "2.0.3", default-features = false } io-extras = "0.18.4" rustix = "1.0.8" # wit-bindgen: -wit-bindgen = { version = "0.55.0", default-features = false } -wit-bindgen-rust-macro = { version = "0.55.0", default-features = false } +wit-bindgen = { version = "0.56.0", default-features = false } +wit-bindgen-rust-macro = { version = "0.56.0", default-features = false } # wasm-tools family: wasmparser = { version = "0.246.2", default-features = false, features = ['simd'] } diff --git a/crates/c-api/include/wasmtime/trap.h b/crates/c-api/include/wasmtime/trap.h index f709ff1380e1..a1f5880aa063 100644 --- a/crates/c-api/include/wasmtime/trap.h +++ b/crates/c-api/include/wasmtime/trap.h @@ -136,6 +136,8 @@ enum wasmtime_trap_code_enum { WASMTIME_TRAP_CODE_CONCURRENT_FUTURE_STREAM_OP = 45, /// A reference count (for e.g. an `error-context`) overflowed. WASMTIME_TRAP_CODE_REFERENCE_COUNT_OVERFLOW = 46, + /// A read/write on a stream must be <2**28 items. + WASMTIME_TRAP_CODE_STREAM_OP_TOO_BIG = 47, }; /** diff --git a/crates/c-api/src/trap.rs b/crates/c-api/src/trap.rs index 70654c0d62d8..75b15740ba85 100644 --- a/crates/c-api/src/trap.rs +++ b/crates/c-api/src/trap.rs @@ -53,6 +53,7 @@ const _: () = { assert!(Trap::CannotResumeThread as u8 == 44); assert!(Trap::ConcurrentFutureStreamOp as u8 == 45); assert!(Trap::ReferenceCountOverflow as u8 == 46); + assert!(Trap::StreamOpTooBig as u8 == 47); }; #[repr(C)] diff --git a/crates/environ/src/trap_encoding.rs b/crates/environ/src/trap_encoding.rs index e7a8743d5d2f..4a8d89a8ce6e 100644 --- a/crates/environ/src/trap_encoding.rs +++ b/crates/environ/src/trap_encoding.rs @@ -223,6 +223,9 @@ generate_trap_type! { /// A reference count (for e.g. an `error-context`) overflowed. ReferenceCountOverflow = "reference count overflow", + /// A read/write on a stream must be <2**28 items. + StreamOpTooBig = "stream read/write count too large", + // if adding a variant here be sure to update `trap.rs` and `trap.h` as // mentioned above } diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index 608b3b23bfd6..981b18850042 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -56,9 +56,9 @@ pub enum TransmitKind { #[derive(Copy, Clone, Debug, PartialEq)] pub enum ReturnCode { Blocked, - Completed(u32), - Dropped(u32), - Cancelled(u32), + Completed(ItemCount), + Dropped(ItemCount), + Cancelled(ItemCount), } impl ReturnCode { @@ -73,32 +73,124 @@ impl ReturnCode { const CANCELLED: u32 = 0x2; match self { ReturnCode::Blocked => BLOCKED, - ReturnCode::Completed(n) => { - debug_assert!(*n < (1 << 28)); - (n << 4) | COMPLETED - } - ReturnCode::Dropped(n) => { - debug_assert!(*n < (1 << 28)); - (n << 4) | DROPPED - } - ReturnCode::Cancelled(n) => { - debug_assert!(*n < (1 << 28)); - (n << 4) | CANCELLED - } + ReturnCode::Completed(n) => (n.as_u32() << 4) | COMPLETED, + ReturnCode::Dropped(n) => (n.as_u32() << 4) | DROPPED, + ReturnCode::Cancelled(n) => (n.as_u32() << 4) | CANCELLED, } } /// Returns `Self::Completed` with the specified count (or zero if /// `matches!(kind, TransmitKind::Future)`) - fn completed(kind: TransmitKind, count: u32) -> Self { + fn completed(kind: TransmitKind, count: ItemCount) -> Self { Self::Completed(if let TransmitKind::Future = kind { - 0 + ItemCount::ZERO } else { count }) } } +/// Representation of how many items are being operated on in a stream read or +/// write. +/// +/// The component model requires that stream operations are limited to `1<<28` +/// items in one go. This type is a newtype wrapper around `u32` with the +/// invariant that the internal value is limited by this amount. +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +#[repr(transparent)] +pub struct ItemCount { + raw: u32, +} + +impl ItemCount { + const MAX: u32 = 1 << 28; + const ZERO: ItemCount = ItemCount { raw: 0 }; + + /// Creates a new `ItemCount` with the specified count, or a trap if it's + /// too large. + fn new(count: u32) -> Result { + if count < Self::MAX { + Ok(Self { raw: count }) + } else { + Err(Trap::StreamOpTooBig) + } + } + + /// Same as `Self::new` but takes a `usize`. + fn new_usize(count: usize) -> Result { + let count = u32::try_from(count).map_err(|_| Trap::StreamOpTooBig)?; + Self::new(count) + } + + fn as_u32(&self) -> u32 { + self.raw + } + + fn as_usize(&self) -> usize { + usize::try_from(self.raw).unwrap() + } + + /// Increments `self` by `amt`, returning a trap if the amount would exceed + /// the maximum item count. + fn inc(&mut self, amt: usize) -> Result<(), Trap> { + let amt = u32::try_from(amt).map_err(|_| Trap::StreamOpTooBig)?; + let new_raw = self.raw.checked_add(amt).ok_or(Trap::StreamOpTooBig)?; + if new_raw < Self::MAX { + self.raw = new_raw; + Ok(()) + } else { + Err(Trap::StreamOpTooBig) + } + } + + /// Helper to add two `ItemCount`s together, fallibly. + /// + /// It's considered a bug if this overflows, so this is only suitable in + /// situations where overflow and/or exceeding the total item count is known + /// that it may be possible. + fn add(&self, other: ItemCount) -> Result { + match self.raw.checked_add(other.raw) { + Some(raw) => Ok(ItemCount::new(raw)?), + None => bail_bug!("overflow in `ItemCount::add`"), + } + } + + /// Same as `add`, but for subtraction. + /// + /// Like with `add` this is only suitable for situations where the result is + /// known to not underflow. + fn sub(&self, other: ItemCount) -> Result { + match self.raw.checked_sub(other.raw) { + Some(raw) => Ok(ItemCount { raw }), + None => bail_bug!("underflow in `ItemCount::sub`"), + } + } +} + +impl fmt::Display for ItemCount { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.raw.fmt(f) + } +} + +impl fmt::Debug for ItemCount { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.raw.fmt(f) + } +} + +impl PartialEq for ItemCount { + fn eq(&self, other: &u32) -> bool { + self.raw == *other + } +} + +impl PartialOrd for ItemCount { + fn partial_cmp(&self, other: &u32) -> Option { + self.raw.partial_cmp(other) + } +} + /// Represents a stream or future type index. /// /// This is useful as a parameter type for functions which operate on either a @@ -320,7 +412,7 @@ impl<'a, T, B> Destination<'a, T, B> { bail_bug!("expected WriteState::HostReady") }; - Ok(Some(count - guest_offset)) + Ok(Some(count.as_usize() - guest_offset.as_usize())) } else { Ok(None) } @@ -417,8 +509,8 @@ impl DirectDestination<'_, D> { let memory = instance .options_memory_mut(self.store.0, options) - .get_mut((address + guest_offset)..) - .and_then(|b| b.get_mut(..(count - guest_offset))); + .get_mut((address + guest_offset.as_usize())..) + .and_then(|b| b.get_mut(..(count.as_usize() - guest_offset.as_usize()))); match memory { Some(memory) => Ok(memory), None => bail_bug!("guest buffer unexpectedly out of bounds"), @@ -467,14 +559,14 @@ impl DirectDestination<'_, D> { bail_bug!("expected WriteState::HostReady"); }; - if *guest_offset + count > *read_count { + if guest_offset.as_usize() + count > read_count.as_usize() { // Note that this `panic` is a documented panic condition of // `mark_written`. panic!( "write count ({count}) must be less than or equal to read count ({read_count})" ) } else { - *guest_offset += count; + guest_offset.inc(count)?; } } Ok(()) @@ -817,8 +909,8 @@ impl<'a, T> Source<'a, T> { cx, ty.copied(), buffer, - address + (T::SIZE32 * guest_offset), - count - guest_offset, + address + (T::SIZE32 * guest_offset.as_usize()), + count.as_usize() - guest_offset.as_usize(), )?; let transmit = store.0.concurrent_state_mut().get_mut(self.id)?; @@ -827,7 +919,7 @@ impl<'a, T> Source<'a, T> { bail_bug!("expected ReadState::HostReady"); }; - *guest_offset += old_remaining - buffer.remaining_capacity(); + guest_offset.inc(old_remaining - buffer.remaining_capacity())?; } Ok(()) @@ -856,7 +948,7 @@ impl<'a, T> Source<'a, T> { bail_bug!("expected ReadState::HostReady") }; - Ok(count - guest_offset) + Ok(count.as_usize() - guest_offset.as_usize()) } else if let Some(host_buffer) = &self.host_buffer { Ok(host_buffer.remaining().len()) } else { @@ -931,8 +1023,8 @@ impl DirectSource<'_, D> { let memory = instance .options_memory(self.store.0, options) - .get((address + guest_offset)..) - .and_then(|b| b.get(..(count - guest_offset))); + .get((address + guest_offset.as_usize())..) + .and_then(|b| b.get(..(count.as_usize() - guest_offset.as_usize()))); match memory { Some(memory) => Ok(memory), None => bail_bug!("guest buffer unexpectedly out of bounds"), @@ -976,11 +1068,11 @@ impl DirectSource<'_, D> { bail_bug!("expected ReadState::HostReady"); }; - if *guest_offset + count > *write_count { + if guest_offset.as_usize() + count > write_count.as_usize() { // Note that this is a documented panic condition of `mark_read`. panic!("read count ({count}) must be less than or equal to write count ({write_count})") } else { - *guest_offset += count; + guest_offset.inc(count)?; } Ok(()) } @@ -2162,14 +2254,14 @@ enum WriteState { flat_abi: Option, options: OptionsIndex, address: usize, - count: usize, + count: ItemCount, handle: u32, }, /// The write end is owned by the host, which is ready to produce items. HostReady { produce: PollStream, try_into: TryInto, - guest_offset: usize, + guest_offset: ItemCount, cancel: bool, cancel_waker: Option, }, @@ -2201,13 +2293,13 @@ enum ReadState { instance: Instance, options: OptionsIndex, address: usize, - count: usize, + count: ItemCount, handle: u32, }, /// The read end is owned by a host task, and it is ready to consume items. HostReady { consume: PollStream, - guest_offset: usize, + guest_offset: ItemCount, cancel: bool, cancel_waker: Option, }, @@ -2240,8 +2332,7 @@ impl fmt::Debug for ReadState { } } -fn return_code(kind: TransmitKind, state: StreamResult, guest_offset: usize) -> Result { - let count = guest_offset.try_into()?; +fn return_code(kind: TransmitKind, state: StreamResult, count: ItemCount) -> Result { Ok(match state { StreamResult::Dropped => ReturnCode::Dropped(count), StreamResult::Completed => ReturnCode::completed(kind, count), @@ -2274,7 +2365,7 @@ impl StoreOpaque { StreamResult::Dropped => ReadState::Dropped, StreamResult::Completed | StreamResult::Cancelled => ReadState::HostReady { consume, - guest_offset: 0, + guest_offset: ItemCount::ZERO, cancel: false, cancel_waker: None, }, @@ -2318,7 +2409,7 @@ impl StoreOpaque { StreamResult::Completed | StreamResult::Cancelled => WriteState::HostReady { produce, try_into, - guest_offset: 0, + guest_offset: ItemCount::ZERO, cancel: false, cancel_waker: None, }, @@ -2369,11 +2460,11 @@ impl StoreOpaque { write_handle.rep(), match ty { TransmitIndex::Future(ty) => Event::FutureWrite { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: Some((ty, handle)), }, TransmitIndex::Stream(ty) => Event::StreamWrite { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: Some((ty, handle)), }, }, @@ -2387,11 +2478,11 @@ impl StoreOpaque { write_handle.rep(), match kind { TransmitKind::Future => Event::FutureWrite { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: None, }, TransmitKind::Stream => Event::StreamWrite { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: None, }, }, @@ -2468,11 +2559,11 @@ impl StoreOpaque { read_handle.rep(), match ty { TransmitIndex::Future(ty) => Event::FutureRead { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: Some((ty, handle)), }, TransmitIndex::Stream(ty) => Event::StreamRead { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: Some((ty, handle)), }, }, @@ -2487,11 +2578,11 @@ impl StoreOpaque { read_handle.rep(), match on_drop_open { Some(_) => Event::FutureRead { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: None, }, None => Event::StreamRead { - code: ReturnCode::Dropped(0), + code: ReturnCode::Dropped(ItemCount::ZERO), pending: None, }, }, @@ -2622,7 +2713,7 @@ impl StoreContextMut<'_, T> { let (guest_offset, host_offset, count) = tls::get(|store| { let transmit = store.concurrent_state_mut().get_mut(id)?; let (count, host_offset) = match &transmit.read { - &ReadState::GuestReady { count, .. } => (count, 0), + &ReadState::GuestReady { count, .. } => (count.as_u32(), 0), &ReadState::HostToHost { limit, .. } => (1, limit), _ => bail_bug!("invalid read state"), }; @@ -2693,7 +2784,7 @@ impl StoreContextMut<'_, T> { state.get_mut(id)?.write = WriteState::HostReady { produce, try_into, - guest_offset: 0, + guest_offset: ItemCount::ZERO, cancel: false, cancel_waker: None, }; @@ -2761,11 +2852,11 @@ impl StoreContextMut<'_, T> { Ok(( match &transmit.read { &ReadState::HostReady { guest_offset, .. } => guest_offset, - ReadState::Open => 0, + ReadState::Open => ItemCount::ZERO, _ => bail_bug!("invalid read state"), }, match &transmit.write { - &WriteState::GuestReady { count, .. } => count, + WriteState::GuestReady { count, .. } => count.as_usize(), WriteState::HostReady { .. } => match host_buffer_remaining_before { Some(n) => n, None => bail_bug!("host_buffer_remaining_before should be set"), @@ -2823,7 +2914,7 @@ impl StoreContextMut<'_, T> { WriteState::Open => { transmit.read = ReadState::HostReady { consume, - guest_offset: 0, + guest_offset: ItemCount::ZERO, cancel: false, cancel_waker: None, }; @@ -2832,7 +2923,7 @@ impl StoreContextMut<'_, T> { let future = consume(); transmit.read = ReadState::HostReady { consume, - guest_offset: 0, + guest_offset: ItemCount::ZERO, cancel: false, cancel_waker: None, }; @@ -2846,7 +2937,7 @@ impl StoreContextMut<'_, T> { Box::pin(async { bail_bug!("unexpected invocation of `produce`") }) }), try_into: Box::new(|_| None), - guest_offset: 0, + guest_offset: ItemCount::ZERO, cancel: false, cancel_waker: None, }, @@ -2956,8 +3047,8 @@ async fn write, consume: PollStream, - guest_offset: usize, + guest_offset: ItemCount, cancel: bool, ) -> Result { let mut future = consume(); @@ -3093,7 +3184,7 @@ impl Instance { let ReadState::HostReady { guest_offset, .. } = &mut transmit.read else { bail_bug!("expected ReadState::HostReady") }; - let code = return_code(kind, state?, mem::replace(guest_offset, 0))?; + let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?; transmit.write = WriteState::Open; code } @@ -3113,7 +3204,7 @@ impl Instance { transmit_id: TableId, produce: PollStream, try_into: TryInto, - guest_offset: usize, + guest_offset: ItemCount, cancel: bool, ) -> Result { let mut future = produce(); @@ -3136,7 +3227,7 @@ impl Instance { let WriteState::HostReady { guest_offset, .. } = &mut transmit.write else { bail_bug!("expected WriteState::HostReady") }; - let code = return_code(kind, state?, mem::replace(guest_offset, 0))?; + let code = return_code(kind, state?, mem::replace(guest_offset, ItemCount::ZERO))?; transmit.read = ReadState::Open; code } @@ -3190,11 +3281,12 @@ impl Instance { read_ty: TransmitIndex, read_options: OptionsIndex, read_address: usize, - count: usize, + count: ItemCount, rep: u32, ) -> Result<()> { let (component, mut store) = self.component_and_store_mut(store.0); let types = component.types(); + let count = count.as_usize(); // Validate `write_ty` w.r.t. `write_address` to ensure it's properly // aligned and in-bounds. @@ -3403,6 +3495,8 @@ impl Instance { address: u32, count: u32, ) -> Result { + let count = ItemCount::new(count)?; + if !self.options(store.0, options).async_ { // The caller may only sync call `{stream,future}.write` from an // async task (i.e. a task created via a call to an async export). @@ -3411,8 +3505,7 @@ impl Instance { } let address = usize::try_from(address)?; - let count = usize::try_from(count)?; - self.check_bounds(store.0, options, ty, address, count)?; + self.check_bounds(store.0, options, ty, address, count.as_usize())?; let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?; let TransmitLocalState::Write { done } = *state else { bail!(Trap::ConcurrentFutureStreamOp); @@ -3533,13 +3626,12 @@ impl Instance { }; let concurrent_state = store.0.concurrent_state_mut(); if read_complete { - let count = u32::try_from(count)?; let total = if let Some(Event::StreamRead { code: ReturnCode::Completed(old_total), .. }) = concurrent_state.take_event(read_handle_rep)? { - count + old_total + count.add(old_total)? } else { count }; @@ -3555,8 +3647,8 @@ impl Instance { ty: read_ty, flat_abi: read_flat_abi, options: read_options, - address: read_address + (count * item_size), - count: read_count - count, + address: read_address + (count.as_usize() * item_size), + count: read_count.sub(count)?, handle: read_handle, instance: read_instance, caller_instance: read_caller_instance, @@ -3565,7 +3657,7 @@ impl Instance { } if write_complete { - ReturnCode::completed(ty.kind(), count.try_into()?) + ReturnCode::completed(ty.kind(), count) } else { set_guest_ready(concurrent_state)?; ReturnCode::Blocked @@ -3593,7 +3685,14 @@ impl Instance { } set_guest_ready(concurrent_state)?; - self.consume(store.0, ty.kind(), transmit_id, consume, 0, false)? + self.consume( + store.0, + ty.kind(), + transmit_id, + consume, + ItemCount::ZERO, + false, + )? } ReadState::HostToHost { .. } => bail_bug!("unexpected HostToHost"), @@ -3608,7 +3707,7 @@ impl Instance { transmit.done = true; } - ReturnCode::Dropped(0) + ReturnCode::Dropped(ItemCount::ZERO) } }; @@ -3645,6 +3744,8 @@ impl Instance { address: u32, count: u32, ) -> Result { + let count = ItemCount::new(count)?; + if !self.options(store.0, options).async_ { // The caller may only sync call `{stream,future}.read` from an // async task (i.e. a task created via a call to an async export). @@ -3653,8 +3754,7 @@ impl Instance { } let address = usize::try_from(address)?; - let count = usize::try_from(count)?; - self.check_bounds(store.0, options, ty, address, count)?; + self.check_bounds(store.0, options, ty, address, count.as_usize())?; let (rep, state) = self.id().get_mut(store.0).get_mut_by_index(ty, handle)?; let TransmitLocalState::Read { done } = *state else { bail!(Trap::ConcurrentFutureStreamOp); @@ -3760,13 +3860,12 @@ impl Instance { let concurrent_state = store.0.concurrent_state_mut(); if write_complete { - let count = u32::try_from(count)?; let total = if let Some(Event::StreamWrite { code: ReturnCode::Completed(old_total), .. }) = concurrent_state.take_event(write_handle_rep)? { - count + old_total + count.add(old_total)? } else { count }; @@ -3789,14 +3888,14 @@ impl Instance { ty: write_ty, flat_abi: write_flat_abi, options: write_options, - address: write_address + (count * item_size), - count: write_count - count, + address: write_address + (count.as_usize() * item_size), + count: write_count.sub(count)?, handle: write_handle, }; } if read_complete { - ReturnCode::completed(ty.kind(), count.try_into()?) + ReturnCode::completed(ty.kind(), count) } else { set_guest_ready(concurrent_state)?; ReturnCode::Blocked @@ -3822,8 +3921,15 @@ impl Instance { set_guest_ready(concurrent_state)?; - let code = - self.produce(store.0, ty.kind(), transmit_id, produce, try_into, 0, false)?; + let code = self.produce( + store.0, + ty.kind(), + transmit_id, + produce, + try_into, + ItemCount::ZERO, + false, + )?; if let (TransmitIndex::Future(_), ReturnCode::Completed(_)) = (ty, code) { store.0.concurrent_state_mut().get_mut(transmit_id)?.done = true; @@ -3837,7 +3943,7 @@ impl Instance { ReturnCode::Blocked } - WriteState::Dropped => ReturnCode::Dropped(0), + WriteState::Dropped => ReturnCode::Dropped(ItemCount::ZERO), }; if result == ReturnCode::Blocked && !self.options(store.0, options).async_ { @@ -3928,7 +4034,7 @@ impl Instance { self.wait_for_write(store, handle)? } } else { - ReturnCode::Cancelled(0) + ReturnCode::Cancelled(ItemCount::ZERO) }; if !matches!(code, ReturnCode::Blocked) { @@ -4015,7 +4121,7 @@ impl Instance { self.wait_for_read(store, handle)? } } else { - ReturnCode::Cancelled(0) + ReturnCode::Cancelled(ItemCount::ZERO) }; if !matches!(code, ReturnCode::Blocked) { @@ -4587,8 +4693,8 @@ impl ConcurrentState { }; Ok(match new { - ReturnCode::Dropped(0) => ReturnCode::Dropped(count), - ReturnCode::Cancelled(0) => ReturnCode::Cancelled(count), + ReturnCode::Dropped(ItemCount::ZERO) => ReturnCode::Dropped(count), + ReturnCode::Cancelled(ItemCount::ZERO) => ReturnCode::Cancelled(count), _ => bail_bug!("unexpected new return code"), }) } diff --git a/supply-chain/imports.lock b/supply-chain/imports.lock index 7b93d9418491..19a70370e02a 100644 --- a/supply-chain/imports.lock +++ b/supply-chain/imports.lock @@ -2406,8 +2406,8 @@ when = "2026-01-12" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-bindgen]] -version = "0.55.0" -when = "2026-04-03" +version = "0.56.0" +when = "2026-04-14" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-bindgen-core]] @@ -2416,8 +2416,8 @@ when = "2026-01-12" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-bindgen-core]] -version = "0.55.0" -when = "2026-04-03" +version = "0.56.0" +when = "2026-04-14" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-bindgen-rt]] @@ -2432,8 +2432,8 @@ when = "2026-01-12" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-bindgen-rust]] -version = "0.55.0" -when = "2026-04-03" +version = "0.56.0" +when = "2026-04-14" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-bindgen-rust-macro]] @@ -2442,8 +2442,8 @@ when = "2026-01-12" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-bindgen-rust-macro]] -version = "0.55.0" -when = "2026-04-03" +version = "0.56.0" +when = "2026-04-14" trusted-publisher = "github:bytecodealliance/wit-bindgen" [[publisher.wit-component]] diff --git a/tests/misc_testsuite/component-model/async/stream-big-read-and-writes.wast b/tests/misc_testsuite/component-model/async/stream-big-read-and-writes.wast new file mode 100644 index 000000000000..feaf32220e7f --- /dev/null +++ b/tests/misc_testsuite/component-model/async/stream-big-read-and-writes.wast @@ -0,0 +1,43 @@ +;;! component_model_async = true +;;! reference_types = true + +(component + (core module $M + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + + (func (export "run") (param $len i32) (result i32) + (local $s i64) (local $rx i32) (local $tx i32) (local $ret i32) + + (local.set $s (call $stream.new)) + (local.set $rx (i32.wrap_i64 (local.get $s))) + (local.set $tx (i32.wrap_i64 (i64.shr_u (local.get $s) (i64.const 32)))) + + (local.set $ret (call $stream.read + (local.get $rx) (i32.const 0) (local.get $len))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + (local.set $ret (call $stream.write + (local.get $tx) (i32.const 0) (local.get $len))) + (if (i32.ne (i32.shl (local.get $len) (i32.const 4)) (local.get $ret)) + (then unreachable)) + + (i32.const 42) + ) + ) + (type $ST (stream)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.read $ST async (core func $stream.read)) + (canon stream.write $ST async (core func $stream.write)) + (core instance $m (instantiate $M (with "" (instance + (export "stream.new" (func $stream.new)) + (export "stream.read" (func $stream.read)) + (export "stream.write" (func $stream.write)) + )))) + (func (export "run") (param "len" u32) (result u32) + (canon lift (core func $m "run"))) +) +(assert_return (invoke "run" (u32.const 0x0fffffff)) (u32.const 42)) +(assert_trap (invoke "run" (u32.const 0x10000000)) "stream read/write count too large")