Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/src/ai/ambient_agents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub mod task;
pub mod telemetry;

pub use task::{
cancel_task_with_toast, AgentConfigSnapshot, AgentSource, AmbientAgentTask,
AmbientAgentTaskState, TaskStatusMessage,
cancel_task_silently, cancel_task_with_toast, AgentConfigSnapshot, AgentSource,
AmbientAgentTask, AmbientAgentTaskState, TaskStatusMessage,
};
pub const OUT_OF_CREDITS_TASK_FAILURE_MESSAGE: &str =
"Out of credits. Upgrade your Warp plan to continue running cloud agents.";
Expand Down
13 changes: 13 additions & 0 deletions app/src/ai/ambient_agents/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,16 @@ pub fn cancel_task_with_toast<V: View>(task_id: AmbientAgentTaskId, ctx: &mut Vi
},
);
}

/// Cancel an ambient agent task without surfacing a toast to the user.
pub fn cancel_task_silently<V: View>(task_id: AmbientAgentTaskId, ctx: &mut ViewContext<V>) {
let ai_client = ServerApiProvider::handle(ctx).as_ref(ctx).get_ai_client();
ctx.spawn(
async move { ai_client.cancel_ambient_agent_task(&task_id).await },
move |_view, result, _| {
if let Err(e) = result {
log::error!("Failed to cancel task: {e}");
}
},
);
}
64 changes: 47 additions & 17 deletions app/src/ai/blocklist/agent_view/orchestration_pill_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,9 @@ pub enum OrchestrationPillBarAction {
OpenInNewPane(AIConversationId),
/// Menu item: open this child in a new tab.
OpenInNewTab(AIConversationId),
/// Menu item: stop the in-progress task. Currently hidden; wiring kept
/// for re-enabling later.
#[allow(dead_code)]
/// Menu item: stop the in-progress task.
Stop(AIConversationId),
/// Menu item: cancel and remove from local history. Currently hidden.
#[allow(dead_code)]
/// Menu item: cancel and remove from local history.
Kill(AIConversationId),
/// Set/clear which pill the user is hovering (drives the details card).
SetHoveredPill(Option<AIConversationId>),
Expand Down Expand Up @@ -191,6 +188,23 @@ impl Entity for OrchestrationPillBar {
}

impl OrchestrationPillBar {
fn overflow_menu_item(
label: &'static str,
icon: Icon,
action: OrchestrationPillBarAction,
hover_background: Fill,
icon_color: Option<Fill>,
) -> MenuItem<OrchestrationPillBarAction> {
let mut fields = MenuItemFields::new(label)
.with_icon(icon)
.with_override_hover_background_color(hover_background)
.with_on_select_action(action);
if let Some(color) = icon_color {
fields = fields.with_override_icon_color(color);
}
MenuItem::Item(fields)
}

pub fn new(
agent_view_controller: ModelHandle<AgentViewController>,
ctx: &mut ViewContext<Self>,
Expand Down Expand Up @@ -268,16 +282,17 @@ impl OrchestrationPillBar {
let appearance = Appearance::as_ref(ctx);
let theme = appearance.theme();
let hover_background: Fill = internal_colors::neutral_4(theme).into();

let item = |label: &'static str,
icon: Icon,
action: OrchestrationPillBarAction|
-> MenuItem<OrchestrationPillBarAction> {
MenuItem::Item(
MenuItemFields::new(label)
.with_icon(icon)
.with_override_hover_background_color(hover_background)
.with_on_select_action(action),
let item = |label, icon, action| {
Self::overflow_menu_item(label, icon, action, hover_background, None)
};
let destructive_color: Fill = theme.ansi_fg_red().into();
let destructive_item = |label, icon, action| {
Self::overflow_menu_item(
label,
icon,
action,
hover_background,
Some(destructive_color),
)
};

Expand All @@ -288,8 +303,7 @@ impl OrchestrationPillBar {
let is_open_elsewhere =
is_conversation_open_in_other_visible_view(conversation_id, self_terminal_view_id, ctx);

// Stop / Kill items intentionally omitted (wiring still in place).
let items = if is_open_elsewhere {
let mut items = if is_open_elsewhere {
vec![item(
"Focus pane",
Icon::ArrowSplit,
Expand All @@ -309,6 +323,22 @@ impl OrchestrationPillBar {
),
]
};
let is_in_progress = BlocklistAIHistoryModel::as_ref(ctx)
.conversation(&conversation_id)
.is_some_and(|conversation| conversation.status().is_in_progress());
items.push(MenuItem::Separator);
if is_in_progress {
items.push(destructive_item(
"Stop agent",
Icon::StopFilled,
OrchestrationPillBarAction::Stop(conversation_id),
));
}
items.push(destructive_item(
"Kill agent",
Icon::X,
OrchestrationPillBarAction::Kill(conversation_id),
));

self.menu.update(ctx, |menu, ctx| {
menu.set_items(items, ctx);
Expand Down
9 changes: 9 additions & 0 deletions app/src/ai/blocklist/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,15 @@ impl BlocklistAIController {
.try_cancel_stream(stream_id, reason, ctx)
}

pub fn has_active_stream_for_conversation(
&self,
conversation_id: AIConversationId,
app: &AppContext,
) -> bool {
self.in_flight_response_streams
.has_active_stream_for_conversation(conversation_id, app)
}

/// Cancels 'progress' for the active conversation if there is one:
/// * If there is an in-flight request, cancels it.
/// * Else, if the request finished, but actions from the response are pending or mid-execution, cancels all of them.
Expand Down
70 changes: 61 additions & 9 deletions app/src/ai/blocklist/orchestration_event_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::server::server_api::{ServerApi, ServerApiProvider};
use anyhow::anyhow;
use async_trait::async_trait;
use futures::channel::mpsc;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
Expand All @@ -39,6 +39,8 @@ const RESTORE_FETCH_BACKOFF_STEPS: &[u64] = &[1, 2, 5, 10];
const RESTORE_FETCH_PERMANENT_BACKOFF_STEPS: &[u64] = &[30];
/// How often (milliseconds) the drain timer checks for SSE events.
const SSE_DRAIN_INTERVAL_MS: u64 = 500;
/// Cap killed-run tombstones while keeping normal sessions well below the limit.
const MAX_KILLED_RUN_IDS: usize = 1024;

/// Per-event item delivered from the SSE background task to the entity.
struct SseStreamItem {
Expand Down Expand Up @@ -188,6 +190,9 @@ pub struct OrchestrationEventStreamer {
/// Monotonic counter for wake-only listener generations. Ensures stale
/// callbacks from replaced listeners are discarded.
next_wake_generation: u64,
/// Run IDs killed locally; kept briefly to drop late server events.
killed_run_ids: HashSet<String>,
killed_run_id_order: VecDeque<String>,
}

pub enum OrchestrationEventStreamerEvent {
Expand Down Expand Up @@ -215,6 +220,8 @@ impl OrchestrationEventStreamer {
streams: HashMap::new(),
next_sse_generation: 0,
next_wake_generation: 0,
killed_run_ids: HashSet::new(),
killed_run_id_order: VecDeque::new(),
}
}

Expand All @@ -237,11 +244,41 @@ impl OrchestrationEventStreamer {
streams: HashMap::new(),
next_sse_generation: 0,
next_wake_generation: 0,
killed_run_ids: HashSet::new(),
killed_run_id_order: VecDeque::new(),
}
}

// ---- Public consumer registry API ---------------------------------

/// Tombstone a killed run so late SSE events cannot resurrect it.
pub fn mark_conversation_killed(
&mut self,
conversation_id: AIConversationId,
ctx: &mut ModelContext<Self>,
) {
let Some(run_id) = self.self_run_id(conversation_id, ctx) else {
log::info!("mark_conversation_killed: conversation {conversation_id:?} has no run_id");
return;
};
log::info!(
"Marking orchestration run as killed: conversation_id={conversation_id:?} run_id={run_id}"
);
self.remember_killed_run_id(run_id);
}

fn remember_killed_run_id(&mut self, run_id: String) {
if self.killed_run_ids.insert(run_id.clone()) {
self.killed_run_id_order.push_back(run_id);
}
while self.killed_run_ids.len() > MAX_KILLED_RUN_IDS {
let Some(evicted_run_id) = self.killed_run_id_order.pop_front() else {
break;
};
self.killed_run_ids.remove(&evicted_run_id);
}
}

/// Register a consumer for a conversation. Re-evaluates eligibility
/// and opens the SSE connection if the conversation is newly
/// eligible. Idempotent: re-registering an existing consumer is a
Expand Down Expand Up @@ -578,9 +615,6 @@ impl OrchestrationEventStreamer {
}
}

// Prune the removed conversation's run_id from every other
// tracked conversation's watched set, then re-evaluate eligibility
// for the affected parents.
if let Some(run_id) = removed_run_id.as_deref() {
let mut affected = Vec::new();
for (other_id, stream) in self.streams.iter_mut() {
Expand Down Expand Up @@ -1230,8 +1264,8 @@ impl OrchestrationEventStreamer {
conversation_id: AIConversationId,
self_run_id: &str,
previous_cursor: i64,
events: Vec<AgentRunEvent>,
messages: Vec<ReceivedMessageInput>,
mut events: Vec<AgentRunEvent>,
mut messages: Vec<ReceivedMessageInput>,
ctx: &mut ModelContext<Self>,
) {
let max_seq = events
Expand All @@ -1244,9 +1278,8 @@ impl OrchestrationEventStreamer {
.or_default()
.event_cursor = max_seq;

// Persist the cursor to SQLite so that after a restart we can
// resume event delivery from this sequence number without
// re-delivering events the parent has already acted on.
// Advance the cursor before filtering so dropped killed-run events
// are not replayed later.
BlocklistAIHistoryModel::handle(ctx).update(ctx, |model, ctx| {
model.update_event_sequence(conversation_id, max_seq, ctx);
});
Expand Down Expand Up @@ -1278,6 +1311,25 @@ impl OrchestrationEventStreamer {
);
}

if !self.killed_run_ids.is_empty() {
let dropped_message_ids: HashSet<String> = events
.iter()
.filter(|event| self.killed_run_ids.contains(&event.run_id))
.filter_map(|event| event.ref_id.clone())
.collect();
let event_count_before = events.len();
events.retain(|event| !self.killed_run_ids.contains(&event.run_id));
messages.retain(|message| {
!dropped_message_ids.contains(&message.message_id)
&& !self.killed_run_ids.contains(&message.sender_agent_id)
});
let dropped_event_count = event_count_before - events.len();
if dropped_event_count > 0 {
log::info!(
"Dropped {dropped_event_count} orchestration events for killed run IDs while handling {conversation_id:?}"
);
}
}
// Track message IDs for server-side mark_delivered calls.
let message_ids: Vec<String> = messages
.iter()
Expand Down
Loading
Loading