Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
567 changes: 541 additions & 26 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ pub(crate) struct InlineAggregateStream {

batch_size: usize,

/// Per-partition limit on the number of emitted groups, see [`InlineAggregateExec::limit`]
limit: Option<usize>,

groups_emitted: usize,

exec_state: ExecutionState,

input_done: bool,
Expand Down Expand Up @@ -100,6 +105,8 @@ impl InlineAggregateStream {
group_by: agg_group_by,
exec_state,
batch_size,
limit: agg.limit(),
groups_emitted: 0,
current_group_indices,
group_values,
input_done: false,
Expand Down Expand Up @@ -175,38 +182,45 @@ impl Stream for InlineAggregateStream {
loop {
match &self.exec_state {
ExecutionState::ReadingInput => {
// All needed groups are emitted, skip the rest of the input
if self.limit_reached() {
self.exec_state = ExecutionState::Done;
continue;
}

// Drain groups accumulated beyond the emit threshold (a single input batch
// can bring many) before reading further input
match self.emit_early_if_ready() {
Ok(Some(batch)) => {
self.exec_state = ExecutionState::ProducingOutput(batch);
continue;
}
Ok(None) => {
// Not enough groups, read further
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
}

match ready!(self.input.poll_next_unpin(cx)) {
// New input batch to aggregate
// New input batch to aggregate; emitting happens at the top of the loop
Some(Ok(batch)) => {
// Aggregate the batch
if let Err(e) = self.group_aggregate_batch(batch) {
return Poll::Ready(Some(Err(e)));
}

// Try to emit a batch if we have enough groups
match self.emit_early_if_ready() {
Ok(Some(batch)) => {
self.exec_state = ExecutionState::ProducingOutput(batch);
}
Ok(None) => {
// Not enough groups yet, continue reading
}
Err(e) => {
return Poll::Ready(Some(Err(e)));
}
}
}

// Error from input stream
Some(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}

// Input stream exhausted - emit all remaining groups
// Input stream exhausted - emit the remaining groups, up to the limit
None => {
self.input_done = true;

match self.emit(EmitTo::All) {
match self.emit_remaining() {
Ok(Some(batch)) => {
self.exec_state = ExecutionState::ProducingOutput(batch);
}
Expand Down Expand Up @@ -244,9 +258,6 @@ impl Stream for InlineAggregateStream {
}

impl InlineAggregateStream {
/// Emit groups based on EmitTo strategy.
///
/// Returns None if there are no groups to emit.
/// Emit groups based on EmitTo strategy.
///
/// Returns None if there are no groups to emit.
Expand Down Expand Up @@ -283,25 +294,61 @@ impl InlineAggregateStream {
Ok(Some(batch))
}

/// Check if we have enough groups to emit a batch, keeping the last (potentially incomplete) group.
///
/// For sorted aggregation, we emit batches of size batch_size when we have accumulated
/// more than batch_size groups. We always keep the last group as it may continue in the next input batch.
fn should_emit_early(&self) -> bool {
// Need at least (batch_size + 1) groups to emit batch_size and keep 1
self.group_values.len() > self.batch_size
fn limit_reached(&self) -> bool {
self.limit.is_some_and(|limit| self.groups_emitted >= limit)
}

/// How many groups to emit in the next early batch: full batches until the limit (if any)
/// leaves fewer groups to emit.
fn emit_early_threshold(&self) -> usize {
match self.limit {
Some(limit) => self
.batch_size
.min(limit.saturating_sub(self.groups_emitted)),
None => self.batch_size,
}
}

/// Emit a batch of groups if we have enough accumulated, keeping the last group.
///
/// For sorted aggregation, we emit when we have accumulated more than threshold groups: the
/// last group is always kept as it may continue in the next input batch, so only closed
/// groups are emitted.
///
/// Returns Some(batch) if emitted, None otherwise.
fn emit_early_if_ready(&mut self) -> DFResult<Option<RecordBatch>> {
if !self.should_emit_early() {
let threshold = self.emit_early_threshold();
// Need at least (threshold + 1) groups to emit threshold closed groups and keep 1.
// The threshold == 0 check is defensive: the poll loop checks limit_reached() before
// calling this, so the threshold is at least 1 there.
if threshold == 0 || self.group_values.len() <= threshold {
return Ok(None);
}

// Emit exactly batch_size groups, keeping the rest (including last incomplete group)
self.emit(EmitTo::First(self.batch_size))
let batch = self.emit(EmitTo::First(threshold))?;
self.groups_emitted += threshold;
Ok(batch)
}
Comment thread
claude[bot] marked this conversation as resolved.

/// Emit the groups left at the end of the input: all of them are closed at this point, but
/// no more than the limit allows.
fn emit_remaining(&mut self) -> DFResult<Option<RecordBatch>> {
let len = self.group_values.len();
let emit_count = match self.limit {
Some(limit) => len.min(limit.saturating_sub(self.groups_emitted)),
None => len,
};
if emit_count == 0 {
return Ok(None);
}
let emit_to = if emit_count < len {
EmitTo::First(emit_count)
} else {
EmitTo::All
};
let batch = self.emit(emit_to)?;
self.groups_emitted += emit_count;
Ok(batch)
}

fn group_aggregate_batch(&mut self, batch: RecordBatch) -> DFResult<()> {
Expand Down
Loading
Loading