//! [`FsOrchestratorWatcher`] — filesystem driving adapter for the orchestrator //! protocol (ARCHITECTURE §14.3). //! //! An orchestrator agent drops a JSON request under //! `/.ideai/requests//*.json`. This adapter watches //! that tree, and for each request file: //! //! 1. parses + validates it into a [`domain::OrchestratorCommand`], //! 2. dispatches it through the application's [`OrchestratorService`] (which calls //! the *same* use cases as the UI — IdeA stays the single source of truth for //! the agent lifecycle; the orchestrator never spawns a process itself), //! 3. writes a sibling `.response.json` (`ok` / error), //! 4. removes the consumed request file. //! //! ## notify vs polling //! //! We use a **poll loop** as the primary trigger (a tokio interval re-scanning the //! requests tree) and use [`notify`] purely to *wake* that loop early on a change. //! Polling is the robust baseline: `notify`'s native back-ends behave //! inconsistently across the platforms IdeA targets (inotify on Linux, ReadDirectoryChangesW //! on Windows) and over network/WSL-mounted filesystems where an orchestrator's //! project root may live. The poll loop guarantees eventual processing regardless; //! notify just lowers latency. The per-file logic ([`process_request_file`]) is a //! standalone async fn, unit-tested against a temp dir independently of the watch. use std::path::{Path, PathBuf}; use std::sync::Arc; use application::OrchestratorService; use domain::{DomainEvent, OrchestratorRequest, Project}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; /// Subdirectory (under `.ideai/`) the orchestrator drops request files into. pub const REQUESTS_SUBDIR: &str = "requests"; /// How often the poll loop re-scans the requests tree when idle. const POLL_INTERVAL_MS: u64 = 500; /// The JSON response written next to a consumed request file. /// /// Serialised camelCase to match the DTO convention used across IdeA's wire /// formats. `ok` is the single boolean an orchestrator polls for; `detail` carries /// a success summary and `error` the failure reason (mutually exclusive). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct OrchestratorResponse { /// Whether IdeA handled the request successfully. pub ok: bool, /// The action that was attempted (echoed back), when parseable. #[serde(skip_serializing_if = "Option::is_none")] pub action: Option, /// Human-readable success summary (`ok == true`). #[serde(skip_serializing_if = "Option::is_none")] pub detail: Option, /// Human-readable failure reason (`ok == false`). #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } impl OrchestratorResponse { fn success(action: String, detail: String) -> Self { Self { ok: true, action: Some(action), detail: Some(detail), error: None, } } fn failure(action: Option, error: String) -> Self { Self { ok: false, action, detail: None, error: Some(error), } } } /// A running watcher; dropping it stops the background task. pub struct OrchestratorWatchHandle { stop: mpsc::Sender<()>, } impl OrchestratorWatchHandle { /// Signals the watch loop to stop (best-effort; the task also stops when this /// handle is dropped). pub fn stop(&self) { let _ = self.stop.try_send(()); } } /// Filesystem driving adapter that watches a project's `.ideai/requests/` tree. pub struct FsOrchestratorWatcher; impl FsOrchestratorWatcher { /// Starts watching `project`'s `.ideai/requests/` tree, dispatching every /// request file through `service`. Returns a handle that stops the watch when /// dropped (or via [`OrchestratorWatchHandle::stop`]). /// /// The optional `events` sink (a domain [`EventBus`](domain::ports::EventBus) /// publish closure) is invoked with an [`DomainEvent::OrchestratorRequestProcessed`] /// after each handled file so the presentation layer can surface orchestration /// activity. The actual cell/tab for `spawn_agent` opens off the /// [`DomainEvent::AgentLaunched`] the dispatch already publishes. /// /// Spawns onto the ambient Tokio runtime; never blocks the caller. #[must_use] pub fn start( project: Project, service: Arc, events: Arc, ) -> OrchestratorWatchHandle { let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); let requests_root = requests_root(&project); tokio::spawn(async move { // Poll loop. A notify watcher (best-effort) wakes us early; the // interval is the robust fallback. let (wake_tx, mut wake_rx) = mpsc::channel::<()>(8); let _notify_guard = spawn_notify(&requests_root, wake_tx); let mut interval = tokio::time::interval(std::time::Duration::from_millis(POLL_INTERVAL_MS)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { _ = stop_rx.recv() => break, _ = interval.tick() => {} _ = wake_rx.recv() => {} } scan_once(&requests_root, &project, &service, events.as_ref()).await; } }); OrchestratorWatchHandle { stop: stop_tx } } } /// Resolves `/.ideai/requests/`. fn requests_root(project: &Project) -> PathBuf { Path::new(project.root.as_str()) .join(".ideai") .join(REQUESTS_SUBDIR) } /// Spawns a best-effort `notify` watcher that forwards change notifications to /// `wake`. Returns the watcher guard (kept alive by the caller); on any error /// (e.g. the directory does not exist yet) it returns `None` and the poll loop /// still covers correctness. fn spawn_notify(root: &Path, wake: mpsc::Sender<()>) -> Option { use notify::{RecursiveMode, Watcher}; // The directory may not exist yet; create it so notify has something to watch // and orchestrators have a stable drop target. Ignore failures — the poll loop // re-creates intent each scan. let _ = std::fs::create_dir_all(root); let mut watcher = notify::recommended_watcher(move |res: notify::Result| { if res.is_ok() { let _ = wake.try_send(()); } }) .ok()?; watcher.watch(root, RecursiveMode::Recursive).ok()?; Some(watcher) } /// Scans the requests tree once, processing every `*.json` request file that is /// not itself a `*.response.json`. async fn scan_once( root: &Path, project: &Project, service: &OrchestratorService, publish: &(dyn Fn(DomainEvent) + Send + Sync), ) { let Ok(requesters) = std::fs::read_dir(root) else { return; }; for requester in requesters.flatten() { let dir = requester.path(); if !dir.is_dir() { continue; } let requester_id = requester.file_name().to_string_lossy().into_owned(); let Ok(files) = std::fs::read_dir(&dir) else { continue; }; for file in files.flatten() { let path = file.path(); if !is_request_file(&path) { continue; } let outcome = process_request_file(&path, project, service).await; publish(DomainEvent::OrchestratorRequestProcessed { requester_id: requester_id.clone(), action: outcome.action.clone().unwrap_or_default(), ok: outcome.ok, }); } } } /// Whether `path` is a request file to process: a `.json` that is not a /// `.response.json` sibling we wrote ourselves. fn is_request_file(path: &Path) -> bool { let name = path.file_name().and_then(|n| n.to_str()).unwrap_or_default(); name.ends_with(".json") && !name.ends_with(".response.json") } /// Processes a single request file end to end: read → parse/validate → dispatch → /// write `.response.json` → delete the request file. /// /// Always writes a response and removes the request (even on error) so a poisoned /// request can never wedge the loop. Returns the [`OrchestratorResponse`] it wrote /// (handy for tests and for the change event). Standalone (no watch) so it is /// unit-testable against a temp directory. pub async fn process_request_file( path: &Path, project: &Project, service: &OrchestratorService, ) -> OrchestratorResponse { let response = dispatch_file(path, project, service).await; write_response(path, &response); let _ = std::fs::remove_file(path); response } /// Reads + parses + validates + dispatches a request file, mapping every failure /// to an [`OrchestratorResponse::failure`]. async fn dispatch_file( path: &Path, project: &Project, service: &OrchestratorService, ) -> OrchestratorResponse { let bytes = match std::fs::read(path) { Ok(b) => b, Err(e) => return OrchestratorResponse::failure(None, format!("read failed: {e}")), }; let request: OrchestratorRequest = match serde_json::from_slice(&bytes) { Ok(r) => r, Err(e) => return OrchestratorResponse::failure(None, format!("invalid json: {e}")), }; let action = request.action.clone(); let command = match request.validate() { Ok(c) => c, Err(e) => return OrchestratorResponse::failure(Some(action), e.to_string()), }; match service.dispatch(project, command).await { Ok(out) => OrchestratorResponse::success(action, out.detail), Err(e) => OrchestratorResponse::failure(Some(action), e.to_string()), } } /// Writes the response JSON next to the request file as `.response.json`. fn write_response(request_path: &Path, response: &OrchestratorResponse) { let response_path = response_path_for(request_path); if let Ok(json) = serde_json::to_vec_pretty(response) { let _ = std::fs::write(response_path, json); } } /// Derives `.json` → `.json.response.json` so the response is an /// unambiguous sibling that `is_request_file` skips. fn response_path_for(request_path: &Path) -> PathBuf { let mut name = request_path .file_name() .map(|n| n.to_string_lossy().into_owned()) .unwrap_or_default(); name.push_str(".response.json"); request_path.with_file_name(name) }