Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 89 additions & 10 deletions rust/lance-index/src/vector/ivf/shuffler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::stream::repeat_with;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt, stream};
use lance_arrow::RecordBatchExt;
use lance_core::cache::LanceCache;
use lance_core::utils::futures::StreamOnDropExt;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::{Error, ROW_ID, Result, datatypes::Schema};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
Expand All @@ -53,12 +54,12 @@ use crate::vector::transform::Transformer;
const UNSORTED_BUFFER: &str = "unsorted.lance";
const SHUFFLE_BATCH_SIZE: usize = 1024;

fn get_temp_dir() -> Result<Path> {
// Note: using keep here means we will not delete this TempDir automatically
let dir = tempfile::TempDir::new()?.keep();
/// Returns the temp dir path plus a guard whose `Drop` removes the directory.
fn get_temp_dir() -> Result<(Path, tempfile::TempDir)> {
let dir = tempfile::TempDir::new()?;
let tmp_dir_path =
Path::from_filesystem_path(dir).map_err(|e| Error::io_source(Box::new(e)))?;
Ok(tmp_dir_path)
Path::from_filesystem_path(dir.path()).map_err(|e| Error::io_source(Box::new(e)))?;
Ok((tmp_dir_path, dir))
}

/// A builder for a partition of data
Expand Down Expand Up @@ -343,11 +344,22 @@ pub async fn shuffle_dataset(

// step 3: load the sorted chunks, consumers are expect to be responsible for merging the streams
let start = std::time::Instant::now();
let stream =
let streams =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files).await?;
info!("merged partitioned shuffles in {:?}", start.elapsed());

Ok(stream)
// Clone the temp-dir guard into each returned stream so the shuffle
// files are removed only after the consumer drops every stream.
let temp_dir_guard = shuffler.owned_temp_dir.clone();
let guarded_streams = streams
.into_iter()
.map(|stream| {
let guard = temp_dir_guard.clone();
stream.on_drop(move || drop(guard))
})
.collect::<Vec<_>>();

Ok(guarded_streams)
}

pub async fn shuffle_vectors(
Expand Down Expand Up @@ -385,6 +397,10 @@ pub struct IvfShuffler {

output_dir: Path,

// `Some` for an auto-created `output_dir`; cleanup runs when the last
// clone of this `Arc` is dropped. `None` when the caller owns cleanup.
owned_temp_dir: Option<Arc<tempfile::TempDir>>,

// whether the lance file is v1 (legacy) or v2
is_legacy: bool,

Expand All @@ -410,9 +426,12 @@ impl IvfShuffler {
is_legacy: bool,
shuffle_output_root_filename: Option<String>,
) -> Result<Self> {
let output_dir = match output_dir {
Some(output_dir) => output_dir,
None => get_temp_dir()?,
let (output_dir, owned_temp_dir) = match output_dir {
Some(output_dir) => (output_dir, None),
None => {
let (path, dir) = get_temp_dir()?;
(path, Some(Arc::new(dir)))
}
};

let shuffle_output_root_filename = match shuffle_output_root_filename {
Expand All @@ -423,6 +442,7 @@ impl IvfShuffler {
Ok(Self {
num_partitions,
output_dir,
owned_temp_dir,
unsorted_buffers: vec![],
is_legacy,
shuffle_output_root_filename,
Expand Down Expand Up @@ -1198,4 +1218,63 @@ mod test {

assert_eq!(num_batches, NUM_PARTITIONS * expected_num_part_files);
}

// Auto-created shuffler temp dir must be removed once the shuffler and
// its returned streams are dropped.
#[tokio::test]
async fn test_shuffler_cleans_up_auto_temp_dir() {
let (stream, mut shuffler) = make_stream_and_shuffler(false);

// Snapshot the path without cloning the `Arc` — a clone here would
// block cleanup on drop.
let temp_dir_path = shuffler
.owned_temp_dir
.as_ref()
.expect("shuffler built with output_dir = None should own a TempDir guard")
.path()
.to_path_buf();

assert!(
temp_dir_path.is_dir(),
"auto-created shuffler temp dir should exist while shuffler is alive: {:?}",
temp_dir_path,
);

shuffler.write_unsorted_stream(stream).await.unwrap();
let partition_files = shuffler.write_partitioned_shuffles(100, 1).await.unwrap();
assert_eq!(partition_files.len(), 1);

assert!(
temp_dir_path.join("unsorted.lance").is_file(),
"shuffler should have written unsorted.lance into its working dir: {:?}",
temp_dir_path,
);
assert!(
temp_dir_path.join("sorted_0.lance").is_file(),
"shuffler should have written sorted_0.lance into its working dir: {:?}",
temp_dir_path,
);

let mut result_streams =
IvfShuffler::load_partitioned_shuffles(&shuffler.output_dir, partition_files)
.await
.unwrap();

while let Some(mut s) = result_streams.pop() {
while let Some(item) = s.next().await {
let _ = item.unwrap();
}
}
drop(result_streams);
// Dropping the shuffler releases the last `Arc<TempDir>`, which
// removes the on-disk directory.
drop(shuffler);

assert!(
!temp_dir_path.exists(),
"auto-created shuffler temp dir should be removed once the IvfShuffler and \
its returned streams are dropped, but it still exists: {:?}",
temp_dir_path,
);
}
}
Loading