Merge branch 'worktree-agent-a2650e91d2bd39ca2' into develop
This commit is contained in:
@ -8,12 +8,18 @@ description = "IdeA — infrastructure layer: concrete adapters implementing the
|
||||
|
||||
[dependencies]
|
||||
domain = { workspace = true }
|
||||
# The orchestrator filesystem watcher (driving adapter, ARCHITECTURE §14.3) drives
|
||||
# the application's `OrchestratorService`; infrastructure may depend on application.
|
||||
application = { workspace = true }
|
||||
# `process` (additive) powers LocalProcessSpawner; the workspace baseline keeps
|
||||
# rt/macros/sync/fs/io-util.
|
||||
tokio = { workspace = true, features = ["process"] }
|
||||
tokio = { workspace = true, features = ["process", "time"] }
|
||||
uuid = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
portable-pty = "0.9"
|
||||
git2 = { workspace = true }
|
||||
# Filesystem change notifications used to *wake* the orchestrator poll loop early
|
||||
# (the poll loop remains the robust cross-platform correctness guarantee).
|
||||
notify = "6"
|
||||
|
||||
@ -17,6 +17,7 @@ pub mod eventbus;
|
||||
pub mod fs;
|
||||
pub mod git;
|
||||
pub mod id;
|
||||
pub mod orchestrator;
|
||||
pub mod process;
|
||||
pub mod pty;
|
||||
pub mod remote;
|
||||
@ -28,6 +29,10 @@ pub use eventbus::TokioBroadcastEventBus;
|
||||
pub use fs::LocalFileSystem;
|
||||
pub use git::Git2Repository;
|
||||
pub use id::UuidGenerator;
|
||||
pub use orchestrator::{
|
||||
process_request_file, FsOrchestratorWatcher, OrchestratorResponse, OrchestratorWatchHandle,
|
||||
REQUESTS_SUBDIR,
|
||||
};
|
||||
pub use process::LocalProcessSpawner;
|
||||
pub use pty::PortablePtyAdapter;
|
||||
pub use remote::{remote_host, LocalHost};
|
||||
|
||||
273
crates/infrastructure/src/orchestrator/mod.rs
Normal file
273
crates/infrastructure/src/orchestrator/mod.rs
Normal file
@ -0,0 +1,273 @@
|
||||
//! [`FsOrchestratorWatcher`] — filesystem driving adapter for the orchestrator
|
||||
//! protocol (ARCHITECTURE §14.3).
|
||||
//!
|
||||
//! An orchestrator agent drops a JSON request under
|
||||
//! `<project_root>/.ideai/requests/<requester-id>/*.json`. This adapter watches
|
||||
//! that tree, and for each request file:
|
||||
//!
|
||||
//! 1. parses + validates it into a [`domain::OrchestratorCommand`],
|
||||
//! 2. dispatches it through the application's [`OrchestratorService`] (which calls
|
||||
//! the *same* use cases as the UI — IdeA stays the single source of truth for
|
||||
//! the agent lifecycle; the orchestrator never spawns a process itself),
|
||||
//! 3. writes a sibling `<file>.response.json` (`ok` / error),
|
||||
//! 4. removes the consumed request file.
|
||||
//!
|
||||
//! ## notify vs polling
|
||||
//!
|
||||
//! We use a **poll loop** as the primary trigger (a tokio interval re-scanning the
|
||||
//! requests tree) and use [`notify`] purely to *wake* that loop early on a change.
|
||||
//! Polling is the robust baseline: `notify`'s native back-ends behave
|
||||
//! inconsistently across the platforms IdeA targets (inotify on Linux, ReadDirectoryChangesW
|
||||
//! on Windows) and over network/WSL-mounted filesystems where an orchestrator's
|
||||
//! project root may live. The poll loop guarantees eventual processing regardless;
|
||||
//! notify just lowers latency. The per-file logic ([`process_request_file`]) is a
|
||||
//! standalone async fn, unit-tested against a temp dir independently of the watch.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use application::OrchestratorService;
|
||||
use domain::{DomainEvent, OrchestratorRequest, Project};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Subdirectory (under `.ideai/`) the orchestrator drops request files into.
|
||||
pub const REQUESTS_SUBDIR: &str = "requests";
|
||||
|
||||
/// How often the poll loop re-scans the requests tree when idle.
|
||||
const POLL_INTERVAL_MS: u64 = 500;
|
||||
|
||||
/// The JSON response written next to a consumed request file.
|
||||
///
|
||||
/// Serialised camelCase to match the DTO convention used across IdeA's wire
|
||||
/// formats. `ok` is the single boolean an orchestrator polls for; `detail` carries
|
||||
/// a success summary and `error` the failure reason (mutually exclusive).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct OrchestratorResponse {
|
||||
/// Whether IdeA handled the request successfully.
|
||||
pub ok: bool,
|
||||
/// The action that was attempted (echoed back), when parseable.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub action: Option<String>,
|
||||
/// Human-readable success summary (`ok == true`).
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub detail: Option<String>,
|
||||
/// Human-readable failure reason (`ok == false`).
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
impl OrchestratorResponse {
|
||||
fn success(action: String, detail: String) -> Self {
|
||||
Self {
|
||||
ok: true,
|
||||
action: Some(action),
|
||||
detail: Some(detail),
|
||||
error: None,
|
||||
}
|
||||
}
|
||||
fn failure(action: Option<String>, error: String) -> Self {
|
||||
Self {
|
||||
ok: false,
|
||||
action,
|
||||
detail: None,
|
||||
error: Some(error),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A running watcher; dropping it stops the background task.
|
||||
pub struct OrchestratorWatchHandle {
|
||||
stop: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
impl OrchestratorWatchHandle {
|
||||
/// Signals the watch loop to stop (best-effort; the task also stops when this
|
||||
/// handle is dropped).
|
||||
pub fn stop(&self) {
|
||||
let _ = self.stop.try_send(());
|
||||
}
|
||||
}
|
||||
|
||||
/// Filesystem driving adapter that watches a project's `.ideai/requests/` tree.
|
||||
pub struct FsOrchestratorWatcher;
|
||||
|
||||
impl FsOrchestratorWatcher {
|
||||
/// Starts watching `project`'s `.ideai/requests/` tree, dispatching every
|
||||
/// request file through `service`. Returns a handle that stops the watch when
|
||||
/// dropped (or via [`OrchestratorWatchHandle::stop`]).
|
||||
///
|
||||
/// The optional `events` sink (a domain [`EventBus`](domain::ports::EventBus)
|
||||
/// publish closure) is invoked with an [`DomainEvent::OrchestratorRequestProcessed`]
|
||||
/// after each handled file so the presentation layer can surface orchestration
|
||||
/// activity. The actual cell/tab for `spawn_agent` opens off the
|
||||
/// [`DomainEvent::AgentLaunched`] the dispatch already publishes.
|
||||
///
|
||||
/// Spawns onto the ambient Tokio runtime; never blocks the caller.
|
||||
#[must_use]
|
||||
pub fn start(
|
||||
project: Project,
|
||||
service: Arc<OrchestratorService>,
|
||||
events: Arc<dyn Fn(DomainEvent) + Send + Sync>,
|
||||
) -> OrchestratorWatchHandle {
|
||||
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
|
||||
let requests_root = requests_root(&project);
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Poll loop. A notify watcher (best-effort) wakes us early; the
|
||||
// interval is the robust fallback.
|
||||
let (wake_tx, mut wake_rx) = mpsc::channel::<()>(8);
|
||||
let _notify_guard = spawn_notify(&requests_root, wake_tx);
|
||||
|
||||
let mut interval =
|
||||
tokio::time::interval(std::time::Duration::from_millis(POLL_INTERVAL_MS));
|
||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = stop_rx.recv() => break,
|
||||
_ = interval.tick() => {}
|
||||
_ = wake_rx.recv() => {}
|
||||
}
|
||||
scan_once(&requests_root, &project, &service, events.as_ref()).await;
|
||||
}
|
||||
});
|
||||
|
||||
OrchestratorWatchHandle { stop: stop_tx }
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves `<project_root>/.ideai/requests/`.
|
||||
fn requests_root(project: &Project) -> PathBuf {
|
||||
Path::new(project.root.as_str())
|
||||
.join(".ideai")
|
||||
.join(REQUESTS_SUBDIR)
|
||||
}
|
||||
|
||||
/// Spawns a best-effort `notify` watcher that forwards change notifications to
|
||||
/// `wake`. Returns the watcher guard (kept alive by the caller); on any error
|
||||
/// (e.g. the directory does not exist yet) it returns `None` and the poll loop
|
||||
/// still covers correctness.
|
||||
fn spawn_notify(root: &Path, wake: mpsc::Sender<()>) -> Option<notify::RecommendedWatcher> {
|
||||
use notify::{RecursiveMode, Watcher};
|
||||
|
||||
// The directory may not exist yet; create it so notify has something to watch
|
||||
// and orchestrators have a stable drop target. Ignore failures — the poll loop
|
||||
// re-creates intent each scan.
|
||||
let _ = std::fs::create_dir_all(root);
|
||||
|
||||
let mut watcher = notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
|
||||
if res.is_ok() {
|
||||
let _ = wake.try_send(());
|
||||
}
|
||||
})
|
||||
.ok()?;
|
||||
watcher.watch(root, RecursiveMode::Recursive).ok()?;
|
||||
Some(watcher)
|
||||
}
|
||||
|
||||
/// Scans the requests tree once, processing every `*.json` request file that is
|
||||
/// not itself a `*.response.json`.
|
||||
async fn scan_once(
|
||||
root: &Path,
|
||||
project: &Project,
|
||||
service: &OrchestratorService,
|
||||
publish: &(dyn Fn(DomainEvent) + Send + Sync),
|
||||
) {
|
||||
let Ok(requesters) = std::fs::read_dir(root) else {
|
||||
return;
|
||||
};
|
||||
for requester in requesters.flatten() {
|
||||
let dir = requester.path();
|
||||
if !dir.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let requester_id = requester.file_name().to_string_lossy().into_owned();
|
||||
let Ok(files) = std::fs::read_dir(&dir) else {
|
||||
continue;
|
||||
};
|
||||
for file in files.flatten() {
|
||||
let path = file.path();
|
||||
if !is_request_file(&path) {
|
||||
continue;
|
||||
}
|
||||
let outcome = process_request_file(&path, project, service).await;
|
||||
publish(DomainEvent::OrchestratorRequestProcessed {
|
||||
requester_id: requester_id.clone(),
|
||||
action: outcome.action.clone().unwrap_or_default(),
|
||||
ok: outcome.ok,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether `path` is a request file to process: a `.json` that is not a
|
||||
/// `.response.json` sibling we wrote ourselves.
|
||||
fn is_request_file(path: &Path) -> bool {
|
||||
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or_default();
|
||||
name.ends_with(".json") && !name.ends_with(".response.json")
|
||||
}
|
||||
|
||||
/// Processes a single request file end to end: read → parse/validate → dispatch →
|
||||
/// write `<file>.response.json` → delete the request file.
|
||||
///
|
||||
/// Always writes a response and removes the request (even on error) so a poisoned
|
||||
/// request can never wedge the loop. Returns the [`OrchestratorResponse`] it wrote
|
||||
/// (handy for tests and for the change event). Standalone (no watch) so it is
|
||||
/// unit-testable against a temp directory.
|
||||
pub async fn process_request_file(
|
||||
path: &Path,
|
||||
project: &Project,
|
||||
service: &OrchestratorService,
|
||||
) -> OrchestratorResponse {
|
||||
let response = dispatch_file(path, project, service).await;
|
||||
write_response(path, &response);
|
||||
let _ = std::fs::remove_file(path);
|
||||
response
|
||||
}
|
||||
|
||||
/// Reads + parses + validates + dispatches a request file, mapping every failure
|
||||
/// to an [`OrchestratorResponse::failure`].
|
||||
async fn dispatch_file(
|
||||
path: &Path,
|
||||
project: &Project,
|
||||
service: &OrchestratorService,
|
||||
) -> OrchestratorResponse {
|
||||
let bytes = match std::fs::read(path) {
|
||||
Ok(b) => b,
|
||||
Err(e) => return OrchestratorResponse::failure(None, format!("read failed: {e}")),
|
||||
};
|
||||
let request: OrchestratorRequest = match serde_json::from_slice(&bytes) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return OrchestratorResponse::failure(None, format!("invalid json: {e}")),
|
||||
};
|
||||
let action = request.action.clone();
|
||||
let command = match request.validate() {
|
||||
Ok(c) => c,
|
||||
Err(e) => return OrchestratorResponse::failure(Some(action), e.to_string()),
|
||||
};
|
||||
match service.dispatch(project, command).await {
|
||||
Ok(out) => OrchestratorResponse::success(action, out.detail),
|
||||
Err(e) => OrchestratorResponse::failure(Some(action), e.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes the response JSON next to the request file as `<file>.response.json`.
|
||||
fn write_response(request_path: &Path, response: &OrchestratorResponse) {
|
||||
let response_path = response_path_for(request_path);
|
||||
if let Ok(json) = serde_json::to_vec_pretty(response) {
|
||||
let _ = std::fs::write(response_path, json);
|
||||
}
|
||||
}
|
||||
|
||||
/// Derives `<file>.json` → `<file>.json.response.json` so the response is an
|
||||
/// unambiguous sibling that `is_request_file` skips.
|
||||
fn response_path_for(request_path: &Path) -> PathBuf {
|
||||
let mut name = request_path
|
||||
.file_name()
|
||||
.map(|n| n.to_string_lossy().into_owned())
|
||||
.unwrap_or_default();
|
||||
name.push_str(".response.json");
|
||||
request_path.with_file_name(name)
|
||||
}
|
||||
353
crates/infrastructure/tests/orchestrator_watcher.rs
Normal file
353
crates/infrastructure/tests/orchestrator_watcher.rs
Normal file
@ -0,0 +1,353 @@
|
||||
//! Integration tests for the orchestrator filesystem adapter (ARCHITECTURE §14.3).
|
||||
//!
|
||||
//! These drive [`process_request_file`] — the standalone "parse + dispatch + write
|
||||
//! response + delete request" unit — against a real temp directory, with an
|
||||
//! [`OrchestratorService`] wired over in-memory fakes. We assert:
|
||||
//!
|
||||
//! - a **valid** `spawn_agent` request → success response, request deleted, agent
|
||||
//! created + launched,
|
||||
//! - an **invalid JSON** request → error response (no panic, request deleted),
|
||||
//! - an unknown action → error response carrying the rejection.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use domain::agent::{AgentManifest, ManifestEntry};
|
||||
use domain::events::DomainEvent;
|
||||
use domain::ids::{AgentId, ProfileId, ProjectId};
|
||||
use domain::markdown::MarkdownDoc;
|
||||
use domain::ports::{
|
||||
AgentContextStore, AgentRuntime, ContextInjectionPlan, DirEntry, EventBus, EventStream,
|
||||
ExitStatus, FileSystem, FsError, IdGenerator, OutputStream, PreparedContext, ProfileStore,
|
||||
PtyError, PtyHandle, PtyPort, RemotePath, RuntimeError, SpawnSpec, StoreError,
|
||||
};
|
||||
use domain::profile::{AgentProfile, ContextInjection};
|
||||
use domain::project::{Project, ProjectPath};
|
||||
use domain::remote::RemoteRef;
|
||||
use domain::{PtySize, SessionId};
|
||||
use uuid::Uuid;
|
||||
|
||||
use application::{
|
||||
CloseTerminal, CreateAgentFromScratch, LaunchAgent, ListAgents, OrchestratorService,
|
||||
TerminalSessions, UpdateAgentContext,
|
||||
};
|
||||
use infrastructure::{process_request_file, OrchestratorResponse};
|
||||
|
||||
// --- temp dir (mirror local_fs.rs) ---
|
||||
struct TempDir(PathBuf);
|
||||
impl TempDir {
|
||||
fn new() -> Self {
|
||||
let p = std::env::temp_dir().join(format!("idea-orch-{}", Uuid::new_v4()));
|
||||
std::fs::create_dir_all(&p).unwrap();
|
||||
Self(p)
|
||||
}
|
||||
}
|
||||
impl Drop for TempDir {
|
||||
fn drop(&mut self) {
|
||||
let _ = std::fs::remove_dir_all(&self.0);
|
||||
}
|
||||
}
|
||||
|
||||
// --- minimal fakes ---
|
||||
#[derive(Default)]
|
||||
struct ContextsInner {
|
||||
manifest: AgentManifest,
|
||||
contents: HashMap<String, String>,
|
||||
}
|
||||
#[derive(Clone)]
|
||||
struct FakeContexts(Arc<Mutex<ContextsInner>>);
|
||||
impl FakeContexts {
|
||||
fn new() -> Self {
|
||||
Self(Arc::new(Mutex::new(ContextsInner {
|
||||
manifest: AgentManifest {
|
||||
version: 1,
|
||||
entries: Vec::new(),
|
||||
},
|
||||
contents: HashMap::new(),
|
||||
})))
|
||||
}
|
||||
fn entries(&self) -> Vec<ManifestEntry> {
|
||||
self.0.lock().unwrap().manifest.entries.clone()
|
||||
}
|
||||
fn md_path_of(&self, agent: &AgentId) -> Option<String> {
|
||||
self.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.manifest
|
||||
.entries
|
||||
.iter()
|
||||
.find(|e| &e.agent_id == agent)
|
||||
.map(|e| e.md_path.clone())
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl AgentContextStore for FakeContexts {
|
||||
async fn read_context(
|
||||
&self,
|
||||
_project: &Project,
|
||||
agent: &AgentId,
|
||||
) -> Result<MarkdownDoc, StoreError> {
|
||||
let md = self.md_path_of(agent).ok_or(StoreError::NotFound)?;
|
||||
Ok(MarkdownDoc::new(
|
||||
self.0.lock().unwrap().contents.get(&md).cloned().unwrap_or_default(),
|
||||
))
|
||||
}
|
||||
async fn write_context(
|
||||
&self,
|
||||
_project: &Project,
|
||||
agent: &AgentId,
|
||||
md: &MarkdownDoc,
|
||||
) -> Result<(), StoreError> {
|
||||
let path = self.md_path_of(agent).ok_or(StoreError::NotFound)?;
|
||||
self.0
|
||||
.lock()
|
||||
.unwrap()
|
||||
.contents
|
||||
.insert(path, md.as_str().to_owned());
|
||||
Ok(())
|
||||
}
|
||||
async fn load_manifest(&self, _project: &Project) -> Result<AgentManifest, StoreError> {
|
||||
Ok(self.0.lock().unwrap().manifest.clone())
|
||||
}
|
||||
async fn save_manifest(
|
||||
&self,
|
||||
_project: &Project,
|
||||
manifest: &AgentManifest,
|
||||
) -> Result<(), StoreError> {
|
||||
self.0.lock().unwrap().manifest = manifest.clone();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FakeProfiles(Arc<Vec<AgentProfile>>);
|
||||
#[async_trait]
|
||||
impl ProfileStore for FakeProfiles {
|
||||
async fn list(&self) -> Result<Vec<AgentProfile>, StoreError> {
|
||||
Ok((*self.0).clone())
|
||||
}
|
||||
async fn save(&self, _p: &AgentProfile) -> Result<(), StoreError> {
|
||||
Ok(())
|
||||
}
|
||||
async fn delete(&self, _id: ProfileId) -> Result<(), StoreError> {
|
||||
Ok(())
|
||||
}
|
||||
async fn is_configured(&self) -> Result<bool, StoreError> {
|
||||
Ok(true)
|
||||
}
|
||||
async fn mark_configured(&self) -> Result<(), StoreError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct FakeRuntime;
|
||||
impl AgentRuntime for FakeRuntime {
|
||||
fn detect(&self, _p: &AgentProfile) -> Result<bool, RuntimeError> {
|
||||
Ok(true)
|
||||
}
|
||||
fn prepare_invocation(
|
||||
&self,
|
||||
profile: &AgentProfile,
|
||||
_ctx: &PreparedContext,
|
||||
cwd: &ProjectPath,
|
||||
) -> Result<SpawnSpec, RuntimeError> {
|
||||
Ok(SpawnSpec {
|
||||
command: profile.command.clone(),
|
||||
args: profile.args.clone(),
|
||||
cwd: cwd.clone(),
|
||||
env: Vec::new(),
|
||||
context_plan: Some(ContextInjectionPlan::Stdin),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct FakeFs;
|
||||
#[async_trait]
|
||||
impl FileSystem for FakeFs {
|
||||
async fn read(&self, p: &RemotePath) -> Result<Vec<u8>, FsError> {
|
||||
Err(FsError::NotFound(p.as_str().to_owned()))
|
||||
}
|
||||
async fn write(&self, _p: &RemotePath, _d: &[u8]) -> Result<(), FsError> {
|
||||
Ok(())
|
||||
}
|
||||
async fn exists(&self, _p: &RemotePath) -> Result<bool, FsError> {
|
||||
Ok(false)
|
||||
}
|
||||
async fn create_dir_all(&self, _p: &RemotePath) -> Result<(), FsError> {
|
||||
Ok(())
|
||||
}
|
||||
async fn list(&self, _p: &RemotePath) -> Result<Vec<DirEntry>, FsError> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
async fn symlink(&self, _s: &RemotePath, _d: &RemotePath) -> Result<(), FsError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FakePty;
|
||||
#[async_trait]
|
||||
impl PtyPort for FakePty {
|
||||
async fn spawn(&self, _s: SpawnSpec, _z: PtySize) -> Result<PtyHandle, PtyError> {
|
||||
Ok(PtyHandle {
|
||||
session_id: SessionId::from_uuid(Uuid::from_u128(777)),
|
||||
})
|
||||
}
|
||||
fn write(&self, _h: &PtyHandle, _d: &[u8]) -> Result<(), PtyError> {
|
||||
Ok(())
|
||||
}
|
||||
fn resize(&self, _h: &PtyHandle, _z: PtySize) -> Result<(), PtyError> {
|
||||
Ok(())
|
||||
}
|
||||
fn subscribe_output(&self, _h: &PtyHandle) -> Result<OutputStream, PtyError> {
|
||||
Ok(Box::new(std::iter::empty()))
|
||||
}
|
||||
fn scrollback(&self, _h: &PtyHandle) -> Result<Vec<u8>, PtyError> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
async fn kill(&self, _h: &PtyHandle) -> Result<ExitStatus, PtyError> {
|
||||
Ok(ExitStatus { code: Some(0) })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
struct NoopBus;
|
||||
impl EventBus for NoopBus {
|
||||
fn publish(&self, _e: DomainEvent) {}
|
||||
fn subscribe(&self) -> EventStream {
|
||||
Box::new(std::iter::empty())
|
||||
}
|
||||
}
|
||||
|
||||
struct SeqIds(Mutex<u128>);
|
||||
impl IdGenerator for SeqIds {
|
||||
fn new_uuid(&self) -> Uuid {
|
||||
let mut n = self.0.lock().unwrap();
|
||||
let id = Uuid::from_u128(*n);
|
||||
*n += 1;
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
fn project() -> Project {
|
||||
Project::new(
|
||||
ProjectId::from_uuid(Uuid::from_u128(1000)),
|
||||
"demo",
|
||||
ProjectPath::new("/home/me/proj").unwrap(),
|
||||
RemoteRef::local(),
|
||||
1_700_000_000_000,
|
||||
)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn build_service(contexts: FakeContexts) -> Arc<OrchestratorService> {
|
||||
let profiles = Arc::new(FakeProfiles(Arc::new(vec![AgentProfile::new(
|
||||
ProfileId::from_uuid(Uuid::from_u128(9)),
|
||||
"Claude Code",
|
||||
"claude",
|
||||
Vec::new(),
|
||||
ContextInjection::stdin(),
|
||||
None,
|
||||
"{agentRunDir}",
|
||||
)
|
||||
.unwrap()])));
|
||||
let sessions = Arc::new(TerminalSessions::new());
|
||||
let bus = Arc::new(NoopBus);
|
||||
let create = Arc::new(CreateAgentFromScratch::new(
|
||||
Arc::new(contexts.clone()),
|
||||
Arc::new(SeqIds(Mutex::new(1))),
|
||||
bus.clone(),
|
||||
));
|
||||
let launch = Arc::new(LaunchAgent::new(
|
||||
Arc::new(contexts.clone()),
|
||||
Arc::clone(&profiles) as Arc<dyn ProfileStore>,
|
||||
Arc::new(FakeRuntime),
|
||||
Arc::new(FakeFs),
|
||||
Arc::new(FakePty),
|
||||
Arc::clone(&sessions),
|
||||
bus.clone(),
|
||||
));
|
||||
let list = Arc::new(ListAgents::new(Arc::new(contexts.clone())));
|
||||
let close = Arc::new(CloseTerminal::new(Arc::new(FakePty), Arc::clone(&sessions)));
|
||||
let update = Arc::new(UpdateAgentContext::new(Arc::new(contexts)));
|
||||
Arc::new(OrchestratorService::new(
|
||||
create,
|
||||
launch,
|
||||
list,
|
||||
close,
|
||||
update,
|
||||
Arc::clone(&profiles) as Arc<dyn ProfileStore>,
|
||||
sessions,
|
||||
))
|
||||
}
|
||||
|
||||
fn read_response(request_path: &std::path::Path) -> OrchestratorResponse {
|
||||
let mut name = request_path.file_name().unwrap().to_string_lossy().into_owned();
|
||||
name.push_str(".response.json");
|
||||
let response_path = request_path.with_file_name(name);
|
||||
let bytes = std::fs::read(&response_path).expect("response file written");
|
||||
serde_json::from_slice(&bytes).expect("response parses")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn valid_spawn_request_succeeds_and_is_consumed() {
|
||||
let tmp = TempDir::new();
|
||||
let contexts = FakeContexts::new();
|
||||
let service = build_service(contexts.clone());
|
||||
|
||||
let req = tmp.0.join("req-1.json");
|
||||
std::fs::write(
|
||||
&req,
|
||||
br#"{ "action": "spawn_agent", "name": "dev-backend", "profile": "claude-code" }"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let response = process_request_file(&req, &project(), &service).await;
|
||||
|
||||
assert!(response.ok, "expected ok, got {response:?}");
|
||||
assert_eq!(response.action.as_deref(), Some("spawn_agent"));
|
||||
// Request consumed; a response sibling written.
|
||||
assert!(!req.exists(), "request file must be removed");
|
||||
let on_disk = read_response(&req);
|
||||
assert!(on_disk.ok);
|
||||
// The agent was actually created through the use cases.
|
||||
assert_eq!(contexts.entries().len(), 1);
|
||||
assert_eq!(contexts.entries()[0].name, "dev-backend");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn invalid_json_request_yields_error_response() {
|
||||
let tmp = TempDir::new();
|
||||
let service = build_service(FakeContexts::new());
|
||||
|
||||
let req = tmp.0.join("broken.json");
|
||||
std::fs::write(&req, b"{ this is not json").unwrap();
|
||||
|
||||
let response = process_request_file(&req, &project(), &service).await;
|
||||
|
||||
assert!(!response.ok);
|
||||
assert!(
|
||||
response.error.as_deref().unwrap_or_default().contains("invalid json"),
|
||||
"got {response:?}"
|
||||
);
|
||||
assert!(!req.exists(), "poisoned request must still be removed");
|
||||
assert!(!read_response(&req).ok);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_action_yields_error_response() {
|
||||
let tmp = TempDir::new();
|
||||
let service = build_service(FakeContexts::new());
|
||||
|
||||
let req = tmp.0.join("weird.json");
|
||||
std::fs::write(&req, br#"{ "action": "explode", "name": "x" }"#).unwrap();
|
||||
|
||||
let response = process_request_file(&req, &project(), &service).await;
|
||||
|
||||
assert!(!response.ok);
|
||||
assert_eq!(response.action.as_deref(), Some("explode"));
|
||||
assert!(response.error.as_deref().unwrap_or_default().contains("unknown orchestrator action"));
|
||||
}
|
||||
Reference in New Issue
Block a user