Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4318,6 +4318,9 @@ pub async fn flat_bm25_search_stream_with_metrics(
elapsed_compute: Option<Time>,
) -> DataFusionResult<SendableRecordBatchStream> {
let mut tokenizer = tokenizer;

// Pre-await synchronous work: query tokenization + chunk-stream setup.
let pre_await_start = std::time::Instant::now();
let query_tokens = Arc::new(collect_query_tokens(&query, &mut tokenizer));

let input_schema = input.schema();
Expand All @@ -4336,6 +4339,9 @@ pub async fn flat_bm25_search_stream_with_metrics(
ACCUMULATE_BYTES,
SLICE_BYTES,
);
if let Some(t) = &elapsed_compute {
t.add_duration(pre_await_start.elapsed());
}

// Phase 2 - For each row we need to know the total number of tokens and the count of each
// of the query tokens. For example, if the query is "book" and the row is "the book shop"
Expand All @@ -4351,13 +4357,10 @@ pub async fn flat_bm25_search_stream_with_metrics(
.await?;

// Phase 3 - Calculate final scores (this is fairly cheap, probably don't need to parallelize).
// Synchronous and single-threaded, so we time it from this thread.
let scoring_start = std::time::Instant::now();
// All post-await work is synchronous; time the scorer + score + slicing loop together.
let post_await_start = std::time::Instant::now();
let scorer = initialize_scorer(base_scorer.as_ref(), query_tokens.as_ref(), &counted_input);
let scores = flat_bm25_score(query_tokens.as_ref(), &counted_input, &scorer)?;
if let Some(t) = &elapsed_compute {
t.add_duration(scoring_start.elapsed());
}

// Finally we emit batches according to the target batch size
let num_out_batches = scores.num_rows().div_ceil(target_batch_size);
Expand All @@ -4367,6 +4370,9 @@ pub async fn flat_bm25_search_stream_with_metrics(
let len = (scores.num_rows() - start).min(target_batch_size);
batches.push(Ok(scores.slice(start, len)));
}
if let Some(t) = &elapsed_compute {
t.add_duration(post_await_start.elapsed());
}
Ok(Box::pin(RecordBatchStreamAdapter::new(
FTS_SCHEMA.clone(),
stream::iter(batches),
Expand Down
93 changes: 41 additions & 52 deletions rust/lance/src/io/exec/fts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use lance_datafusion::utils::{ExecutionPlanMetricsSetExt, MetricsExt, PARTITIONS
use lance_table::format::IndexMetadata;

use super::PreFilterSource;
use super::utils::{IndexMetrics, InstrumentedChildInputStream, build_prefilter};
use super::utils::{IndexMetrics, build_prefilter};
use crate::index::scalar::inverted::{load_segment_details, load_segments};
use crate::{Dataset, index::DatasetIndexInternalExt};
use lance_index::metrics::MetricsCollector;
Expand Down Expand Up @@ -695,11 +695,6 @@ impl FlatMatchFilterExec {
metrics_set: ExecutionPlanMetricsSet,
) -> DataFusionResult<SendableRecordBatchStream> {
let metrics = Arc::new(FtsIndexMetrics::new(&metrics_set, partition));
let elapsed_compute = metrics.baseline_metrics.elapsed_compute().clone();
// Time the one-shot setup (tokenizer load + query tokenization) so it's
// attributed to this node's elapsed_compute. The helper itself only
// times per-batch work.
let setup_start = std::time::Instant::now();
let column = query
.column
.clone()
Expand All @@ -720,44 +715,45 @@ impl FlatMatchFilterExec {
None => Self::load_tokenizer(&dataset, &column, &metrics.index_metrics).await?,
};
let query_tokens = Arc::new(collect_query_tokens(&query.terms, &mut tokenizer));
elapsed_compute.add_duration(setup_start.elapsed());

let helper = InstrumentedChildInputStream::new(
input,
schema,
move |batch| {
// Clone per-batch so the work runs *inside* the async block
// (i.e., during the helper's timed in_flight poll, not during
// its untimed input-pulling phase).
let column = column.clone();
let query_tokens = query_tokens.clone();
let mut tokenizer = tokenizer.box_clone();
async move {
let text_column = batch.column_by_name(&column).ok_or_else(|| {
DataFusionError::Execution(format!("Column {} not found in batch", column,))
})?;
let predicate = match text_column.data_type() {
DataType::Utf8 => {
Self::find_matches::<i32>(text_column, &mut tokenizer, &query_tokens)
}
DataType::LargeUtf8 => {
Self::find_matches::<i64>(text_column, &mut tokenizer, &query_tokens)
}
_ => {
return Err(DataFusionError::Execution(format!(
"Column {} is not a string",
column,
)));
}
};
Ok(arrow::compute::filter_record_batch(&batch, &predicate)?)
}
},
1,
partition,
&metrics_set,
);
Ok(Box::pin(helper))
let baseline = BaselineMetrics::new(&metrics_set, partition);
let elapsed_compute = baseline.elapsed_compute().clone();
let stream = input.then(move |batch_result| {
let column = column.clone();
let query_tokens = query_tokens.clone();
let mut tokenizer = tokenizer.box_clone();
let elapsed_compute = elapsed_compute.clone();
async move {
let batch = batch_result?;
let _t = elapsed_compute.timer();
let text_column = batch.column_by_name(&column).ok_or_else(|| {
DataFusionError::Execution(format!("Column {} not found in batch", column,))
})?;
let predicate = match text_column.data_type() {
DataType::Utf8 => {
Self::find_matches::<i32>(text_column, &mut tokenizer, &query_tokens)
}
DataType::LargeUtf8 => {
Self::find_matches::<i64>(text_column, &mut tokenizer, &query_tokens)
}
_ => {
return Err(DataFusionError::Execution(format!(
"Column {} is not a string",
column,
)));
}
};
Ok(arrow::compute::filter_record_batch(&batch, &predicate)?)
}
});
let stream = stream.map(move |batch| {
let poll = baseline.record_poll(std::task::Poll::Ready(Some(batch)));
match poll {
std::task::Poll::Ready(Some(b)) => b,
_ => unreachable!("record_poll preserves Ready(Some) input"),
}
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}

Expand Down Expand Up @@ -1010,11 +1006,8 @@ impl ExecutionPlan for FlatMatchQueryExec {
// so it can attribute the spawn_cpu tokenize work and synchronous
// scoring back onto this node's `elapsed_compute`. Sharing the same
// `Time` handle that's already inside the FtsIndexMetrics avoids
// registering a duplicate metric. Cloned once for use during setup
// timing (below) and again moved into the async block for the
// streaming-phase call.
// registering a duplicate metric.
let elapsed_compute = metrics.baseline_metrics.elapsed_compute().clone();
let elapsed_compute_for_stream = elapsed_compute.clone();

let column = query.column.ok_or(DataFusionError::Execution(format!(
"column not set for MatchQuery {}",
Expand All @@ -1024,9 +1017,6 @@ impl ExecutionPlan for FlatMatchQueryExec {
document_input(self.unindexed_input.execute(partition, context)?, &column)?;

let stream = stream::once(async move {
// Time the one-shot setup (load segments / open indices / build
// scorer / acquire tokenizer) and attribute it to elapsed_compute.
let setup_start = std::time::Instant::now();
let segments = match preset_segments {
Some(segments) => Some(segments),
None => load_segments(&ds, &column).await?,
Expand Down Expand Up @@ -1061,7 +1051,6 @@ impl ExecutionPlan for FlatMatchQueryExec {
preset_base_scorer.map(|s| (*s).clone()),
),
};
elapsed_compute.add_duration(setup_start.elapsed());

flat_bm25_search_stream_with_metrics(
unindexed_input,
Expand All @@ -1070,7 +1059,7 @@ impl ExecutionPlan for FlatMatchQueryExec {
tokenizer,
base_scorer,
target_batch_size,
Some(elapsed_compute_for_stream),
Some(elapsed_compute),
)
.await
})
Expand Down
52 changes: 24 additions & 28 deletions rust/lance/src/io/exec/knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ use crate::{Error, Result};
use lance_arrow::*;

use super::utils::{
FilteredRowIdsToPrefilter, IndexMetrics, InstrumentedChildInputStream, PreFilterSource,
SelectionVectorToPrefilter,
FilteredRowIdsToPrefilter, IndexMetrics, PreFilterSource, SelectionVectorToPrefilter,
};

pub struct AnnPartitionMetrics {
Expand Down Expand Up @@ -253,43 +252,35 @@ impl ExecutionPlan for KNNVectorDistanceExec {
context: Arc<datafusion::execution::context::TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;
let input_schema = input_stream.schema();
let key = self.query.clone();
let column = self.column.clone();
let dt = self.distance_type;
let schema = self.schema();

// Empty batches don't have a vector column to score; filter them out
// before reaching the helper so the transform always sees real work.
let filtered_input = Box::pin(RecordBatchStreamAdapter::new(
input_schema,
input_stream.try_filter(|batch| future::ready(batch.num_rows() > 0)),
)) as SendableRecordBatchStream;

// Mirror of the helper's elapsed_compute counter; used to attribute
// wall-clock from the spawn_blocking distance kernel back onto the
// node's `elapsed_compute` metric.
let elapsed_compute = BaselineMetrics::new(&self.metrics, partition)
.elapsed_compute()
.clone();

let stream = InstrumentedChildInputStream::new(
filtered_input,
schema,
move |batch| {
// before the transform so it always sees real work.
let filtered_input = input_stream.try_filter(|batch| future::ready(batch.num_rows() > 0));

let baseline = BaselineMetrics::new(&self.metrics, partition);
let elapsed_compute = baseline.elapsed_compute().clone();

let stream = filtered_input
.map(move |batch_result| {
let key = key.clone();
let column = column.clone();
let elapsed_compute = elapsed_compute.clone();
async move {
let batch = batch_result?;
// Time around the .await to capture the spawn_blocking
// distance work, which otherwise runs while this future is
// Pending and is missed by the helper's own poll timer.
let start = std::time::Instant::now();
// Pending and is missed by a poll-time timer.
let start = Instant::now();
let batch = compute_distance(key, dt, &column, batch)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
elapsed_compute.add_duration(start.elapsed());

let _t = elapsed_compute.timer();
let distances = batch[DIST_COL].as_primitive::<Float32Type>();
let mask = BooleanArray::from_iter(
distances
Expand All @@ -299,12 +290,17 @@ impl ExecutionPlan for KNNVectorDistanceExec {
arrow::compute::filter_record_batch(&batch, &mask)
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
}
},
get_num_compute_intensive_cpus(),
partition,
&self.metrics,
);
Ok(Box::pin(stream) as SendableRecordBatchStream)
})
.buffer_unordered(get_num_compute_intensive_cpus());

let stream = stream.map(move |batch| {
let poll = baseline.record_poll(std::task::Poll::Ready(Some(batch)));
match poll {
std::task::Poll::Ready(Some(b)) => b,
_ => unreachable!("record_poll preserves Ready(Some) input"),
}
});
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}

fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
Expand Down
64 changes: 35 additions & 29 deletions rust/lance/src/io/exec/rowids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use datafusion::common::ColumnStatistics;
use datafusion::common::stats::Precision;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::Statistics;
Expand All @@ -29,8 +29,6 @@ use crate::Dataset;
use crate::dataset::rowids::get_row_id_index;
use crate::utils::future::SharedPrerequisite;

use super::utils::InstrumentedChildInputStream;

/// Add a `_rowaddr` column to a stream of record batches that have a `_rowid`.
///
/// It's generally more efficient to scan the `_rowaddr` column, but this can be
Expand Down Expand Up @@ -191,33 +189,41 @@ impl AddRowAddrExec {
let rowid_pos = self.rowid_pos;
let rowaddr_pos = self.rowaddr_pos;
let output_schema = self.output_schema.clone();
let stream = InstrumentedChildInputStream::new(
input_stream,
let baseline = BaselineMetrics::new(&self.metrics, partition);
let elapsed_compute = baseline.elapsed_compute().clone();
let stream = input_stream.then(move |batch_result| {
let output_schema = output_schema.clone();
let index_prereq = index_prereq.clone();
let elapsed_compute = elapsed_compute.clone();
async move {
let batch = batch_result?;
index_prereq.wait_ready().await?;
let row_id_index = index_prereq.get_ready();
let index_ref = row_id_index.as_deref();

let _t = elapsed_compute.timer();
let row_addr = Self::compute_row_addrs(batch.column(rowid_pos), index_ref)?;

let mut columns = Vec::with_capacity(batch.num_columns() + 1);
let existing_columns = batch.columns();
columns.extend_from_slice(&existing_columns[..rowaddr_pos]);
columns.push(row_addr);
columns.extend_from_slice(&existing_columns[rowaddr_pos..]);

Ok(RecordBatch::try_new(output_schema.clone(), columns)?)
}
});
let stream = stream.map(move |batch| {
let poll = baseline.record_poll(std::task::Poll::Ready(Some(batch)));
match poll {
std::task::Poll::Ready(Some(b)) => b,
_ => unreachable!("record_poll preserves Ready(Some) input"),
}
});
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.output_schema.clone(),
move |batch| {
let output_schema = output_schema.clone();
let index_prereq = index_prereq.clone();
async move {
index_prereq.wait_ready().await?;
let row_id_index = index_prereq.get_ready();
let index_ref = row_id_index.as_deref();

let row_addr = Self::compute_row_addrs(batch.column(rowid_pos), index_ref)?;

let mut columns = Vec::with_capacity(batch.num_columns() + 1);
let existing_columns = batch.columns();
columns.extend_from_slice(&existing_columns[..rowaddr_pos]);
columns.push(row_addr);
columns.extend_from_slice(&existing_columns[rowaddr_pos..]);

Ok(RecordBatch::try_new(output_schema.clone(), columns)?)
}
},
1,
partition,
&self.metrics,
);
Ok(Box::pin(stream))
stream,
)))
}
}

Expand Down
Loading
Loading