Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .openapi-generator-ignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ src/resources.rs
src/field.rs
src/status.rs
src/query.rs
src/uploads.rs
CHANGELOG.md
RELEASING.md
examples/
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Ergonomic presigned (direct-to-storage) file uploads: `Client::upload_file`
(and `client.uploads().upload_file`) open an upload session, `PUT` the bytes
straight to object storage — a single `PUT` for small files, bounded-
concurrency multipart `PUT`s sliced by the server's `part_size` for large
ones — then finalize, returning the `FinalizeUploadResponse`. Configurable via
`UploadOptions` (content type/encoding, filename, part-size hint, and an
`UploadProgress` callback). Never falls back to the legacy `POST /v1/files`
proxy; storage `PUT`s carry no SDK auth/scope headers. Multipart concurrency
is tunable via `UploadOptions::max_concurrency` (default 10), bounded by a
256 MiB peak-memory budget derived from the server's actual part size; when no
`part_size` is given, the SDK auto-scales the hint (8 MiB for normal files,
larger only past ~72 GiB to keep the part count under S3's 10,000-part limit).
Finalize is exactly-once (sent with retries disabled so an ambiguous failure
can't be retried into a spurious "already finalized" error); part `PUT`s stay
retryable. Storage `PUT`s use a dedicated header-bare reqwest client, so a host
app's default headers on the SDK's main client never leak to object storage.
The multipart session shape is validated (`part_urls` count must match the
file's part count) and pathological sizes (`> i64::MAX`) are rejected rather
than silently wrapped.

### Changed

- feat(uploads): add file upload endpoints
Expand Down
39 changes: 33 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,37 @@ impl Client {
}
}

/// Upload a local file directly to object storage and finalize it — the
/// primary, presigned upload entry point.
///
/// Opens an upload session (`POST /v1/uploads`), `PUT`s the bytes **directly
/// to storage** (a single `PUT` for a small file, or bounded-concurrency
/// multipart `PUT`s split by the server's `part_size` for a large one), then
/// finalizes (`POST /v1/uploads/{id}/finalize`) and returns the
/// [`models::FinalizeUploadResponse`] — read `upload_id` from it to load the
/// upload into a managed table.
///
/// This path NEVER falls back to the legacy `POST /v1/files` proxy: a server
/// that cannot presign (`501 PRESIGN_UNSUPPORTED`) is a hard
/// [`UploadError::CreateSession`] error.
///
/// Pass [`UploadOptions`] to record advisory metadata (`content_type`,
/// `content_encoding`, `filename`), hint a `part_size` (the server clamps
/// it), or attach a `progress` callback invoked with
/// `(bytes_done_total, total)` as bytes flow.
///
/// Large uploads legitimately take minutes; storage `PUT`s reuse the
/// configured reqwest client, so supply one with no request timeout (via
/// [`ClientBuilder::reqwest_client`](crate::ClientBuilder::reqwest_client))
/// when uploading large files.
pub async fn upload_file(
&self,
path: impl AsRef<std::path::Path>,
opts: crate::uploads::UploadOptions,
) -> Result<models::FinalizeUploadResponse, crate::uploads::UploadError> {
crate::uploads::upload_file(&self.configuration, path.as_ref(), opts).await
}

/// Stream an arbitrary byte source to `POST /v1/files`, the raw-body upload
/// endpoint.
///
Expand Down Expand Up @@ -624,8 +655,7 @@ impl Client {
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream")
.to_owned();
let is_json =
content_type.starts_with("application") && content_type.contains("json");
let is_json = content_type.starts_with("application") && content_type.contains("json");

if !status.is_client_error() && !status.is_server_error() {
let content = resp.text().await?;
Expand Down Expand Up @@ -1270,10 +1300,7 @@ mod tests {
// The mock only matches when all three headers are present, so a
// successful Submitted outcome proves they reached the wire.
let outcome = client
.submit_query(
models::QueryRequest::new("select 1".into()),
Some("db_123"),
)
.submit_query(models::QueryRequest::new("select 1".into()), Some("db_123"))
.await
.expect("submit_query should succeed with scoped headers");

Expand Down
13 changes: 10 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod models;
pub mod query;
pub mod resources;
pub mod status;
pub mod uploads;

#[cfg(all(test, unix))]
mod test_support;
Expand All @@ -49,11 +50,16 @@ pub use query::{
DEFAULT_MAX_AUTO_BYTES, DEFAULT_MAX_AUTO_ROWS, OVERLOADED_ERROR_CODE,
};
pub use resources::{
ConnectionTypesApi, ConnectionsApi, DatabaseContextApi, DatabasesApi,
EmbeddingProvidersApi, IndexesApi, InformationSchemaApi, JobsApi, QueryApi, QueryRunsApi,
RefreshApi, ResultsApi, SavedQueriesApi, SecretsApi, UploadsApi, WorkspacesApi,
ConnectionTypesApi, ConnectionsApi, DatabaseContextApi, DatabasesApi, EmbeddingProvidersApi,
IndexesApi, InformationSchemaApi, JobsApi, QueryApi, QueryRunsApi, RefreshApi, ResultsApi,
SavedQueriesApi, SecretsApi, UploadsApi, WorkspacesApi,
};
pub use status::{QueryRunStatus, QueryRunStatusExt, ResultStatus, ResultStatusExt};
pub use uploads::{
auto_part_size_hint, effective_in_flight, UploadError, UploadOptions, UploadProgress,
DEFAULT_MAX_CONCURRENCY, DEFAULT_PART_SIZE, MAX_PART_SIZE, MIN_PART_SIZE, TARGET_MAX_PARTS,
UPLOAD_MEMORY_BUDGET,
};

/// Process-wide lock serializing every test that mutates `std::env`. Env is a
/// process-global resource, so per-module locks would race; all env-mutating
Expand All @@ -73,4 +79,5 @@ pub mod prelude {
};
pub use crate::resources::*;
pub use crate::status::{QueryRunStatus, QueryRunStatusExt, ResultStatus, ResultStatusExt};
pub use crate::uploads::{UploadError, UploadOptions, UploadProgress};
}
15 changes: 14 additions & 1 deletion src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,20 @@ impl<'a> UploadsApi<'a> {
Self { config }
}

/// Upload a file from a local path.
/// Upload a local file directly to object storage (presigned) and finalize
/// it. The primary upload path; returns the finalized upload (read
/// `upload_id` from it). See [`Client::upload_file`](crate::Client::upload_file)
/// for the full contract and [`UploadOptions`](crate::uploads::UploadOptions).
pub async fn upload_file(
&self,
path: impl AsRef<std::path::Path>,
opts: crate::uploads::UploadOptions,
) -> Result<models::FinalizeUploadResponse, crate::uploads::UploadError> {
crate::uploads::upload_file(self.config, path.as_ref(), opts).await
}

/// Stream a file to the legacy raw-body `POST /v1/files` proxy. Prefer
/// [`upload_file`](Self::upload_file), the presigned direct-to-storage path.
pub async fn upload(
&self,
body: std::path::PathBuf,
Expand Down
Loading
Loading