Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 9 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,23 @@ path = "src/main.rs"
[dependencies]
# Hotdata Rust SDK. The CLI's sync wrapper (src/sdk.rs) drives this async SDK
# behind a shared multi-thread tokio runtime. The `arrow` feature backs result
# decode; `upload_stream` carries `content_length` (sized body, not chunked, so
# the server can fast-fail an oversized upload — see src/sdk.rs::Api::upload_stream).
hotdata = { version = "0.4.0", features = ["arrow"] }
# decode; `upload_file` runs the presigned direct-to-storage upload flow
# (see src/sdk.rs::Api::upload).
hotdata = { version = "0.5.0", features = ["arrow"] }
# Shared multi-thread runtime for the sync wrapper; block_on is called
# concurrently from rayon worker threads (see src/indexes.rs). `sync` backs the
# mpsc channel that bridges the blocking upload reader into an async byte stream.
tokio = { version = "1", features = ["rt-multi-thread", "sync"] }
# concurrently from rayon worker threads (see src/indexes.rs). The presigned
# upload (src/sdk.rs::Api::upload) drives the SDK's async `upload_file` on this
# same runtime.
tokio = { version = "1", features = ["rt-multi-thread"] }
# CliTokenProvider implements the SDK's #[async_trait] BearerTokenProvider.
async-trait = "0.1"
# Bridge the progress-wrapped blocking upload reader into the async
# `Stream<Item = Result<Bytes, _>>` the SDK's `upload_stream` consumes: a
# spawn_blocking task feeds chunks through a tokio mpsc channel, wrapped as a
# Stream by tokio-stream's ReceiverStream. `futures-core` names the Stream
# bound; `bytes` matches the SDK's chunk type.
bytes = "1"
futures-core = "0.3"
tokio-stream = "0.1"
anstyle = "1.0.13"
clap = { version = "4", features = ["derive"] }
directories = "6"
# Matches the SDK's reqwest 0.13 so the sdk seam can name the `reqwest::Client`
# type backing `Configuration.client`; `blocking` serves the CLI's own
# synchronous paths (PKCE/token mints in jwt.rs + the streaming /files upload).
# synchronous paths (PKCE/token mints in jwt.rs + the `--url` source download
# in databases.rs that feeds the presigned upload).
reqwest = { version = "0.13", features = ["blocking", "json"] }
rayon = "1.10"
serde = { version = "1", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ pub enum DatabasesCommands {
#[arg(long, conflicts_with_all = ["file", "upload_id"])]
url: Option<String>,

/// Use a previously staged upload ID from `POST /v1/files` instead of uploading
/// Use a previously staged upload ID from `POST /v1/uploads` instead of uploading
#[arg(long, conflicts_with_all = ["file", "url"])]
upload_id: Option<String>,
},
Expand Down Expand Up @@ -640,7 +640,7 @@ pub enum DatabaseTablesCommands {
#[arg(long, conflicts_with_all = ["file", "upload_id"])]
url: Option<String>,

/// Use a previously staged upload ID from `POST /v1/files` instead of uploading
/// Use a previously staged upload ID from `POST /v1/uploads` instead of uploading
#[arg(long, conflicts_with_all = ["file", "url"])]
upload_id: Option<String>,
},
Expand Down
177 changes: 135 additions & 42 deletions src/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,26 +337,38 @@ fn table_rows(catalog: &str, tables: Vec<InfoTable>) -> Vec<TableRow> {
.collect()
}

fn finish_upload(
api: &Api,
reader: impl std::io::Read + Send + 'static,
size: Option<u64>,
pb: &ProgressBar,
) -> String {
// Stream the body to `POST /v1/files` through the SDK seam, which drives the
// SDK's `upload_stream` on a dedicated no-timeout client (a 10 GB+ parquet
// far outlives the shared 300s request timeout) and bridges this blocking,
// progress-wrapped `reader` into the async byte stream the SDK consumes.
// `size` becomes the `Content-Length` so the server fast-fails an oversized
// upload before writing bytes; the `--url` source may not know it, hence
// `Option`. Carries the same auth + scope headers as every other SDK call.
let result = api.upload_stream(reader, size);
pb.finish_and_clear();
/// The shared indicatif progress-bar template for an upload: a spinner, a
/// byte-granular bar, the bytes-done / total, and an ETA.
fn upload_progress_style() -> ProgressStyle {
ProgressStyle::with_template(
"{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
)
.unwrap()
.progress_chars("=>-")
}

match result {
Ok(id) => id,
Err(e) => e.exit(),
}
/// Upload an already-on-disk parquet file via the SDK's presigned direct-to-
/// storage flow, driving a single aggregate progress bar from the SDK's
/// byte-granular progress callback. Returns the finalized upload id, or the
/// seam's error (a `501 PRESIGN_UNSUPPORTED` surfaces an actionable message,
/// not a fallback). The caller decides how to surface failure — `--url` must
/// clean up its temp file before exiting, so this returns rather than exits.
fn upload_parquet_path(api: &Api, path: &Path, size: u64) -> Result<String, ApiError> {
let pb = ProgressBar::new(size);
pb.set_style(upload_progress_style());

// The SDK reports cumulative `(done, total)`; mirror it onto the bar. We
// `set_length(total)` defensively so the bar tracks the SDK's own notion of
// total even though it equals `size` here.
let cb_pb = pb.clone();
let progress: hotdata::UploadProgress = std::sync::Arc::new(move |done, total| {
cb_pb.set_length(total);
cb_pb.set_position(done);
});

let result = api.upload(path, progress);
pb.finish_and_clear();
result
}

fn upload_parquet_file(api: &Api, path: &str) -> String {
Expand All @@ -369,25 +381,15 @@ fn upload_parquet_file(api: &Api, path: &str) -> String {
std::process::exit(1);
}

let f = match std::fs::File::open(path) {
Ok(f) => f,
let file_size = match std::fs::metadata(path) {
Ok(m) => m.len(),
Err(e) => {
eprintln!("error opening file '{path}': {e}");
std::process::exit(1);
}
};

let file_size = f.metadata().map(|m| m.len()).unwrap_or(0);
let pb = ProgressBar::new(file_size);
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
)
.unwrap()
.progress_chars("=>-"),
);
let reader = pb.wrap_read(f);
finish_upload(api, reader, Some(file_size), &pb)
upload_parquet_path(api, Path::new(path), file_size).unwrap_or_else(|e| e.exit())
}

fn upload_parquet_url(api: &Api, url: &str) -> String {
Expand All @@ -398,6 +400,11 @@ fn upload_parquet_url(api: &Api, url: &str) -> String {
std::process::exit(1);
}

// The presigned upload needs a seekable, size-known source (the SDK opens
// the path, declares its byte count, and PUTs it directly to storage), so
// download the URL to a temp file first, then upload that file on the same
// path as `--file`. The temp file is removed before this returns on both
// success and failure (see `upload_temp_file`).
let resp = match reqwest::blocking::get(url) {
Ok(r) => r,
Err(e) => {
Expand All @@ -415,16 +422,11 @@ fn upload_parquet_url(api: &Api, url: &str) -> String {
}

let content_length = resp.content_length();
let pb = match content_length {
// Download progress: a byte bar when the length is known, else a spinner.
let dl_pb = match content_length {
Some(len) => {
let pb = ProgressBar::new(len);
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
)
.unwrap()
.progress_chars("=>-"),
);
pb.set_style(upload_progress_style());
pb
}
None => {
Expand All @@ -436,8 +438,59 @@ fn upload_parquet_url(api: &Api, url: &str) -> String {
pb
}
};
let reader = pb.wrap_read(resp);
finish_upload(api, reader, content_length, &pb)

let temp = match download_to_temp(resp, &dl_pb) {
Ok(t) => t,
Err(e) => {
dl_pb.finish_and_clear();
eprintln!("error downloading '{url}': {e}");
std::process::exit(1);
}
};
dl_pb.finish_and_clear();

let size = std::fs::metadata(temp.path()).map(|m| m.len()).unwrap_or(0);
upload_temp_file(temp, |path| upload_parquet_path(api, path, size)).unwrap_or_else(|e| e.exit())
}

/// Upload an already-downloaded temp file, guaranteeing the file is deleted
/// before returning — on both success and failure.
///
/// The temp file MUST be cleaned up here, on return, rather than left to a
/// guard in the caller's scope: the caller exits the process via
/// [`ApiError::exit`] (`std::process::exit`) on the `Err` arm, and
/// `process::exit` runs no destructors. Owning `temp` in this function means it
/// drops (deleting a potentially multi-GB download) before the caller can exit.
fn upload_temp_file<F>(temp: tempfile::NamedTempFile, upload: F) -> Result<String, ApiError>
where
F: FnOnce(&Path) -> Result<String, ApiError>,
{
let result = upload(temp.path());
// Delete now, while still inside this function, so cleanup precedes any
// `process::exit` the caller performs on `Err`.
drop(temp);
result
}

/// Stream a blocking HTTP response body to a freshly created temp file,
/// advancing `pb` as bytes land. Returns the open [`NamedTempFile`], which
/// deletes the file on drop. Created atomically with `O_EXCL` + 0600 perms via
/// `tempfile`, so it can't be redirected by a pre-planted symlink.
fn download_to_temp(
resp: reqwest::blocking::Response,
pb: &ProgressBar,
) -> std::io::Result<tempfile::NamedTempFile> {
use std::io::Write;

let mut temp = tempfile::Builder::new()
.prefix("hotdata-upload-")
.suffix(".parquet")
.tempfile()?;

let mut reader = pb.wrap_read(resp);
std::io::copy(&mut reader, temp.as_file_mut())?;
temp.as_file_mut().flush()?;
Ok(temp)
}

fn collect_tables(api: &Api, connection_id: &str, schema: Option<&str>) -> Vec<InfoTable> {
Expand Down Expand Up @@ -1717,4 +1770,44 @@ mod tests {
assert_eq!(resp.token, "jwt-x");
assert_eq!(resp.database_id, "dbid_abc");
}

// The `--url` path downloads to a temp file and then exits via
// `process::exit` if the upload fails. Because `process::exit` runs no
// destructors, the temp file must be deleted by `upload_temp_file` before
// it returns the `Err` the caller exits on — not by a guard in the caller's
// scope. These tests pin that contract for both arms.
#[test]
fn upload_temp_file_removes_temp_on_upload_failure() {
let temp = tempfile::Builder::new()
.suffix(".parquet")
.tempfile()
.unwrap();
let path = temp.path().to_path_buf();
assert!(path.exists());

let result = upload_temp_file(temp, |p| {
assert!(p.exists(), "file present while the upload runs");
Err(ApiError::Transport("upload boom".into()))
});

assert!(result.is_err());
assert!(
!path.exists(),
"temp file must be removed before the failure is returned (caller exits without unwinding)"
);
}

#[test]
fn upload_temp_file_removes_temp_on_success() {
let temp = tempfile::Builder::new()
.suffix(".parquet")
.tempfile()
.unwrap();
let path = temp.path().to_path_buf();

let result = upload_temp_file(temp, |_p| Ok("upid_123".to_string()));

assert_eq!(result.unwrap(), "upid_123");
assert!(!path.exists(), "temp file must be removed on success");
}
}
Loading
Loading