diff --git a/.openapi-generator-ignore b/.openapi-generator-ignore index 3db7940..5d3c21b 100644 --- a/.openapi-generator-ignore +++ b/.openapi-generator-ignore @@ -12,6 +12,7 @@ src/resources.rs src/field.rs src/status.rs src/query.rs +src/uploads.rs CHANGELOG.md RELEASING.md examples/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 34b46cd..aae7b76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Ergonomic presigned (direct-to-storage) file uploads: `Client::upload_file` + (and `client.uploads().upload_file`) open an upload session, `PUT` the bytes + straight to object storage — a single `PUT` for small files, bounded- + concurrency multipart `PUT`s sliced by the server's `part_size` for large + ones — then finalize, returning the `FinalizeUploadResponse`. Configurable via + `UploadOptions` (content type/encoding, filename, part-size hint, and an + `UploadProgress` callback). Never falls back to the legacy `POST /v1/files` + proxy; storage `PUT`s carry no SDK auth/scope headers. Multipart concurrency + is tunable via `UploadOptions::max_concurrency` (default 10), bounded by a + 256 MiB peak-memory budget derived from the server's actual part size; when no + `part_size` is given, the SDK auto-scales the hint (8 MiB for normal files, + larger only past ~72 GiB to keep the part count under S3's 10,000-part limit). + Finalize is exactly-once (sent with retries disabled so an ambiguous failure + can't be retried into a spurious "already finalized" error); part `PUT`s stay + retryable. Storage `PUT`s use a dedicated header-bare reqwest client, so a host + app's default headers on the SDK's main client never leak to object storage. + The multipart session shape is validated (`part_urls` count must match the + file's part count) and pathological sizes (`> i64::MAX`) are rejected rather + than silently wrapped. + ### Changed - feat(uploads): add file upload endpoints diff --git a/src/client.rs b/src/client.rs index 8680cdf..314f08e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -513,6 +513,37 @@ impl Client { } } + /// Upload a local file directly to object storage and finalize it — the + /// primary, presigned upload entry point. + /// + /// Opens an upload session (`POST /v1/uploads`), `PUT`s the bytes **directly + /// to storage** (a single `PUT` for a small file, or bounded-concurrency + /// multipart `PUT`s split by the server's `part_size` for a large one), then + /// finalizes (`POST /v1/uploads/{id}/finalize`) and returns the + /// [`models::FinalizeUploadResponse`] — read `upload_id` from it to load the + /// upload into a managed table. + /// + /// This path NEVER falls back to the legacy `POST /v1/files` proxy: a server + /// that cannot presign (`501 PRESIGN_UNSUPPORTED`) is a hard + /// [`UploadError::CreateSession`] error. + /// + /// Pass [`UploadOptions`] to record advisory metadata (`content_type`, + /// `content_encoding`, `filename`), hint a `part_size` (the server clamps + /// it), or attach a `progress` callback invoked with + /// `(bytes_done_total, total)` as bytes flow. + /// + /// Large uploads legitimately take minutes; storage `PUT`s reuse the + /// configured reqwest client, so supply one with no request timeout (via + /// [`ClientBuilder::reqwest_client`](crate::ClientBuilder::reqwest_client)) + /// when uploading large files. + pub async fn upload_file( + &self, + path: impl AsRef, + opts: crate::uploads::UploadOptions, + ) -> Result { + crate::uploads::upload_file(&self.configuration, path.as_ref(), opts).await + } + /// Stream an arbitrary byte source to `POST /v1/files`, the raw-body upload /// endpoint. /// @@ -624,8 +655,7 @@ impl Client { .and_then(|v| v.to_str().ok()) .unwrap_or("application/octet-stream") .to_owned(); - let is_json = - content_type.starts_with("application") && content_type.contains("json"); + let is_json = content_type.starts_with("application") && content_type.contains("json"); if !status.is_client_error() && !status.is_server_error() { let content = resp.text().await?; @@ -1270,10 +1300,7 @@ mod tests { // The mock only matches when all three headers are present, so a // successful Submitted outcome proves they reached the wire. let outcome = client - .submit_query( - models::QueryRequest::new("select 1".into()), - Some("db_123"), - ) + .submit_query(models::QueryRequest::new("select 1".into()), Some("db_123")) .await .expect("submit_query should succeed with scoped headers"); diff --git a/src/lib.rs b/src/lib.rs index e14a1d8..2109c2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub mod models; pub mod query; pub mod resources; pub mod status; +pub mod uploads; #[cfg(all(test, unix))] mod test_support; @@ -49,11 +50,16 @@ pub use query::{ DEFAULT_MAX_AUTO_BYTES, DEFAULT_MAX_AUTO_ROWS, OVERLOADED_ERROR_CODE, }; pub use resources::{ - ConnectionTypesApi, ConnectionsApi, DatabaseContextApi, DatabasesApi, - EmbeddingProvidersApi, IndexesApi, InformationSchemaApi, JobsApi, QueryApi, QueryRunsApi, - RefreshApi, ResultsApi, SavedQueriesApi, SecretsApi, UploadsApi, WorkspacesApi, + ConnectionTypesApi, ConnectionsApi, DatabaseContextApi, DatabasesApi, EmbeddingProvidersApi, + IndexesApi, InformationSchemaApi, JobsApi, QueryApi, QueryRunsApi, RefreshApi, ResultsApi, + SavedQueriesApi, SecretsApi, UploadsApi, WorkspacesApi, }; pub use status::{QueryRunStatus, QueryRunStatusExt, ResultStatus, ResultStatusExt}; +pub use uploads::{ + auto_part_size_hint, effective_in_flight, UploadError, UploadOptions, UploadProgress, + DEFAULT_MAX_CONCURRENCY, DEFAULT_PART_SIZE, MAX_PART_SIZE, MIN_PART_SIZE, TARGET_MAX_PARTS, + UPLOAD_MEMORY_BUDGET, +}; /// Process-wide lock serializing every test that mutates `std::env`. Env is a /// process-global resource, so per-module locks would race; all env-mutating @@ -73,4 +79,5 @@ pub mod prelude { }; pub use crate::resources::*; pub use crate::status::{QueryRunStatus, QueryRunStatusExt, ResultStatus, ResultStatusExt}; + pub use crate::uploads::{UploadError, UploadOptions, UploadProgress}; } diff --git a/src/resources.rs b/src/resources.rs index 7675a62..a1f5011 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -703,7 +703,20 @@ impl<'a> UploadsApi<'a> { Self { config } } - /// Upload a file from a local path. + /// Upload a local file directly to object storage (presigned) and finalize + /// it. The primary upload path; returns the finalized upload (read + /// `upload_id` from it). See [`Client::upload_file`](crate::Client::upload_file) + /// for the full contract and [`UploadOptions`](crate::uploads::UploadOptions). + pub async fn upload_file( + &self, + path: impl AsRef, + opts: crate::uploads::UploadOptions, + ) -> Result { + crate::uploads::upload_file(self.config, path.as_ref(), opts).await + } + + /// Stream a file to the legacy raw-body `POST /v1/files` proxy. Prefer + /// [`upload_file`](Self::upload_file), the presigned direct-to-storage path. pub async fn upload( &self, body: std::path::PathBuf, diff --git a/src/uploads.rs b/src/uploads.rs new file mode 100644 index 0000000..5eb5073 --- /dev/null +++ b/src/uploads.rs @@ -0,0 +1,869 @@ +//! Ergonomic, hand-written direct-to-storage (presigned) file uploads. +//! +//! This module is regeneration-immune: it is protected by `.openapi-generator-ignore` +//! and is never emitted by the OpenAPI generator. It orchestrates the +//! presigned-upload flow that the generated [`apis::uploads_api`](crate::apis::uploads_api) +//! ops expose as raw building blocks: +//! +//! 1. `POST /v1/uploads` ([`create_upload_session_handler`]) opens a session and +//! returns either a single `url` (`mode == "single"`) or a set of `part_urls` +//! plus a `part_size` (`mode == "multipart"`), along with a one-time +//! `finalize_token`. +//! 2. The client `PUT`s the bytes **directly to object storage** — never back +//! through the API. Single uploads stream the whole file to `url`; multipart +//! uploads slice the file into `part_size`-byte chunks and `PUT` each chunk to +//! its `part_urls[i - 1]`, collecting the storage `ETag` per part. +//! 3. `POST /v1/uploads/{upload_id}/finalize` ([`finalize_upload_handler`]) +//! confirms the upload with the finalize token in the `X-Upload-Finalize-Token` +//! header (empty body for single; the ascending `{part_number, e_tag}` list +//! for multipart) and returns a [`models::FinalizeUploadResponse`]. +//! +//! # Storage PUT header isolation +//! +//! A presigned storage URL already carries its authorization in the query string +//! (or in the server-provided `headers` map). Object stores (S3 and compatible) +//! reject a `PUT` with `403 SignatureDoesNotMatch` if it carries extra +//! signed-ish headers, so [`put_to_storage`] sends a *bare* request: NONE of the +//! SDK's bearer / workspace / session headers, only an explicit `Content-Length` +//! and whatever the server placed in `headers` (currently always empty). It also +//! refuses to let reqwest auto-append a charset to a `Content-Type` — a type is +//! sent only when the server's `headers` map asks for one. +//! +//! No S3/AWS SDK is involved: storage `PUT`s are plain `reqwest`. + +use std::collections::HashMap; +use std::path::Path; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use crate::apis::configuration::Configuration; +use crate::apis::{self, Error}; +use crate::models; + +/// One mebibyte, the unit the storage part-size range is expressed in. +const MIB: u64 = 1024 * 1024; + +/// Default cap on concurrent part `PUT`s when the caller doesn't set +/// [`UploadOptions::max_concurrency`]. Matches the boto3 / AWS CLI default of 10. +/// The effective in-flight count is the MIN of this and a memory budget (see +/// [`effective_in_flight`]). +pub const DEFAULT_MAX_CONCURRENCY: usize = 10; + +/// Default part-size hint, in bytes (8 MiB), sent when the caller doesn't set +/// [`UploadOptions::part_size`]. The server clamps the hint to its own range and +/// returns the actual size. See [`auto_part_size_hint`]. +pub const DEFAULT_PART_SIZE: u64 = 8 * MIB; + +/// Target ceiling on part count when auto-scaling the part-size hint for very +/// large files, with headroom under S3's hard 10,000-part limit. See +/// [`auto_part_size_hint`]. +pub const TARGET_MAX_PARTS: u64 = 9000; + +/// Minimum part size storage accepts (5 MiB). The hint is clamped to at least +/// this; the server enforces it too. +pub const MIN_PART_SIZE: u64 = 5 * MIB; + +/// Maximum part size storage accepts (5 GiB). The hint is clamped to at most +/// this. +pub const MAX_PART_SIZE: u64 = 5 * 1024 * MIB; + +/// Target peak-memory budget for in-flight part buffers (256 MiB). Each +/// in-flight part buffers up to `part_size` bytes, so [`effective_in_flight`] +/// derives the in-flight count as `budget / part_size`. +/// +/// This is a TARGET, not a hard ceiling: it holds while `part_size` is small +/// relative to the budget (the normal case — 8 MiB parts stay well under it). It +/// cannot bound memory below one in-flight part, so when the server returns a +/// very large `part_size` (e.g. a 5 GiB part on a huge file), a single in-flight +/// part already exceeds this budget and peak memory is `1 * part_size`. In other +/// words the budget caps *concurrency*, not the size of one part. +pub const UPLOAD_MEMORY_BUDGET: u64 = 256 * MIB; + +/// Compute the part-size HINT to send to the server in +/// `CreateUploadRequest.part_size` when the caller did not specify one. +/// +/// Starts from [`DEFAULT_PART_SIZE`] (8 MiB) and grows only for files large +/// enough that 8 MiB parts would exceed [`TARGET_MAX_PARTS`] — so the common +/// case is unchanged and only very large files (beyond ~72 GiB) get a larger +/// hint to keep the part count bounded. The result is rounded UP to a whole MiB +/// and clamped to `[MIN_PART_SIZE, MAX_PART_SIZE]`. The server still has the +/// final say and clamps to its own range. +/// +/// Pure and total: `declared_size == 0` yields [`DEFAULT_PART_SIZE`]. +pub fn auto_part_size_hint(declared_size: u64) -> u64 { + // Smallest part size that keeps the count at or under the target. + let by_count = declared_size.div_ceil(TARGET_MAX_PARTS); + let raw = DEFAULT_PART_SIZE.max(by_count); + // Round up to a whole MiB so the hint is a clean multiple. + let rounded = raw.div_ceil(MIB) * MIB; + rounded.clamp(MIN_PART_SIZE, MAX_PART_SIZE) +} + +/// Compute how many part `PUT`s to keep in flight, given the caller's +/// `max_concurrency` (already defaulted to [`DEFAULT_MAX_CONCURRENCY`]) and the +/// SERVER's actual returned `part_size`. +/// +/// Peak buffered memory is `in_flight * part_size`, so we cap in-flight at +/// `UPLOAD_MEMORY_BUDGET / part_size`, then at `max_concurrency`. Normal 8 MiB +/// parts give `256/8 = 32`, capped to `max_concurrency`; a 64 MiB part gives `4`. +/// +/// `max_concurrency` is honored as an explicit floor: a caller asking for `1` +/// (or `0`) gets serial uploads (`1`), so the budget never *raises* concurrency +/// above what was requested. The budget-derived count itself has a floor of 1 +/// (you must keep at least one part in flight to make progress), so the overall +/// result is always `>= 1`. +/// +/// Pure and total: a zero `part_size` is treated as 1 to avoid division by zero. +pub fn effective_in_flight(max_concurrency: usize, part_size: u64) -> usize { + // Honor an explicit low request down to serial (1); never below 1. + let cap = max_concurrency.max(1); + let by_budget = (UPLOAD_MEMORY_BUDGET / part_size.max(1)).max(1) as usize; + by_budget.min(cap) +} + +/// Progress callback: invoked as bytes flow with `(bytes_done_total, total)`, +/// where `total` is the full declared file size. `bytes_done_total` is +/// monotonically non-decreasing and reaches exactly `total` when the transfer +/// completes. Shared (`Arc`) so it can be cloned across concurrent part tasks; +/// it must therefore be `Send + Sync`. +pub type UploadProgress = Arc; + +/// Options for [`Client::upload_file`](crate::Client::upload_file). +/// +/// All fields are optional. `content_type` / `content_encoding` / `filename` +/// are recorded with the upload (advisory metadata; they do not change where the +/// bytes are stored). `part_size` is a hint the server clamps to its allowed +/// range and ignores for single-`PUT` uploads. `progress`, when set, is invoked +/// as bytes flow. +#[derive(Default, Clone)] +pub struct UploadOptions { + /// Content type to record for the uploaded file (e.g. a Parquet/CSV/JSON + /// MIME type). Advisory. + pub content_type: Option, + /// Content encoding to record for the uploaded file (e.g. `gzip`). Advisory. + pub content_encoding: Option, + /// Original file name, recorded for bookkeeping. Advisory. Defaults to the + /// source path's file name when not set. + pub filename: Option, + /// Preferred part size, in bytes, for a large (multipart) upload. A hint; + /// the server clamps it and ignores it for single-`PUT` uploads. When unset, + /// the SDK auto-scales a hint via [`auto_part_size_hint`] (8 MiB for normal + /// files, larger only for very large ones to bound the part count). + pub part_size: Option, + /// Maximum number of part `PUT`s to keep in flight for a multipart upload. + /// `None` uses [`DEFAULT_MAX_CONCURRENCY`]. The effective in-flight count is + /// the MIN of this and a peak-memory budget derived from the server's actual + /// part size (see [`effective_in_flight`]), so memory stays bounded. + pub max_concurrency: Option, + /// Optional progress callback invoked with `(bytes_done_total, total)`. + pub progress: Option, +} + +impl std::fmt::Debug for UploadOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("UploadOptions") + .field("content_type", &self.content_type) + .field("content_encoding", &self.content_encoding) + .field("filename", &self.filename) + .field("part_size", &self.part_size) + .field("max_concurrency", &self.max_concurrency) + .field("progress", &self.progress.as_ref().map(|_| "")) + .finish() + } +} + +/// Error returned by [`Client::upload_file`](crate::Client::upload_file). +/// +/// Marked `#[non_exhaustive]`: new variants may be added without a breaking +/// change, so downstream `match`es should carry a wildcard arm. +#[derive(Debug)] +#[non_exhaustive] +pub enum UploadError { + /// Opening or reading the local source file failed. + Io(std::io::Error), + /// Opening the upload session (`POST /v1/uploads`) failed. A `501` + /// `PRESIGN_UNSUPPORTED` lands here too — the presigned path is a hard + /// requirement and never falls back to the legacy `POST /v1/files` proxy. + CreateSession(Error), + /// A storage `PUT` (or the request building / transport around it) failed. + Storage(reqwest::Error), + /// A storage `PUT` returned a non-2xx status. Carries the status and the + /// response body for diagnosis. + StorageStatus { + /// The HTTP status the storage endpoint returned. + status: reqwest::StatusCode, + /// The 1-based part number for a multipart `PUT`, or `None` for the + /// single-`PUT` path. + part_number: Option, + /// The storage response body (often XML for S3-style errors). + body: String, + }, + /// Storage accepted a part `PUT` but returned no `ETag` header, so the part + /// cannot be finalized. + MissingETag { + /// The 1-based part number whose `PUT` response lacked an `ETag`. + part_number: i32, + }, + /// The create-session response was internally inconsistent for its declared + /// `mode` (e.g. `single` without a `url`, or `multipart` without + /// `part_urls` / `part_size`). + MalformedSession(String), + /// A size (the file's declared size, or the part-size hint) did not fit the + /// wire's signed 64-bit field. Only reachable for pathological sizes beyond + /// `i64::MAX` bytes (~8 EiB). + SizeOverflow { + /// What overflowed (e.g. `"declared_size_bytes"`). + what: &'static str, + /// The offending value. + value: u64, + }, + /// Finalizing the upload (`POST /v1/uploads/{id}/finalize`) failed. + Finalize(Error), +} + +impl std::fmt::Display for UploadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + UploadError::Io(e) => write!(f, "reading the source file failed: {e}"), + UploadError::CreateSession(e) => write!(f, "opening the upload session failed: {e}"), + UploadError::Storage(e) => write!(f, "uploading to storage failed: {e}"), + UploadError::StorageStatus { + status, + part_number, + body, + } => match part_number { + Some(n) => write!(f, "storage rejected part {n} with status {status}: {body}"), + None => write!( + f, + "storage rejected the upload with status {status}: {body}" + ), + }, + UploadError::MissingETag { part_number } => write!( + f, + "storage returned no ETag for part {part_number}; cannot finalize" + ), + UploadError::SizeOverflow { what, value } => { + write!( + f, + "{what} ({value} bytes) exceeds the maximum supported size" + ) + } + UploadError::MalformedSession(msg) => { + write!(f, "malformed upload session response: {msg}") + } + UploadError::Finalize(e) => write!(f, "finalizing the upload failed: {e}"), + } + } +} + +impl std::error::Error for UploadError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + UploadError::Io(e) => Some(e), + UploadError::CreateSession(e) => Some(e), + UploadError::Storage(e) => Some(e), + UploadError::Finalize(e) => Some(e), + _ => None, + } + } +} + +impl From for UploadError { + fn from(e: std::io::Error) -> Self { + UploadError::Io(e) + } +} + +/// Upload a local file directly to object storage and finalize it. +/// +/// This is the orchestration behind [`Client::upload_file`](crate::Client::upload_file); +/// see that method for the public contract. It stats `path` for the declared +/// size, opens a session, drives the single-`PUT` or multipart path, and +/// finalizes — never touching the legacy `POST /v1/files` proxy. +pub(crate) async fn upload_file( + configuration: &Configuration, + path: &Path, + opts: UploadOptions, +) -> Result { + let metadata = tokio::fs::metadata(path).await?; + let total = metadata.len(); + + let filename = opts + .filename + .clone() + .or_else(|| path.file_name().map(|n| n.to_string_lossy().into_owned())); + + // Part-size hint: honor an explicit caller value, else auto-scale from the + // declared size so the common case stays at 8 MiB and only very large files + // grow the hint (bounding the part count). The server clamps it regardless. + let part_size_hint = opts.part_size.unwrap_or_else(|| auto_part_size_hint(total)); + + // The wire models size as a signed i64; reject (rather than silently wrap) + // a pathological size beyond i64::MAX. + let declared_size_bytes = i64::try_from(total).map_err(|_| UploadError::SizeOverflow { + what: "declared_size_bytes", + value: total, + })?; + let part_size_hint_i64 = + i64::try_from(part_size_hint).map_err(|_| UploadError::SizeOverflow { + what: "part_size", + value: part_size_hint, + })?; + + // Open the session. `declared_size_bytes` (the exact byte count finalize + // validates against) comes from `new(..)`, the single source; the + // struct-update base also fills the optional checksum fields with None. + let create = models::CreateUploadRequest { + content_type: opts.content_type.clone().map(Some), + content_encoding: opts.content_encoding.clone().map(Some), + filename: filename.map(Some), + part_size: Some(Some(part_size_hint_i64)), + ..models::CreateUploadRequest::new(declared_size_bytes) + }; + let session = apis::uploads_api::create_upload_session_handler(configuration, create) + .await + .map_err(UploadError::CreateSession)?; + + // Report initial progress so a 0-byte file (or an instant single PUT) still + // emits a terminal (0/0 or total/total) tick. + if let Some(ref progress) = opts.progress { + progress(0, total); + } + + let parts = match session.mode.as_str() { + "single" => { + upload_single(&session, path, total, opts.progress.as_ref()).await?; + None + } + "multipart" => { + let max_concurrency = opts.max_concurrency.unwrap_or(DEFAULT_MAX_CONCURRENCY); + Some( + upload_multipart( + configuration, + &session, + path, + total, + max_concurrency, + opts.progress.as_ref(), + ) + .await?, + ) + } + other => { + return Err(UploadError::MalformedSession(format!( + "unknown upload mode `{other}`" + ))) + } + }; + + // Finalize: single sends an empty object `{}`; multipart sends + // `{"parts": [...]}` with the ascending, non-duplicate parts list. The token + // rides the X-Upload-Finalize-Token header (handled by the generated op). + // + // The body MUST be a JSON object, never `null`: the server rejects a `null` + // finalize body ("invalid type: null, expected struct FinalizeUploadRequest") + // even though the field is logically optional for single uploads. So we wrap + // in `Some(..)` for both modes — the generated op then serializes a struct, + // and `parts` (skip_serializing_if = Option::is_none) drops out for single, + // yielding `{}`. + let finalize_body = Some( + parts + .map(|parts| models::FinalizeUploadRequest { + parts: Some(Some(parts)), + }) + .unwrap_or_default(), + ); + + // Finalize is exactly-once on the server: a second finalize of the same + // upload is rejected. The generated op routes through `execute_retrying`, + // which would retry an ambiguous failure (a lost response, or a 429 the + // server actually processed) — turning a finalize that SUCCEEDED into a + // spurious "already finalized" error on the retry. So we call it with retries + // disabled (a single attempt). Part PUTs stay retryable (idempotent: storage + // overwrites a part by number); only finalize is single-shot. + let mut finalize_config = configuration.clone(); + finalize_config.retry.max_retries = 0; + + apis::uploads_api::finalize_upload_handler( + &finalize_config, + &session.upload_id, + &session.finalize_token, + finalize_body, + ) + .await + .map_err(UploadError::Finalize) +} + +/// Single-`PUT` path: stream the whole file to `session.url`, invoking the +/// progress callback incrementally as chunks are sent to storage. +/// +/// The body is a [`progress_stream`] wrapping the file reader, so progress is +/// byte-granular (a multi-GB upload reports smooth `done/total` ticks rather +/// than jumping 0% -> 100%). A streaming body is not clonable, so this single +/// `PUT` is sent once with no 429/reset retry — an intentional trade for smooth +/// progress on the large, common single-`PUT` path; a presigned storage `PUT` +/// is not expected to be admission-shed. +async fn upload_single( + session: &models::UploadSessionResponse, + path: &Path, + total: u64, + progress: Option<&UploadProgress>, +) -> Result<(), UploadError> { + let url = + session.url.clone().flatten().ok_or_else(|| { + UploadError::MalformedSession("single upload missing `url`".to_owned()) + })?; + + let file = tokio::fs::File::open(path).await?; + let body = progress_stream(file, total, progress.cloned()); + + put_stream_to_storage(&url, &session.headers, body, total).await?; + + // Guarantee a terminal tick at exactly `total`, even if the stream's last + // chunk boundary or an empty file left the counter short. Monotonic: the + // streamed ticks never exceed `total`. + if let Some(progress) = progress { + progress(total, total); + } + Ok(()) +} + +/// Wrap a file reader in a byte-counting stream of `Bytes` chunks. Each chunk +/// advances a running total and invokes `progress(done, total)` as it is yielded +/// to the request body, so progress reflects bytes actually handed to the +/// transport. Monotonic non-decreasing; the running total never exceeds `total`. +fn progress_stream( + file: tokio::fs::File, + total: u64, + progress: Option, +) -> ProgressStream { + use tokio_util::codec::{BytesCodec, FramedRead}; + + ProgressStream { + inner: FramedRead::new(file, BytesCodec::new()), + done: 0, + total, + progress, + } +} + +/// A [`Stream`](futures_core::Stream) of `Bytes` chunks read from a file that +/// reports cumulative byte progress as each chunk is yielded. Hand-rolled over +/// `futures_core` (the crate's only direct futures dep) rather than pulling in +/// `futures_util`, mirroring how [`Client::upload_stream`](crate::Client::upload_stream) +/// stays on `futures_core::Stream`. +struct ProgressStream { + inner: tokio_util::codec::FramedRead, + done: u64, + total: u64, + progress: Option, +} + +impl futures_core::Stream for ProgressStream { + type Item = std::io::Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + use std::task::Poll; + // `inner` (FramedRead) is Unpin, and our other fields are too, so a + // mutable projection through `get_mut` is sound without pin-project. + let this = self.get_mut(); + match std::pin::Pin::new(&mut this.inner).poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { + let chunk = chunk.freeze(); + this.done = (this.done + chunk.len() as u64).min(this.total); + if let Some(ref progress) = this.progress { + progress(this.done, this.total); + } + Poll::Ready(Some(Ok(chunk))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +/// Multipart path: slice the file into `part_size`-byte chunks (the last is the +/// remainder), `PUT` each chunk to its `part_urls[i - 1]` with bounded +/// concurrency, and collect `(part_number, e_tag)` per part. +/// +/// `max_concurrency` is the caller's ceiling on in-flight parts; the effective +/// count also honors a peak-memory budget derived from the server's actual +/// `part_size` (see [`effective_in_flight`]). +/// +/// Returns the parts sorted ascending by part number, ready for finalize. +async fn upload_multipart( + configuration: &Configuration, + session: &models::UploadSessionResponse, + path: &Path, + total: u64, + max_concurrency: usize, + progress: Option<&UploadProgress>, +) -> Result, UploadError> { + let part_urls = session.part_urls.clone().flatten().ok_or_else(|| { + UploadError::MalformedSession("multipart upload missing `part_urls`".to_owned()) + })?; + let part_size = session.part_size.flatten().ok_or_else(|| { + UploadError::MalformedSession("multipart upload missing `part_size`".to_owned()) + })?; + if part_size <= 0 { + return Err(UploadError::MalformedSession(format!( + "multipart upload has non-positive `part_size` {part_size}" + ))); + } + let part_size = part_size as u64; + + if part_urls.is_empty() { + return Err(UploadError::MalformedSession( + "multipart upload has empty `part_urls`".to_owned(), + )); + } + + // The URL count must match the number of `part_size`-byte chunks the file + // splits into (last is the remainder). Too many URLs and we'd PUT a + // zero-length trailing part; too few and we'd finalize an incomplete list. + // Both mean a session inconsistent with our declared size, so fail loudly. + let expected_parts = total.div_ceil(part_size).max(1); + if part_urls.len() as u64 != expected_parts { + return Err(UploadError::MalformedSession(format!( + "multipart upload returned {} part URLs but the file ({total} bytes) \ + splits into {expected_parts} parts of {part_size} bytes", + part_urls.len() + ))); + } + + // Peak buffered memory is in_flight * part_size; bound in-flight by both the + // caller's max_concurrency and the memory budget, using the SERVER's actual + // part size (the same value we slice by below). + let in_flight_cap = effective_in_flight(max_concurrency, part_size); + + // Aggregate progress across parts via a shared counter; each part adds its + // own byte count as it completes. + let done = Arc::new(AtomicU64::new(0)); + + // Drive part PUTs with a bounded number in flight via a JoinSet. Each task + // opens its own file handle and does a positioned read of exactly its byte + // range so a retry inside `put_to_storage` re-reads cleanly and tasks never + // share a cursor. Each task carries its 0-based `index` so the completion + // order (which JoinSet does not preserve) is undone when placing results. + let mut results: Vec> = vec![None; part_urls.len()]; + let mut next = 0usize; + let mut join_set: tokio::task::JoinSet< + Result<(usize, models::FinalizeUploadPart), UploadError>, + > = tokio::task::JoinSet::new(); + + loop { + while join_set.len() < in_flight_cap && next < part_urls.len() { + let index = next; + next += 1; + let part_number = (index + 1) as i32; + let url = part_urls[index].clone(); + let offset = index as u64 * part_size; + // The part-count check above guarantees every part has bytes, but + // guard defensively: a part starting at/after EOF has no bytes to + // send, so skip it rather than PUT a zero-length object. + if offset >= total && total > 0 { + continue; + } + // The last part carries the remainder; earlier parts are exactly + // `part_size`. + let len = part_size.min(total.saturating_sub(offset)); + let headers = session.headers.clone(); + let done = Arc::clone(&done); + let progress = progress.cloned(); + // RetryPolicy is Copy. + let retry = configuration.retry; + let path = path.to_path_buf(); + + join_set.spawn(async move { + let chunk = read_range(&path, offset, len).await?; + let resp = + put_to_storage(&retry, &url, &headers, chunk, len, Some(part_number)).await?; + let e_tag = resp + .headers() + .get(reqwest::header::ETAG) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned()) + .ok_or(UploadError::MissingETag { part_number })?; + + // Aggregate progress: add this part's bytes once it lands. + if let Some(progress) = progress.as_ref() { + let now = done.fetch_add(len, Ordering::SeqCst) + len; + progress(now, total); + } + + Ok::<_, UploadError>((index, models::FinalizeUploadPart { e_tag, part_number })) + }); + } + + match join_set.join_next().await { + // A part finished. `join_next` yields the task's `Result`; the outer + // `Result` is the JoinError (panic/cancel), the inner is our + // `UploadError`. + Some(Ok(Ok((index, part)))) => results[index] = Some(part), + Some(Ok(Err(e))) => { + join_set.abort_all(); + return Err(e); + } + Some(Err(join_err)) => { + join_set.abort_all(); + // A part task panicked or was cancelled — surface it as an I/O + // error so the upload fails loudly rather than silently dropping + // a part. + return Err(UploadError::Io(std::io::Error::other(format!( + "part upload task failed: {join_err}" + )))); + } + None => break, + } + } + + // `results` is indexed by 0-based part position, so collecting it in order + // yields parts ascending by part_number with no duplicates. + Ok(results.into_iter().flatten().collect()) +} + +/// Read exactly `len` bytes starting at `offset` from `path`. A positioned read +/// (seek + read_exact) so multipart part tasks never share a cursor and a retry +/// re-reads the same range cleanly. +async fn read_range(path: &Path, offset: u64, len: u64) -> Result { + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + let mut file = tokio::fs::File::open(path).await?; + file.seek(std::io::SeekFrom::Start(offset)).await?; + let mut buf = vec![0u8; len as usize]; + file.read_exact(&mut buf).await?; + Ok(bytes::Bytes::from(buf)) +} + +/// `PUT` a body to a presigned storage URL with strict header isolation. +/// +/// Attaches NONE of the SDK's auth/workspace/session/user-agent headers — a +/// presigned URL already carries its authorization, and an extra signed-ish +/// header makes S3-compatible storage return `403`. Only an explicit +/// `Content-Length` and the server-provided `headers` map (replayed verbatim; +/// currently always empty) are sent. A `Content-Type` is set ONLY when the +/// `headers` map includes one, so reqwest never auto-appends a charset. +/// +/// Sent on the dedicated, header-bare [`storage_client`] with **no request +/// timeout** (a large upload legitimately takes minutes); the body buffers in +/// memory so it clones cleanly across retries via [`crate::http::execute_retrying`]. +/// Part `PUT`s are retryable: storage overwrites a part by number, so a retried +/// part is idempotent. `retry` is the SDK's retry policy (carried on +/// `Configuration`), used only for the retry timing here. +async fn put_to_storage( + retry: &crate::query::RetryPolicy, + url: &str, + headers: &HashMap, + body: bytes::Bytes, + content_length: u64, + part_number: Option, +) -> Result { + let client = storage_client(); + + let mut req_builder = client + .request(reqwest::Method::PUT, url) + .header(reqwest::header::CONTENT_LENGTH, content_length); + + // Replay the server-provided headers verbatim. Currently always empty; this + // is the only place a Content-Type may be set, so reqwest can't auto-append + // a charset. + for (name, value) in headers { + req_builder = req_builder.header(name.as_str(), value.as_str()); + } + + // A buffered Bytes body clones cleanly, so 429 / pre-response-reset retries + // in `execute_retrying` can re-send it. + req_builder = req_builder.body(reqwest::Body::from(body)); + + let req = req_builder.build().map_err(UploadError::Storage)?; + crate::http_log::log_request(&req); + let resp = crate::http::execute_retrying(&client, req, retry) + .await + .map_err(UploadError::Storage)?; + + let status = resp.status(); + crate::http_log::log_response_status(status); + if status.is_client_error() || status.is_server_error() { + let body = resp.text().await.unwrap_or_default(); + crate::http_log::log_response_body(&body); + return Err(UploadError::StorageStatus { + status, + part_number, + body, + }); + } + Ok(resp) +} + +/// `PUT` a streaming body to a presigned storage URL with the same strict +/// header isolation as [`put_to_storage`] (no SDK auth/scope headers; explicit +/// `Content-Length`; `Content-Type` only from the server `headers` map). +/// +/// Used by the single-`PUT` path so progress is byte-granular. A streamed body +/// is not clonable, so this is a SINGLE attempt with no 429/reset retry — unlike +/// the buffered, retryable [`put_to_storage`] used per multipart part. +async fn put_stream_to_storage( + url: &str, + headers: &HashMap, + body: S, + content_length: u64, +) -> Result +where + S: futures_core::Stream> + Send + 'static, +{ + let client = storage_client(); + + let mut req_builder = client + .request(reqwest::Method::PUT, url) + // Explicit Content-Length so the body is sized (not chunked) — storage + // can reject an oversized upload up front, and reqwest honors it as the + // framing for a wrapped stream. + .header(reqwest::header::CONTENT_LENGTH, content_length); + + for (name, value) in headers { + req_builder = req_builder.header(name.as_str(), value.as_str()); + } + + req_builder = req_builder.body(reqwest::Body::wrap_stream(body)); + + let req = req_builder.build().map_err(UploadError::Storage)?; + crate::http_log::log_request(&req); + // A streamed body can't be cloned, so send once (no retry helper). + let resp = client.execute(req).await.map_err(UploadError::Storage)?; + + let status = resp.status(); + crate::http_log::log_response_status(status); + if status.is_client_error() || status.is_server_error() { + let body = resp.text().await.unwrap_or_default(); + crate::http_log::log_response_body(&body); + return Err(UploadError::StorageStatus { + status, + part_number: None, + body, + }); + } + Ok(resp) +} + +/// The dedicated, process-wide reqwest client used for storage `PUT`s. +/// +/// Deliberately NOT `configuration.client`: a host app may have installed +/// default headers (auth / workspace / `User-Agent` / `Content-Type`) on the +/// SDK's main client, which reqwest would then apply to the storage `PUT` — +/// making S3-compatible storage return `403 SignatureDoesNotMatch`. This client +/// is built bare: no default headers, and no request timeout (a large upload +/// legitimately takes minutes). It is built once and reused. +/// +/// Trade-off: TLS / proxy / connection-pool settings on the SDK's main client +/// do NOT apply to storage `PUT`s — they go through this independent client. +/// That is intentional; storage transfers must be header-isolated, and a +/// host-configured proxy for the API host is not assumed to front object +/// storage. +fn storage_client() -> reqwest::Client { + static STORAGE_CLIENT: std::sync::OnceLock = std::sync::OnceLock::new(); + STORAGE_CLIENT + .get_or_init(|| { + reqwest::Client::builder() + // No `default_headers`, no `timeout`. A connect timeout is fine + // (it bounds only connection establishment, not the transfer). + .build() + // Falls back to a plain default client if the builder somehow + // fails (e.g. no TLS backend); still header-bare. + .unwrap_or_default() + }) + .clone() +} + +#[cfg(test)] +mod tests { + use super::*; + + /// The part count a given hint would produce for a file of `size`. + fn part_count(size: u64, part: u64) -> u64 { + size.div_ceil(part) + } + + #[test] + fn auto_part_size_keeps_8mib_for_normal_files() { + // Empty and small files default to 8 MiB. + assert_eq!(auto_part_size_hint(0), DEFAULT_PART_SIZE); + assert_eq!(auto_part_size_hint(1), DEFAULT_PART_SIZE); + assert_eq!(auto_part_size_hint(100 * MIB), DEFAULT_PART_SIZE); + assert_eq!(auto_part_size_hint(1024 * MIB), DEFAULT_PART_SIZE); // 1 GiB + // Right at the boundary: 8 MiB * 9000 parts = 72 GiB still fits 8 MiB. + let boundary = DEFAULT_PART_SIZE * TARGET_MAX_PARTS; + assert_eq!(auto_part_size_hint(boundary), DEFAULT_PART_SIZE); + } + + #[test] + fn auto_part_size_scales_up_for_very_large_files_and_caps_parts() { + // Beyond ~72 GiB the hint must grow above 8 MiB. + let big = 200 * 1024 * MIB; // 200 GiB + let hint = auto_part_size_hint(big); + assert!( + hint > DEFAULT_PART_SIZE, + "hint should scale above 8 MiB for a 200 GiB file, got {hint}" + ); + // Hint is a whole number of MiB. + assert_eq!(hint % MIB, 0, "hint must be a whole MiB, got {hint}"); + // Part count stays at or under the target ceiling. + assert!( + part_count(big, hint) <= TARGET_MAX_PARTS, + "part count {} must be <= {TARGET_MAX_PARTS}", + part_count(big, hint) + ); + // And always within storage's accepted range. + assert!((MIN_PART_SIZE..=MAX_PART_SIZE).contains(&hint)); + } + + #[test] + fn auto_part_size_clamps_to_max_for_enormous_files() { + // A file so large the count-driven size would exceed 5 GiB clamps to the + // 5 GiB ceiling (the part count then necessarily exceeds the soft target, + // which is fine — it's a hint and the server has the final say). + let enormous = 100 * 1024 * 1024 * MIB; // 100 PiB + assert_eq!(auto_part_size_hint(enormous), MAX_PART_SIZE); + } + + #[test] + fn effective_in_flight_capped_by_max_concurrency_for_small_parts() { + // 8 MiB parts: budget allows 256/8 = 32, so max_concurrency wins. + assert_eq!(effective_in_flight(12, 8 * MIB), 12); + assert_eq!(effective_in_flight(10, 8 * MIB), 10); + // A tiny part size still can't exceed max_concurrency. + assert_eq!(effective_in_flight(12, MIB), 12); + } + + #[test] + fn effective_in_flight_reduced_by_memory_budget_for_large_parts() { + // 64 MiB parts: budget allows 256/64 = 4, below max_concurrency. + assert_eq!(effective_in_flight(12, 64 * MIB), 4); + // 128 MiB parts: 256/128 = 2. + assert_eq!(effective_in_flight(12, 128 * MIB), 2); + } + + #[test] + fn effective_in_flight_honors_explicit_low_concurrency() { + // An explicit max_concurrency of 1 means serial uploads — NOT raised to a + // floor of 2. (Regression guard for the Codex finding.) + assert_eq!(effective_in_flight(1, 8 * MIB), 1); + // 0 is normalized to 1 (you can't run zero in flight), not to 2. + assert_eq!(effective_in_flight(0, 8 * MIB), 1); + // 2 stays 2. + assert_eq!(effective_in_flight(2, 8 * MIB), 2); + } + + #[test] + fn effective_in_flight_floors_at_1_for_huge_parts_and_handles_zero() { + // A part larger than the whole budget still keeps at least 1 in flight + // (the budget can't bound below a single part). + assert_eq!(effective_in_flight(12, UPLOAD_MEMORY_BUDGET * 4), 1); + // Zero part size doesn't divide-by-zero (treated as 1 byte): the budget + // then allows a huge count, so max_concurrency wins. + assert_eq!(effective_in_flight(12, 0), 12); + } +} diff --git a/tests/presigned_uploads.rs b/tests/presigned_uploads.rs new file mode 100644 index 0000000..289574a --- /dev/null +++ b/tests/presigned_uploads.rs @@ -0,0 +1,1360 @@ +//! Presigned direct-to-storage upload tests. +//! +//! Most tests stand up a single wiremock server that plays BOTH roles: the +//! hotdata API (`POST /v1/uploads`, `POST /v1/uploads/{id}/finalize`) and the +//! "object storage" endpoint the SDK `PUT`s bytes to (`/storage/...`). The +//! concurrency tests instead point `part_urls` at a bare raw-TCP storage server +//! ([`concurrency_storage_server`]) that genuinely holds in-flight PUTs to +//! measure real overlap. All are fully local and deterministic — no real +//! backend, no credentials — so they run in CI without secrets. +//! +//! Coverage: +//! * single-`PUT` happy path (bytes, header isolation, finalize token + empty +//! parts, returned upload_id); +//! * multipart happy path (slicing by `part_size`, per-part ETag collection, +//! ascending finalize parts); +//! * progress callback monotonicity reaching exactly the file size; +//! * storage-PUT header isolation (no SDK bearer/workspace/session headers, and +//! no default headers leaking off the SDK's main client); +//! * finalize exactly-once (no retry) and per-part retry; +//! * error surfacing (missing ETag, storage 4xx/5xx, finalize failure, +//! 501 PRESIGN_UNSUPPORTED, malformed sessions); +//! * bounded in-flight concurrency and server-provided Content-Type replay. + +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use hotdata::apis::configuration::{ApiKey, Configuration}; +use hotdata::{Client, RetryPolicy, UploadError, UploadOptions}; +use wiremock::matchers::{method, path, path_regex}; +use wiremock::{Mock, MockServer, Request, ResponseTemplate}; + +const WORKSPACE_HEADER: &str = "X-Workspace-Id"; +const SESSION_HEADER: &str = "X-Session-Id"; + +/// Build a client pointed at the mock server with a static bearer token and the +/// workspace + session scope headers installed (no JWT-exchange round-trip), so +/// the upload requests carry exactly the headers a real client would. +fn test_client(base_url: &str) -> Client { + let mut configuration = Configuration { + base_path: base_url.to_owned(), + user_agent: Some("hotdata-rust-test".to_owned()), + bearer_access_token: Some("test-bearer".to_owned()), + ..Configuration::default() + }; + configuration.api_keys.insert( + WORKSPACE_HEADER.to_owned(), + ApiKey { + prefix: None, + key: "ws_test".to_owned(), + }, + ); + configuration.api_keys.insert( + SESSION_HEADER.to_owned(), + ApiKey { + prefix: None, + key: "sess_test".to_owned(), + }, + ); + Client::from_configuration(configuration) +} + +/// A fast, deterministic retry policy (tiny backoff, no jitter, several +/// retries) so per-part-retry tests run without real delay. +fn fast_retry(max_retries: u32) -> RetryPolicy { + RetryPolicy { + max_retries, + base_backoff: Duration::from_millis(1), + max_backoff: Duration::from_millis(5), + deadline: Duration::from_secs(30), + jitter: 0.0, + } +} + +/// Like [`test_client`] but with an explicit retry policy installed (storage +/// part PUTs route through it; finalize disables it internally). +fn test_client_with_retry(base_url: &str, retry: RetryPolicy) -> Client { + let mut client = test_client(base_url); + client.configuration_mut().retry = retry; + client +} + +/// Write `contents` to a uniquely-named temp file and return its path. +fn temp_file(contents: &[u8]) -> std::path::PathBuf { + let name = format!("hotdata-presigned-{}", uuid::Uuid::new_v4().simple()); + let path = std::env::temp_dir().join(name); + std::fs::write(&path, contents).expect("writing the temp upload file should succeed"); + path +} + +/// Assert a storage `PUT` request carries NONE of the SDK's auth/scope headers. +/// A presigned URL self-authorizes; an extra signed-ish header makes S3-style +/// storage return 403. +fn assert_no_sdk_headers(req: &Request) { + for forbidden in [ + "authorization", + "x-workspace-id", + "x-session-id", + "x-upload-finalize-token", + ] { + assert!( + req.headers.get(forbidden).is_none(), + "storage PUT must not carry the `{forbidden}` header, found one" + ); + } +} + +#[tokio::test] +async fn single_put_happy_path() { + let server = MockServer::start().await; + let storage_url = format!("{}/storage/single", server.uri()); + let contents = b"hello presigned world"; + + // POST /v1/uploads -> mode=single with a storage url + finalize token. + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_single", + "headers": {}, + "mode": "single", + "upload_id": "upl_single", + "url": storage_url, + }))) + .mount(&server) + .await; + + // The storage PUT target. Accept any bytes; we assert on them afterwards. + Mock::given(method("PUT")) + .and(path("/storage/single")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"single-etag\"")) + .mount(&server) + .await; + + // POST /v1/uploads/{id}/finalize -> the finalized upload. + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_single/finalize")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "created_at": "2026-06-25T00:00:00Z", + "size_bytes": contents.len(), + "status": "ready", + "upload_id": "upl_single", + }))) + .mount(&server) + .await; + + let path = temp_file(contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + let result = result.expect("single upload should succeed"); + + assert_eq!(result.upload_id, "upl_single"); + assert_eq!(result.size_bytes as usize, contents.len()); + assert_eq!(result.status, "ready"); + + // Inspect the recorded requests. + let requests = server.received_requests().await.expect("requests recorded"); + let put = requests + .iter() + .find(|r| r.url.path() == "/storage/single") + .expect("a storage PUT should have been made"); + // Exact bytes arrived. + assert_eq!( + put.body, contents, + "storage PUT body must be the file bytes" + ); + // Explicit Content-Length, framed (not chunked). + assert_eq!( + put.headers + .get("content-length") + .and_then(|v| v.to_str().ok()), + Some(contents.len().to_string().as_str()), + "storage PUT must set an explicit Content-Length" + ); + // Header isolation. + assert_no_sdk_headers(put); + + // Finalize carried the token in the header and an empty/absent parts body. + let finalize = requests + .iter() + .find(|r| r.url.path() == "/v1/uploads/upl_single/finalize") + .expect("a finalize request should have been made"); + assert_eq!( + finalize + .headers + .get("x-upload-finalize-token") + .and_then(|v| v.to_str().ok()), + Some("ftok_single"), + "finalize must carry the token in X-Upload-Finalize-Token" + ); + // The single-PUT finalize body MUST be a JSON object (`{}`), NOT `null`: + // prod rejects a `null` finalize body ("invalid type: null, expected struct + // FinalizeUploadRequest"). Parse the raw bytes strictly so a literal `null` + // is caught (it would parse to Value::Null and fail this assert). + let body: serde_json::Value = + serde_json::from_slice(&finalize.body).expect("finalize body must be valid JSON"); + assert!( + body.is_object(), + "single-PUT finalize body must be a JSON object, not {body}" + ); + assert!( + !body.is_null(), + "single-PUT finalize body must not be JSON null" + ); + // And it must not enumerate parts. + assert!( + body.get("parts").is_none(), + "single-PUT finalize must omit the parts key, got {body}" + ); +} + +#[tokio::test] +async fn single_put_progress_is_byte_granular() { + // A single-PUT body larger than one read chunk must produce MULTIPLE + // intermediate progress ticks (not just 0 and total), so the CLI renders a + // smooth bar instead of a 0% -> 100% jump. FramedRead's BytesCodec yields + // chunks of at most a few KiB, so a 256 KiB body spans many chunks. + let server = MockServer::start().await; + let storage_url = format!("{}/storage/big", server.uri()); + let contents: Vec = (0..256 * 1024).map(|i| (i % 251) as u8).collect(); + let total = contents.len() as u64; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_big", + "headers": {}, + "mode": "single", + "upload_id": "upl_big", + "url": storage_url, + }))) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/big")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"big\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_big/finalize")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "created_at": "2026-06-25T00:00:00Z", + "size_bytes": contents.len(), + "status": "ready", + "upload_id": "upl_big", + }))) + .mount(&server) + .await; + + // Record every tick. The callback runs on the body-stream task; collect into + // a shared Vec. + let ticks: Arc>> = Arc::new(Mutex::new(Vec::new())); + let ticks_cb = Arc::clone(&ticks); + let progress: hotdata::UploadProgress = Arc::new(move |done, total| { + ticks_cb.lock().unwrap().push((done, total)); + }); + let opts = UploadOptions { + progress: Some(progress), + ..UploadOptions::default() + }; + + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, opts).await; + let _ = std::fs::remove_file(&path); + result.expect("single upload should succeed"); + + let ticks = ticks.lock().unwrap(); + // Many intermediate updates, not just the terminal one. + let intermediate = ticks.iter().filter(|(d, _)| *d > 0 && *d < total).count(); + assert!( + intermediate >= 2, + "single-PUT progress must fire multiple intermediate ticks for a \ + multi-chunk body; saw ticks: {ticks:?}" + ); + // Total is always the file size; the sequence is monotonic non-decreasing. + let mut prev = 0u64; + for (d, t) in ticks.iter() { + assert_eq!(*t, total, "total must be the file size"); + assert!( + *d >= prev, + "progress must be non-decreasing: {d} after {prev}" + ); + assert!(*d <= total, "progress must never exceed total"); + prev = *d; + } + // The final observed value is exactly the file size. + assert_eq!( + ticks.last().map(|(d, _)| *d), + Some(total), + "single-PUT progress must reach exactly the file size" + ); +} + +#[tokio::test] +async fn multipart_happy_path() { + let server = MockServer::start().await; + let part_size = 5usize; + // 13 bytes over part_size=5 -> parts of 5, 5, 3 (last is the remainder). + let contents: Vec = (0u8..13).collect(); + let part_urls: Vec = (1..=3) + .map(|i| format!("{}/storage/part/{i}", server.uri())) + .collect(); + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_multi", + "headers": {}, + "mode": "multipart", + "part_size": part_size, + "part_urls": part_urls, + "upload_id": "upl_multi", + }))) + .mount(&server) + .await; + + // Each part endpoint returns a distinct ETag so we can assert per-part + // collection. The mock echoes its part number into the ETag value. + for i in 1..=3 { + Mock::given(method("PUT")) + .and(path(format!("/storage/part/{i}"))) + .respond_with( + ResponseTemplate::new(200).insert_header("ETag", format!("\"etag-part-{i}\"")), + ) + .mount(&server) + .await; + } + + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_multi/finalize")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "created_at": "2026-06-25T00:00:00Z", + "size_bytes": contents.len(), + "status": "ready", + "upload_id": "upl_multi", + }))) + .mount(&server) + .await; + + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + let result = result.expect("multipart upload should succeed"); + assert_eq!(result.upload_id, "upl_multi"); + + let requests = server.received_requests().await.expect("requests recorded"); + + // Each part received exactly its slice: part i (1-based) gets bytes + // [(i-1)*part_size, i*part_size), last is the remainder. + let expected_slices = [&contents[0..5], &contents[5..10], &contents[10..13]]; + for (i, expected) in expected_slices.iter().enumerate() { + let part_path = format!("/storage/part/{}", i + 1); + let put = requests + .iter() + .find(|r| r.url.path() == part_path) + .unwrap_or_else(|| panic!("a PUT to {part_path} should have been made")); + assert_eq!( + &put.body[..], + *expected, + "part {} body must be the {}-byte slice", + i + 1, + expected.len() + ); + assert_eq!( + put.headers + .get("content-length") + .and_then(|v| v.to_str().ok()), + Some(expected.len().to_string().as_str()), + "part {} must set Content-Length to its slice length", + i + 1 + ); + assert_no_sdk_headers(put); + } + + // Finalize carried the ascending {part_number, e_tag} list, ETags + // byte-for-byte (quotes preserved). + let finalize = requests + .iter() + .find(|r| r.url.path() == "/v1/uploads/upl_multi/finalize") + .expect("a finalize request should have been made"); + assert_eq!( + finalize + .headers + .get("x-upload-finalize-token") + .and_then(|v| v.to_str().ok()), + Some("ftok_multi"), + ); + let body: serde_json::Value = serde_json::from_slice(&finalize.body).expect("finalize JSON"); + // The body must be a JSON object carrying `parts` — never `null`. + assert!( + body.is_object(), + "multipart finalize body must be a JSON object, not {body}" + ); + let parts = body + .get("parts") + .and_then(|p| p.as_array()) + .expect("multipart finalize must send a parts array"); + assert_eq!(parts.len(), 3, "all three parts must be finalized"); + for (i, part) in parts.iter().enumerate() { + assert_eq!( + part.get("part_number").and_then(|v| v.as_i64()), + Some((i + 1) as i64), + "parts must be ascending and 1-based" + ); + assert_eq!( + part.get("e_tag").and_then(|v| v.as_str()), + Some(format!("\"etag-part-{}\"", i + 1).as_str()), + "ETag must be forwarded byte-for-byte with surrounding quotes" + ); + } +} + +#[tokio::test] +async fn progress_callback_reaches_total() { + let server = MockServer::start().await; + let part_size = 4usize; + let contents: Vec = (0u8..10).collect(); // 4, 4, 2 + let part_urls: Vec = (1..=3) + .map(|i| format!("{}/storage/p/{i}", server.uri())) + .collect(); + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok", + "headers": {}, + "mode": "multipart", + "part_size": part_size, + "part_urls": part_urls, + "upload_id": "upl_prog", + }))) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path_regex(r"^/storage/p/\d+$")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"e\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_prog/finalize")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "created_at": "2026-06-25T00:00:00Z", + "size_bytes": contents.len(), + "status": "ready", + "upload_id": "upl_prog", + }))) + .mount(&server) + .await; + + // Record every (done, total) tick; assert monotonic and terminal == size. + let ticks: Arc>> = Arc::new(Mutex::new(Vec::new())); + let max_done = Arc::new(AtomicU64::new(0)); + let ticks_cb = Arc::clone(&ticks); + let max_cb = Arc::clone(&max_done); + let progress: hotdata::UploadProgress = Arc::new(move |done, total| { + // Monotonic non-decreasing (tasks complete concurrently, but the shared + // AtomicU64 counter only grows). + let prev = max_cb.fetch_max(done, Ordering::SeqCst); + assert!( + done >= prev, + "progress must be non-decreasing: saw {done} after {prev}" + ); + ticks_cb.lock().unwrap().push((done, total)); + }); + + let opts = UploadOptions { + progress: Some(progress), + ..UploadOptions::default() + }; + + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, opts).await; + let _ = std::fs::remove_file(&path); + result.expect("upload should succeed"); + + let ticks = ticks.lock().unwrap(); + assert!(!ticks.is_empty(), "progress callback must be invoked"); + let total = contents.len() as u64; + for (_, t) in ticks.iter() { + assert_eq!(*t, total, "total passed to progress must be the file size"); + } + let final_done = ticks.iter().map(|(d, _)| *d).max().unwrap(); + assert_eq!( + final_done, total, + "progress must reach exactly the file size" + ); +} + +#[tokio::test] +async fn storage_put_header_isolation_negative_check() { + // A focused negative check on the single-PUT path: the storage PUT must not + // carry the SDK bearer or workspace/session scope headers even though the + // client is fully configured with all of them. + let server = MockServer::start().await; + let storage_url = format!("{}/storage/iso", server.uri()); + let contents = b"isolation bytes"; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_iso", + "headers": {}, + "mode": "single", + "upload_id": "upl_iso", + "url": storage_url, + }))) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/iso")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"iso\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_iso/finalize")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "created_at": "2026-06-25T00:00:00Z", + "size_bytes": contents.len(), + "status": "ready", + "upload_id": "upl_iso", + }))) + .mount(&server) + .await; + + let path = temp_file(contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + result.expect("upload should succeed"); + + let requests = server.received_requests().await.expect("requests recorded"); + let put = requests + .iter() + .find(|r| r.url.path() == "/storage/iso") + .expect("a storage PUT should have been made"); + assert_no_sdk_headers(put); + + // Sanity: the API requests (create/finalize) DO carry the SDK headers, so + // the isolation is specific to the storage PUT, not a client-wide accident. + let create = requests + .iter() + .find(|r| r.url.path() == "/v1/uploads") + .expect("a create-session request should have been made"); + assert_eq!( + create + .headers + .get("authorization") + .and_then(|v| v.to_str().ok()), + Some("Bearer test-bearer"), + "the API create request should still carry the bearer token" + ); + assert_eq!( + create + .headers + .get("x-workspace-id") + .and_then(|v| v.to_str().ok()), + Some("ws_test"), + ); +} + +/// Drive a single-PUT upload against a fresh mock server and return the JSON +/// body the SDK sent to `POST /v1/uploads` (so tests can assert the part-size +/// hint). The file written has `file_len` bytes; `opts` is passed through. +async fn capture_create_body(file_len: usize, opts: UploadOptions) -> serde_json::Value { + let server = MockServer::start().await; + let storage_url = format!("{}/storage/hint", server.uri()); + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_hint", + "headers": {}, + "mode": "single", + "upload_id": "upl_hint", + "url": storage_url, + }))) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/hint")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"h\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_hint/finalize")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "created_at": "2026-06-25T00:00:00Z", + "size_bytes": file_len, + "status": "ready", + "upload_id": "upl_hint", + }))) + .mount(&server) + .await; + + let contents = vec![0u8; file_len]; + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, opts).await; + let _ = std::fs::remove_file(&path); + result.expect("upload should succeed"); + + let requests = server.received_requests().await.expect("requests recorded"); + let create = requests + .iter() + .find(|r| r.url.path() == "/v1/uploads") + .expect("a create-session request should have been made"); + serde_json::from_slice(&create.body).expect("create body must be valid JSON") +} + +#[tokio::test] +async fn create_sends_8mib_part_size_hint_for_normal_file() { + // A normal-sized file sends the default 8 MiB hint (auto-scaling only kicks + // in for very large files). + let body = capture_create_body(4096, UploadOptions::default()).await; + assert_eq!( + body.get("part_size").and_then(|v| v.as_u64()), + Some(hotdata::DEFAULT_PART_SIZE), + "normal file must send the 8 MiB default hint, body: {body}" + ); +} + +#[tokio::test] +async fn create_part_size_hint_matches_auto_scaler() { + // Whatever the SDK sends must equal the public pure scaler for the file's + // size, so the CLI can reason about it. (The scaler's large-file behavior is + // unit-tested directly in src/uploads.rs without writing a giant file.) + let file_len = 64 * 1024usize; // 64 KiB + let body = capture_create_body(file_len, UploadOptions::default()).await; + assert_eq!( + body.get("part_size").and_then(|v| v.as_u64()), + Some(hotdata::auto_part_size_hint(file_len as u64)), + "auto hint on the wire must match auto_part_size_hint(); body: {body}" + ); + // Sanity: the auto scaler keeps the part count well under the S3 hard limit. + let hint = hotdata::auto_part_size_hint(file_len as u64); + assert!(hint.is_multiple_of(1024 * 1024), "hint must be a whole MiB"); +} + +#[tokio::test] +async fn create_explicit_part_size_overrides_auto_hint() { + // An explicit opts.part_size must be forwarded verbatim, overriding the + // auto-scaler. + let explicit = 16 * 1024 * 1024u64; // 16 MiB + let opts = UploadOptions { + part_size: Some(explicit), + ..UploadOptions::default() + }; + let body = capture_create_body(4096, opts).await; + assert_eq!( + body.get("part_size").and_then(|v| v.as_u64()), + Some(explicit), + "explicit part_size must override the auto hint, body: {body}" + ); +} + +// --------------------------------------------------------------------------- +// Codex review follow-ups +// --------------------------------------------------------------------------- + +/// Mock the standard create-session response for a SINGLE upload at `storage_url`. +fn mock_single_session(upload_id: &str, token: &str, storage_url: &str) -> serde_json::Value { + serde_json::json!({ + "finalize_token": token, + "headers": {}, + "mode": "single", + "upload_id": upload_id, + "url": storage_url, + }) +} + +/// Mock a finalize success body. +fn mock_finalize_ok(upload_id: &str, size: usize) -> serde_json::Value { + serde_json::json!({ + "created_at": "2026-06-25T00:00:00Z", + "size_bytes": size, + "status": "ready", + "upload_id": upload_id, + }) +} + +/// #2: default headers set on the SDK's MAIN client must NOT reach storage PUTs. +/// We install an Authorization + X-Workspace-Id default header on the reqwest +/// client the SDK uses, then assert the storage PUT carries neither (it goes out +/// on the dedicated bare storage client). +#[tokio::test] +async fn default_headers_on_main_client_do_not_reach_storage_put() { + let server = MockServer::start().await; + let storage_url = format!("{}/storage/iso2", server.uri()); + let contents = b"bytes with poisoned client"; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_single_session( + "upl_iso2", + "ftok_iso2", + &storage_url, + )), + ) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/iso2")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"iso2\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_iso2/finalize")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_finalize_ok("upl_iso2", contents.len())), + ) + .mount(&server) + .await; + + // A reqwest client carrying default headers a host might set. + let mut default_headers = reqwest::header::HeaderMap::new(); + default_headers.insert( + reqwest::header::AUTHORIZATION, + "Bearer host-default".parse().unwrap(), + ); + default_headers.insert("X-Workspace-Id", "ws-default".parse().unwrap()); + default_headers.insert(reqwest::header::USER_AGENT, "host-agent/9".parse().unwrap()); + let poisoned = reqwest::Client::builder() + .default_headers(default_headers) + .build() + .unwrap(); + + let mut config = Configuration { + base_path: server.uri(), + bearer_access_token: Some("test-bearer".to_owned()), + client: poisoned, + ..Configuration::default() + }; + config.api_keys.insert( + WORKSPACE_HEADER.to_owned(), + ApiKey { + prefix: None, + key: "ws_test".to_owned(), + }, + ); + let client = Client::from_configuration(config); + + let path = temp_file(contents); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + result.expect("upload should succeed"); + + let requests = server.received_requests().await.expect("requests recorded"); + let put = requests + .iter() + .find(|r| r.url.path() == "/storage/iso2") + .expect("a storage PUT should have been made"); + // The host's default headers must not be on the storage PUT. + assert!( + put.headers.get("authorization").is_none(), + "storage PUT must not carry the host client's default Authorization" + ); + assert!( + put.headers.get("x-workspace-id").is_none(), + "storage PUT must not carry the host client's default X-Workspace-Id" + ); + // The bare client sends reqwest's own default UA, not the host's; assert the + // host UA didn't leak. + assert_ne!( + put.headers.get("user-agent").and_then(|v| v.to_str().ok()), + Some("host-agent/9"), + "storage PUT must not carry the host client's default User-Agent" + ); +} + +/// #1: finalize must be exactly-once — it must NOT retry even when the client's +/// retry policy allows retries. We make finalize return 429 (the status +/// `execute_retrying` DOES retry, with Retry-After: 0) under a policy of 5 +/// retries: if finalize were routed through the retry wrapper it would hit the +/// server 6 times; with retries disabled for finalize it must hit exactly once. +/// This is the discriminating regression guard for the no-retry change (a 500 +/// wouldn't exercise the wrapper at all). +#[tokio::test] +async fn finalize_is_not_retried() { + let server = MockServer::start().await; + let storage_url = format!("{}/storage/fin", server.uri()); + let contents = b"finalize once"; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_single_session( + "upl_fin", + "ftok_fin", + &storage_url, + )), + ) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/fin")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"fin\"")) + .mount(&server) + .await; + // Finalize returns 429 (Retry-After: 0) — the wrapper WOULD retry this if it + // were applied. With retries disabled for finalize, only one request lands. + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_fin/finalize")) + .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", "0")) + .mount(&server) + .await; + + let path = temp_file(contents); + // 5 retries allowed at the policy level — but finalize must ignore them. + let client = test_client_with_retry(&server.uri(), fast_retry(5)); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + + assert!( + matches!(result, Err(UploadError::Finalize(_))), + "a finalize 429 must surface as UploadError::Finalize (not be retried away)" + ); + let finalize_hits = server + .received_requests() + .await + .unwrap() + .iter() + .filter(|r| r.url.path() == "/v1/uploads/upl_fin/finalize") + .count(); + assert_eq!( + finalize_hits, 1, + "finalize must be attempted exactly once despite a retry policy of 5" + ); +} + +/// #1 (partner): per-part PUTs ARE retryable. A part returns 500 once (a 429 +/// would also retry, but we use the SDK's pre-response handling for transport; +/// here we assert the 429 path which execute_retrying retries) then 200, and the +/// upload still completes. We use 429 because that is what execute_retrying +/// retries on a status. +#[tokio::test] +async fn part_put_retries_429_then_succeeds() { + let server = MockServer::start().await; + let part_size = 5usize; + let contents: Vec = (0u8..10).collect(); // 2 parts: 5, 5 + let part_urls: Vec = (1..=2) + .map(|i| format!("{}/storage/rpart/{i}", server.uri())) + .collect(); + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_rp", + "headers": {}, + "mode": "multipart", + "part_size": part_size, + "part_urls": part_urls, + "upload_id": "upl_rp", + }))) + .mount(&server) + .await; + + // Part 1: one 429 (Retry-After: 0) then 200. + Mock::given(method("PUT")) + .and(path("/storage/rpart/1")) + .respond_with(ResponseTemplate::new(429).insert_header("Retry-After", "0")) + .up_to_n_times(1) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/rpart/1")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"rp1\"")) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/rpart/2")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"rp2\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_rp/finalize")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_finalize_ok("upl_rp", contents.len())), + ) + .mount(&server) + .await; + + let path = temp_file(&contents); + let client = test_client_with_retry(&server.uri(), fast_retry(5)); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + result.expect("multipart upload should complete after a part retry"); + + // Part 1 was hit twice (429 then 200). + let p1_hits = server + .received_requests() + .await + .unwrap() + .iter() + .filter(|r| r.url.path() == "/storage/rpart/1") + .count(); + assert_eq!(p1_hits, 2, "part 1 must be retried after the 429"); +} + +/// Missing ETag on a part PUT response surfaces as UploadError::MissingETag. +#[tokio::test] +async fn missing_etag_is_an_error() { + let server = MockServer::start().await; + let part_size = 5usize; + let contents: Vec = (0u8..5).collect(); // 1 part + let part_urls = vec![format!("{}/storage/noetag/1", server.uri())]; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_ne", + "headers": {}, + "mode": "multipart", + "part_size": part_size, + "part_urls": part_urls, + "upload_id": "upl_ne", + }))) + .mount(&server) + .await; + // 200 but NO ETag header. + Mock::given(method("PUT")) + .and(path("/storage/noetag/1")) + .respond_with(ResponseTemplate::new(200)) + .mount(&server) + .await; + + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + assert!( + matches!(result, Err(UploadError::MissingETag { part_number: 1 })), + "missing ETag must surface as MissingETag, got {result:?}" + ); +} + +/// A storage 4xx/5xx surfaces as UploadError::StorageStatus. +#[tokio::test] +async fn storage_error_status_is_surfaced() { + let server = MockServer::start().await; + let storage_url = format!("{}/storage/403", server.uri()); + let contents = b"denied"; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_single_session( + "upl_403", + "ftok_403", + &storage_url, + )), + ) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/403")) + .respond_with(ResponseTemplate::new(403).set_body_string("AccessDenied")) + .mount(&server) + .await; + + let path = temp_file(contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + match result { + Err(UploadError::StorageStatus { status, body, .. }) => { + assert_eq!(status, reqwest::StatusCode::FORBIDDEN); + assert!( + body.contains("AccessDenied"), + "body should carry storage error: {body}" + ); + } + other => panic!("expected StorageStatus(403), got {other:?}"), + } +} + +/// 501 PRESIGN_UNSUPPORTED on create-session is a hard error — NO /v1/files +/// fallback is attempted. +#[tokio::test] +async fn presign_unsupported_is_hard_error_no_fallback() { + let server = MockServer::start().await; + let contents = b"no presign here"; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(501).set_body_json(serde_json::json!({ + "error": { "code": "PRESIGN_UNSUPPORTED", "message": "no presign" } + }))) + .mount(&server) + .await; + + let path = temp_file(contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + assert!( + matches!(result, Err(UploadError::CreateSession(_))), + "501 must surface as CreateSession error, got {result:?}" + ); + // No request to the legacy proxy. + let hit_files = server + .received_requests() + .await + .unwrap() + .iter() + .any(|r| r.url.path() == "/v1/files"); + assert!(!hit_files, "must NOT fall back to POST /v1/files"); +} + +/// Malformed multipart sessions are rejected as MalformedSession. +#[tokio::test] +async fn malformed_multipart_sessions_are_rejected() { + // Helper: run an upload whose create-session returns the given multipart JSON + // overrides, and return the result. + async fn run(session: serde_json::Value, file_len: usize) -> Result<(), UploadError> { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(session)) + .mount(&server) + .await; + // Accept any PUT so a (wrongly) issued part doesn't fail for another + // reason; we expect to reject BEFORE PUTting. + Mock::given(method("PUT")) + .and(path_regex(r"^/storage/.*$")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"x\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path_regex(r"^/v1/uploads/.*/finalize$")) + .respond_with(ResponseTemplate::new(200).set_body_json(mock_finalize_ok("u", file_len))) + .mount(&server) + .await; + + let contents = vec![0u8; file_len]; + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + result.map(|_| ()) + } + + let su = "http://example.invalid/storage/p1"; + + // Zero part_size. + let r = run( + serde_json::json!({ + "finalize_token": "t", "headers": {}, "mode": "multipart", + "part_size": 0, "part_urls": [su], "upload_id": "u", + }), + 10, + ) + .await; + assert!( + matches!(r, Err(UploadError::MalformedSession(_))), + "zero part_size: {r:?}" + ); + + // Negative part_size. + let r = run( + serde_json::json!({ + "finalize_token": "t", "headers": {}, "mode": "multipart", + "part_size": -5, "part_urls": [su], "upload_id": "u", + }), + 10, + ) + .await; + assert!( + matches!(r, Err(UploadError::MalformedSession(_))), + "negative part_size: {r:?}" + ); + + // Empty part_urls. + let r = run( + serde_json::json!({ + "finalize_token": "t", "headers": {}, "mode": "multipart", + "part_size": 5, "part_urls": [], "upload_id": "u", + }), + 10, + ) + .await; + assert!( + matches!(r, Err(UploadError::MalformedSession(_))), + "empty part_urls: {r:?}" + ); + + // Too FEW URLs: 10 bytes / 5 = 2 parts, but only 1 URL. + let r = run( + serde_json::json!({ + "finalize_token": "t", "headers": {}, "mode": "multipart", + "part_size": 5, "part_urls": [su], "upload_id": "u", + }), + 10, + ) + .await; + assert!( + matches!(r, Err(UploadError::MalformedSession(_))), + "too few URLs: {r:?}" + ); + + // Too MANY URLs: 10 bytes / 5 = 2 parts, but 3 URLs. + let r = run( + serde_json::json!({ + "finalize_token": "t", "headers": {}, "mode": "multipart", + "part_size": 5, "part_urls": [su, su, su], "upload_id": "u", + }), + 10, + ) + .await; + assert!( + matches!(r, Err(UploadError::MalformedSession(_))), + "too many URLs: {r:?}" + ); +} + +/// Server-provided Content-Type in the session `headers` map is replayed +/// verbatim on the storage PUT. +#[tokio::test] +async fn server_content_type_is_replayed_on_storage_put() { + let server = MockServer::start().await; + let storage_url = format!("{}/storage/ct", server.uri()); + let contents = b"typed bytes"; + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_ct", + "headers": { "Content-Type": "application/parquet" }, + "mode": "single", + "upload_id": "upl_ct", + "url": storage_url, + }))) + .mount(&server) + .await; + Mock::given(method("PUT")) + .and(path("/storage/ct")) + .respond_with(ResponseTemplate::new(200).insert_header("ETag", "\"ct\"")) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_ct/finalize")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_finalize_ok("upl_ct", contents.len())), + ) + .mount(&server) + .await; + + let path = temp_file(contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, UploadOptions::default()).await; + let _ = std::fs::remove_file(&path); + result.expect("upload should succeed"); + + let requests = server.received_requests().await.expect("requests recorded"); + let put = requests + .iter() + .find(|r| r.url.path() == "/storage/ct") + .expect("storage PUT"); + assert_eq!( + put.headers + .get("content-type") + .and_then(|v| v.to_str().ok()), + Some("application/parquet"), + "server-provided Content-Type must be replayed verbatim (no charset appended)" + ); +} + +/// A bare blocking TCP server that plays "object storage" for part PUTs and +/// GENUINELY measures concurrency: each connection is handled on its own thread, +/// which reads the full request (headers + Content-Length body), bumps an active +/// counter, HOLDS it for `hold` (so overlapping PUTs actually coexist), records +/// the peak, then decrements and replies `200 OK` with an `ETag`. Because the +/// in-flight count is held across the sleep — unlike wiremock's synchronous +/// `Respond`, which can't span its own response delay — `peak` reflects true +/// overlap and can be asserted to reach (not just stay under) the cap. +/// +/// Returns `(base_url, peak, served)`. Mirrors the raw-TCP pattern in +/// `src/test_support.rs`. +fn concurrency_storage_server(hold: Duration) -> (String, Arc, Arc) { + use std::io::{Read, Write}; + use std::net::TcpListener; + + let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port"); + let addr = listener.local_addr().expect("local addr"); + let active = Arc::new(AtomicUsize::new(0)); + let peak = Arc::new(AtomicUsize::new(0)); + let served = Arc::new(AtomicUsize::new(0)); + let (a, p, s) = (Arc::clone(&active), Arc::clone(&peak), Arc::clone(&served)); + + std::thread::spawn(move || { + for stream in listener.incoming() { + let Ok(mut sock) = stream else { continue }; + let (a, p, s, hold) = (Arc::clone(&a), Arc::clone(&p), Arc::clone(&s), hold); + std::thread::spawn(move || { + // Read until we have the full headers, then drain the body by its + // declared Content-Length so the client finishes writing before + // we respond. + let mut buf = Vec::new(); + let mut tmp = [0u8; 4096]; + let header_end = loop { + match sock.read(&mut tmp) { + Ok(0) => break None, + Ok(n) => { + buf.extend_from_slice(&tmp[..n]); + if let Some(pos) = find_subslice(&buf, b"\r\n\r\n") { + break Some(pos + 4); + } + } + Err(_) => break None, + } + }; + if let Some(body_start) = header_end { + let headers = String::from_utf8_lossy(&buf[..body_start]).to_lowercase(); + let content_len = headers + .lines() + .find_map(|l| l.strip_prefix("content-length:")) + .and_then(|v| v.trim().parse::().ok()) + .unwrap_or(0); + let mut have = buf.len() - body_start; + while have < content_len { + match sock.read(&mut tmp) { + Ok(0) => break, + Ok(n) => have += n, + Err(_) => break, + } + } + } + + // Now genuinely occupy an in-flight slot for the hold duration. + let now = a.fetch_add(1, Ordering::SeqCst) + 1; + p.fetch_max(now, Ordering::SeqCst); + std::thread::sleep(hold); + a.fetch_sub(1, Ordering::SeqCst); + s.fetch_add(1, Ordering::SeqCst); + + let resp = "HTTP/1.1 200 OK\r\nETag: \"c\"\r\ncontent-length: 0\r\n\ + connection: close\r\n\r\n"; + let _ = sock.write_all(resp.as_bytes()); + let _ = sock.flush(); + }); + } + }); + + (format!("http://{addr}"), peak, served) +} + +/// Find the first index of `needle` in `haystack`. +fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option { + haystack.windows(needle.len()).position(|w| w == needle) +} + +/// Bounded in-flight concurrency, measured faithfully. With 6 parts and +/// `max_concurrency = 2` (and 8 MiB-equivalent small parts so the memory budget +/// doesn't reduce the cap), exactly 2 PUTs overlap at the peak — never more, and +/// it actually reaches 2. The storage server holds each PUT for 100ms so the +/// JoinSet bound is genuinely exercised. +#[tokio::test] +async fn in_flight_concurrency_is_bounded_and_reached() { + let (storage_base, peak, served) = concurrency_storage_server(Duration::from_millis(100)); + + let server = MockServer::start().await; + let part_size = 5usize; + // 6 parts of 5 bytes: 30 bytes total. + let contents: Vec = (0u8..30).collect(); + let part_urls: Vec = (1..=6) + .map(|i| format!("{storage_base}/cpart/{i}")) + .collect(); + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_c", + "headers": {}, + "mode": "multipart", + "part_size": part_size, + "part_urls": part_urls, + "upload_id": "upl_c", + }))) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_c/finalize")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_finalize_ok("upl_c", contents.len())), + ) + .mount(&server) + .await; + + let opts = UploadOptions { + max_concurrency: Some(2), + ..UploadOptions::default() + }; + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, opts).await; + let _ = std::fs::remove_file(&path); + result.expect("upload should complete"); + + // All 6 parts were served by the storage server. + assert_eq!(served.load(Ordering::SeqCst), 6, "all 6 parts must be PUT"); + // Genuine overlap: peak reaches the cap and never exceeds it. + let observed = peak.load(Ordering::SeqCst); + assert_eq!( + observed, 2, + "in-flight concurrency must reach exactly max_concurrency=2, observed {observed}" + ); +} + +/// Serial when `max_concurrency = 1`: no two PUTs ever overlap. +#[tokio::test] +async fn serial_when_max_concurrency_is_one() { + let (storage_base, peak, served) = concurrency_storage_server(Duration::from_millis(40)); + + let server = MockServer::start().await; + let part_size = 5usize; + let contents: Vec = (0u8..20).collect(); // 4 parts + let part_urls: Vec = (1..=4) + .map(|i| format!("{storage_base}/spart/{i}")) + .collect(); + + Mock::given(method("POST")) + .and(path("/v1/uploads")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "finalize_token": "ftok_s", + "headers": {}, + "mode": "multipart", + "part_size": part_size, + "part_urls": part_urls, + "upload_id": "upl_s", + }))) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path("/v1/uploads/upl_s/finalize")) + .respond_with( + ResponseTemplate::new(200).set_body_json(mock_finalize_ok("upl_s", contents.len())), + ) + .mount(&server) + .await; + + let opts = UploadOptions { + max_concurrency: Some(1), + ..UploadOptions::default() + }; + let path = temp_file(&contents); + let client = test_client(&server.uri()); + let result = client.upload_file(&path, opts).await; + let _ = std::fs::remove_file(&path); + result.expect("upload should complete"); + + assert_eq!(served.load(Ordering::SeqCst), 4, "all 4 parts must be PUT"); + assert_eq!( + peak.load(Ordering::SeqCst), + 1, + "max_concurrency=1 must keep PUTs strictly serial (no overlap)" + ); +}