Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions crates/ark/src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ use harp::utils::r_typeof;
use harp::CONSOLE_THREAD_ID;
use libr::R_BaseNamespace;
use libr::R_ProcessEvents;
use libr::R_RunPendingFinalizers;
use libr::Rf_ScalarInteger;
use libr::Rf_error;
use libr::Rf_findVarInFrame;
Expand Down Expand Up @@ -128,8 +127,7 @@ use console_filter::ConsoleFilter;
pub use console_repl::catching_panics;
pub(crate) use console_repl::console_inputs;
pub(crate) use console_repl::r_busy;
#[cfg(unix)]
pub(crate) use console_repl::r_polled_events;
pub(crate) use console_repl::r_process_events;
pub(crate) use console_repl::r_read_console;
pub(crate) use console_repl::r_show_message;
pub(crate) use console_repl::r_suicide;
Expand Down Expand Up @@ -235,7 +233,6 @@ pub struct Console {
autoprint_output: String,

/// Channel to send and receive tasks from `QueuedRTask`s
tasks_interrupt_rx: Receiver<QueuedRTask>,
tasks_idle_rx: Receiver<QueuedRTask>,
tasks_idle_any_rx: Receiver<QueuedRTask>,
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo, Option<String>)>,
Expand Down
214 changes: 107 additions & 107 deletions crates/ark/src/console/console_repl.rs

Large diffs are not rendered by default.

43 changes: 33 additions & 10 deletions crates/ark/src/lsp/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use amalthea::comm::server_comm::ServerStartMessage;
use amalthea::comm::server_comm::ServerStartedMessage;
Expand Down Expand Up @@ -85,7 +86,7 @@ macro_rules! cast_response {
},
RequestResponse::Crashed(err) => {
// Notify user that the LSP has crashed and is no longer active
report_crash();
report_crash($self.client()).await;

// The backtrace is reported via `err` and eventually shows up
// in the LSP logs on the client side
Expand All @@ -99,21 +100,35 @@ macro_rules! cast_response {
}};
}

