Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .openapi-generator-ignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ src/resources.rs
src/field.rs
src/status.rs
src/query.rs
src/uploads.rs
CHANGELOG.md
RELEASING.md
examples/
Expand Down
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Ergonomic presigned (direct-to-storage) file uploads: `Client::upload_file`
(and `client.uploads().upload_file`) open an upload session, `PUT` the bytes
straight to object storage — a single `PUT` for small files, bounded-
concurrency multipart `PUT`s sliced by the server's `part_size` for large
ones — then finalize, returning the `FinalizeUploadResponse`. Configurable via
`UploadOptions` (content type/encoding, filename, part-size hint, and an
`UploadProgress` callback). Never falls back to the legacy `POST /v1/files`
proxy; storage `PUT`s carry no SDK auth/scope headers. Multipart concurrency
is tunable via `UploadOptions::max_concurrency` (default 10), bounded by a
256 MiB peak-memory budget derived from the server's actual part size; when no
`part_size` is given, the SDK auto-scales the hint (8 MiB for normal files,
larger only past ~72 GiB to keep the part count under S3's 10,000-part limit).
Finalize is exactly-once (sent with retries disabled so an ambiguous failure
can't be retried into a spurious "already finalized" error); part `PUT`s stay
retryable. Storage `PUT`s use a dedicated header-bare reqwest client, so a host
app's default headers on the SDK's main client never leak to object storage.
The multipart session shape is validated (`part_urls` count must match the
file's part count) and pathological sizes (`> i64::MAX`) are rejected rather
than silently wrapped.

### Changed

- feat(uploads): add file upload endpoints
Expand Down
39 changes: 33 additions & 6 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,37 @@ impl Client {
}
}

/// Upload a local file directly to object storage and finalize it — the
/// primary, presigned upload entry point.
///
/// Opens an upload session (`POST /v1/uploads`), `PUT`s the bytes **directly
/// to storage** (a single `PUT` for a small file, or bounded-concurrency
/// multipart `PUT`s split by the server's `part_size` for a large one), then
/// finalizes (`POST /v1/uploads/{id}/finalize`) and returns the
/// [`models::FinalizeUploadResponse`] — read `upload_id` from it to load the
/// upload into a managed table.
///
/// This path NEVER falls back to the legacy `POST /v1/files` proxy: a server
/// that cannot presign (`501 PRESIGN_UNSUPPORTED`) is a hard
/// [`UploadError::CreateSession`] error.
///
/// Pass [`UploadOptions`] to record advisory metadata (`content_type`,
/// `content_encoding`, `filename`), hint a `part_size` (the server clamps
/// it), or attach a `progress` callback invoked with
/// `(bytes_done_total, total)` as bytes flow.
///
/// Large uploads legitimately take minutes; storage `PUT`s reuse the
/// configured reqwest client, so supply one with no request timeout (via
/// [`ClientBuilder::reqwest_client`](crate::ClientBuilder::reqwest_client))
/// when uploading large files.
pub async fn upload_file(
&self,
path: impl AsRef<std::path::Path>,
opts: crate::uploads::UploadOptions,
) -> Result<models::FinalizeUploadResponse, crate::uploads::UploadError> {
crate::uploads::upload_file(&self.configuration, path.as_ref(), opts).await
}

/// Stream an arbitrary byte source to `POST /v1/files`, the raw-body upload
/// endpoint.
///
Expand Down Expand Up @@ -624,8 +655,7 @@ impl Client {
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream")
.to_owned();
let is_json =
content_type.starts_with("application") && content_type.contains("json");
let is_json = content_type.starts_with("application") && content_type.contains("json");

if !status.is_client_error() && !status.is_server_error() {
let content = resp.text().await?;
Expand Down Expand Up @@ -1270,10 +1300,7 @@ mod tests {
// The mock only matches when all three headers are present, so a
// successful Submitted outcome proves they reached the wire.
let outcome = client
.submit_query(
models::QueryRequest::new("select 1".into()),
Some("db_123"),
)
.submit_query(models::QueryRequest::new("select 1".into()), Some("db_123"))
.await
.expect("submit_query should succeed with scoped headers");

Expand Down
13 changes: 10 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod models;
pub mod query;
pub mod resources;
pub mod status;
pub mod uploads;

#[cfg(all(test, unix))]
mod test_support;
Expand All @@ -49,11 +50,16 @@ pub use query::{
DEFAULT_MAX_AUTO_BYTES, DEFAULT_MAX_AUTO_ROWS, OVERLOADED_ERROR_CODE,
};
pub use resources::{
ConnectionTypesApi, ConnectionsApi, DatabaseContextApi, DatabasesApi,
EmbeddingProvidersApi, IndexesApi, InformationSchemaApi, JobsApi, QueryApi, QueryRunsApi,
RefreshApi, ResultsApi, SavedQueriesApi, SecretsApi, UploadsApi, WorkspacesApi,
ConnectionTypesApi, ConnectionsApi, DatabaseContextApi, DatabasesApi, EmbeddingProvidersApi,
IndexesApi, InformationSchemaApi, JobsApi, QueryApi, QueryRunsApi, RefreshApi, ResultsApi,
SavedQueriesApi, SecretsApi, UploadsApi, WorkspacesApi,
};
pub use status::{QueryRunStatus, QueryRunStatusExt, ResultStatus, ResultStatusExt};
pub use uploads::{
auto_part_size_hint, effective_in_flight, UploadError, UploadOptions, UploadProgress,
DEFAULT_MAX_CONCURRENCY, DEFAULT_PART_SIZE, MAX_PART_SIZE, MIN_PART_SIZE, TARGET_MAX_PARTS,
UPLOAD_MEMORY_BUDGET,
};

/// Process-wide lock serializing every test that mutates `std::env`. Env is a
/// process-global resource, so per-module locks would race; all env-mutating
Expand All @@ -73,4 +79,5 @@ pub mod prelude {
};
pub use crate::resources::*;
pub use crate::status::{QueryRunStatus, QueryRunStatusExt, ResultStatus, ResultStatusExt};
pub use crate::uploads::{UploadError, UploadOptions, UploadProgress};
}
15 changes: 14 additions & 1 deletion src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,20 @@ impl<'a> UploadsApi<'a> {
Self { config }
}

/// Upload a file from a local path.
/// Upload a local file directly to object storage (presigned) and finalize
/// it. The primary upload path; returns the finalized upload (read
/// `upload_id` from it). See [`Client::upload_file`](crate::Client::upload_file)
/// for the full contract and [`UploadOptions`](crate::uploads::UploadOptions).
pub async fn upload_file(
&self,
path: impl AsRef<std::path::Path>,
opts: crate::uploads::UploadOptions,
) -> Result<models::FinalizeUploadResponse, crate::uploads::UploadError> {
crate::uploads::upload_file(self.config, path.as_ref(), opts).await
}

/// Stream a file to the legacy raw-body `POST /v1/files` proxy. Prefer
/// [`upload_file`](Self::upload_file), the presigned direct-to-storage path.
pub async fn upload(
&self,
body: std::path::PathBuf,
Expand Down
Loading
Loading