//! L3 integration tests for [`PortablePtyAdapter`] — exercising a **real** OS //! PTY on Linux. We spawn tiny `/bin/sh` programs whose output is deterministic, //! drain the blocking output stream on a dedicated thread, and assert on the //! bytes / exit code. //! //! Robustness: every blocking drain runs on its own thread joined with a bounded //! timeout so a misbehaving PTY can never hang the test suite/CI. #![cfg(unix)] use std::sync::mpsc; use std::thread; use std::time::Duration; use domain::ports::{PtyPort, SpawnSpec}; use domain::{ProjectPath, PtySize}; use infrastructure::PortablePtyAdapter; /// Hard ceiling for any single PTY interaction in these tests. const TIMEOUT: Duration = Duration::from_secs(10); fn sh_spec(script: &str) -> SpawnSpec { SpawnSpec { command: "/bin/sh".to_owned(), args: vec!["-c".to_owned(), script.to_owned()], cwd: ProjectPath::new("/").unwrap(), env: Vec::new(), context_plan: None, } } fn size() -> PtySize { PtySize::new(24, 80).unwrap() } /// Drains an output stream to a single `Vec` on a worker thread, returning /// the collected bytes or panicking if it does not finish within `TIMEOUT`. fn drain_with_timeout( stream: domain::ports::OutputStream, timeout: Duration, ) -> Vec { let (tx, rx) = mpsc::channel(); let worker = thread::spawn(move || { let mut all = Vec::new(); for chunk in stream { all.extend_from_slice(&chunk); } let _ = tx.send(all); }); let bytes = rx .recv_timeout(timeout) .expect("output stream drained within timeout"); worker.join().expect("drain thread joined"); bytes } #[tokio::test] async fn spawn_printf_streams_expected_bytes_and_exits_zero() { let pty = PortablePtyAdapter::new(); let handle = pty .spawn(sh_spec("printf hello-pty"), size()) .await .expect("spawn succeeds"); let stream = pty.subscribe_output(&handle).expect("subscribe once"); let bytes = drain_with_timeout(stream, TIMEOUT); let text = String::from_utf8_lossy(&bytes); assert!( text.contains("hello-pty"), "expected output to contain 'hello-pty', got {text:?}" ); // Process already exited; kill collects the status. `sh` exiting cleanly → 0. let status = pty.kill(&handle).await.expect("kill succeeds"); assert_eq!(status.code, Some(0), "clean exit reports code 0"); } #[tokio::test] async fn write_is_echoed_back_through_output_stream() { // `cat` echoes its stdin back to stdout; we feed it a line then close stdin // by killing it, and assert we saw the echoed bytes. let pty = PortablePtyAdapter::new(); let handle = pty .spawn(sh_spec("cat"), size()) .await .expect("spawn cat"); let stream = pty.subscribe_output(&handle).expect("subscribe once"); // Look for the marker on a worker thread, with a timeout, so we don't block // forever if `cat` never echoes. let (found_tx, found_rx) = mpsc::channel(); let worker = thread::spawn(move || { let mut all = Vec::new(); for chunk in stream { all.extend_from_slice(&chunk); if String::from_utf8_lossy(&all).contains("marker-123") { let _ = found_tx.send(true); // Keep draining until EOF so the thread can exit on kill. } } }); pty.write(&handle, b"marker-123\n").expect("write to cat"); let found = found_rx .recv_timeout(TIMEOUT) .expect("echoed marker observed within timeout"); assert!(found, "cat echoed the written bytes back"); pty.kill(&handle).await.expect("kill cat"); worker.join().expect("drain thread joined after kill"); } #[tokio::test] async fn subscribe_output_twice_is_an_error() { let pty = PortablePtyAdapter::new(); let handle = pty .spawn(sh_spec("sleep 0.2"), size()) .await .expect("spawn"); let first = pty.subscribe_output(&handle); assert!(first.is_ok(), "first subscribe succeeds"); let second = pty.subscribe_output(&handle); assert!( second.is_err(), "second subscribe on the same session must error" ); // Drain the first stream so the reader thread can finish, then tidy up. let stream = first.unwrap(); drain_with_timeout(stream, TIMEOUT); let _ = pty.kill(&handle).await; } #[tokio::test] async fn write_resize_kill_on_unknown_handle_are_not_found() { use domain::ports::{PtyError, PtyHandle}; use domain::SessionId; let pty = PortablePtyAdapter::new(); let ghost = PtyHandle { session_id: SessionId::new_random(), }; assert_eq!(pty.write(&ghost, b"x"), Err(PtyError::NotFound)); assert_eq!(pty.resize(&ghost, size()), Err(PtyError::NotFound)); assert!(pty.subscribe_output(&ghost).is_err()); assert_eq!(pty.kill(&ghost).await, Err(PtyError::NotFound)); } #[tokio::test] async fn resize_on_live_pty_succeeds() { let pty = PortablePtyAdapter::new(); let handle = pty .spawn(sh_spec("sleep 0.2"), size()) .await .expect("spawn"); pty.resize(&handle, PtySize::new(40, 120).unwrap()) .expect("resize a live pty succeeds"); // Drain + reap so the test leaves no live process/thread behind. let stream = pty.subscribe_output(&handle).expect("subscribe"); let _ = thread::spawn(move || stream.count()); let _ = pty.kill(&handle).await; }