From 480e7c7bbe07b92082fcfc0eb61d953ca575c1ec Mon Sep 17 00:00:00 2001 From: Blomios Date: Sun, 7 Jun 2026 11:12:04 +0200 Subject: [PATCH] =?UTF-8?q?feat(orchestrator):=20file-based=20orchestrator?= =?UTF-8?q?=20request=20watcher=20(=C2=A714.3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - domain: OrchestratorRequest/Command parse-don't-validate + OrchestratorRequestProcessed event - application: OrchestratorService dispatching spawn/stop/update_agent_context - infrastructure: request watcher over .ideai/requests/, writes .response.json - app-tauri: relay OrchestratorRequestProcessed to the frontend DTO Co-Authored-By: Claude Opus 4.8 --- Cargo.lock | 160 +++++- crates/app-tauri/src/events.rs | 19 + crates/application/src/lib.rs | 2 + crates/application/src/orchestrator/mod.rs | 9 + .../application/src/orchestrator/service.rs | 284 ++++++++++ crates/application/src/terminal/registry.rs | 17 +- .../application/tests/orchestrator_service.rs | 511 ++++++++++++++++++ crates/domain/src/events.rs | 12 + crates/domain/src/lib.rs | 3 + crates/domain/src/orchestrator.rs | 255 +++++++++ crates/infrastructure/Cargo.toml | 8 +- crates/infrastructure/src/lib.rs | 5 + crates/infrastructure/src/orchestrator/mod.rs | 273 ++++++++++ .../tests/orchestrator_watcher.rs | 353 ++++++++++++ 14 files changed, 1908 insertions(+), 3 deletions(-) create mode 100644 crates/application/src/orchestrator/mod.rs create mode 100644 crates/application/src/orchestrator/service.rs create mode 100644 crates/application/tests/orchestrator_service.rs create mode 100644 crates/domain/src/orchestrator.rs create mode 100644 crates/infrastructure/src/orchestrator/mod.rs create mode 100644 crates/infrastructure/tests/orchestrator_watcher.rs diff --git a/Cargo.lock b/Cargo.lock index 934ccb0..ebd62ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -831,6 +831,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "filetime" +version = "0.2.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c287a33c7f0a620c38e641e7f60827713987b3c0f26e8ddc9462cc69cf75759" +dependencies = [ + "cfg-if", + "libc", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -901,6 +911,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures-channel" version = "0.3.32" @@ -1594,9 +1613,11 @@ dependencies = [ name = "infrastructure" version = "0.1.0" dependencies = [ + "application", "async-trait", "domain", "git2", + "notify", "portable-pty", "serde", "serde_json", @@ -1604,6 +1625,26 @@ dependencies = [ "uuid", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" @@ -1738,6 +1779,26 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "kqueue" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "273c0752728918e0ac4976f2b275b6fefb9ecd400585dec929419f3844cd87b5" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07293a4e297ac234359b510362495713f75ea345d5307140414f20c69ffeb087" +dependencies = [ + "bitflags 2.12.1", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1895,6 +1956,18 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.2.1" @@ -1969,6 +2042,25 @@ dependencies = [ "libc", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.12.1", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "num-conv" version = "0.2.2" @@ -3608,7 +3700,7 @@ checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" dependencies = [ "bytes", "libc", - "mio", + "mio 1.2.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -4452,6 +4544,15 @@ dependencies = [ "windows-targets 0.42.2", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.59.0" @@ -4494,6 +4595,21 @@ dependencies = [ "windows_x86_64_msvc 0.42.2", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -4551,6 +4667,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -4569,6 +4691,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -4587,6 +4715,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -4617,6 +4751,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -4635,6 +4775,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -4653,6 +4799,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -4671,6 +4823,12 @@ version = "0.42.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/crates/app-tauri/src/events.rs b/crates/app-tauri/src/events.rs index b3532d6..a1eb1b2 100644 --- a/crates/app-tauri/src/events.rs +++ b/crates/app-tauri/src/events.rs @@ -91,6 +91,16 @@ pub enum DomainEventDto { /// Project id. project_id: String, }, + /// An orchestrator request was processed on behalf of a requester agent. + #[serde(rename_all = "camelCase")] + OrchestratorRequestProcessed { + /// Id of the requesting (orchestrator) agent. + requester_id: String, + /// The action that was processed. + action: String, + /// Whether IdeA handled it successfully. + ok: bool, + }, /// Raw PTY output (normally routed to a per-session channel, not here). #[serde(rename_all = "camelCase")] PtyOutput { @@ -147,6 +157,15 @@ impl From<&DomainEvent> for DomainEventDto { DomainEvent::GitStateChanged { project_id } => Self::GitStateChanged { project_id: project_id.to_string(), }, + DomainEvent::OrchestratorRequestProcessed { + requester_id, + action, + ok, + } => Self::OrchestratorRequestProcessed { + requester_id: requester_id.clone(), + action: action.clone(), + ok: *ok, + }, DomainEvent::PtyOutput { session_id, bytes } => Self::PtyOutput { session_id: session_id.to_string(), bytes: bytes.clone(), diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index ce20dba..ec2d896 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -16,6 +16,7 @@ pub mod error; pub mod git; pub mod health; pub mod layout; +pub mod orchestrator; pub mod project; pub mod remote; pub mod template; @@ -33,6 +34,7 @@ pub use agent::{ SaveProfileInput, SaveProfileOutput, UpdateAgentContext, UpdateAgentContextInput, }; pub use error::AppError; +pub use orchestrator::{OrchestratorOutcome, OrchestratorService}; pub use git::{ GitBranches, GitBranchesInput, GitBranchesOutput, GitCheckout, GitCheckoutInput, GitCommit, GitCommitInput, GitCommitOutput, GitGraph, GitGraphInput, GitGraphOutput, GitInit, GitInitInput, diff --git a/crates/application/src/orchestrator/mod.rs b/crates/application/src/orchestrator/mod.rs new file mode 100644 index 0000000..cfe1fb2 --- /dev/null +++ b/crates/application/src/orchestrator/mod.rs @@ -0,0 +1,9 @@ +//! Orchestrator application service (ARCHITECTURE §14.3). +//! +//! Turns a validated [`domain::OrchestratorCommand`] into the *same* agent/terminal +//! use-case calls the UI makes, so an orchestrator agent can drive IdeA without +//! ever spawning a process itself. See [`service::OrchestratorService`]. + +mod service; + +pub use service::{OrchestratorOutcome, OrchestratorService}; diff --git a/crates/application/src/orchestrator/service.rs b/crates/application/src/orchestrator/service.rs new file mode 100644 index 0000000..c73a129 --- /dev/null +++ b/crates/application/src/orchestrator/service.rs @@ -0,0 +1,284 @@ +//! [`OrchestratorService`] — dispatches a validated [`OrchestratorCommand`] to the +//! existing agent/terminal use cases (ARCHITECTURE §14.3). +//! +//! The orchestrator agent never spawns a process itself: IdeA is the single source +//! of truth for the agent lifecycle. This service is the application-layer seam +//! that turns a request into the *same* calls the UI makes: +//! +//! - `spawn_agent` → [`CreateAgentFromScratch`] (if unknown) then [`LaunchAgent`], +//! - `stop_agent` → resolve the agent's live session, then [`CloseTerminal`], +//! - `update_agent_context` → [`UpdateAgentContext`]. +//! +//! It talks **only** to use cases and ports ([`ProfileStore`], [`TerminalSessions`]): +//! no filesystem watching, no JSON, no process spawning here — those are the +//! infrastructure adapter's job. That keeps this fully unit-testable with fakes. + +use std::sync::Arc; + +use domain::ports::ProfileStore; +use domain::{OrchestratorCommand, Project, ProfileId}; + +use crate::agent::{ + CreateAgentFromScratch, CreateAgentInput, LaunchAgent, LaunchAgentInput, ListAgents, + ListAgentsInput, UpdateAgentContext, UpdateAgentContextInput, +}; +use crate::error::AppError; +use crate::terminal::{CloseTerminal, CloseTerminalInput, TerminalSessions}; + +/// Default terminal geometry for an orchestrator-launched agent cell. The UI +/// resizes the PTY to the real cell size on attach; these are sane starting rows +/// /cols so the spawn never fails on a zero-sized terminal. +const DEFAULT_ROWS: u16 = 24; +/// See [`DEFAULT_ROWS`]. +const DEFAULT_COLS: u16 = 80; + +/// Dispatches validated orchestrator commands to the agent/terminal use cases. +pub struct OrchestratorService { + create_agent: Arc, + launch_agent: Arc, + list_agents: Arc, + close_terminal: Arc, + update_context: Arc, + profiles: Arc, + sessions: Arc, +} + +/// Outcome of dispatching a command — a short, human-readable success summary the +/// infrastructure adapter folds into the JSON response file. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct OrchestratorOutcome { + /// One-line description of what IdeA did (e.g. `"launched agent dev-backend"`). + pub detail: String, +} + +impl OrchestratorService { + /// Builds the service from the use cases and ports it dispatches to. + #[must_use] + #[allow(clippy::too_many_arguments)] + pub fn new( + create_agent: Arc, + launch_agent: Arc, + list_agents: Arc, + close_terminal: Arc, + update_context: Arc, + profiles: Arc, + sessions: Arc, + ) -> Self { + Self { + create_agent, + launch_agent, + list_agents, + close_terminal, + update_context, + profiles, + sessions, + } + } + + /// Dispatches a validated command against `project`. + /// + /// # Errors + /// Propagates the underlying use-case [`AppError`] (e.g. unknown profile, + /// unknown agent, PTY failure). For `spawn_agent` a *known* agent is launched + /// directly; an *unknown* one is created from scratch first. + pub async fn dispatch( + &self, + project: &Project, + command: OrchestratorCommand, + ) -> Result { + match command { + OrchestratorCommand::SpawnAgent { + name, + profile, + context, + } => self.spawn_agent(project, name, profile, context).await, + OrchestratorCommand::StopAgent { name } => self.stop_agent(project, name).await, + OrchestratorCommand::UpdateAgentContext { name, context } => { + self.update_agent_context(project, name, context).await + } + } + } + + /// `spawn_agent`: create the agent if the manifest doesn't already hold one by + /// that name, then launch it (which publishes `AgentLaunched` → the UI opens a + /// cell + the Agents tab). + async fn spawn_agent( + &self, + project: &Project, + name: String, + profile: String, + context: Option, + ) -> Result { + let existing = self.find_agent_id_by_name(project, &name).await?; + + let agent_id = match existing { + Some(id) => id, + None => { + let profile_id = self.resolve_profile(&profile).await?; + let created = self + .create_agent + .execute(CreateAgentInput { + project: project.clone(), + name: name.clone(), + profile_id, + initial_content: context, + }) + .await?; + created.agent.id + } + }; + + self.launch_agent + .execute(LaunchAgentInput { + project: project.clone(), + agent_id, + rows: DEFAULT_ROWS, + cols: DEFAULT_COLS, + node_id: None, + }) + .await?; + + Ok(OrchestratorOutcome { + detail: format!("launched agent {name}"), + }) + } + + /// `stop_agent`: translate the agent name → its live session → `CloseTerminal`. + async fn stop_agent( + &self, + project: &Project, + name: String, + ) -> Result { + let agent_id = self + .find_agent_id_by_name(project, &name) + .await? + .ok_or_else(|| AppError::NotFound(format!("agent {name}")))?; + + let session_id = self + .sessions + .session_for_agent(&agent_id) + .ok_or_else(|| AppError::NotFound(format!("running session for agent {name}")))?; + + self.close_terminal + .execute(CloseTerminalInput { session_id }) + .await?; + + Ok(OrchestratorOutcome { + detail: format!("stopped agent {name}"), + }) + } + + /// `update_agent_context`: overwrite the agent's `.md` body. + async fn update_agent_context( + &self, + project: &Project, + name: String, + context: String, + ) -> Result { + let agent_id = self + .find_agent_id_by_name(project, &name) + .await? + .ok_or_else(|| AppError::NotFound(format!("agent {name}")))?; + + self.update_context + .execute(UpdateAgentContextInput { + project: project.clone(), + agent_id, + content: context, + }) + .await?; + + Ok(OrchestratorOutcome { + detail: format!("updated context for agent {name}"), + }) + } + + /// Finds an agent id by display name (case-insensitive) in the project manifest. + async fn find_agent_id_by_name( + &self, + project: &Project, + name: &str, + ) -> Result, AppError> { + let listed = self + .list_agents + .execute(ListAgentsInput { + project: project.clone(), + }) + .await?; + Ok(listed + .agents + .into_iter() + .find(|a| a.name.eq_ignore_ascii_case(name)) + .map(|a| a.id)) + } + + /// Resolves a human-friendly profile reference (slug like `claude-code`, + /// command like `claude`, or display name like `Claude Code`) to a configured + /// [`ProfileId`]. Matching is universal — never hard-coded to one AI — by + /// scanning the configured profiles' command and name. + /// + /// # Errors + /// [`AppError::NotFound`] when no configured profile matches. + async fn resolve_profile(&self, reference: &str) -> Result { + let needle = normalise(reference); + let profiles = self.profiles.list().await?; + profiles + .into_iter() + .find(|p| { + normalise(&p.command) == needle + || normalise(&p.name) == needle + || p.id.to_string() == reference + }) + .map(|p| p.id) + .ok_or_else(|| AppError::NotFound(format!("profile matching '{reference}'"))) + } +} + +/// Normalises a profile reference for tolerant matching: lowercased, with spaces, +/// dashes and underscores stripped (`"Claude Code"`, `"claude-code"`, `"claude"` +/// → comparable forms; `claude` ⊂ ... handled by the command match above). +fn normalise(s: &str) -> String { + s.chars() + .filter(|c| c.is_ascii_alphanumeric()) + .map(|c| c.to_ascii_lowercase()) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use domain::profile::{AgentProfile, ContextInjection}; + use domain::ProfileId; + + fn profile(id: u128, name: &str, command: &str) -> AgentProfile { + AgentProfile::new( + ProfileId::from_uuid(uuid::Uuid::from_u128(id)), + name, + command, + Vec::new(), + ContextInjection::convention_file("CLAUDE.md").unwrap(), + None, + "{agentRunDir}", + ) + .unwrap() + } + + #[test] + fn normalise_makes_slug_command_and_name_comparable() { + assert_eq!(normalise("Claude Code"), "claudecode"); + assert_eq!(normalise("claude-code"), "claudecode"); + assert_eq!(normalise("claude_code"), "claudecode"); + } + + #[test] + fn resolve_matches_by_command_name_or_id() { + // We exercise the pure matching predicate the same way `resolve_profile` + // does, without standing up the whole service/ports. + let p = profile(1, "Claude Code", "claude"); + let by_command = normalise("claude") == normalise(&p.command); + let by_name = normalise("claude-code") == normalise(&p.name); + assert!(by_command); + assert!(by_name); + assert_eq!(p.id.to_string(), p.id.to_string()); + } +} diff --git a/crates/application/src/terminal/registry.rs b/crates/application/src/terminal/registry.rs index b4a4c74..dd84a53 100644 --- a/crates/application/src/terminal/registry.rs +++ b/crates/application/src/terminal/registry.rs @@ -10,7 +10,7 @@ use std::collections::HashMap; use std::sync::Mutex; use domain::ports::PtyHandle; -use domain::{SessionId, TerminalSession}; +use domain::{AgentId, SessionId, SessionKind, TerminalSession}; /// A registered, live terminal: its PTY handle plus the domain snapshot. #[derive(Debug, Clone)] @@ -59,6 +59,21 @@ impl TerminalSessions { .and_then(|m| m.get(id).map(|e| e.session.clone())) } + /// Returns the [`SessionId`] of the live session hosting a given agent, if any. + /// + /// An agent runs in a session tagged [`SessionKind::Agent`]; this is the + /// mapping the orchestrator's `stop_agent` uses to translate an agent id into + /// the [`SessionId`] that `CloseTerminal` expects. Returns `None` when the + /// agent has no live session (already stopped / never launched). + #[must_use] + pub fn session_for_agent(&self, agent_id: &AgentId) -> Option { + self.entries.lock().ok().and_then(|m| { + m.values() + .find(|e| matches!(e.session.kind, SessionKind::Agent { agent_id: a } if &a == agent_id)) + .map(|e| e.session.id) + }) + } + /// Returns the [`PtyHandle`]s of every currently-registered session. /// /// Used at application shutdown to kill all live PTYs cleanly (the diff --git a/crates/application/tests/orchestrator_service.rs b/crates/application/tests/orchestrator_service.rs new file mode 100644 index 0000000..d3aa309 --- /dev/null +++ b/crates/application/tests/orchestrator_service.rs @@ -0,0 +1,511 @@ +//! Integration tests for [`OrchestratorService`] (ARCHITECTURE §14.3). +//! +//! The service is wired over the *real* agent/terminal use cases, themselves +//! backed by in-memory fakes (the same fake patterns as `agent_lifecycle.rs`). +//! This proves the dispatch contract end-to-end without real I/O: +//! +//! - `spawn_agent` on an **unknown** agent → create + launch (manifest grows, PTY +//! spawns, `AgentLaunched` published), +//! - `spawn_agent` on a **known** agent → launch only (no second manifest entry), +//! - `stop_agent` → the agent's live session is killed and de-registered, +//! - `update_agent_context` → the agent `.md` is overwritten, +//! - unknown profile / unknown agent → `NotFound`, no spawn. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use domain::agent::{Agent, AgentManifest, AgentOrigin, ManifestEntry}; +use domain::events::DomainEvent; +use domain::ids::{AgentId, ProfileId, ProjectId}; +use domain::markdown::MarkdownDoc; +use domain::ports::{ + AgentContextStore, AgentRuntime, ContextInjectionPlan, DirEntry, EventBus, EventStream, + ExitStatus, FileSystem, FsError, IdGenerator, OutputStream, PreparedContext, ProfileStore, + PtyError, PtyHandle, PtyPort, RemotePath, RuntimeError, SpawnSpec, StoreError, +}; +use domain::profile::{AgentProfile, ContextInjection}; +use domain::project::{Project, ProjectPath}; +use domain::remote::RemoteRef; +use domain::{OrchestratorCommand, OrchestratorRequest, PtySize, SessionId}; +use uuid::Uuid; + +use application::{ + CloseTerminal, CreateAgentFromScratch, LaunchAgent, ListAgents, OrchestratorService, + TerminalSessions, UpdateAgentContext, +}; + +// --------------------------------------------------------------------------- +// Fakes (mirror agent_lifecycle.rs) +// --------------------------------------------------------------------------- + +#[derive(Default)] +struct ContextsInner { + manifest: AgentManifest, + contents: HashMap, +} + +#[derive(Clone)] +struct FakeContexts(Arc>); + +impl FakeContexts { + fn new() -> Self { + Self(Arc::new(Mutex::new(ContextsInner { + manifest: AgentManifest { + version: 1, + entries: Vec::new(), + }, + contents: HashMap::new(), + }))) + } + fn with_agent(agent: &Agent, content: &str) -> Self { + let me = Self::new(); + { + let mut inner = me.0.lock().unwrap(); + inner.manifest.entries.push(ManifestEntry::from_agent(agent)); + inner + .contents + .insert(agent.context_path.clone(), content.to_owned()); + } + me + } + fn manifest(&self) -> AgentManifest { + self.0.lock().unwrap().manifest.clone() + } + fn content(&self, md_path: &str) -> Option { + self.0.lock().unwrap().contents.get(md_path).cloned() + } + fn md_path_of(&self, agent: &AgentId) -> Option { + self.0 + .lock() + .unwrap() + .manifest + .entries + .iter() + .find(|e| &e.agent_id == agent) + .map(|e| e.md_path.clone()) + } +} + +#[async_trait] +impl AgentContextStore for FakeContexts { + async fn read_context( + &self, + _project: &Project, + agent: &AgentId, + ) -> Result { + let md_path = self.md_path_of(agent).ok_or(StoreError::NotFound)?; + self.content(&md_path) + .map(MarkdownDoc::new) + .ok_or(StoreError::NotFound) + } + async fn write_context( + &self, + _project: &Project, + agent: &AgentId, + md: &MarkdownDoc, + ) -> Result<(), StoreError> { + let md_path = self.md_path_of(agent).ok_or(StoreError::NotFound)?; + self.0 + .lock() + .unwrap() + .contents + .insert(md_path, md.as_str().to_owned()); + Ok(()) + } + async fn load_manifest(&self, _project: &Project) -> Result { + Ok(self.manifest()) + } + async fn save_manifest( + &self, + _project: &Project, + manifest: &AgentManifest, + ) -> Result<(), StoreError> { + self.0.lock().unwrap().manifest = manifest.clone(); + Ok(()) + } +} + +#[derive(Clone)] +struct FakeProfiles(Arc>); +impl FakeProfiles { + fn new(profiles: Vec) -> Self { + Self(Arc::new(profiles)) + } +} +#[async_trait] +impl ProfileStore for FakeProfiles { + async fn list(&self) -> Result, StoreError> { + Ok((*self.0).clone()) + } + async fn save(&self, _profile: &AgentProfile) -> Result<(), StoreError> { + Ok(()) + } + async fn delete(&self, _id: ProfileId) -> Result<(), StoreError> { + Ok(()) + } + async fn is_configured(&self) -> Result { + Ok(true) + } + async fn mark_configured(&self) -> Result<(), StoreError> { + Ok(()) + } +} + +struct FakeRuntime; +impl AgentRuntime for FakeRuntime { + fn detect(&self, _profile: &AgentProfile) -> Result { + Ok(true) + } + fn prepare_invocation( + &self, + profile: &AgentProfile, + _ctx: &PreparedContext, + cwd: &ProjectPath, + ) -> Result { + Ok(SpawnSpec { + command: profile.command.clone(), + args: profile.args.clone(), + cwd: cwd.clone(), + env: Vec::new(), + context_plan: Some(ContextInjectionPlan::Stdin), + }) + } +} + +#[derive(Clone, Default)] +struct FakeFs; +#[async_trait] +impl FileSystem for FakeFs { + async fn read(&self, path: &RemotePath) -> Result, FsError> { + Err(FsError::NotFound(path.as_str().to_owned())) + } + async fn write(&self, _path: &RemotePath, _data: &[u8]) -> Result<(), FsError> { + Ok(()) + } + async fn exists(&self, _path: &RemotePath) -> Result { + Ok(false) + } + async fn create_dir_all(&self, _path: &RemotePath) -> Result<(), FsError> { + Ok(()) + } + async fn list(&self, _path: &RemotePath) -> Result, FsError> { + Ok(Vec::new()) + } + async fn symlink(&self, _src: &RemotePath, _dst: &RemotePath) -> Result<(), FsError> { + Ok(()) + } +} + +#[derive(Clone)] +struct FakePty { + next_id: SessionId, + kills: Arc>>, +} +impl FakePty { + fn new(next_id: SessionId) -> Self { + Self { + next_id, + kills: Arc::new(Mutex::new(Vec::new())), + } + } + fn kills(&self) -> Vec { + self.kills.lock().unwrap().clone() + } +} +#[async_trait] +impl PtyPort for FakePty { + async fn spawn(&self, _spec: SpawnSpec, _size: PtySize) -> Result { + Ok(PtyHandle { + session_id: self.next_id, + }) + } + fn write(&self, _handle: &PtyHandle, _data: &[u8]) -> Result<(), PtyError> { + Ok(()) + } + fn resize(&self, _handle: &PtyHandle, _size: PtySize) -> Result<(), PtyError> { + Ok(()) + } + fn subscribe_output(&self, _handle: &PtyHandle) -> Result { + Ok(Box::new(std::iter::empty())) + } + fn scrollback(&self, _handle: &PtyHandle) -> Result, PtyError> { + Ok(Vec::new()) + } + async fn kill(&self, handle: &PtyHandle) -> Result { + self.kills.lock().unwrap().push(handle.session_id); + Ok(ExitStatus { code: Some(0) }) + } +} + +#[derive(Default, Clone)] +struct SpyBus(Arc>>); +impl SpyBus { + fn events(&self) -> Vec { + self.0.lock().unwrap().clone() + } +} +impl EventBus for SpyBus { + fn publish(&self, event: DomainEvent) { + self.0.lock().unwrap().push(event); + } + fn subscribe(&self) -> EventStream { + Box::new(std::iter::empty()) + } +} + +struct SeqIds(Mutex); +impl SeqIds { + fn new() -> Self { + Self(Mutex::new(1)) + } +} +impl IdGenerator for SeqIds { + fn new_uuid(&self) -> Uuid { + let mut n = self.0.lock().unwrap(); + let id = Uuid::from_u128(*n); + *n += 1; + id + } +} + +// --------------------------------------------------------------------------- +// Builders +// --------------------------------------------------------------------------- + +fn pid(n: u128) -> ProfileId { + ProfileId::from_uuid(Uuid::from_u128(n)) +} +fn aid(n: u128) -> AgentId { + AgentId::from_uuid(Uuid::from_u128(n)) +} +fn sid(n: u128) -> SessionId { + SessionId::from_uuid(Uuid::from_u128(n)) +} + +fn project() -> Project { + Project::new( + ProjectId::from_uuid(Uuid::from_u128(1000)), + "demo", + ProjectPath::new("/home/me/proj").unwrap(), + RemoteRef::local(), + 1_700_000_000_000, + ) + .unwrap() +} + +fn claude_profile() -> AgentProfile { + AgentProfile::new( + pid(9), + "Claude Code", + "claude", + Vec::new(), + ContextInjection::stdin(), + Some("claude --version".to_owned()), + "{agentRunDir}", + ) + .unwrap() +} + +fn scratch_agent(id: AgentId, name: &str, md: &str) -> Agent { + Agent::new(id, name, md, pid(9), AgentOrigin::Scratch, false).unwrap() +} + +/// Everything wired for a dispatch test. +struct Fixture { + service: OrchestratorService, + contexts: FakeContexts, + pty: FakePty, + bus: SpyBus, + sessions: Arc, +} + +fn fixture(contexts: FakeContexts) -> Fixture { + let profiles = Arc::new(FakeProfiles::new(vec![claude_profile()])); + let sessions = Arc::new(TerminalSessions::new()); + let pty = FakePty::new(sid(777)); + let bus = SpyBus::default(); + + let create = Arc::new(CreateAgentFromScratch::new( + Arc::new(contexts.clone()), + Arc::new(SeqIds::new()), + Arc::new(bus.clone()), + )); + let launch = Arc::new(LaunchAgent::new( + Arc::new(contexts.clone()), + Arc::clone(&profiles) as Arc, + Arc::new(FakeRuntime), + Arc::new(FakeFs), + Arc::new(pty.clone()), + Arc::clone(&sessions), + Arc::new(bus.clone()), + )); + let list = Arc::new(ListAgents::new(Arc::new(contexts.clone()))); + let close = Arc::new(CloseTerminal::new( + Arc::new(pty.clone()), + Arc::clone(&sessions), + )); + let update = Arc::new(UpdateAgentContext::new(Arc::new(contexts.clone()))); + + let service = OrchestratorService::new( + create, + launch, + list, + close, + update, + Arc::clone(&profiles) as Arc, + Arc::clone(&sessions), + ); + + Fixture { + service, + contexts, + pty, + bus, + sessions, + } +} + +fn cmd(json: &str) -> OrchestratorCommand { + serde_json::from_str::(json) + .unwrap() + .validate() + .unwrap() +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn spawn_unknown_agent_creates_then_launches() { + let fx = fixture(FakeContexts::new()); + + let out = fx + .service + .dispatch( + &project(), + cmd(r#"{ "action":"spawn_agent", "name":"dev-backend", "profile":"claude-code" }"#), + ) + .await + .expect("dispatch ok"); + assert!(out.detail.contains("dev-backend")); + + // The agent was created (manifest grew to one entry) and launched (session + // registered as an agent, AgentLaunched published). + let manifest = fx.contexts.manifest(); + assert_eq!(manifest.entries.len(), 1); + assert_eq!(manifest.entries[0].name, "dev-backend"); + + assert!(fx.sessions.session(&sid(777)).is_some()); + let launched = fx + .bus + .events() + .into_iter() + .any(|e| matches!(e, DomainEvent::AgentLaunched { session_id, .. } if session_id == sid(777))); + assert!(launched, "AgentLaunched must be published"); +} + +#[tokio::test] +async fn spawn_known_agent_launches_without_recreating() { + let agent = scratch_agent(aid(1), "dev-backend", "agents/dev-backend.md"); + let fx = fixture(FakeContexts::with_agent(&agent, "# persona")); + + fx.service + .dispatch( + &project(), + cmd(r#"{ "action":"spawn_agent", "name":"dev-backend", "profile":"claude-code" }"#), + ) + .await + .expect("dispatch ok"); + + // No second manifest entry — the existing agent was reused, just launched. + assert_eq!(fx.contexts.manifest().entries.len(), 1); + assert_eq!(fx.contexts.manifest().entries[0].agent_id, agent.id); + assert!(fx.sessions.session(&sid(777)).is_some()); +} + +#[tokio::test] +async fn stop_agent_kills_the_right_session() { + let agent = scratch_agent(aid(1), "dev-backend", "agents/dev-backend.md"); + let fx = fixture(FakeContexts::with_agent(&agent, "# persona")); + + // Launch it first so a session is registered for the agent. + fx.service + .dispatch( + &project(), + cmd(r#"{ "action":"spawn_agent", "name":"dev-backend", "profile":"claude" }"#), + ) + .await + .unwrap(); + assert!(fx.sessions.session(&sid(777)).is_some()); + + // Now stop it. + fx.service + .dispatch( + &project(), + cmd(r#"{ "action":"stop_agent", "name":"dev-backend" }"#), + ) + .await + .expect("stop ok"); + + // The PTY for that session was killed and the session de-registered. + assert_eq!(fx.pty.kills(), vec![sid(777)]); + assert!(fx.sessions.session(&sid(777)).is_none()); +} + +#[tokio::test] +async fn stop_agent_without_live_session_is_not_found() { + let agent = scratch_agent(aid(1), "dev-backend", "agents/dev-backend.md"); + let fx = fixture(FakeContexts::with_agent(&agent, "# persona")); + + let err = fx + .service + .dispatch( + &project(), + cmd(r#"{ "action":"stop_agent", "name":"dev-backend" }"#), + ) + .await + .unwrap_err(); + assert_eq!(err.code(), "NOT_FOUND", "got {err:?}"); +} + +#[tokio::test] +async fn update_agent_context_overwrites_md() { + let agent = scratch_agent(aid(1), "dev-backend", "agents/dev-backend.md"); + let fx = fixture(FakeContexts::with_agent(&agent, "# old")); + + fx.service + .dispatch( + &project(), + cmd( + r##"{ "action":"update_agent_context", "name":"dev-backend", "context":"# new body" }"##, + ), + ) + .await + .expect("update ok"); + + assert_eq!( + fx.contexts.content("agents/dev-backend.md").as_deref(), + Some("# new body") + ); +} + +#[tokio::test] +async fn spawn_with_unknown_profile_is_not_found_and_does_not_create() { + let fx = fixture(FakeContexts::new()); + + let err = fx + .service + .dispatch( + &project(), + cmd(r#"{ "action":"spawn_agent", "name":"x", "profile":"does-not-exist" }"#), + ) + .await + .unwrap_err(); + assert_eq!(err.code(), "NOT_FOUND", "got {err:?}"); + + // No agent was created when the profile could not be resolved. + assert!(fx.contexts.manifest().entries.is_empty()); + assert!(fx.sessions.session(&sid(777)).is_none()); +} diff --git a/crates/domain/src/events.rs b/crates/domain/src/events.rs index 3c5f501..af11aa6 100644 --- a/crates/domain/src/events.rs +++ b/crates/domain/src/events.rs @@ -69,6 +69,18 @@ pub enum DomainEvent { /// The project. project_id: ProjectId, }, + /// An orchestrator request (dropped under `.ideai/requests/`) was processed + /// by IdeA on behalf of a requester agent (ARCHITECTURE §14.3). Relayed so the + /// frontend can surface orchestration activity; the resulting cell/tab opens + /// off the [`AgentLaunched`](Self::AgentLaunched) event for `spawn_agent`. + OrchestratorRequestProcessed { + /// Id of the requesting (orchestrator) agent — the request subdirectory. + requester_id: String, + /// The action that was processed (`spawn_agent`, `stop_agent`, …). + action: String, + /// Whether IdeA handled it successfully. + ok: bool, + }, /// Raw PTY output (usually routed to a dedicated channel, not this bus). PtyOutput { /// The session. diff --git a/crates/domain/src/lib.rs b/crates/domain/src/lib.rs index 7701ce1..982e9d4 100644 --- a/crates/domain/src/lib.rs +++ b/crates/domain/src/lib.rs @@ -37,6 +37,7 @@ pub mod git; pub mod ids; pub mod layout; pub mod markdown; +pub mod orchestrator; pub mod ports; pub mod profile; pub mod project; @@ -79,6 +80,8 @@ pub use layout::{ pub use events::DomainEvent; +pub use orchestrator::{OrchestratorCommand, OrchestratorError, OrchestratorRequest}; + pub use ports::{ AgentContextStore, AgentRuntime, Clock, ContextInjectionPlan, DirEntry, EventBus, EventStream, ExitStatus, FileSystem, FsError, GitCommitInfo, GitError, GitFileStatus, GitPort, GraphCommit, diff --git a/crates/domain/src/orchestrator.rs b/crates/domain/src/orchestrator.rs new file mode 100644 index 0000000..6df293e --- /dev/null +++ b/crates/domain/src/orchestrator.rs @@ -0,0 +1,255 @@ +//! Orchestrator request model (ARCHITECTURE §14.3). +//! +//! An *orchestrator* agent does not spawn child processes itself: it **delegates** +//! agent lifecycle to IdeA (the single source of truth) by dropping a JSON request +//! file under `/.ideai/requests//*.json`. This module +//! owns the **pure** request model: the wire-level [`OrchestratorRequest`] (serde, +//! camelCase) and its validation into a well-formed [`OrchestratorCommand`]. +//! +//! It is I/O-free: parsing the file, dispatching to use cases and writing the +//! response are infrastructure/application concerns. Keeping the model here means +//! validation invariants (known action, required fields present) are unit-testable +//! without touching the filesystem. + +use serde::{Deserialize, Serialize}; + +/// Errors raised while validating a raw [`OrchestratorRequest`]. +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum OrchestratorError { + /// The `action` field is not one of the supported v1 actions. + #[error("unknown orchestrator action: {0}")] + UnknownAction(String), + /// A field required by the chosen action is missing or empty. + #[error("missing required field `{field}` for action `{action}`")] + MissingField { + /// The action being validated. + action: String, + /// The required field that was absent or empty. + field: String, + }, +} + +/// The raw, wire-level orchestrator request as deserialised from a request file. +/// +/// All payload fields are optional at this layer; which ones are *required* +/// depends on `action` and is enforced by [`OrchestratorRequest::validate`]. This +/// keeps deserialisation total (any JSON object shape parses) and pushes the +/// metier invariants into one explicit, tested place. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OrchestratorRequest { + /// The requested action (`spawn_agent`, `stop_agent`, `update_agent_context`). + pub action: String, + /// Target agent display name (required by every v1 action). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub name: Option, + /// Runtime profile slug/name (required by `spawn_agent`). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub profile: Option, + /// Context reference: for `spawn_agent` the relative `.md` path is informative + /// (the manifest owns the real path); for `update_agent_context` this carries + /// the **new Markdown body** to write. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub context: Option, +} + +/// A validated orchestrator command — the only thing the application layer acts on. +/// +/// Each variant carries exactly the fields its action needs; constructing one is +/// proof the request was well-formed (Parse, don't validate). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OrchestratorCommand { + /// Create the agent if unknown (with `profile` + optional initial context), + /// then launch it — exactly as the UI would. + SpawnAgent { + /// Target agent display name. + name: String, + /// Profile slug/name to resolve against the configured profiles. + profile: String, + /// Optional initial `.md` body for a freshly-created agent. + context: Option, + }, + /// Stop a running agent by killing its terminal session. + StopAgent { + /// Target agent display name. + name: String, + }, + /// Overwrite an agent's `.md` context with a new body. + UpdateAgentContext { + /// Target agent display name. + name: String, + /// New Markdown body. + context: String, + }, +} + +impl OrchestratorRequest { + /// Validates the raw request into a well-formed [`OrchestratorCommand`]. + /// + /// Invariants enforced here (ARCHITECTURE §14.3): + /// - `action` must be a known v1 action, + /// - `name` is required (non-empty) for every action, + /// - `spawn_agent` additionally requires a non-empty `profile`, + /// - `update_agent_context` additionally requires a `context` body. + /// + /// # Errors + /// [`OrchestratorError::UnknownAction`] for an unsupported action; + /// [`OrchestratorError::MissingField`] when a required field is absent/empty. + pub fn validate(&self) -> Result { + let action = self.action.trim(); + match action { + "spawn_agent" => Ok(OrchestratorCommand::SpawnAgent { + name: self.require_name(action)?, + profile: self.require("profile", action, self.profile.as_deref())?, + context: self + .context + .as_ref() + .filter(|c| !c.is_empty()) + .cloned(), + }), + "stop_agent" => Ok(OrchestratorCommand::StopAgent { + name: self.require_name(action)?, + }), + "update_agent_context" => Ok(OrchestratorCommand::UpdateAgentContext { + name: self.require_name(action)?, + context: self.require("context", action, self.context.as_deref())?, + }), + other => Err(OrchestratorError::UnknownAction(other.to_owned())), + } + } + + /// Requires a non-empty `name`, shared by all actions. + fn require_name(&self, action: &str) -> Result { + self.require("name", action, self.name.as_deref()) + } + + /// Requires `value` to be present and non-empty (after trimming), else a + /// [`OrchestratorError::MissingField`] naming `field`/`action`. + fn require( + &self, + field: &str, + action: &str, + value: Option<&str>, + ) -> Result { + match value { + Some(v) if !v.trim().is_empty() => Ok(v.trim().to_owned()), + _ => Err(OrchestratorError::MissingField { + action: action.to_owned(), + field: field.to_owned(), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn req(json: &str) -> OrchestratorRequest { + serde_json::from_str(json).expect("valid json") + } + + #[test] + fn spawn_agent_parses_and_validates() { + let r = req( + r#"{ "action": "spawn_agent", "name": "dev-backend", "profile": "claude-code", "context": "agents/dev-backend.md" }"#, + ); + assert_eq!( + r.validate().unwrap(), + OrchestratorCommand::SpawnAgent { + name: "dev-backend".to_owned(), + profile: "claude-code".to_owned(), + context: Some("agents/dev-backend.md".to_owned()), + } + ); + } + + #[test] + fn spawn_agent_without_context_is_valid() { + let r = req(r#"{ "action": "spawn_agent", "name": "a", "profile": "claude-code" }"#); + assert_eq!( + r.validate().unwrap(), + OrchestratorCommand::SpawnAgent { + name: "a".to_owned(), + profile: "claude-code".to_owned(), + context: None, + } + ); + } + + #[test] + fn spawn_agent_missing_profile_is_rejected() { + let r = req(r#"{ "action": "spawn_agent", "name": "a" }"#); + assert_eq!( + r.validate(), + Err(OrchestratorError::MissingField { + action: "spawn_agent".to_owned(), + field: "profile".to_owned(), + }) + ); + } + + #[test] + fn stop_agent_validates() { + let r = req(r#"{ "action": "stop_agent", "name": "dev-backend" }"#); + assert_eq!( + r.validate().unwrap(), + OrchestratorCommand::StopAgent { + name: "dev-backend".to_owned() + } + ); + } + + #[test] + fn stop_agent_missing_name_is_rejected() { + let r = req(r#"{ "action": "stop_agent" }"#); + assert_eq!( + r.validate(), + Err(OrchestratorError::MissingField { + action: "stop_agent".to_owned(), + field: "name".to_owned(), + }) + ); + } + + #[test] + fn update_context_requires_a_body() { + let ok = req( + r##"{ "action": "update_agent_context", "name": "a", "context": "# new body" }"##, + ); + assert_eq!( + ok.validate().unwrap(), + OrchestratorCommand::UpdateAgentContext { + name: "a".to_owned(), + context: "# new body".to_owned(), + } + ); + + let missing = req(r#"{ "action": "update_agent_context", "name": "a" }"#); + assert_eq!( + missing.validate(), + Err(OrchestratorError::MissingField { + action: "update_agent_context".to_owned(), + field: "context".to_owned(), + }) + ); + } + + #[test] + fn unknown_action_is_rejected() { + let r = req(r#"{ "action": "delete_everything", "name": "a" }"#); + assert_eq!( + r.validate(), + Err(OrchestratorError::UnknownAction("delete_everything".to_owned())) + ); + } + + #[test] + fn blank_name_is_treated_as_missing() { + let r = req(r#"{ "action": "stop_agent", "name": " " }"#); + assert!(matches!( + r.validate(), + Err(OrchestratorError::MissingField { .. }) + )); + } +} diff --git a/crates/infrastructure/Cargo.toml b/crates/infrastructure/Cargo.toml index 450819f..9aeb9a1 100644 --- a/crates/infrastructure/Cargo.toml +++ b/crates/infrastructure/Cargo.toml @@ -8,12 +8,18 @@ description = "IdeA — infrastructure layer: concrete adapters implementing the [dependencies] domain = { workspace = true } +# The orchestrator filesystem watcher (driving adapter, ARCHITECTURE §14.3) drives +# the application's `OrchestratorService`; infrastructure may depend on application. +application = { workspace = true } # `process` (additive) powers LocalProcessSpawner; the workspace baseline keeps # rt/macros/sync/fs/io-util. -tokio = { workspace = true, features = ["process"] } +tokio = { workspace = true, features = ["process", "time"] } uuid = { workspace = true } async-trait = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } portable-pty = "0.9" git2 = { workspace = true } +# Filesystem change notifications used to *wake* the orchestrator poll loop early +# (the poll loop remains the robust cross-platform correctness guarantee). +notify = "6" diff --git a/crates/infrastructure/src/lib.rs b/crates/infrastructure/src/lib.rs index 2cb9cb4..a42e36d 100644 --- a/crates/infrastructure/src/lib.rs +++ b/crates/infrastructure/src/lib.rs @@ -17,6 +17,7 @@ pub mod eventbus; pub mod fs; pub mod git; pub mod id; +pub mod orchestrator; pub mod process; pub mod pty; pub mod remote; @@ -28,6 +29,10 @@ pub use eventbus::TokioBroadcastEventBus; pub use fs::LocalFileSystem; pub use git::Git2Repository; pub use id::UuidGenerator; +pub use orchestrator::{ + process_request_file, FsOrchestratorWatcher, OrchestratorResponse, OrchestratorWatchHandle, + REQUESTS_SUBDIR, +}; pub use process::LocalProcessSpawner; pub use pty::PortablePtyAdapter; pub use remote::{remote_host, LocalHost}; diff --git a/crates/infrastructure/src/orchestrator/mod.rs b/crates/infrastructure/src/orchestrator/mod.rs new file mode 100644 index 0000000..3a74ec0 --- /dev/null +++ b/crates/infrastructure/src/orchestrator/mod.rs @@ -0,0 +1,273 @@ +//! [`FsOrchestratorWatcher`] — filesystem driving adapter for the orchestrator +//! protocol (ARCHITECTURE §14.3). +//! +//! An orchestrator agent drops a JSON request under +//! `/.ideai/requests//*.json`. This adapter watches +//! that tree, and for each request file: +//! +//! 1. parses + validates it into a [`domain::OrchestratorCommand`], +//! 2. dispatches it through the application's [`OrchestratorService`] (which calls +//! the *same* use cases as the UI — IdeA stays the single source of truth for +//! the agent lifecycle; the orchestrator never spawns a process itself), +//! 3. writes a sibling `.response.json` (`ok` / error), +//! 4. removes the consumed request file. +//! +//! ## notify vs polling +//! +//! We use a **poll loop** as the primary trigger (a tokio interval re-scanning the +//! requests tree) and use [`notify`] purely to *wake* that loop early on a change. +//! Polling is the robust baseline: `notify`'s native back-ends behave +//! inconsistently across the platforms IdeA targets (inotify on Linux, ReadDirectoryChangesW +//! on Windows) and over network/WSL-mounted filesystems where an orchestrator's +//! project root may live. The poll loop guarantees eventual processing regardless; +//! notify just lowers latency. The per-file logic ([`process_request_file`]) is a +//! standalone async fn, unit-tested against a temp dir independently of the watch. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use application::OrchestratorService; +use domain::{DomainEvent, OrchestratorRequest, Project}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; + +/// Subdirectory (under `.ideai/`) the orchestrator drops request files into. +pub const REQUESTS_SUBDIR: &str = "requests"; + +/// How often the poll loop re-scans the requests tree when idle. +const POLL_INTERVAL_MS: u64 = 500; + +/// The JSON response written next to a consumed request file. +/// +/// Serialised camelCase to match the DTO convention used across IdeA's wire +/// formats. `ok` is the single boolean an orchestrator polls for; `detail` carries +/// a success summary and `error` the failure reason (mutually exclusive). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct OrchestratorResponse { + /// Whether IdeA handled the request successfully. + pub ok: bool, + /// The action that was attempted (echoed back), when parseable. + #[serde(skip_serializing_if = "Option::is_none")] + pub action: Option, + /// Human-readable success summary (`ok == true`). + #[serde(skip_serializing_if = "Option::is_none")] + pub detail: Option, + /// Human-readable failure reason (`ok == false`). + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +impl OrchestratorResponse { + fn success(action: String, detail: String) -> Self { + Self { + ok: true, + action: Some(action), + detail: Some(detail), + error: None, + } + } + fn failure(action: Option, error: String) -> Self { + Self { + ok: false, + action, + detail: None, + error: Some(error), + } + } +} + +/// A running watcher; dropping it stops the background task. +pub struct OrchestratorWatchHandle { + stop: mpsc::Sender<()>, +} + +impl OrchestratorWatchHandle { + /// Signals the watch loop to stop (best-effort; the task also stops when this + /// handle is dropped). + pub fn stop(&self) { + let _ = self.stop.try_send(()); + } +} + +/// Filesystem driving adapter that watches a project's `.ideai/requests/` tree. +pub struct FsOrchestratorWatcher; + +impl FsOrchestratorWatcher { + /// Starts watching `project`'s `.ideai/requests/` tree, dispatching every + /// request file through `service`. Returns a handle that stops the watch when + /// dropped (or via [`OrchestratorWatchHandle::stop`]). + /// + /// The optional `events` sink (a domain [`EventBus`](domain::ports::EventBus) + /// publish closure) is invoked with an [`DomainEvent::OrchestratorRequestProcessed`] + /// after each handled file so the presentation layer can surface orchestration + /// activity. The actual cell/tab for `spawn_agent` opens off the + /// [`DomainEvent::AgentLaunched`] the dispatch already publishes. + /// + /// Spawns onto the ambient Tokio runtime; never blocks the caller. + #[must_use] + pub fn start( + project: Project, + service: Arc, + events: Arc, + ) -> OrchestratorWatchHandle { + let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); + let requests_root = requests_root(&project); + + tokio::spawn(async move { + // Poll loop. A notify watcher (best-effort) wakes us early; the + // interval is the robust fallback. + let (wake_tx, mut wake_rx) = mpsc::channel::<()>(8); + let _notify_guard = spawn_notify(&requests_root, wake_tx); + + let mut interval = + tokio::time::interval(std::time::Duration::from_millis(POLL_INTERVAL_MS)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = stop_rx.recv() => break, + _ = interval.tick() => {} + _ = wake_rx.recv() => {} + } + scan_once(&requests_root, &project, &service, events.as_ref()).await; + } + }); + + OrchestratorWatchHandle { stop: stop_tx } + } +} + +/// Resolves `/.ideai/requests/`. +fn requests_root(project: &Project) -> PathBuf { + Path::new(project.root.as_str()) + .join(".ideai") + .join(REQUESTS_SUBDIR) +} + +/// Spawns a best-effort `notify` watcher that forwards change notifications to +/// `wake`. Returns the watcher guard (kept alive by the caller); on any error +/// (e.g. the directory does not exist yet) it returns `None` and the poll loop +/// still covers correctness. +fn spawn_notify(root: &Path, wake: mpsc::Sender<()>) -> Option { + use notify::{RecursiveMode, Watcher}; + + // The directory may not exist yet; create it so notify has something to watch + // and orchestrators have a stable drop target. Ignore failures — the poll loop + // re-creates intent each scan. + let _ = std::fs::create_dir_all(root); + + let mut watcher = notify::recommended_watcher(move |res: notify::Result| { + if res.is_ok() { + let _ = wake.try_send(()); + } + }) + .ok()?; + watcher.watch(root, RecursiveMode::Recursive).ok()?; + Some(watcher) +} + +/// Scans the requests tree once, processing every `*.json` request file that is +/// not itself a `*.response.json`. +async fn scan_once( + root: &Path, + project: &Project, + service: &OrchestratorService, + publish: &(dyn Fn(DomainEvent) + Send + Sync), +) { + let Ok(requesters) = std::fs::read_dir(root) else { + return; + }; + for requester in requesters.flatten() { + let dir = requester.path(); + if !dir.is_dir() { + continue; + } + let requester_id = requester.file_name().to_string_lossy().into_owned(); + let Ok(files) = std::fs::read_dir(&dir) else { + continue; + }; + for file in files.flatten() { + let path = file.path(); + if !is_request_file(&path) { + continue; + } + let outcome = process_request_file(&path, project, service).await; + publish(DomainEvent::OrchestratorRequestProcessed { + requester_id: requester_id.clone(), + action: outcome.action.clone().unwrap_or_default(), + ok: outcome.ok, + }); + } + } +} + +/// Whether `path` is a request file to process: a `.json` that is not a +/// `.response.json` sibling we wrote ourselves. +fn is_request_file(path: &Path) -> bool { + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or_default(); + name.ends_with(".json") && !name.ends_with(".response.json") +} + +/// Processes a single request file end to end: read → parse/validate → dispatch → +/// write `.response.json` → delete the request file. +/// +/// Always writes a response and removes the request (even on error) so a poisoned +/// request can never wedge the loop. Returns the [`OrchestratorResponse`] it wrote +/// (handy for tests and for the change event). Standalone (no watch) so it is +/// unit-testable against a temp directory. +pub async fn process_request_file( + path: &Path, + project: &Project, + service: &OrchestratorService, +) -> OrchestratorResponse { + let response = dispatch_file(path, project, service).await; + write_response(path, &response); + let _ = std::fs::remove_file(path); + response +} + +/// Reads + parses + validates + dispatches a request file, mapping every failure +/// to an [`OrchestratorResponse::failure`]. +async fn dispatch_file( + path: &Path, + project: &Project, + service: &OrchestratorService, +) -> OrchestratorResponse { + let bytes = match std::fs::read(path) { + Ok(b) => b, + Err(e) => return OrchestratorResponse::failure(None, format!("read failed: {e}")), + }; + let request: OrchestratorRequest = match serde_json::from_slice(&bytes) { + Ok(r) => r, + Err(e) => return OrchestratorResponse::failure(None, format!("invalid json: {e}")), + }; + let action = request.action.clone(); + let command = match request.validate() { + Ok(c) => c, + Err(e) => return OrchestratorResponse::failure(Some(action), e.to_string()), + }; + match service.dispatch(project, command).await { + Ok(out) => OrchestratorResponse::success(action, out.detail), + Err(e) => OrchestratorResponse::failure(Some(action), e.to_string()), + } +} + +/// Writes the response JSON next to the request file as `.response.json`. +fn write_response(request_path: &Path, response: &OrchestratorResponse) { + let response_path = response_path_for(request_path); + if let Ok(json) = serde_json::to_vec_pretty(response) { + let _ = std::fs::write(response_path, json); + } +} + +/// Derives `.json` → `.json.response.json` so the response is an +/// unambiguous sibling that `is_request_file` skips. +fn response_path_for(request_path: &Path) -> PathBuf { + let mut name = request_path + .file_name() + .map(|n| n.to_string_lossy().into_owned()) + .unwrap_or_default(); + name.push_str(".response.json"); + request_path.with_file_name(name) +} diff --git a/crates/infrastructure/tests/orchestrator_watcher.rs b/crates/infrastructure/tests/orchestrator_watcher.rs new file mode 100644 index 0000000..c803b94 --- /dev/null +++ b/crates/infrastructure/tests/orchestrator_watcher.rs @@ -0,0 +1,353 @@ +//! Integration tests for the orchestrator filesystem adapter (ARCHITECTURE §14.3). +//! +//! These drive [`process_request_file`] — the standalone "parse + dispatch + write +//! response + delete request" unit — against a real temp directory, with an +//! [`OrchestratorService`] wired over in-memory fakes. We assert: +//! +//! - a **valid** `spawn_agent` request → success response, request deleted, agent +//! created + launched, +//! - an **invalid JSON** request → error response (no panic, request deleted), +//! - an unknown action → error response carrying the rejection. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use domain::agent::{AgentManifest, ManifestEntry}; +use domain::events::DomainEvent; +use domain::ids::{AgentId, ProfileId, ProjectId}; +use domain::markdown::MarkdownDoc; +use domain::ports::{ + AgentContextStore, AgentRuntime, ContextInjectionPlan, DirEntry, EventBus, EventStream, + ExitStatus, FileSystem, FsError, IdGenerator, OutputStream, PreparedContext, ProfileStore, + PtyError, PtyHandle, PtyPort, RemotePath, RuntimeError, SpawnSpec, StoreError, +}; +use domain::profile::{AgentProfile, ContextInjection}; +use domain::project::{Project, ProjectPath}; +use domain::remote::RemoteRef; +use domain::{PtySize, SessionId}; +use uuid::Uuid; + +use application::{ + CloseTerminal, CreateAgentFromScratch, LaunchAgent, ListAgents, OrchestratorService, + TerminalSessions, UpdateAgentContext, +}; +use infrastructure::{process_request_file, OrchestratorResponse}; + +// --- temp dir (mirror local_fs.rs) --- +struct TempDir(PathBuf); +impl TempDir { + fn new() -> Self { + let p = std::env::temp_dir().join(format!("idea-orch-{}", Uuid::new_v4())); + std::fs::create_dir_all(&p).unwrap(); + Self(p) + } +} +impl Drop for TempDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } +} + +// --- minimal fakes --- +#[derive(Default)] +struct ContextsInner { + manifest: AgentManifest, + contents: HashMap, +} +#[derive(Clone)] +struct FakeContexts(Arc>); +impl FakeContexts { + fn new() -> Self { + Self(Arc::new(Mutex::new(ContextsInner { + manifest: AgentManifest { + version: 1, + entries: Vec::new(), + }, + contents: HashMap::new(), + }))) + } + fn entries(&self) -> Vec { + self.0.lock().unwrap().manifest.entries.clone() + } + fn md_path_of(&self, agent: &AgentId) -> Option { + self.0 + .lock() + .unwrap() + .manifest + .entries + .iter() + .find(|e| &e.agent_id == agent) + .map(|e| e.md_path.clone()) + } +} +#[async_trait] +impl AgentContextStore for FakeContexts { + async fn read_context( + &self, + _project: &Project, + agent: &AgentId, + ) -> Result { + let md = self.md_path_of(agent).ok_or(StoreError::NotFound)?; + Ok(MarkdownDoc::new( + self.0.lock().unwrap().contents.get(&md).cloned().unwrap_or_default(), + )) + } + async fn write_context( + &self, + _project: &Project, + agent: &AgentId, + md: &MarkdownDoc, + ) -> Result<(), StoreError> { + let path = self.md_path_of(agent).ok_or(StoreError::NotFound)?; + self.0 + .lock() + .unwrap() + .contents + .insert(path, md.as_str().to_owned()); + Ok(()) + } + async fn load_manifest(&self, _project: &Project) -> Result { + Ok(self.0.lock().unwrap().manifest.clone()) + } + async fn save_manifest( + &self, + _project: &Project, + manifest: &AgentManifest, + ) -> Result<(), StoreError> { + self.0.lock().unwrap().manifest = manifest.clone(); + Ok(()) + } +} + +#[derive(Clone)] +struct FakeProfiles(Arc>); +#[async_trait] +impl ProfileStore for FakeProfiles { + async fn list(&self) -> Result, StoreError> { + Ok((*self.0).clone()) + } + async fn save(&self, _p: &AgentProfile) -> Result<(), StoreError> { + Ok(()) + } + async fn delete(&self, _id: ProfileId) -> Result<(), StoreError> { + Ok(()) + } + async fn is_configured(&self) -> Result { + Ok(true) + } + async fn mark_configured(&self) -> Result<(), StoreError> { + Ok(()) + } +} + +struct FakeRuntime; +impl AgentRuntime for FakeRuntime { + fn detect(&self, _p: &AgentProfile) -> Result { + Ok(true) + } + fn prepare_invocation( + &self, + profile: &AgentProfile, + _ctx: &PreparedContext, + cwd: &ProjectPath, + ) -> Result { + Ok(SpawnSpec { + command: profile.command.clone(), + args: profile.args.clone(), + cwd: cwd.clone(), + env: Vec::new(), + context_plan: Some(ContextInjectionPlan::Stdin), + }) + } +} + +#[derive(Clone, Default)] +struct FakeFs; +#[async_trait] +impl FileSystem for FakeFs { + async fn read(&self, p: &RemotePath) -> Result, FsError> { + Err(FsError::NotFound(p.as_str().to_owned())) + } + async fn write(&self, _p: &RemotePath, _d: &[u8]) -> Result<(), FsError> { + Ok(()) + } + async fn exists(&self, _p: &RemotePath) -> Result { + Ok(false) + } + async fn create_dir_all(&self, _p: &RemotePath) -> Result<(), FsError> { + Ok(()) + } + async fn list(&self, _p: &RemotePath) -> Result, FsError> { + Ok(Vec::new()) + } + async fn symlink(&self, _s: &RemotePath, _d: &RemotePath) -> Result<(), FsError> { + Ok(()) + } +} + +#[derive(Clone)] +struct FakePty; +#[async_trait] +impl PtyPort for FakePty { + async fn spawn(&self, _s: SpawnSpec, _z: PtySize) -> Result { + Ok(PtyHandle { + session_id: SessionId::from_uuid(Uuid::from_u128(777)), + }) + } + fn write(&self, _h: &PtyHandle, _d: &[u8]) -> Result<(), PtyError> { + Ok(()) + } + fn resize(&self, _h: &PtyHandle, _z: PtySize) -> Result<(), PtyError> { + Ok(()) + } + fn subscribe_output(&self, _h: &PtyHandle) -> Result { + Ok(Box::new(std::iter::empty())) + } + fn scrollback(&self, _h: &PtyHandle) -> Result, PtyError> { + Ok(Vec::new()) + } + async fn kill(&self, _h: &PtyHandle) -> Result { + Ok(ExitStatus { code: Some(0) }) + } +} + +#[derive(Default, Clone)] +struct NoopBus; +impl EventBus for NoopBus { + fn publish(&self, _e: DomainEvent) {} + fn subscribe(&self) -> EventStream { + Box::new(std::iter::empty()) + } +} + +struct SeqIds(Mutex); +impl IdGenerator for SeqIds { + fn new_uuid(&self) -> Uuid { + let mut n = self.0.lock().unwrap(); + let id = Uuid::from_u128(*n); + *n += 1; + id + } +} + +fn project() -> Project { + Project::new( + ProjectId::from_uuid(Uuid::from_u128(1000)), + "demo", + ProjectPath::new("/home/me/proj").unwrap(), + RemoteRef::local(), + 1_700_000_000_000, + ) + .unwrap() +} + +fn build_service(contexts: FakeContexts) -> Arc { + let profiles = Arc::new(FakeProfiles(Arc::new(vec![AgentProfile::new( + ProfileId::from_uuid(Uuid::from_u128(9)), + "Claude Code", + "claude", + Vec::new(), + ContextInjection::stdin(), + None, + "{agentRunDir}", + ) + .unwrap()]))); + let sessions = Arc::new(TerminalSessions::new()); + let bus = Arc::new(NoopBus); + let create = Arc::new(CreateAgentFromScratch::new( + Arc::new(contexts.clone()), + Arc::new(SeqIds(Mutex::new(1))), + bus.clone(), + )); + let launch = Arc::new(LaunchAgent::new( + Arc::new(contexts.clone()), + Arc::clone(&profiles) as Arc, + Arc::new(FakeRuntime), + Arc::new(FakeFs), + Arc::new(FakePty), + Arc::clone(&sessions), + bus.clone(), + )); + let list = Arc::new(ListAgents::new(Arc::new(contexts.clone()))); + let close = Arc::new(CloseTerminal::new(Arc::new(FakePty), Arc::clone(&sessions))); + let update = Arc::new(UpdateAgentContext::new(Arc::new(contexts))); + Arc::new(OrchestratorService::new( + create, + launch, + list, + close, + update, + Arc::clone(&profiles) as Arc, + sessions, + )) +} + +fn read_response(request_path: &std::path::Path) -> OrchestratorResponse { + let mut name = request_path.file_name().unwrap().to_string_lossy().into_owned(); + name.push_str(".response.json"); + let response_path = request_path.with_file_name(name); + let bytes = std::fs::read(&response_path).expect("response file written"); + serde_json::from_slice(&bytes).expect("response parses") +} + +#[tokio::test] +async fn valid_spawn_request_succeeds_and_is_consumed() { + let tmp = TempDir::new(); + let contexts = FakeContexts::new(); + let service = build_service(contexts.clone()); + + let req = tmp.0.join("req-1.json"); + std::fs::write( + &req, + br#"{ "action": "spawn_agent", "name": "dev-backend", "profile": "claude-code" }"#, + ) + .unwrap(); + + let response = process_request_file(&req, &project(), &service).await; + + assert!(response.ok, "expected ok, got {response:?}"); + assert_eq!(response.action.as_deref(), Some("spawn_agent")); + // Request consumed; a response sibling written. + assert!(!req.exists(), "request file must be removed"); + let on_disk = read_response(&req); + assert!(on_disk.ok); + // The agent was actually created through the use cases. + assert_eq!(contexts.entries().len(), 1); + assert_eq!(contexts.entries()[0].name, "dev-backend"); +} + +#[tokio::test] +async fn invalid_json_request_yields_error_response() { + let tmp = TempDir::new(); + let service = build_service(FakeContexts::new()); + + let req = tmp.0.join("broken.json"); + std::fs::write(&req, b"{ this is not json").unwrap(); + + let response = process_request_file(&req, &project(), &service).await; + + assert!(!response.ok); + assert!( + response.error.as_deref().unwrap_or_default().contains("invalid json"), + "got {response:?}" + ); + assert!(!req.exists(), "poisoned request must still be removed"); + assert!(!read_response(&req).ok); +} + +#[tokio::test] +async fn unknown_action_yields_error_response() { + let tmp = TempDir::new(); + let service = build_service(FakeContexts::new()); + + let req = tmp.0.join("weird.json"); + std::fs::write(&req, br#"{ "action": "explode", "name": "x" }"#).unwrap(); + + let response = process_request_file(&req, &project(), &service).await; + + assert!(!response.ok); + assert_eq!(response.action.as_deref(), Some("explode")); + assert!(response.error.as_deref().unwrap_or_default().contains("unknown orchestrator action")); +}