Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/src/ai/ambient_agents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub mod task;
pub mod telemetry;

pub use task::{
cancel_task_with_toast, AgentConfigSnapshot, AgentSource, AmbientAgentTask,
AmbientAgentTaskState, TaskStatusMessage,
cancel_task_silently, cancel_task_with_toast, AgentConfigSnapshot, AgentSource,
AmbientAgentTask, AmbientAgentTaskState, TaskStatusMessage,
};
pub const OUT_OF_CREDITS_TASK_FAILURE_MESSAGE: &str =
"Out of credits. Upgrade your Warp plan to continue running cloud agents.";
Expand Down
13 changes: 13 additions & 0 deletions app/src/ai/ambient_agents/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,16 @@ pub fn cancel_task_with_toast<V: View>(task_id: AmbientAgentTaskId, ctx: &mut Vi
},
);
}

/// Cancel an ambient agent task without surfacing a toast to the user.
pub fn cancel_task_silently<V: View>(task_id: AmbientAgentTaskId, ctx: &mut ViewContext<V>) {
let ai_client = ServerApiProvider::handle(ctx).as_ref(ctx).get_ai_client();
ctx.spawn(
async move { ai_client.cancel_ambient_agent_task(&task_id).await },
move |_view, result, _| {
if let Err(e) = result {
log::error!("Failed to cancel task: {e}");
}
},
);
}
64 changes: 47 additions & 17 deletions app/src/ai/blocklist/agent_view/orchestration_pill_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,9 @@ pub enum OrchestrationPillBarAction {
OpenInNewPane(AIConversationId),
/// Menu item: open this child in a new tab.
OpenInNewTab(AIConversationId),
/// Menu item: stop the in-progress task. Currently hidden; wiring kept
/// for re-enabling later.
#[allow(dead_code)]
/// Menu item: stop the in-progress task.
Stop(AIConversationId),
/// Menu item: cancel and remove from local history. Currently hidden.
#[allow(dead_code)]
/// Menu item: cancel and remove from local history.
Kill(AIConversationId),
/// Set/clear which pill the user is hovering (drives the details card).
SetHoveredPill(Option<AIConversationId>),
Expand Down Expand Up @@ -225,6 +222,23 @@ impl Entity for OrchestrationPillBar {
}

impl OrchestrationPillBar {
fn overflow_menu_item(
label: &'static str,
icon: Icon,
action: OrchestrationPillBarAction,
hover_background: Fill,
icon_color: Option<Fill>,
) -> MenuItem<OrchestrationPillBarAction> {
let mut fields = MenuItemFields::new(label)
.with_icon(icon)
.with_override_hover_background_color(hover_background)
.with_on_select_action(action);
if let Some(color) = icon_color {
fields = fields.with_override_icon_color(color);
}
MenuItem::Item(fields)
}

pub fn new(
agent_view_controller: ModelHandle<AgentViewController>,
ctx: &mut ViewContext<Self>,
Expand Down Expand Up @@ -302,16 +316,17 @@ impl OrchestrationPillBar {
let appearance = Appearance::as_ref(ctx);
let theme = appearance.theme();
let hover_background: Fill = internal_colors::neutral_4(theme).into();

let item = |label: &'static str,
icon: Icon,
action: OrchestrationPillBarAction|
-> MenuItem<OrchestrationPillBarAction> {
MenuItem::Item(
MenuItemFields::new(label)
.with_icon(icon)
.with_override_hover_background_color(hover_background)
.with_on_select_action(action),
let item = |label, icon, action| {
Self::overflow_menu_item(label, icon, action, hover_background, None)
};
let destructive_color: Fill = theme.ansi_fg_red().into();
let destructive_item = |label, icon, action| {
Self::overflow_menu_item(
label,
icon,
action,
hover_background,
Some(destructive_color),
)
};

Expand All @@ -322,8 +337,7 @@ impl OrchestrationPillBar {
let is_open_elsewhere =
is_conversation_open_in_other_visible_view(conversation_id, self_terminal_view_id, ctx);

// Stop / Kill items intentionally omitted (wiring still in place).
let items = if is_open_elsewhere {
let mut items = if is_open_elsewhere {
vec![item(
"Focus pane",
Icon::ArrowSplit,
Expand All @@ -343,6 +357,22 @@ impl OrchestrationPillBar {
),
]
};
let is_in_progress = BlocklistAIHistoryModel::as_ref(ctx)
.conversation(&conversation_id)
.is_some_and(|conversation| conversation.status().is_in_progress());
items.push(MenuItem::Separator);
if is_in_progress {
items.push(destructive_item(
"Stop agent",
Icon::StopFilled,
OrchestrationPillBarAction::Stop(conversation_id),
));
}
items.push(destructive_item(
"Kill agent",
Icon::X,
OrchestrationPillBarAction::Kill(conversation_id),
));

self.menu.update(ctx, |menu, ctx| {
menu.set_items(items, ctx);
Expand Down
9 changes: 9 additions & 0 deletions app/src/ai/blocklist/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,15 @@ impl BlocklistAIController {
.try_cancel_stream(stream_id, reason, ctx)
}

pub fn has_active_stream_for_conversation(
&self,
conversation_id: AIConversationId,
app: &AppContext,
) -> bool {
self.in_flight_response_streams
.has_active_stream_for_conversation(conversation_id, app)
}

/// Cancels 'progress' for the active conversation if there is one:
/// * If there is an in-flight request, cancels it.
/// * Else, if the request finished, but actions from the response are pending or mid-execution, cancels all of them.
Expand Down
70 changes: 61 additions & 9 deletions app/src/ai/blocklist/orchestration_event_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::server::server_api::{ServerApi, ServerApiProvider};
use anyhow::anyhow;
use async_trait::async_trait;
use futures::channel::mpsc;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
Expand All @@ -39,6 +39,8 @@ const RESTORE_FETCH_BACKOFF_STEPS: &[u64] = &[1, 2, 5, 10];
const RESTORE_FETCH_PERMANENT_BACKOFF_STEPS: &[u64] = &[30];
/// How often (milliseconds) the drain timer checks for SSE events.
const SSE_DRAIN_INTERVAL_MS: u64 = 500;
/// Cap killed-run tombstones while keeping normal sessions well below the limit.
const MAX_KILLED_RUN_IDS: usize = 1024;

/// Per-event item delivered from the SSE background task to the entity.
struct SseStreamItem {
Expand Down Expand Up @@ -188,6 +190,9 @@ pub struct OrchestrationEventStreamer {
/// Monotonic counter for wake-only listener generations. Ensures stale
/// callbacks from replaced listeners are discarded.
next_wake_generation: u64,
/// Run IDs killed locally; kept briefly to drop late server events.
killed_run_ids: HashSet<String>,
killed_run_id_order: VecDeque<String>,
}

pub enum OrchestrationEventStreamerEvent {
Expand Down Expand Up @@ -215,6 +220,8 @@ impl OrchestrationEventStreamer {
streams: HashMap::new(),
next_sse_generation: 0,
next_wake_generation: 0,
killed_run_ids: HashSet::new(),
killed_run_id_order: VecDeque::new(),
}
}

Expand All @@ -237,11 +244,41 @@ impl OrchestrationEventStreamer {
streams: HashMap::new(),
next_sse_generation: 0,
next_wake_generation: 0,
killed_run_ids: HashSet::new(),
killed_run_id_order: VecDeque::new(),
}
}

// ---- Public consumer registry API ---------------------------------

/// Tombstone a killed run so late SSE events cannot resurrect it.
pub fn mark_conversation_killed(
&mut self,
conversation_id: AIConversationId,
ctx: &mut ModelContext<Self>,
) {
let Some(run_id) = self.self_run_id(conversation_id, ctx) else {
log::info!("mark_conversation_killed: conversation {conversation_id:?} has no run_id");
return;
};
log::info!(
"Marking orchestration run as killed: conversation_id={conversation_id:?} run_id={run_id}"
);
self.remember_killed_run_id(run_id);
}

fn remember_killed_run_id(&mut self, run_id: String) {
if self.killed_run_ids.insert(run_id.clone()) {
self.killed_run_id_order.push_back(run_id);
}
while self.killed_run_ids.len() > MAX_KILLED_RUN_IDS {
let Some(evicted_run_id) = self.killed_run_id_order.pop_front() else {
break;
};
self.killed_run_ids.remove(&evicted_run_id);
}
}

/// Register a consumer for a conversation. Re-evaluates eligibility
/// and opens the SSE connection if the conversation is newly
/// eligible. Idempotent: re-registering an existing consumer is a
Expand Down Expand Up @@ -578,9 +615,6 @@ impl OrchestrationEventStreamer {
}
}

// Prune the removed conversation's run_id from every other
// tracked conversation's watched set, then re-evaluate eligibility
// for the affected parents.
if let Some(run_id) = removed_run_id.as_deref() {
let mut affected = Vec::new();
for (other_id, stream) in self.streams.iter_mut() {
Expand Down Expand Up @@ -1230,8 +1264,8 @@ impl OrchestrationEventStreamer {
conversation_id: AIConversationId,
self_run_id: &str,
previous_cursor: i64,
events: Vec<AgentRunEvent>,
messages: Vec<ReceivedMessageInput>,
mut events: Vec<AgentRunEvent>,
mut messages: Vec<ReceivedMessageInput>,
ctx: &mut ModelContext<Self>,
) {
let max_seq = events
Expand All @@ -1244,9 +1278,8 @@ impl OrchestrationEventStreamer {
.or_default()
.event_cursor = max_seq;

// Persist the cursor to SQLite so that after a restart we can
// resume event delivery from this sequence number without
// re-delivering events the parent has already acted on.
// Advance the cursor before filtering so dropped killed-run events
// are not replayed later.
BlocklistAIHistoryModel::handle(ctx).update(ctx, |model, ctx| {
model.update_event_sequence(conversation_id, max_seq, ctx);
});
Expand Down Expand Up @@ -1278,6 +1311,25 @@ impl OrchestrationEventStreamer {
);
}

if !self.killed_run_ids.is_empty() {
let dropped_message_ids: HashSet<String> = events
.iter()
.filter(|event| self.killed_run_ids.contains(&event.run_id))
.filter_map(|event| event.ref_id.clone())
.collect();
let event_count_before = events.len();
events.retain(|event| !self.killed_run_ids.contains(&event.run_id));
messages.retain(|message| {
!dropped_message_ids.contains(&message.message_id)
&& !self.killed_run_ids.contains(&message.sender_agent_id)
});
let dropped_event_count = event_count_before - events.len();
if dropped_event_count > 0 {
log::info!(
"Dropped {dropped_event_count} orchestration events for killed run IDs while handling {conversation_id:?}"
);
}
}
// Track message IDs for server-side mark_delivered calls.
let message_ids: Vec<String> = messages
.iter()
Expand Down
Loading
Loading