fn report_crash() {
/// Send via `request::ShowMessageRequest` not `notification::ShowMessage` so that we can
/// ensure that the message has been received on the frontend side. We are about to shut
/// the LSP down, and sending out a fire-and-forget notification often won't get sent out
/// before shutdown occurs. The request returns control to us when the user acknowledges
/// the message. It doesn't matter if that takes awhile because we shut down right after,
/// and we've already flipped the `LSP_HAS_CRASHED` global flag.
///
/// Bounded by a timeout so a disconnected or non-compliant frontend can't block LSP
/// shutdown.
async fn report_crash(client: &Client) {
let user_message = concat!(
"The R language server has crashed and has been disabled. ",
"Smart features such as completions will no longer work in this session. ",
"Please report this crash to https://github.com/posit-dev/positron/issues ",
"with full logs (see https://positron.posit.co/troubleshooting.html#python-and-r-logs)."
);

// NOTE: This is a legit use of interrupt-time task. No R access here, and
// we need to go through Console since it owns the UI comm.
r_task(|| {
if let Some(ui) = Console::get().ui_comm() {
ui.show_message(String::from(user_message));
}
let request = client.send_request::<request::ShowMessageRequest>(ShowMessageRequestParams {
typ: MessageType::ERROR,
message: String::from(user_message),
actions: None,
});
match tokio::time::timeout(Duration::from_secs(5), request).await {
Ok(result) => {
result.log_err();
},
Err(_) => {
log::warn!("Timed out waiting for frontend to acknowledge LSP crash notification");
},
}
Comment on lines -110 to +130
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report_crash() moves off needing to use the ui comm!

}

#[derive(Debug)]
Expand Down Expand Up @@ -228,6 +243,9 @@ struct Backend {
/// Channel for communication with the main loop.
events_tx: TokioUnboundedSender<Event>,

/// Copy of the Client, for reporting crash messages.
client: Client,

/// Handle to main loop. Drop it to cancel the loop, all associated tasks,
/// and drop all owned state.
_main_loop: tokio::task::JoinSet<()>,
Expand Down Expand Up @@ -256,6 +274,10 @@ impl Backend {
.send(Event::Lsp(LspMessage::Notification(notif)))
.unwrap();
}

fn client(&self) -> &Client {
&self.client
}
}

#[tower_lsp::async_trait]
Expand Down Expand Up @@ -557,7 +579,7 @@ pub(crate) fn start_lsp(
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::<()>(1);

let init = |client: Client| {
let state = GlobalState::new(client, r_home, console_notification_tx);
let state = GlobalState::new(client.clone(), r_home, console_notification_tx);
let events_tx = state.events_tx();

// Start main loop and hold onto the handle that keeps it alive
Expand All @@ -579,6 +601,7 @@ pub(crate) fn start_lsp(
Backend {
shutdown_tx,
events_tx,
client,
_main_loop: main_loop,
}
};
Expand Down
119 changes: 47 additions & 72 deletions crates/ark/src/r_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ use crate::console::Console;
use crate::console::ConsoleOutputCapture;
use crate::fixtures::r_test_init;

/// Task channels for interrupt-time tasks
static INTERRUPT_TASKS: LazyLock<TaskChannels> = LazyLock::new(TaskChannels::new);

/// Task channels for idle-time tasks
/// Task channels for idle-time tasks (top-level only)
static IDLE_TASKS: LazyLock<TaskChannels> = LazyLock::new(TaskChannels::new);

/// Task channels for idle tasks that run at any idle prompt (top-level or browser)
/// Task channels for idle tasks that run at any idle prompt (top-level or browser, but
/// not input)
static IDLE_ANY_TASKS: LazyLock<TaskChannels> = LazyLock::new(TaskChannels::new);

// Compared to `futures::BoxFuture`, this doesn't require the future to be Send.
Expand Down Expand Up @@ -64,19 +62,11 @@ impl TaskChannels {
}
}

/// Returns receivers for interrupt, idle, and debug-idle tasks.
/// Returns receivers for idle and idle-any tasks.
/// Initializes the task channels if they haven't been initialized yet.
/// Can only be called once (intended for `Console` during init).
pub(crate) fn take_receivers() -> (
Receiver<QueuedRTask>,
Receiver<QueuedRTask>,
Receiver<QueuedRTask>,
) {
(
INTERRUPT_TASKS.take_rx(),
IDLE_TASKS.take_rx(),
IDLE_ANY_TASKS.take_rx(),
)
pub(crate) fn take_receivers() -> (Receiver<QueuedRTask>, Receiver<QueuedRTask>) {
(IDLE_TASKS.take_rx(), IDLE_ANY_TASKS.take_rx())
}

pub enum QueuedRTask {
Expand Down Expand Up @@ -148,7 +138,7 @@ impl std::task::Wake for RTaskWaker {
}

impl RTaskStartInfo {
pub(crate) fn new(idle: bool) -> Self {
pub(crate) fn new() -> Self {
let thread = std::thread::current();
let thread_id = thread.id();
let thread_name = thread
Expand All @@ -158,7 +148,7 @@ impl RTaskStartInfo {
.to_owned();

let start_time = std::time::Instant::now();
let span = tracing::trace_span!("R task", thread = thread_name, interrupt = !idle,);
let span = tracing::trace_span!("R task", thread = thread_name);

Self {
thread_id,
Expand Down Expand Up @@ -191,6 +181,9 @@ impl RTaskStartInfo {
// running, so borrowing is allowed even though we send it to another
// thread. See also `Crossbeam::thread::ScopedThreadBuilder` (from which
// `r_task()` is adapted) for a similar approach.
//
// `r_task()`s run via `IDLE_ANY_TASKS`, i.e. they run at top level, and at a debugger
// prompt (but notably not the input prompt as a way to reduce risk of reentrancy).

pub fn r_task<'env, F, T>(f: F) -> T
where
Expand Down Expand Up @@ -242,9 +235,9 @@ where
let task = QueuedRTask::Sync(RTaskSync {
fun: closure,
status_tx: Some(status_tx),
start_info: RTaskStartInfo::new(false),
start_info: RTaskStartInfo::new(),
});
INTERRUPT_TASKS.tx().send(task).unwrap();
IDLE_ANY_TASKS.tx().send(task).unwrap();

// Block until we get the signal that the task has started
let status = status_rx.recv().unwrap();
Expand Down Expand Up @@ -289,23 +282,18 @@ where

/// An async task to be run on the R thread.
///
/// Construct via `RTask::interrupt`, `RTask::idle`, or `RTask::idle_any_prompt`
/// when spawning from the R thread. Use the `Send` variants
/// (`RTask::send_interrupt`, etc.) when spawning from other threads.
/// Construct via [RTask::idle] or [RTask::idle_any_prompt] when spawning from the R
/// thread. Use the `Send` variants ([RTask::send_idle], etc.) when spawning from other
/// threads.
///
/// For idle modes, console output is automatically captured during the task's
/// execution via a `ConsoleOutputCapture` passed to the closure.
/// Console output is automatically captured during the task's execution via a
/// `ConsoleOutputCapture` passed to the closure.
pub(crate) enum RTask {
/// Run at the next interrupt check. Must be spawned from the R thread.
Interrupt(BoxFuture<'static, ()>),
/// Run when R is at a top-level idle prompt. Must be spawned from the R thread.
Idle(BoxFuture<'static, ()>),
/// Run when R is at any idle prompt (top-level or browser). Must be spawned
/// from the R thread.
IdleAnyPrompt(BoxFuture<'static, ()>),
/// Like `Interrupt`, but can be spawned from any thread. The constructor
/// enforces `Send` on the closure.
SendInterrupt(BoxFuture<'static, ()>),
/// Like `Idle`, but can be spawned from any thread. The constructor
/// enforces `Send` on the closure.
SendIdle(BoxFuture<'static, ()>),
Expand All @@ -315,14 +303,6 @@ pub(crate) enum RTask {
}

impl RTask {
pub(crate) fn interrupt<F, Fut>(fun: F) -> Self
where
F: FnOnce() -> Fut + 'static,
Fut: Future<Output = ()> + 'static,
{
RTask::Interrupt(Box::pin(fun()))
}

pub(crate) fn idle<F, Fut>(fun: F) -> Self
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static,
Expand All @@ -340,90 +320,85 @@ impl RTask {
RTask::IdleAnyPrompt(Self::pin_with_capture(fun))
}

fn pin_with_capture<F, Fut>(fun: F) -> BoxFuture<'static, ()>
pub(crate) fn send_idle<F, Fut>(fun: F) -> Self
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static,
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static + Send,
Fut: Future<Output = ()> + 'static,
{
Box::pin(async move {
let capture = if Console::is_initialized() {
Console::get_mut().start_capture()
} else {
// Unit tests run without a Console. The dummy capture is
// inert and doesn't interact with Console state.
debug_assert!(stdext::IS_TESTING);
ConsoleOutputCapture::dummy()
};
fun(capture).await
})
RTask::SendIdle(Self::pin_with_capture(fun))
}

pub(crate) fn send_interrupt<F, Fut>(fun: F) -> Self
/// For rare performance sensitive cases where you'd like to avoid the cost of
/// poking R options via [Self::pin_with_capture()] and you know you don't need
/// the safety of capturing output because you aren't running R code
pub(crate) fn send_idle_without_capture<F, Fut>(fun: F) -> Self
Comment on lines +331 to +334
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did end up wanting this for Drop in RThreadSafe

where
F: FnOnce() -> Fut + 'static + Send,
Fut: Future<Output = ()> + 'static,
{
RTask::SendInterrupt(Box::pin(fun()))
RTask::SendIdle(Box::pin(fun()))
}

pub(crate) fn send_idle<F, Fut>(fun: F) -> Self
pub(crate) fn send_idle_any_prompt<F, Fut>(fun: F) -> Self
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static + Send,
Fut: Future<Output = ()> + 'static,
{
RTask::SendIdle(Self::pin_with_capture(fun))
RTask::SendIdleAnyPrompt(Self::pin_with_capture(fun))
}

pub(crate) fn send_idle_any_prompt<F, Fut>(fun: F) -> Self
fn pin_with_capture<F, Fut>(fun: F) -> BoxFuture<'static, ()>
where
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static + Send,
F: FnOnce(ConsoleOutputCapture) -> Fut + 'static,
Fut: Future<Output = ()> + 'static,
{
RTask::SendIdleAnyPrompt(Self::pin_with_capture(fun))
Box::pin(async move {
let capture = if Console::is_initialized() {
Console::get_mut().start_capture()
} else {
// Unit tests run without a Console. The dummy capture is
// inert and doesn't interact with Console state.
debug_assert!(stdext::IS_TESTING);
ConsoleOutputCapture::dummy()
};
fun(capture).await
})
}
}

/// Spawn an async task on the R thread.
///
/// For `Send` variants (`RTask::send_interrupt`, etc.) this can be called from
/// For `Send` variants ([RTask::send_idle], etc.) this can be called from
/// any thread. Non-`Send` variants must be called from the R thread.
pub(crate) fn spawn(task: RTask) {
if stdext::IS_TESTING && !Console::is_initialized() {
let _lock = harp::fixtures::R_TEST_LOCK.lock();
let fut = match task {
RTask::Interrupt(fut) |
RTask::Idle(fut) |
RTask::IdleAnyPrompt(fut) |
RTask::SendInterrupt(fut) |
RTask::SendIdle(fut) |
RTask::SendIdleAnyPrompt(fut) => fut,
};
futures::executor::block_on(fut);
return;
}

let needs_r_thread = matches!(
task,
RTask::Interrupt(_) | RTask::Idle(_) | RTask::IdleAnyPrompt(_)
);
let needs_r_thread = matches!(task, RTask::Idle(_) | RTask::IdleAnyPrompt(_));
if needs_r_thread && !Console::on_main_thread() {
let thread = std::thread::current();
let name = thread.name().unwrap_or("<unnamed>");
panic!("`spawn()` must be called from the R thread, not thread '{name}'");
}

let (fut, tasks_tx, only_idle) = match task {
RTask::Interrupt(fut) | RTask::SendInterrupt(fut) => (fut, INTERRUPT_TASKS.tx(), false),
RTask::Idle(fut) | RTask::SendIdle(fut) => (fut, IDLE_TASKS.tx(), true),
RTask::IdleAnyPrompt(fut) | RTask::SendIdleAnyPrompt(fut) => {
(fut, IDLE_ANY_TASKS.tx(), true)
},
let (fut, tasks_tx) = match task {
RTask::Idle(fut) | RTask::SendIdle(fut) => (fut, IDLE_TASKS.tx()),
RTask::IdleAnyPrompt(fut) | RTask::SendIdleAnyPrompt(fut) => (fut, IDLE_ANY_TASKS.tx()),
};

let task = QueuedRTask::Async(RTaskAsync {
fut,
tasks_tx: tasks_tx.clone(),
start_info: RTaskStartInfo::new(only_idle),
start_info: RTaskStartInfo::new(),
});

tasks_tx.send(task).unwrap();
Expand Down
Loading
Loading