feat: add volume, images and networks

This commit is contained in:
2026-05-18 19:30:33 +02:00
parent 4f53aefb6e
commit bd3121d688
10 changed files with 1221 additions and 208 deletions

View File

@ -4,12 +4,15 @@ use bollard::{
ListContainersOptions, LogOutput, LogsOptions, RemoveContainerOptions,
StartContainerOptions, StopContainerOptions,
},
image::ListImagesOptions,
network::ListNetworksOptions,
volume::ListVolumesOptions,
Docker,
};
use futures_util::Stream;
use std::{collections::HashMap, pin::Pin};
use crate::proto::{ContainerInfo, ContainerPort};
use crate::proto::{ContainerInfo, ContainerPort, ImageInfo, NetworkInfo, VolumeInfo};
// ── Public trait ─────────────────────────────────────────────────────────────
@ -24,6 +27,10 @@ pub trait ContainerBackend: Clone + Send + Sync + 'static {
fn restart(&self, id: &str) -> impl std::future::Future<Output = Result<()>> + Send;
fn remove(&self, id: &str) -> impl std::future::Future<Output = Result<()>> + Send;
fn list_images(&self) -> impl std::future::Future<Output = Result<Vec<ImageInfo>>> + Send;
fn list_volumes(&self) -> impl std::future::Future<Output = Result<Vec<VolumeInfo>>> + Send;
fn list_networks(&self) -> impl std::future::Future<Output = Result<Vec<NetworkInfo>>> + Send;
fn logs(
&self,
id: &str,
@ -138,6 +145,55 @@ impl ContainerBackend for DockerClient {
Ok(())
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
let images = self
.inner
.list_images(None::<ListImagesOptions<String>>)
.await?;
Ok(images
.into_iter()
.map(|c| ImageInfo {
id: c.id,
tags: c.repo_tags,
size: c.size,
created_at: c.created,
})
.collect())
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
let info = self
.inner
.list_volumes(None::<ListVolumesOptions<String>>)
.await?;
Ok(info
.volumes
.unwrap_or_default()
.into_iter()
.map(|v| VolumeInfo {
name: v.name,
driver: v.driver,
mountpoint: v.mountpoint,
})
.collect())
}
async fn list_networks(&self) -> Result<Vec<NetworkInfo>> {
let networks = self
.inner
.list_networks(None::<ListNetworksOptions<String>>)
.await?;
Ok(networks
.into_iter()
.map(|n| NetworkInfo {
id: n.id.unwrap_or_default(),
name: n.name.unwrap_or_default(),
driver: n.driver.unwrap_or_default(),
scope: n.scope.unwrap_or_default(),
})
.collect())
}
fn logs(
&self,
id: &str,
@ -230,6 +286,38 @@ pub mod tests {
}])
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
self.record("list_images".to_string());
self.maybe_err()?;
Ok(vec![ImageInfo {
id: "sha256:abc".to_string(),
tags: vec!["nginx:latest".to_string()],
size: 1024,
created_at: 1_700_000_000,
}])
}
async fn list_volumes(&self) -> Result<Vec<VolumeInfo>> {
self.record("list_volumes".to_string());
self.maybe_err()?;
Ok(vec![VolumeInfo {
name: "data".to_string(),
driver: "local".to_string(),
mountpoint: "/var/lib/docker/volumes/data/_data".to_string(),
}])
}
async fn list_networks(&self) -> Result<Vec<NetworkInfo>> {
self.record("list_networks".to_string());
self.maybe_err()?;
Ok(vec![NetworkInfo {
id: "net123".to_string(),
name: "bridge".to_string(),
driver: "bridge".to_string(),
scope: "local".to_string(),
}])
}
async fn start(&self, id: &str) -> Result<()> {
self.record(format!("start:{id}"));
self.maybe_err()
@ -291,6 +379,81 @@ pub mod tests {
assert!(err.to_string().contains("docker down"));
}
// ── list_images ───────────────────────────────────────────────────────────
#[tokio::test]
async fn mock_list_images_returns_one_entry() {
let backend = MockBackend::new();
let images = backend.list_images().await.unwrap();
assert_eq!(images.len(), 1);
assert_eq!(images[0].tags, vec!["nginx:latest"]);
}
#[tokio::test]
async fn mock_list_images_records_call() {
let backend = MockBackend::new();
backend.list_images().await.unwrap();
let calls = backend.calls.lock().unwrap().clone();
assert_eq!(calls, vec!["list_images"]);
}
#[tokio::test]
async fn mock_list_images_propagates_error() {
let backend = MockBackend::failing("image daemon down");
let err = backend.list_images().await.unwrap_err();
assert!(err.to_string().contains("image daemon down"));
}
// ── list_volumes ──────────────────────────────────────────────────────────
#[tokio::test]
async fn mock_list_volumes_returns_one_entry() {
let backend = MockBackend::new();
let volumes = backend.list_volumes().await.unwrap();
assert_eq!(volumes.len(), 1);
assert_eq!(volumes[0].name, "data");
}
#[tokio::test]
async fn mock_list_volumes_records_call() {
let backend = MockBackend::new();
backend.list_volumes().await.unwrap();
let calls = backend.calls.lock().unwrap().clone();
assert_eq!(calls, vec!["list_volumes"]);
}
#[tokio::test]
async fn mock_list_volumes_propagates_error() {
let backend = MockBackend::failing("volume daemon down");
let err = backend.list_volumes().await.unwrap_err();
assert!(err.to_string().contains("volume daemon down"));
}
// ── list_networks ─────────────────────────────────────────────────────────
#[tokio::test]
async fn mock_list_networks_returns_one_entry() {
let backend = MockBackend::new();
let networks = backend.list_networks().await.unwrap();
assert_eq!(networks.len(), 1);
assert_eq!(networks[0].name, "bridge");
}
#[tokio::test]
async fn mock_list_networks_records_call() {
let backend = MockBackend::new();
backend.list_networks().await.unwrap();
let calls = backend.calls.lock().unwrap().clone();
assert_eq!(calls, vec!["list_networks"]);
}
#[tokio::test]
async fn mock_list_networks_propagates_error() {
let backend = MockBackend::failing("network daemon down");
let err = backend.list_networks().await.unwrap_err();
assert!(err.to_string().contains("network daemon down"));
}
// ── start / stop / restart / remove ──────────────────────────────────────
#[tokio::test]

View File

@ -12,6 +12,7 @@ use proto::{
agent_gateway_client::AgentGatewayClient,
agent_message, server_message,
AgentHandshake, AgentMessage, ContainerAction, ContainerSnapshot,
ImageInfo, VolumeInfo, NetworkInfo,
};
use std::{collections::HashMap, env, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle, time};
@ -73,20 +74,45 @@ async fn run(url: &str, token: &str, hostname: &str, docker: DockerClient) -> Re
loop {
tokio::select! {
_ = snapshot_ticker.tick() => {
match time::timeout(Duration::from_secs(5), docker.list_containers()).await {
Err(_) => warn!("docker list timed out"),
Ok(Err(e)) => warn!("docker list failed: {:#}", e),
Ok(Ok(containers)) => {
let msg = AgentMessage {
payload: Some(agent_message::Payload::Snapshot(ContainerSnapshot {
containers,
timestamp: unix_now(),
})),
};
if tx.send(msg).await.is_err() {
break;
}
}
let (containers_res, images_res, volumes_res, networks_res) = tokio::join!(
time::timeout(Duration::from_secs(5), docker.list_containers()),
time::timeout(Duration::from_secs(5), docker.list_images()),
time::timeout(Duration::from_secs(5), docker.list_volumes()),
time::timeout(Duration::from_secs(5), docker.list_networks()),
);
let containers = match containers_res {
Err(_) => { warn!("docker list_containers timed out"); continue; }
Ok(Err(e)) => { warn!("docker list_containers failed: {:#}", e); continue; }
Ok(Ok(v)) => v,
};
let images: Vec<ImageInfo> = match images_res {
Err(_) => { warn!("docker list_images timed out"); vec![] }
Ok(Err(e)) => { warn!("docker list_images failed: {:#}", e); vec![] }
Ok(Ok(v)) => v,
};
let volumes: Vec<VolumeInfo> = match volumes_res {
Err(_) => { warn!("docker list_volumes timed out"); vec![] }
Ok(Err(e)) => { warn!("docker list_volumes failed: {:#}", e); vec![] }
Ok(Ok(v)) => v,
};
let networks: Vec<NetworkInfo> = match networks_res {
Err(_) => { warn!("docker list_networks timed out"); vec![] }
Ok(Err(e)) => { warn!("docker list_networks failed: {:#}", e); vec![] }
Ok(Ok(v)) => v,
};
let msg = AgentMessage {
payload: Some(agent_message::Payload::Snapshot(ContainerSnapshot {
containers,
timestamp: unix_now(),
images,
volumes,
networks,
})),
};
if tx.send(msg).await.is_err() {
break;
}
}