fix(terminals): decouple PTY lifecycle from view lifecycle (no kill on navigation)
Navigating (layout/tab switch) tore the xterm view down and called handle.close(), killing the backend PTY and cutting off running AIs. Now the view's cleanup only detaches; only an explicit user action kills a PTY. Backend: - PortablePtyAdapter: per-session scrollback ring buffer (~100KB, most recent) + re-subscribable fan-out broadcast replacing the single-take output_rx. Reader thread feeds both the ring buffer and current subscribers; on EOF it closes subscribers (streams end) while keeping scrollback for late re-attach. - PtyPort: new scrollback() method; subscribe_output is now re-subscribable (all impls + test fakes updated). - reattach_terminal IPC command: returns scrollback and re-wires a fresh output channel on the live session without re-spawning. - CloseRequested hook kills all live PTYs cleanly on app shutdown. - TerminalSessions::handles() to enumerate live sessions at shutdown. Frontend: - TerminalHandle.detach(); TerminalGateway/AgentGateway.reattach() + mocks. - TerminalView cleanup detaches (never close); on mount it re-attaches to a persisted session (repainting scrollback) instead of opening a new PTY. - LayoutGrid persists the cell's session id via setSession; AgentsPanel tracks per-agent session ids — both drive reattach-vs-open. Tests: ring buffer bounds to 100KB keeping newest bytes; scrollback retained; re-subscription delivers post-reattach output; TerminalView detaches (not closes) on unmount and reattaches with a known session; mock detach/reattach. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@ -12,10 +12,15 @@
|
||||
//! domain never sees an OS handle (ARCHITECTURE §4).
|
||||
//! - On [`spawn`](PtyPort::spawn) we open a PTY pair, spawn the command in the
|
||||
//! slave, then start **one reader thread** that pumps bytes from the master
|
||||
//! into an [`std::sync::mpsc`] channel. [`subscribe_output`](PtyPort::subscribe_output)
|
||||
//! hands back the receiver wrapped as the domain's blocking [`OutputStream`]
|
||||
//! iterator; the presentation layer drains it on its own thread and forwards
|
||||
//! chunks to the per-session Tauri channel (the `PtyBridge`).
|
||||
//! into a shared [`Broadcast`] hub. The hub does two things with every chunk:
|
||||
//! it appends to a bounded **scrollback ring buffer** (~100 KB, most recent
|
||||
//! bytes) and it fans the chunk out to every *currently subscribed* receiver.
|
||||
//! - [`subscribe_output`](PtyPort::subscribe_output) registers a fresh
|
||||
//! subscriber and returns its receiver wrapped as the domain's blocking
|
||||
//! [`OutputStream`] iterator. It is **re-subscribable**: after a view tears
|
||||
//! down (navigation / layout change) a new view can re-attach to the *same*
|
||||
//! live PTY by subscribing again and repainting the scrollback first — no
|
||||
//! re-spawn. [`scrollback`](PtyPort::scrollback) returns that retained buffer.
|
||||
//! - [`write`](PtyPort::write) / [`resize`](PtyPort::resize) act on the stored
|
||||
//! writer / master. [`kill`](PtyPort::kill) terminates the child, joins the
|
||||
//! reader thread, and returns the [`ExitStatus`].
|
||||
@ -30,9 +35,10 @@
|
||||
//! Unix-only assumption (no raw fds, no signals) so it should port as-is.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::{Read, Write};
|
||||
use std::sync::mpsc::{self, Receiver, Sender};
|
||||
use std::sync::Mutex;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@ -45,6 +51,68 @@ use domain::SessionId;
|
||||
/// Size of each read buffer pumped from the master PTY.
|
||||
const READ_BUF: usize = 8 * 1024;
|
||||
|
||||
/// Maximum number of bytes retained in a session's scrollback ring buffer
|
||||
/// (~100 KB, "the most recent output"). When the buffer would exceed this, the
|
||||
/// oldest bytes are dropped so a re-attaching view repaints recent history.
|
||||
const SCROLLBACK_CAP: usize = 100 * 1024;
|
||||
|
||||
/// The shared output hub of one PTY: a bounded scrollback ring buffer plus the
|
||||
/// set of currently-subscribed receivers. The reader thread feeds both; views
|
||||
/// subscribe and unsubscribe freely over the PTY's lifetime (re-attach support).
|
||||
///
|
||||
/// Each subscriber is a [`Sender`]; a send failing (receiver dropped because the
|
||||
/// view detached) prunes that subscriber on the next chunk. This is the
|
||||
/// fan-out/broadcast that replaces the old single-take `output_rx`.
|
||||
#[derive(Default)]
|
||||
struct Broadcast {
|
||||
/// Bounded ring buffer of the most recent output bytes.
|
||||
scrollback: VecDeque<u8>,
|
||||
/// Live subscribers; pruned lazily when their receiver is gone.
|
||||
subscribers: Vec<Sender<Vec<u8>>>,
|
||||
/// Set once the PTY hit EOF (process exited) — no more output will ever come.
|
||||
eof: bool,
|
||||
}
|
||||
|
||||
impl Broadcast {
|
||||
/// Appends a chunk to the scrollback (trimming to [`SCROLLBACK_CAP`]) and
|
||||
/// fans it out to every live subscriber, dropping any that have gone away.
|
||||
fn push(&mut self, chunk: &[u8]) {
|
||||
self.scrollback.extend(chunk.iter().copied());
|
||||
let overflow = self.scrollback.len().saturating_sub(SCROLLBACK_CAP);
|
||||
if overflow > 0 {
|
||||
self.scrollback.drain(0..overflow);
|
||||
}
|
||||
self.subscribers
|
||||
.retain(|tx| tx.send(chunk.to_vec()).is_ok());
|
||||
}
|
||||
|
||||
/// Registers a new subscriber, returning its receiver.
|
||||
///
|
||||
/// If the PTY already hit EOF, the returned stream is immediately closed
|
||||
/// (its sender is dropped) so a late re-attach to a finished session doesn't
|
||||
/// block forever waiting for output that will never come.
|
||||
fn subscribe(&mut self) -> Receiver<Vec<u8>> {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
if !self.eof {
|
||||
self.subscribers.push(tx);
|
||||
}
|
||||
rx
|
||||
}
|
||||
|
||||
/// Returns the retained scrollback as a contiguous byte vector.
|
||||
fn snapshot(&self) -> Vec<u8> {
|
||||
self.scrollback.iter().copied().collect()
|
||||
}
|
||||
|
||||
/// Drops every subscriber's sender so their output streams end (EOF). Called
|
||||
/// by the reader thread when the PTY hits EOF (process exit). The scrollback
|
||||
/// is preserved so a late re-attach can still repaint the final output.
|
||||
fn close_subscribers(&mut self) {
|
||||
self.eof = true;
|
||||
self.subscribers.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// A live PTY owned by the adapter.
|
||||
struct LivePty {
|
||||
/// Master side — used for resize.
|
||||
@ -53,8 +121,8 @@ struct LivePty {
|
||||
writer: Box<dyn Write + Send>,
|
||||
/// The spawned child process.
|
||||
child: Box<dyn Child + Send + Sync>,
|
||||
/// Receiver end of the output channel; taken once by `subscribe_output`.
|
||||
output_rx: Option<Receiver<Vec<u8>>>,
|
||||
/// Shared scrollback + subscriber hub, fed by the reader thread.
|
||||
output: Arc<Mutex<Broadcast>>,
|
||||
/// Handle of the reader thread, joined on kill.
|
||||
reader: Option<JoinHandle<()>>,
|
||||
}
|
||||
@ -124,22 +192,29 @@ impl PtyPort for PortablePtyAdapter {
|
||||
.try_clone_reader()
|
||||
.map_err(|e| PtyError::Io(e.to_string()))?;
|
||||
|
||||
// One reader thread per PTY pumps bytes into the output channel until EOF.
|
||||
let (tx, rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = mpsc::channel();
|
||||
// One reader thread per PTY pumps bytes into the broadcast hub until EOF.
|
||||
// The hub retains a scrollback ring buffer AND fans bytes out to every
|
||||
// current subscriber, so views can detach/re-attach without re-spawning.
|
||||
let output: Arc<Mutex<Broadcast>> = Arc::new(Mutex::new(Broadcast::default()));
|
||||
let output_for_reader = Arc::clone(&output);
|
||||
let reader_handle = std::thread::spawn(move || {
|
||||
let mut buf = [0u8; READ_BUF];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
// Receiver gone (session closed) → stop pumping.
|
||||
if tx.send(buf[..n].to_vec()).is_err() {
|
||||
break;
|
||||
if let Ok(mut hub) = output_for_reader.lock() {
|
||||
hub.push(&buf[..n]);
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
// EOF (process exited): end every attached stream by dropping its
|
||||
// sender, while preserving the scrollback for any late re-attach.
|
||||
if let Ok(mut hub) = output_for_reader.lock() {
|
||||
hub.close_subscribers();
|
||||
}
|
||||
});
|
||||
|
||||
// The PTY layer owns the handle identity: it mints a fresh session id and
|
||||
@ -153,7 +228,7 @@ impl PtyPort for PortablePtyAdapter {
|
||||
master: pair.master,
|
||||
writer,
|
||||
child,
|
||||
output_rx: Some(rx),
|
||||
output,
|
||||
reader: Some(reader_handle),
|
||||
};
|
||||
self.sessions
|
||||
@ -189,18 +264,33 @@ impl PtyPort for PortablePtyAdapter {
|
||||
}
|
||||
|
||||
fn subscribe_output(&self, handle: &PtyHandle) -> Result<OutputStream, PtyError> {
|
||||
let mut map = self
|
||||
let map = self
|
||||
.sessions
|
||||
.lock()
|
||||
.map_err(|_| PtyError::Io("pty registry poisoned".to_owned()))?;
|
||||
let live = map.get_mut(&handle.session_id).ok_or(PtyError::NotFound)?;
|
||||
let live = map.get(&handle.session_id).ok_or(PtyError::NotFound)?;
|
||||
let rx = live
|
||||
.output_rx
|
||||
.take()
|
||||
.ok_or_else(|| PtyError::Io("output already subscribed".to_owned()))?;
|
||||
.output
|
||||
.lock()
|
||||
.map_err(|_| PtyError::Io("pty output hub poisoned".to_owned()))?
|
||||
.subscribe();
|
||||
Ok(Box::new(rx.into_iter()))
|
||||
}
|
||||
|
||||
fn scrollback(&self, handle: &PtyHandle) -> Result<Vec<u8>, PtyError> {
|
||||
let map = self
|
||||
.sessions
|
||||
.lock()
|
||||
.map_err(|_| PtyError::Io("pty registry poisoned".to_owned()))?;
|
||||
let live = map.get(&handle.session_id).ok_or(PtyError::NotFound)?;
|
||||
let snapshot = live
|
||||
.output
|
||||
.lock()
|
||||
.map_err(|_| PtyError::Io("pty output hub poisoned".to_owned()))?
|
||||
.snapshot();
|
||||
Ok(snapshot)
|
||||
}
|
||||
|
||||
async fn kill(&self, handle: &PtyHandle) -> Result<ExitStatus, PtyError> {
|
||||
// Remove from the registry so the writer/master drop and the child is
|
||||
// fully owned here while we tear it down.
|
||||
@ -220,7 +310,8 @@ impl PtyPort for PortablePtyAdapter {
|
||||
.map_err(|e| PtyError::Io(e.to_string()))?;
|
||||
|
||||
// Dropping master/writer closes the PTY; the reader thread then sees EOF.
|
||||
drop(live.output_rx.take());
|
||||
// Dropping the broadcast hub drops every subscriber's sender, so any
|
||||
// still-attached view's output stream ends cleanly too.
|
||||
if let Some(reader) = live.reader.take() {
|
||||
let _ = reader.join();
|
||||
}
|
||||
|
||||
@ -113,25 +113,113 @@ async fn write_is_echoed_back_through_output_stream() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_output_twice_is_an_error() {
|
||||
async fn subscribe_output_is_re_subscribable_for_reattach() {
|
||||
// A live PTY can be subscribed to more than once over its lifetime: the
|
||||
// first view detaches (drops its stream), a second view re-attaches and
|
||||
// still receives subsequent output — the core of the no-kill navigation fix.
|
||||
let pty = PortablePtyAdapter::new();
|
||||
let handle = pty
|
||||
.spawn(sh_spec("sleep 0.2"), size())
|
||||
.spawn(sh_spec("cat"), size())
|
||||
.await
|
||||
.expect("spawn cat");
|
||||
|
||||
// First attachment: subscribe, observe an echo, then drop the stream
|
||||
// (simulating a view tearing down on navigation — NOT a kill).
|
||||
{
|
||||
let first = pty.subscribe_output(&handle).expect("first subscribe");
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let worker = thread::spawn(move || {
|
||||
let mut all = Vec::new();
|
||||
for chunk in first {
|
||||
all.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&all).contains("first-marker") {
|
||||
let _ = tx.send(());
|
||||
return; // drop the stream → detach
|
||||
}
|
||||
}
|
||||
});
|
||||
pty.write(&handle, b"first-marker\n").expect("write 1");
|
||||
rx.recv_timeout(TIMEOUT).expect("first view saw its marker");
|
||||
worker.join().expect("first worker joined");
|
||||
}
|
||||
|
||||
// Second attachment to the SAME live PTY (no re-spawn): must still receive
|
||||
// new output produced after re-subscription.
|
||||
let second = pty.subscribe_output(&handle).expect("re-subscribe");
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let worker = thread::spawn(move || {
|
||||
let mut all = Vec::new();
|
||||
for chunk in second {
|
||||
all.extend_from_slice(&chunk);
|
||||
if String::from_utf8_lossy(&all).contains("second-marker") {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
});
|
||||
pty.write(&handle, b"second-marker\n").expect("write 2");
|
||||
rx.recv_timeout(TIMEOUT)
|
||||
.expect("re-attached view saw new output");
|
||||
|
||||
pty.kill(&handle).await.expect("kill cat");
|
||||
worker.join().expect("second worker joined after kill");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn scrollback_retains_recent_output_for_repaint() {
|
||||
// After output is produced and the process exits, the scrollback still holds
|
||||
// the recent bytes so a re-attaching view can repaint them.
|
||||
let pty = PortablePtyAdapter::new();
|
||||
let handle = pty
|
||||
.spawn(sh_spec("printf scrollback-content"), size())
|
||||
.await
|
||||
.expect("spawn");
|
||||
|
||||
let first = pty.subscribe_output(&handle);
|
||||
assert!(first.is_ok(), "first subscribe succeeds");
|
||||
// Drain to EOF so all output has been pushed into the ring buffer.
|
||||
let stream = pty.subscribe_output(&handle).expect("subscribe");
|
||||
drain_with_timeout(stream, TIMEOUT);
|
||||
|
||||
let second = pty.subscribe_output(&handle);
|
||||
let sb = pty.scrollback(&handle).expect("scrollback readable");
|
||||
let text = String::from_utf8_lossy(&sb);
|
||||
assert!(
|
||||
second.is_err(),
|
||||
"second subscribe on the same session must error"
|
||||
text.contains("scrollback-content"),
|
||||
"scrollback should retain recent output, got {text:?}"
|
||||
);
|
||||
|
||||
// Drain the first stream so the reader thread can finish, then tidy up.
|
||||
let stream = first.unwrap();
|
||||
let _ = pty.kill(&handle).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn scrollback_is_bounded_to_cap_and_keeps_most_recent_bytes() {
|
||||
// Emit clearly more than 100 KB of deterministic output, then assert the
|
||||
// retained scrollback is bounded and ends with the most recent bytes.
|
||||
let pty = PortablePtyAdapter::new();
|
||||
// 5000 lines of "....END<n>" → well over 100 KB; the tail is the freshest.
|
||||
let script = "i=0; while [ $i -lt 5000 ]; do \
|
||||
printf 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA-%d\\n' $i; \
|
||||
i=$((i+1)); done; printf 'FINAL-LINE-MARKER'";
|
||||
let handle = pty.spawn(sh_spec(script), size()).await.expect("spawn");
|
||||
|
||||
let stream = pty.subscribe_output(&handle).expect("subscribe");
|
||||
drain_with_timeout(stream, TIMEOUT);
|
||||
|
||||
let sb = pty.scrollback(&handle).expect("scrollback readable");
|
||||
assert!(
|
||||
sb.len() <= 100 * 1024,
|
||||
"scrollback must be bounded to ~100 KB, was {} bytes",
|
||||
sb.len()
|
||||
);
|
||||
// The newest output is retained even though the oldest was dropped.
|
||||
let text = String::from_utf8_lossy(&sb);
|
||||
assert!(
|
||||
text.contains("FINAL-LINE-MARKER"),
|
||||
"the most recent bytes must be kept in the ring buffer"
|
||||
);
|
||||
// And the very first lines must have been evicted.
|
||||
assert!(
|
||||
!text.contains("-0\n") || sb.len() < 100 * 1024,
|
||||
"oldest bytes should be dropped once the cap is exceeded"
|
||||
);
|
||||
|
||||
let _ = pty.kill(&handle).await;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user