diff --git a/Cargo.lock b/Cargo.lock index b6f306c..dd102ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1165,9 +1165,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hotdata" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a79ce05333e09411029842117c625b5350d939697a450b7b6e4cc7251b87c28b" +checksum = "b0b519aaebaee3089cf13b7eb47a64517271e881200319d51ebbb6fd3cbf3909" dependencies = [ "arrow-array", "arrow-ipc", @@ -1194,14 +1194,12 @@ dependencies = [ "arrow", "async-trait", "base64", - "bytes", "clap", "clap_complete", "crossterm 0.28.1", "directories", "dotenvy", "flate2", - "futures-core", "hotdata", "indicatif", "inquire", @@ -1224,7 +1222,6 @@ dependencies = [ "tempfile", "tiny_http", "tokio", - "tokio-stream", "toml", ] @@ -3276,17 +3273,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.18" diff --git a/Cargo.toml b/Cargo.toml index 28acb7e..657216a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,29 +14,23 @@ path = "src/main.rs" [dependencies] # Hotdata Rust SDK. The CLI's sync wrapper (src/sdk.rs) drives this async SDK # behind a shared multi-thread tokio runtime. The `arrow` feature backs result -# decode; `upload_stream` carries `content_length` (sized body, not chunked, so -# the server can fast-fail an oversized upload — see src/sdk.rs::Api::upload_stream). -hotdata = { version = "0.4.0", features = ["arrow"] } +# decode; `upload_file` runs the presigned direct-to-storage upload flow +# (see src/sdk.rs::Api::upload). +hotdata = { version = "0.5.0", features = ["arrow"] } # Shared multi-thread runtime for the sync wrapper; block_on is called -# concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the -# mpsc channel that bridges the blocking upload reader into an async byte stream. -tokio = { version = "1", features = ["rt-multi-thread", "sync"] } +# concurrently from rayon worker threads (see src/indexes.rs). The presigned +# upload (src/sdk.rs::Api::upload) drives the SDK's async `upload_file` on this +# same runtime. +tokio = { version = "1", features = ["rt-multi-thread"] } # CliTokenProvider implements the SDK's #[async_trait] BearerTokenProvider. async-trait = "0.1" -# Bridge the progress-wrapped blocking upload reader into the async -# `Stream>` the SDK's `upload_stream` consumes: a -# spawn_blocking task feeds chunks through a tokio mpsc channel, wrapped as a -# Stream by tokio-stream's ReceiverStream. `futures-core` names the Stream -# bound; `bytes` matches the SDK's chunk type. -bytes = "1" -futures-core = "0.3" -tokio-stream = "0.1" anstyle = "1.0.13" clap = { version = "4", features = ["derive"] } directories = "6" # Matches the SDK's reqwest 0.13 so the sdk seam can name the `reqwest::Client` # type backing `Configuration.client`; `blocking` serves the CLI's own -# synchronous paths (PKCE/token mints in jwt.rs + the streaming /files upload). +# synchronous paths (PKCE/token mints in jwt.rs + the `--url` source download +# in databases.rs that feeds the presigned upload). reqwest = { version = "0.13", features = ["blocking", "json"] } rayon = "1.10" serde = { version = "1", features = ["derive"] } diff --git a/src/command.rs b/src/command.rs index 32495ad..0acfc91 100644 --- a/src/command.rs +++ b/src/command.rs @@ -559,7 +559,7 @@ pub enum DatabasesCommands { #[arg(long, conflicts_with_all = ["file", "upload_id"])] url: Option, - /// Use a previously staged upload ID from `POST /v1/files` instead of uploading + /// Use a previously staged upload ID from `POST /v1/uploads` instead of uploading #[arg(long, conflicts_with_all = ["file", "url"])] upload_id: Option, }, @@ -640,7 +640,7 @@ pub enum DatabaseTablesCommands { #[arg(long, conflicts_with_all = ["file", "upload_id"])] url: Option, - /// Use a previously staged upload ID from `POST /v1/files` instead of uploading + /// Use a previously staged upload ID from `POST /v1/uploads` instead of uploading #[arg(long, conflicts_with_all = ["file", "url"])] upload_id: Option, }, diff --git a/src/databases.rs b/src/databases.rs index d2dc7da..a63794f 100644 --- a/src/databases.rs +++ b/src/databases.rs @@ -337,26 +337,38 @@ fn table_rows(catalog: &str, tables: Vec) -> Vec { .collect() } -fn finish_upload( - api: &Api, - reader: impl std::io::Read + Send + 'static, - size: Option, - pb: &ProgressBar, -) -> String { - // Stream the body to `POST /v1/files` through the SDK seam, which drives the - // SDK's `upload_stream` on a dedicated no-timeout client (a 10 GB+ parquet - // far outlives the shared 300s request timeout) and bridges this blocking, - // progress-wrapped `reader` into the async byte stream the SDK consumes. - // `size` becomes the `Content-Length` so the server fast-fails an oversized - // upload before writing bytes; the `--url` source may not know it, hence - // `Option`. Carries the same auth + scope headers as every other SDK call. - let result = api.upload_stream(reader, size); - pb.finish_and_clear(); +/// The shared indicatif progress-bar template for an upload: a spinner, a +/// byte-granular bar, the bytes-done / total, and an ETA. +fn upload_progress_style() -> ProgressStyle { + ProgressStyle::with_template( + "{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})", + ) + .unwrap() + .progress_chars("=>-") +} - match result { - Ok(id) => id, - Err(e) => e.exit(), - } +/// Upload an already-on-disk parquet file via the SDK's presigned direct-to- +/// storage flow, driving a single aggregate progress bar from the SDK's +/// byte-granular progress callback. Returns the finalized upload id, or the +/// seam's error (a `501 PRESIGN_UNSUPPORTED` surfaces an actionable message, +/// not a fallback). The caller decides how to surface failure — `--url` must +/// clean up its temp file before exiting, so this returns rather than exits. +fn upload_parquet_path(api: &Api, path: &Path, size: u64) -> Result { + let pb = ProgressBar::new(size); + pb.set_style(upload_progress_style()); + + // The SDK reports cumulative `(done, total)`; mirror it onto the bar. We + // `set_length(total)` defensively so the bar tracks the SDK's own notion of + // total even though it equals `size` here. + let cb_pb = pb.clone(); + let progress: hotdata::UploadProgress = std::sync::Arc::new(move |done, total| { + cb_pb.set_length(total); + cb_pb.set_position(done); + }); + + let result = api.upload(path, progress); + pb.finish_and_clear(); + result } fn upload_parquet_file(api: &Api, path: &str) -> String { @@ -369,25 +381,15 @@ fn upload_parquet_file(api: &Api, path: &str) -> String { std::process::exit(1); } - let f = match std::fs::File::open(path) { - Ok(f) => f, + let file_size = match std::fs::metadata(path) { + Ok(m) => m.len(), Err(e) => { eprintln!("error opening file '{path}': {e}"); std::process::exit(1); } }; - let file_size = f.metadata().map(|m| m.len()).unwrap_or(0); - let pb = ProgressBar::new(file_size); - pb.set_style( - ProgressStyle::with_template( - "{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})", - ) - .unwrap() - .progress_chars("=>-"), - ); - let reader = pb.wrap_read(f); - finish_upload(api, reader, Some(file_size), &pb) + upload_parquet_path(api, Path::new(path), file_size).unwrap_or_else(|e| e.exit()) } fn upload_parquet_url(api: &Api, url: &str) -> String { @@ -398,6 +400,11 @@ fn upload_parquet_url(api: &Api, url: &str) -> String { std::process::exit(1); } + // The presigned upload needs a seekable, size-known source (the SDK opens + // the path, declares its byte count, and PUTs it directly to storage), so + // download the URL to a temp file first, then upload that file on the same + // path as `--file`. The temp file is removed before this returns on both + // success and failure (see `upload_temp_file`). let resp = match reqwest::blocking::get(url) { Ok(r) => r, Err(e) => { @@ -415,16 +422,11 @@ fn upload_parquet_url(api: &Api, url: &str) -> String { } let content_length = resp.content_length(); - let pb = match content_length { + // Download progress: a byte bar when the length is known, else a spinner. + let dl_pb = match content_length { Some(len) => { let pb = ProgressBar::new(len); - pb.set_style( - ProgressStyle::with_template( - "{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})", - ) - .unwrap() - .progress_chars("=>-"), - ); + pb.set_style(upload_progress_style()); pb } None => { @@ -436,8 +438,59 @@ fn upload_parquet_url(api: &Api, url: &str) -> String { pb } }; - let reader = pb.wrap_read(resp); - finish_upload(api, reader, content_length, &pb) + + let temp = match download_to_temp(resp, &dl_pb) { + Ok(t) => t, + Err(e) => { + dl_pb.finish_and_clear(); + eprintln!("error downloading '{url}': {e}"); + std::process::exit(1); + } + }; + dl_pb.finish_and_clear(); + + let size = std::fs::metadata(temp.path()).map(|m| m.len()).unwrap_or(0); + upload_temp_file(temp, |path| upload_parquet_path(api, path, size)).unwrap_or_else(|e| e.exit()) +} + +/// Upload an already-downloaded temp file, guaranteeing the file is deleted +/// before returning — on both success and failure. +/// +/// The temp file MUST be cleaned up here, on return, rather than left to a +/// guard in the caller's scope: the caller exits the process via +/// [`ApiError::exit`] (`std::process::exit`) on the `Err` arm, and +/// `process::exit` runs no destructors. Owning `temp` in this function means it +/// drops (deleting a potentially multi-GB download) before the caller can exit. +fn upload_temp_file(temp: tempfile::NamedTempFile, upload: F) -> Result +where + F: FnOnce(&Path) -> Result, +{ + let result = upload(temp.path()); + // Delete now, while still inside this function, so cleanup precedes any + // `process::exit` the caller performs on `Err`. + drop(temp); + result +} + +/// Stream a blocking HTTP response body to a freshly created temp file, +/// advancing `pb` as bytes land. Returns the open [`NamedTempFile`], which +/// deletes the file on drop. Created atomically with `O_EXCL` + 0600 perms via +/// `tempfile`, so it can't be redirected by a pre-planted symlink. +fn download_to_temp( + resp: reqwest::blocking::Response, + pb: &ProgressBar, +) -> std::io::Result { + use std::io::Write; + + let mut temp = tempfile::Builder::new() + .prefix("hotdata-upload-") + .suffix(".parquet") + .tempfile()?; + + let mut reader = pb.wrap_read(resp); + std::io::copy(&mut reader, temp.as_file_mut())?; + temp.as_file_mut().flush()?; + Ok(temp) } fn collect_tables(api: &Api, connection_id: &str, schema: Option<&str>) -> Vec { @@ -1717,4 +1770,44 @@ mod tests { assert_eq!(resp.token, "jwt-x"); assert_eq!(resp.database_id, "dbid_abc"); } + + // The `--url` path downloads to a temp file and then exits via + // `process::exit` if the upload fails. Because `process::exit` runs no + // destructors, the temp file must be deleted by `upload_temp_file` before + // it returns the `Err` the caller exits on — not by a guard in the caller's + // scope. These tests pin that contract for both arms. + #[test] + fn upload_temp_file_removes_temp_on_upload_failure() { + let temp = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .unwrap(); + let path = temp.path().to_path_buf(); + assert!(path.exists()); + + let result = upload_temp_file(temp, |p| { + assert!(p.exists(), "file present while the upload runs"); + Err(ApiError::Transport("upload boom".into())) + }); + + assert!(result.is_err()); + assert!( + !path.exists(), + "temp file must be removed before the failure is returned (caller exits without unwinding)" + ); + } + + #[test] + fn upload_temp_file_removes_temp_on_success() { + let temp = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .unwrap(); + let path = temp.path().to_path_buf(); + + let result = upload_temp_file(temp, |_p| Ok("upid_123".to_string())); + + assert_eq!(result.unwrap(), "upid_123"); + assert!(!path.exists(), "temp file must be removed on success"); + } } diff --git a/src/sdk.rs b/src/sdk.rs index 454c0a9..18aab1d 100644 --- a/src/sdk.rs +++ b/src/sdk.rs @@ -24,12 +24,14 @@ //! the SDK passes through unchanged; the CLI keeps full ownership of //! session.json and the refresh table. +use std::path::Path; use std::sync::Arc; use std::sync::OnceLock; use hotdata::Client; use hotdata::apis::configuration::{ApiKey, Configuration}; use hotdata::apis::{Error, ResponseContent}; +use hotdata::{UploadError, UploadOptions, UploadProgress}; use crate::auth; use crate::config; @@ -67,8 +69,8 @@ pub struct Api { } /// Request timeout for SDK-routed calls. Mirrors the old `ApiClient` so a hung -/// server cannot stall the CLI indefinitely. The streaming `/files` upload -/// keeps its own no-timeout client on the raw-HTTP path. +/// server cannot stall the CLI indefinitely. The presigned upload keeps its own +/// no-timeout client (the storage `PUT`s can run for minutes). const HTTP_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); /// TCP keepalive probe interval, matching the old client. const TCP_KEEPALIVE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30); @@ -84,12 +86,13 @@ fn sdk_http_client() -> reqwest::Client { .expect("reqwest client with timeout should build") } -/// The `reqwest::Client` backing the streaming `/files` upload. Deliberately has -/// **no** request timeout: an upload's duration scales with file size and uplink -/// (a 10 GB parquet far outlives [`HTTP_REQUEST_TIMEOUT`], which is sized for -/// slow server-side work), so a wall-clock cap would abort a healthy-but-slow -/// transfer. TCP keepalive is kept so a genuinely dead peer is still reaped by -/// the OS; a live-but-slow upload runs to completion and the user can Ctrl-C. +/// The `reqwest::Client` backing the presigned upload's storage `PUT`s. +/// Deliberately has **no** request timeout: an upload's duration scales with +/// file size and uplink (a 10 GB parquet far outlives [`HTTP_REQUEST_TIMEOUT`], +/// which is sized for slow server-side work), so a wall-clock cap would abort a +/// healthy-but-slow transfer. TCP keepalive is kept so a genuinely dead peer is +/// still reaped by the OS; a live-but-slow upload runs to completion and the +/// user can Ctrl-C. fn upload_reqwest_client() -> reqwest::Client { reqwest::Client::builder() .tcp_keepalive(TCP_KEEPALIVE_INTERVAL) @@ -97,48 +100,30 @@ fn upload_reqwest_client() -> reqwest::Client { .expect("reqwest client should build without a timeout") } -/// Size of each chunk pulled from the blocking reader (1 MiB). Large enough to -/// keep per-chunk overhead negligible on a multi-GB upload, small enough that an -/// in-flight chunk is a trivial allocation. -const UPLOAD_CHUNK_SIZE: usize = 1 << 20; -/// Bound on chunks buffered between the blocking reader and the async sender. -/// Caps in-flight memory so a fast local disk can't outrun a slow uplink; the -/// read task blocks on a full channel (back-pressure). -const UPLOAD_CHANNEL_DEPTH: usize = 4; - -/// Bridge a blocking [`Read`](std::io::Read) source into the async -/// `Stream>` the SDK's `upload_stream` consumes. -/// -/// A `spawn_blocking` task reads fixed-size chunks and forwards them through a -/// bounded tokio mpsc channel; the returned [`ReceiverStream`] yields them to -/// the request body. The blocking task lives on the runtime's blocking pool, so -/// it does not stall an async worker, and a full channel back-pressures the -/// reader (which keeps the caller's progress bar — wrapped around `reader` — -/// honest). If the receiver is dropped (request aborted/failed) the send errors -/// and the task exits; a read error is forwarded as the stream's terminal item. -fn reader_into_stream( - mut reader: impl std::io::Read + Send + 'static, -) -> impl futures_core::Stream> + Send + 'static { - let (tx, rx) = tokio::sync::mpsc::channel(UPLOAD_CHANNEL_DEPTH); - rt().spawn_blocking(move || { - let mut buf = vec![0u8; UPLOAD_CHUNK_SIZE]; - loop { - match reader.read(&mut buf) { - Ok(0) => break, - Ok(n) => { - let chunk = bytes::Bytes::copy_from_slice(&buf[..n]); - if tx.blocking_send(Ok(chunk)).is_err() { - break; // receiver gone — request aborted - } - } - Err(e) => { - let _ = tx.blocking_send(Err(e)); - break; - } - } - } - }); - tokio_stream::wrappers::ReceiverStream::new(rx) +/// Content type recorded for a managed-table parquet upload. Advisory only — +/// the managed-table load keys off the parquet file extension, not the upload's +/// recorded content type — but a correct MIME type is the right metadata to +/// persist alongside the file. +const PARQUET_CONTENT_TYPE: &str = "application/vnd.apache.parquet"; + +/// Default number of multipart part `PUT`s the SDK keeps in flight for an +/// upload. 12 saturates a typical uplink without overwhelming the socket pool +/// or buffering too many parts (the SDK still caps effective in-flight by its +/// own memory budget — at the 8 MiB default part size, 256 MiB / 8 MiB = 32 ≥ +/// 12, so 12 is the binding limit). Overridable via +/// [`HOTDATA_UPLOAD_CONCURRENCY`](upload_concurrency). +const UPLOAD_CONCURRENCY: usize = 12; + +/// Resolve the multipart upload concurrency. Honors `HOTDATA_UPLOAD_CONCURRENCY` +/// when set to a parseable, non-zero value (a power-user knob); otherwise +/// [`UPLOAD_CONCURRENCY`]. A present-but-garbage or zero value falls back to the +/// default rather than erroring — this is a tuning hint, not a hard input. +fn upload_concurrency() -> usize { + std::env::var("HOTDATA_UPLOAD_CONCURRENCY") + .ok() + .and_then(|v| v.parse::().ok()) + .filter(|&n| n > 0) + .unwrap_or(UPLOAD_CONCURRENCY) } // Compile-time guarantee that the rayon bound can never silently regress. @@ -210,6 +195,45 @@ impl ApiError { } } + /// Map the SDK's [`UploadError`] (the presigned-upload error type, which is + /// neither an `Error` nor an `ArrowError`) into an [`ApiError`]. + /// + /// The status-bearing variants are preserved as [`ApiError::Status`] so the + /// 4xx re-auth hint in [`print`](Self::print) still fires and the server + /// body is shown verbatim. A `501 PRESIGN_UNSUPPORTED` arrives as a + /// `CreateSession` `ResponseError(501, ...)`, so it surfaces as a status + /// error with the server's actionable message — never a silent fallback. + /// The statusless variants (IO, transport, malformed/missing-ETag) collapse + /// to [`ApiError::Transport`] carrying the SDK's own descriptive message. + /// `UploadError` is `#[non_exhaustive]`, hence the wildcard arm. + pub fn from_upload_error(err: UploadError) -> Self { + // Lift an `Error` (create-session / finalize) into the same Status / + // Transport mapping every other SDK call uses, so a 4xx/5xx from those + // ops keeps its status + body. + fn from_inner(err: Error) -> ApiError { + ApiError::from_sdk(err) + } + match err { + UploadError::CreateSession(e) => from_inner(e), + UploadError::Finalize(e) => from_inner(e), + UploadError::StorageStatus { + status, + part_number, + body, + } => { + let where_ = match part_number { + Some(n) => format!("storage rejected part {n}"), + None => "storage rejected the upload".to_string(), + }; + ApiError::Status { + status, + body: format!("{where_}: {body}"), + } + } + other => ApiError::Transport(format!("upload failed: {other}")), + } + } + /// A printable, single-line description of the failure. /// /// Used where the error is surfaced inline (e.g. folded into a query @@ -637,40 +661,41 @@ impl Api { &self.client } - /// Stream a file/URL body to `POST /v1/files` through the SDK's - /// [`Client::upload_stream`], returning the upload id. + /// Upload a local parquet file directly to object storage via the SDK's + /// presigned-upload flow ([`Client::upload_file`]), returning the upload id. /// - /// Drives the async SDK from the CLI's synchronous call site, like every - /// other seam method, but on a **dedicated no-timeout client**: a 10 GB+ - /// parquet far outlives the shared client's 300s request timeout, so a + /// The flow is `POST /v1/uploads` (open a session) → direct `PUT`(s) to + /// object storage → `POST /v1/uploads/{id}/finalize`; the bytes never proxy + /// back through the API. There is **no** legacy `/v1/files` fallback — a + /// `501 PRESIGN_UNSUPPORTED` surfaces as an [`ApiError::Status`] with the + /// server's message (see [`from_upload_error`](ApiError::from_upload_error)). + /// + /// Driven from the CLI's synchronous call site on a **dedicated no-timeout + /// client**: a 10 GB+ parquet far outlives the shared client's 300s request + /// timeout, and the storage `PUT`s reuse `configuration.client`, so a /// wall-clock cap would abort a healthy-but-slow transfer. We clone the /// configured `Configuration` (same base_path, token_provider, scope - /// api_keys, user-agent) and swap only the reqwest client, so the upload - /// carries the identical auth + headers. - /// - /// `reader` is the progress-wrapped blocking source (file or URL response); - /// it is bridged into the async byte stream the SDK consumes by - /// [`reader_into_stream`]. `content_length`, when known, is sent as - /// `Content-Length` so the server can reject an oversized upload up front - /// (the `--url` path may not know the length, hence `Option`). + /// api_keys, user-agent) and swap only the reqwest client, so the + /// session/finalize calls carry the identical auth + headers. /// - /// The `Content-Type` is left to the SDK default (`application/octet-stream`): - /// the managed-table load keys off the parquet file extension, not the - /// upload's recorded content type. - pub fn upload_stream( - &self, - reader: impl std::io::Read + Send + 'static, - content_length: Option, - ) -> Result { + /// `progress` is the SDK [`UploadProgress`] callback, invoked with + /// cumulative `(bytes_done, total)` as bytes flow; the caller drives a + /// progress bar from it. The recorded content type is parquet (advisory). + pub fn upload(&self, path: &Path, progress: UploadProgress) -> Result { let mut cfg = self.client.configuration().clone(); cfg.client = upload_reqwest_client(); let upload_client = Client::from_configuration(cfg); - let stream = reader_into_stream(reader); + let opts = UploadOptions { + content_type: Some(PARQUET_CONTENT_TYPE.to_string()), + progress: Some(progress), + max_concurrency: Some(upload_concurrency()), + ..UploadOptions::default() + }; let resp = rt() - .block_on(upload_client.upload_stream(stream, None, content_length)) - .map_err(ApiError::from_sdk)?; - Ok(resp.id) + .block_on(upload_client.upload_file(path, opts)) + .map_err(ApiError::from_upload_error)?; + Ok(resp.upload_id) } /// Issue an authenticated `GET {base}/v1{path}` through the SDK @@ -1356,112 +1381,172 @@ mod tests { m.assert(); } - // --- streaming /files upload -------------------------------------------- + // --- presigned direct-to-storage upload --------------------------------- - /// A deterministic ASCII payload of `len` bytes, so a body can be matched - /// exactly to prove the bridged stream delivered every byte in order. - fn upload_payload(len: usize) -> Vec { - (0..len).map(|i| b'a' + (i % 26) as u8).collect() + /// A deterministic ASCII payload of `len` bytes written to a temp parquet + /// file, returned alongside its `tempfile::NamedTempFile` guard (kept alive + /// for the duration of the test so the path stays valid). + fn upload_temp_file(len: usize) -> (tempfile::NamedTempFile, usize) { + use std::io::Write; + let bytes: Vec = (0..len).map(|i| b'a' + (i % 26) as u8).collect(); + let mut tf = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .expect("temp file should create"); + tf.write_all(&bytes).expect("temp write should succeed"); + tf.flush().expect("temp flush should succeed"); + (tf, len) } - fn upload_response_body(id: &str, size: usize) -> String { - format!( - r#"{{"id":"{id}","status":"ready","size_bytes":{size},"content_type":"application/octet-stream","created_at":"2026-06-05T00:00:00Z"}}"# - ) + fn noop_progress() -> UploadProgress { + Arc::new(|_done, _total| {}) } - /// A sized upload (`content_length = Some`) streams the blocking reader - /// through the bridge to `POST /v1/files`, sends a matching `Content-Length` - /// (so the server can fast-fail oversize before reading), and delivers every - /// byte intact. The 5 MiB payload spans several `UPLOAD_CHUNK_SIZE` chunks - /// and overruns `UPLOAD_CHANNEL_DEPTH`, exercising the channel back-pressure. + /// The full presigned flow against a single mock server: the seam opens a + /// session (`POST /v1/uploads`, single mode) whose `url` points back at the + /// same server's storage path, `PUT`s the bytes there, then finalizes + /// (`POST /v1/uploads/{id}/finalize`). The returned id is the finalize + /// response's `upload_id` — proving the seam went through the direct path, + /// never `POST /v1/files`. #[test] - fn upload_stream_sends_sized_body_with_content_length() { - let payload = upload_payload(5 * 1024 * 1024); - let len = payload.len(); + fn upload_runs_presigned_single_put_flow() { + let (tf, len) = upload_temp_file(4096); let mut server = mockito::Server::new(); - let m = server - .mock("POST", "/v1/files") + let put_url = format!("{}/storage/upload_test", server.url()); + + let create = server + .mock("POST", "/v1/uploads") .match_header("Authorization", "Bearer test-jwt") .match_header("X-Workspace-Id", "ws-1") - .match_header("Content-Type", "application/octet-stream") - .match_header("Content-Length", len.to_string().as_str()) - .match_body(mockito::Matcher::Exact( - String::from_utf8(payload.clone()).unwrap(), + .with_status(200) + .with_header("content-type", "application/json") + .with_body(format!( + r#"{{"finalize_token":"ft_1","headers":{{}},"mode":"single","upload_id":"upload_test","url":"{put_url}"}}"# )) - .with_status(201) + .create(); + let put = server + .mock("PUT", "/storage/upload_test") + .with_status(200) + .with_header("ETag", "\"etag-1\"") + .create(); + let finalize = server + .mock("POST", "/v1/uploads/upload_test/finalize") + .match_header("X-Upload-Finalize-Token", "ft_1") + .with_status(200) .with_header("content-type", "application/json") - .with_body(upload_response_body("upload_sized", len)) + .with_body(format!( + r#"{{"upload_id":"upload_test","status":"ready","size_bytes":{len},"created_at":"2026-06-05T00:00:00Z"}}"# + )) .create(); let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); let id = api - .upload_stream(std::io::Cursor::new(payload), Some(len as u64)) - .expect("sized upload should succeed"); + .upload(tf.path(), noop_progress()) + .expect("presigned upload should succeed"); - assert_eq!(id, "upload_sized"); - m.assert(); + assert_eq!(id, "upload_test"); + create.assert(); + put.assert(); + finalize.assert(); } - /// With an unknown length (`content_length = None`, the `--url` source) the - /// body streams chunked and still arrives intact. Multiple chunks, so the - /// bridge is genuinely streaming rather than buffering a single read. + /// A `501 PRESIGN_UNSUPPORTED` on create-session surfaces as an + /// `ApiError::Status` carrying the server's message — never a silent + /// fallback to a legacy path — so the CLI prints an actionable error. #[test] - fn upload_stream_streams_chunked_when_length_unknown() { - let payload = upload_payload(3 * 1024 * 1024); - let len = payload.len(); + fn upload_maps_501_presign_unsupported_to_status() { + let (tf, _len) = upload_temp_file(64); let mut server = mockito::Server::new(); - let m = server - .mock("POST", "/v1/files") - .match_body(mockito::Matcher::Exact( - String::from_utf8(payload.clone()).unwrap(), - )) - .with_status(201) + let create = server + .mock("POST", "/v1/uploads") + .with_status(501) .with_header("content-type", "application/json") - .with_body(upload_response_body("upload_chunked", len)) + .with_body(r#"{"error":{"code":"PRESIGN_UNSUPPORTED","message":"presigned uploads are not enabled"}}"#) .create(); let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); - let id = api - .upload_stream(std::io::Cursor::new(payload), None) - .expect("chunked upload should succeed"); + let err = api + .upload(tf.path(), noop_progress()) + .expect_err("a 501 must map to an error, not a fallback"); - assert_eq!(id, "upload_chunked"); - m.assert(); + match err { + ApiError::Status { status, body } => { + assert_eq!(status, reqwest::StatusCode::NOT_IMPLEMENTED); + assert!(body.contains("PRESIGN_UNSUPPORTED"), "{body}"); + } + other => panic!("expected Status error, got {other:?}"), + } + create.assert(); } - /// A non-success status surfaces as an `ApiError::Status` carrying the body, - /// the same mapping every other seam call uses (so the CLI prints the server - /// message and the 4xx re-auth hint still fires). + /// A non-2xx from the storage `PUT` surfaces as an `ApiError::Status` + /// carrying the storage status + body, so the user sees why the transfer + /// was rejected. #[test] - fn upload_stream_maps_error_status() { - let payload = upload_payload(64); - let len = payload.len(); + fn upload_maps_storage_put_failure_to_status() { + let (tf, _len) = upload_temp_file(64); let mut server = mockito::Server::new(); - let _m = server - .mock("POST", "/v1/files") - .with_status(400) + let put_url = format!("{}/storage/upload_fail", server.url()); + let _create = server + .mock("POST", "/v1/uploads") + .with_status(200) .with_header("content-type", "application/json") - .with_body(r#"{"error":"Upload exceeds maximum size"}"#) + .with_body(format!( + r#"{{"finalize_token":"ft_1","headers":{{}},"mode":"single","upload_id":"upload_fail","url":"{put_url}"}}"# + )) + .create(); + let _put = server + .mock("PUT", "/storage/upload_fail") + .with_status(403) + .with_body("SignatureDoesNotMatch") .create(); let api = Api::test_new(&server.url(), "test-jwt", Some("ws-1")); let err = api - .upload_stream(std::io::Cursor::new(payload), Some(len as u64)) - .expect_err("a 400 should map to an error"); + .upload(tf.path(), noop_progress()) + .expect_err("a storage 403 must map to an error"); match err { ApiError::Status { status, body } => { - assert_eq!(status, reqwest::StatusCode::BAD_REQUEST); - assert!(body.contains("Upload exceeds maximum size")); + assert_eq!(status, reqwest::StatusCode::FORBIDDEN); + assert!(body.contains("SignatureDoesNotMatch"), "{body}"); } other => panic!("expected Status error, got {other:?}"), } } + #[test] + fn upload_concurrency_honors_env_else_defaults() { + // Default when unset. + unsafe { + std::env::remove_var("HOTDATA_UPLOAD_CONCURRENCY"); + } + assert_eq!(upload_concurrency(), UPLOAD_CONCURRENCY); + + // A valid non-zero override wins. + unsafe { + std::env::set_var("HOTDATA_UPLOAD_CONCURRENCY", "20"); + } + assert_eq!(upload_concurrency(), 20); + + // Garbage and zero fall back to the default (tuning hint, not hard input). + unsafe { + std::env::set_var("HOTDATA_UPLOAD_CONCURRENCY", "nope"); + } + assert_eq!(upload_concurrency(), UPLOAD_CONCURRENCY); + unsafe { + std::env::set_var("HOTDATA_UPLOAD_CONCURRENCY", "0"); + } + assert_eq!(upload_concurrency(), UPLOAD_CONCURRENCY); + + unsafe { + std::env::remove_var("HOTDATA_UPLOAD_CONCURRENCY"); + } + } + #[test] fn runtime_state_parsing_and_coldness() { assert_eq!(RuntimeState::from_state_str("ready"), RuntimeState::Ready);