Navigating (layout/tab switch) tore the xterm view down and called handle.close(), killing the backend PTY and cutting off running AIs. Now the view's cleanup only detaches; only an explicit user action kills a PTY. Backend: - PortablePtyAdapter: per-session scrollback ring buffer (~100KB, most recent) + re-subscribable fan-out broadcast replacing the single-take output_rx. Reader thread feeds both the ring buffer and current subscribers; on EOF it closes subscribers (streams end) while keeping scrollback for late re-attach. - PtyPort: new scrollback() method; subscribe_output is now re-subscribable (all impls + test fakes updated). - reattach_terminal IPC command: returns scrollback and re-wires a fresh output channel on the live session without re-spawning. - CloseRequested hook kills all live PTYs cleanly on app shutdown. - TerminalSessions::handles() to enumerate live sessions at shutdown. Frontend: - TerminalHandle.detach(); TerminalGateway/AgentGateway.reattach() + mocks. - TerminalView cleanup detaches (never close); on mount it re-attaches to a persisted session (repainting scrollback) instead of opening a new PTY. - LayoutGrid persists the cell's session id via setSession; AgentsPanel tracks per-agent session ids — both drive reattach-vs-open. Tests: ring buffer bounds to 100KB keeping newest bytes; scrollback retained; re-subscription delivers post-reattach output; TerminalView detaches (not closes) on unmount and reattaches with a known session; mock detach/reattach. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
258 lines
9.0 KiB
Rust
258 lines
9.0 KiB
Rust
//! L3 integration tests for [`PortablePtyAdapter`] — exercising a **real** OS
|
|
//! PTY on Linux. We spawn tiny `/bin/sh` programs whose output is deterministic,
|
|
//! drain the blocking output stream on a dedicated thread, and assert on the
|
|
//! bytes / exit code.
|
|
//!
|
|
//! Robustness: every blocking drain runs on its own thread joined with a bounded
|
|
//! timeout so a misbehaving PTY can never hang the test suite/CI.
|
|
|
|
#![cfg(unix)]
|
|
|
|
use std::sync::mpsc;
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
|
|
use domain::ports::{PtyPort, SpawnSpec};
|
|
use domain::{ProjectPath, PtySize};
|
|
use infrastructure::PortablePtyAdapter;
|
|
|
|
/// Hard ceiling for any single PTY interaction in these tests.
|
|
const TIMEOUT: Duration = Duration::from_secs(10);
|
|
|
|
fn sh_spec(script: &str) -> SpawnSpec {
|
|
SpawnSpec {
|
|
command: "/bin/sh".to_owned(),
|
|
args: vec!["-c".to_owned(), script.to_owned()],
|
|
cwd: ProjectPath::new("/").unwrap(),
|
|
env: Vec::new(),
|
|
context_plan: None,
|
|
}
|
|
}
|
|
|
|
fn size() -> PtySize {
|
|
PtySize::new(24, 80).unwrap()
|
|
}
|
|
|
|
/// Drains an output stream to a single `Vec<u8>` on a worker thread, returning
|
|
/// the collected bytes or panicking if it does not finish within `TIMEOUT`.
|
|
fn drain_with_timeout(
|
|
stream: domain::ports::OutputStream,
|
|
timeout: Duration,
|
|
) -> Vec<u8> {
|
|
let (tx, rx) = mpsc::channel();
|
|
let worker = thread::spawn(move || {
|
|
let mut all = Vec::new();
|
|
for chunk in stream {
|
|
all.extend_from_slice(&chunk);
|
|
}
|
|
let _ = tx.send(all);
|
|
});
|
|
let bytes = rx
|
|
.recv_timeout(timeout)
|
|
.expect("output stream drained within timeout");
|
|
worker.join().expect("drain thread joined");
|
|
bytes
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn spawn_printf_streams_expected_bytes_and_exits_zero() {
|
|
let pty = PortablePtyAdapter::new();
|
|
let handle = pty
|
|
.spawn(sh_spec("printf hello-pty"), size())
|
|
.await
|
|
.expect("spawn succeeds");
|
|
|
|
let stream = pty.subscribe_output(&handle).expect("subscribe once");
|
|
let bytes = drain_with_timeout(stream, TIMEOUT);
|
|
let text = String::from_utf8_lossy(&bytes);
|
|
assert!(
|
|
text.contains("hello-pty"),
|
|
"expected output to contain 'hello-pty', got {text:?}"
|
|
);
|
|
|
|
// Process already exited; kill collects the status. `sh` exiting cleanly → 0.
|
|
let status = pty.kill(&handle).await.expect("kill succeeds");
|
|
assert_eq!(status.code, Some(0), "clean exit reports code 0");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_is_echoed_back_through_output_stream() {
|
|
// `cat` echoes its stdin back to stdout; we feed it a line then close stdin
|
|
// by killing it, and assert we saw the echoed bytes.
|
|
let pty = PortablePtyAdapter::new();
|
|
let handle = pty
|
|
.spawn(sh_spec("cat"), size())
|
|
.await
|
|
.expect("spawn cat");
|
|
|
|
let stream = pty.subscribe_output(&handle).expect("subscribe once");
|
|
|
|
// Look for the marker on a worker thread, with a timeout, so we don't block
|
|
// forever if `cat` never echoes.
|
|
let (found_tx, found_rx) = mpsc::channel();
|
|
let worker = thread::spawn(move || {
|
|
let mut all = Vec::new();
|
|
for chunk in stream {
|
|
all.extend_from_slice(&chunk);
|
|
if String::from_utf8_lossy(&all).contains("marker-123") {
|
|
let _ = found_tx.send(true);
|
|
// Keep draining until EOF so the thread can exit on kill.
|
|
}
|
|
}
|
|
});
|
|
|
|
pty.write(&handle, b"marker-123\n").expect("write to cat");
|
|
|
|
let found = found_rx
|
|
.recv_timeout(TIMEOUT)
|
|
.expect("echoed marker observed within timeout");
|
|
assert!(found, "cat echoed the written bytes back");
|
|
|
|
pty.kill(&handle).await.expect("kill cat");
|
|
worker.join().expect("drain thread joined after kill");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscribe_output_is_re_subscribable_for_reattach() {
|
|
// A live PTY can be subscribed to more than once over its lifetime: the
|
|
// first view detaches (drops its stream), a second view re-attaches and
|
|
// still receives subsequent output — the core of the no-kill navigation fix.
|
|
let pty = PortablePtyAdapter::new();
|
|
let handle = pty
|
|
.spawn(sh_spec("cat"), size())
|
|
.await
|
|
.expect("spawn cat");
|
|
|
|
// First attachment: subscribe, observe an echo, then drop the stream
|
|
// (simulating a view tearing down on navigation — NOT a kill).
|
|
{
|
|
let first = pty.subscribe_output(&handle).expect("first subscribe");
|
|
let (tx, rx) = mpsc::channel();
|
|
let worker = thread::spawn(move || {
|
|
let mut all = Vec::new();
|
|
for chunk in first {
|
|
all.extend_from_slice(&chunk);
|
|
if String::from_utf8_lossy(&all).contains("first-marker") {
|
|
let _ = tx.send(());
|
|
return; // drop the stream → detach
|
|
}
|
|
}
|
|
});
|
|
pty.write(&handle, b"first-marker\n").expect("write 1");
|
|
rx.recv_timeout(TIMEOUT).expect("first view saw its marker");
|
|
worker.join().expect("first worker joined");
|
|
}
|
|
|
|
// Second attachment to the SAME live PTY (no re-spawn): must still receive
|
|
// new output produced after re-subscription.
|
|
let second = pty.subscribe_output(&handle).expect("re-subscribe");
|
|
let (tx, rx) = mpsc::channel();
|
|
let worker = thread::spawn(move || {
|
|
let mut all = Vec::new();
|
|
for chunk in second {
|
|
all.extend_from_slice(&chunk);
|
|
if String::from_utf8_lossy(&all).contains("second-marker") {
|
|
let _ = tx.send(());
|
|
}
|
|
}
|
|
});
|
|
pty.write(&handle, b"second-marker\n").expect("write 2");
|
|
rx.recv_timeout(TIMEOUT)
|
|
.expect("re-attached view saw new output");
|
|
|
|
pty.kill(&handle).await.expect("kill cat");
|
|
worker.join().expect("second worker joined after kill");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn scrollback_retains_recent_output_for_repaint() {
|
|
// After output is produced and the process exits, the scrollback still holds
|
|
// the recent bytes so a re-attaching view can repaint them.
|
|
let pty = PortablePtyAdapter::new();
|
|
let handle = pty
|
|
.spawn(sh_spec("printf scrollback-content"), size())
|
|
.await
|
|
.expect("spawn");
|
|
|
|
// Drain to EOF so all output has been pushed into the ring buffer.
|
|
let stream = pty.subscribe_output(&handle).expect("subscribe");
|
|
drain_with_timeout(stream, TIMEOUT);
|
|
|
|
let sb = pty.scrollback(&handle).expect("scrollback readable");
|
|
let text = String::from_utf8_lossy(&sb);
|
|
assert!(
|
|
text.contains("scrollback-content"),
|
|
"scrollback should retain recent output, got {text:?}"
|
|
);
|
|
|
|
let _ = pty.kill(&handle).await;
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn scrollback_is_bounded_to_cap_and_keeps_most_recent_bytes() {
|
|
// Emit clearly more than 100 KB of deterministic output, then assert the
|
|
// retained scrollback is bounded and ends with the most recent bytes.
|
|
let pty = PortablePtyAdapter::new();
|
|
// 5000 lines of "....END<n>" → well over 100 KB; the tail is the freshest.
|
|
let script = "i=0; while [ $i -lt 5000 ]; do \
|
|
printf 'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA-%d\\n' $i; \
|
|
i=$((i+1)); done; printf 'FINAL-LINE-MARKER'";
|
|
let handle = pty.spawn(sh_spec(script), size()).await.expect("spawn");
|
|
|
|
let stream = pty.subscribe_output(&handle).expect("subscribe");
|
|
drain_with_timeout(stream, TIMEOUT);
|
|
|
|
let sb = pty.scrollback(&handle).expect("scrollback readable");
|
|
assert!(
|
|
sb.len() <= 100 * 1024,
|
|
"scrollback must be bounded to ~100 KB, was {} bytes",
|
|
sb.len()
|
|
);
|
|
// The newest output is retained even though the oldest was dropped.
|
|
let text = String::from_utf8_lossy(&sb);
|
|
assert!(
|
|
text.contains("FINAL-LINE-MARKER"),
|
|
"the most recent bytes must be kept in the ring buffer"
|
|
);
|
|
// And the very first lines must have been evicted.
|
|
assert!(
|
|
!text.contains("-0\n") || sb.len() < 100 * 1024,
|
|
"oldest bytes should be dropped once the cap is exceeded"
|
|
);
|
|
|
|
let _ = pty.kill(&handle).await;
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_resize_kill_on_unknown_handle_are_not_found() {
|
|
use domain::ports::{PtyError, PtyHandle};
|
|
use domain::SessionId;
|
|
|
|
let pty = PortablePtyAdapter::new();
|
|
let ghost = PtyHandle {
|
|
session_id: SessionId::new_random(),
|
|
};
|
|
|
|
assert_eq!(pty.write(&ghost, b"x"), Err(PtyError::NotFound));
|
|
assert_eq!(pty.resize(&ghost, size()), Err(PtyError::NotFound));
|
|
assert!(pty.subscribe_output(&ghost).is_err());
|
|
assert_eq!(pty.kill(&ghost).await, Err(PtyError::NotFound));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn resize_on_live_pty_succeeds() {
|
|
let pty = PortablePtyAdapter::new();
|
|
let handle = pty
|
|
.spawn(sh_spec("sleep 0.2"), size())
|
|
.await
|
|
.expect("spawn");
|
|
|
|
pty.resize(&handle, PtySize::new(40, 120).unwrap())
|
|
.expect("resize a live pty succeeds");
|
|
|
|
// Drain + reap so the test leaves no live process/thread behind.
|
|
let stream = pty.subscribe_output(&handle).expect("subscribe");
|
|
let _ = thread::spawn(move || stream.count());
|
|
let _ = pty.kill(&handle).await;
|
|
}
|