386 lines
13 KiB
Rust
386 lines
13 KiB
Rust
use anyhow::Result;
|
|
use bollard::{
|
|
container::{
|
|
ListContainersOptions, LogOutput, LogsOptions, RemoveContainerOptions,
|
|
StartContainerOptions, StopContainerOptions,
|
|
},
|
|
Docker,
|
|
};
|
|
use futures_util::Stream;
|
|
use std::{collections::HashMap, pin::Pin};
|
|
|
|
use crate::proto::{ContainerInfo, ContainerPort};
|
|
|
|
// ── Public trait ─────────────────────────────────────────────────────────────
|
|
|
|
/// Abstraction over Docker operations, allowing tests to provide a mock backend.
|
|
pub trait ContainerBackend: Clone + Send + Sync + 'static {
|
|
fn list_containers(
|
|
&self,
|
|
) -> impl std::future::Future<Output = Result<Vec<ContainerInfo>>> + Send;
|
|
|
|
fn start(&self, id: &str) -> impl std::future::Future<Output = Result<()>> + Send;
|
|
fn stop(&self, id: &str) -> impl std::future::Future<Output = Result<()>> + Send;
|
|
fn restart(&self, id: &str) -> impl std::future::Future<Output = Result<()>> + Send;
|
|
fn remove(&self, id: &str) -> impl std::future::Future<Output = Result<()>> + Send;
|
|
|
|
fn logs(
|
|
&self,
|
|
id: &str,
|
|
follow: bool,
|
|
tail: i32,
|
|
) -> Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>;
|
|
}
|
|
|
|
// ── Real implementation ───────────────────────────────────────────────────────
|
|
|
|
#[derive(Clone)]
|
|
pub struct DockerClient {
|
|
inner: Docker,
|
|
}
|
|
|
|
impl DockerClient {
|
|
pub fn new() -> Result<Self> {
|
|
Ok(Self {
|
|
inner: Docker::connect_with_socket_defaults()?,
|
|
})
|
|
}
|
|
}
|
|
|
|
impl ContainerBackend for DockerClient {
|
|
async fn list_containers(&self) -> Result<Vec<ContainerInfo>> {
|
|
let opts = ListContainersOptions::<String> {
|
|
all: true,
|
|
..Default::default()
|
|
};
|
|
let containers = self.inner.list_containers(Some(opts)).await?;
|
|
|
|
let result = containers
|
|
.into_iter()
|
|
.map(|c| {
|
|
let id = c.id.unwrap_or_default();
|
|
let name = c
|
|
.names
|
|
.unwrap_or_default()
|
|
.into_iter()
|
|
.next()
|
|
.unwrap_or_default()
|
|
.trim_start_matches('/')
|
|
.to_string();
|
|
|
|
let ports = c
|
|
.ports
|
|
.unwrap_or_default()
|
|
.into_iter()
|
|
.map(|p| ContainerPort {
|
|
host_port: p.public_port.unwrap_or(0) as u32,
|
|
container_port: p.private_port as u32,
|
|
protocol: p
|
|
.typ
|
|
.map(|t| format!("{:?}", t).to_lowercase())
|
|
.unwrap_or_default(),
|
|
host_ip: p.ip.unwrap_or_default(),
|
|
})
|
|
.collect();
|
|
|
|
let labels: HashMap<String, String> = c.labels.unwrap_or_default();
|
|
let compose_project = labels
|
|
.get("com.docker.compose.project")
|
|
.cloned()
|
|
.unwrap_or_default();
|
|
|
|
ContainerInfo {
|
|
id,
|
|
name,
|
|
image: c.image.unwrap_or_default(),
|
|
status: c.status.unwrap_or_default(),
|
|
state: c.state.unwrap_or_default(),
|
|
ports,
|
|
created_at: c.created.unwrap_or(0),
|
|
labels,
|
|
compose_project,
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
async fn start(&self, id: &str) -> Result<()> {
|
|
self.inner
|
|
.start_container(id, None::<StartContainerOptions<String>>)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn stop(&self, id: &str) -> Result<()> {
|
|
self.inner
|
|
.stop_container(id, Some(StopContainerOptions { t: 10 }))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn restart(&self, id: &str) -> Result<()> {
|
|
self.inner.restart_container(id, None).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn remove(&self, id: &str) -> Result<()> {
|
|
self.inner
|
|
.remove_container(
|
|
id,
|
|
Some(RemoveContainerOptions {
|
|
force: true,
|
|
..Default::default()
|
|
}),
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
fn logs(
|
|
&self,
|
|
id: &str,
|
|
follow: bool,
|
|
tail: i32,
|
|
) -> Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>> {
|
|
let tail_str = if tail > 0 {
|
|
tail.to_string()
|
|
} else {
|
|
"100".to_string()
|
|
};
|
|
Box::pin(self.inner.logs(
|
|
id,
|
|
Some(LogsOptions::<String> {
|
|
stdout: true,
|
|
stderr: true,
|
|
follow,
|
|
tail: tail_str,
|
|
timestamps: false,
|
|
..Default::default()
|
|
}),
|
|
))
|
|
}
|
|
}
|
|
|
|
// ── Tests ─────────────────────────────────────────────────────────────────────
|
|
|
|
#[cfg(test)]
|
|
pub mod tests {
|
|
use super::*;
|
|
use bytes::Bytes;
|
|
use futures_util::StreamExt;
|
|
use std::sync::{Arc, Mutex};
|
|
use tokio_stream::once as stream_once;
|
|
|
|
// ── Minimal mock backend ──────────────────────────────────────────────────
|
|
|
|
/// Records which method was last called and with what container id, so
|
|
/// tests can assert on behaviour without a real Docker daemon.
|
|
#[derive(Clone, Default)]
|
|
pub struct MockBackend {
|
|
pub calls: Arc<Mutex<Vec<String>>>,
|
|
/// When Some(msg) every async method returns Err(anyhow!(msg)).
|
|
pub fail_with: Option<String>,
|
|
}
|
|
|
|
impl MockBackend {
|
|
pub fn new() -> Self {
|
|
Self::default()
|
|
}
|
|
|
|
pub fn failing(msg: &str) -> Self {
|
|
Self {
|
|
calls: Default::default(),
|
|
fail_with: Some(msg.to_owned()),
|
|
}
|
|
}
|
|
|
|
fn record(&self, entry: String) {
|
|
self.calls.lock().unwrap().push(entry);
|
|
}
|
|
|
|
fn maybe_err(&self) -> Result<()> {
|
|
if let Some(ref m) = self.fail_with {
|
|
anyhow::bail!("{}", m);
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl ContainerBackend for MockBackend {
|
|
async fn list_containers(&self) -> Result<Vec<ContainerInfo>> {
|
|
self.record("list".to_string());
|
|
self.maybe_err()?;
|
|
Ok(vec![ContainerInfo {
|
|
id: "abc123".to_string(),
|
|
name: "test-container".to_string(),
|
|
image: "nginx:latest".to_string(),
|
|
status: "Up 2 hours".to_string(),
|
|
state: "running".to_string(),
|
|
ports: vec![ContainerPort {
|
|
host_port: 8080,
|
|
container_port: 80,
|
|
protocol: "tcp".to_string(),
|
|
host_ip: "0.0.0.0".to_string(),
|
|
}],
|
|
created_at: 1_700_000_000,
|
|
labels: HashMap::new(),
|
|
compose_project: String::new(),
|
|
}])
|
|
}
|
|
|
|
async fn start(&self, id: &str) -> Result<()> {
|
|
self.record(format!("start:{id}"));
|
|
self.maybe_err()
|
|
}
|
|
|
|
async fn stop(&self, id: &str) -> Result<()> {
|
|
self.record(format!("stop:{id}"));
|
|
self.maybe_err()
|
|
}
|
|
|
|
async fn restart(&self, id: &str) -> Result<()> {
|
|
self.record(format!("restart:{id}"));
|
|
self.maybe_err()
|
|
}
|
|
|
|
async fn remove(&self, id: &str) -> Result<()> {
|
|
self.record(format!("remove:{id}"));
|
|
self.maybe_err()
|
|
}
|
|
|
|
fn logs(
|
|
&self,
|
|
id: &str,
|
|
_follow: bool,
|
|
_tail: i32,
|
|
) -> Pin<Box<dyn Stream<Item = Result<LogOutput, bollard::errors::Error>> + Send>>
|
|
{
|
|
self.record(format!("logs:{id}"));
|
|
let chunk = LogOutput::StdOut {
|
|
message: Bytes::from("hello from mock\n"),
|
|
};
|
|
Box::pin(stream_once(Ok(chunk)))
|
|
}
|
|
}
|
|
|
|
// ── list_containers ───────────────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn mock_list_containers_returns_one_entry() {
|
|
let backend = MockBackend::new();
|
|
let containers = backend.list_containers().await.unwrap();
|
|
assert_eq!(containers.len(), 1);
|
|
assert_eq!(containers[0].id, "abc123");
|
|
assert_eq!(containers[0].name, "test-container");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mock_list_containers_records_call() {
|
|
let backend = MockBackend::new();
|
|
backend.list_containers().await.unwrap();
|
|
let calls = backend.calls.lock().unwrap().clone();
|
|
assert_eq!(calls, vec!["list"]);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mock_list_containers_propagates_error() {
|
|
let backend = MockBackend::failing("docker down");
|
|
let err = backend.list_containers().await.unwrap_err();
|
|
assert!(err.to_string().contains("docker down"));
|
|
}
|
|
|
|
// ── start / stop / restart / remove ──────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn mock_start_records_id() {
|
|
let backend = MockBackend::new();
|
|
backend.start("cid-1").await.unwrap();
|
|
assert_eq!(*backend.calls.lock().unwrap(), vec!["start:cid-1"]);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mock_stop_records_id() {
|
|
let backend = MockBackend::new();
|
|
backend.stop("cid-2").await.unwrap();
|
|
assert_eq!(*backend.calls.lock().unwrap(), vec!["stop:cid-2"]);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mock_restart_records_id() {
|
|
let backend = MockBackend::new();
|
|
backend.restart("cid-3").await.unwrap();
|
|
assert_eq!(*backend.calls.lock().unwrap(), vec!["restart:cid-3"]);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mock_remove_records_id() {
|
|
let backend = MockBackend::new();
|
|
backend.remove("cid-4").await.unwrap();
|
|
assert_eq!(*backend.calls.lock().unwrap(), vec!["remove:cid-4"]);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mock_operations_propagate_errors() {
|
|
let backend = MockBackend::failing("socket gone");
|
|
assert!(backend.start("x").await.is_err());
|
|
assert!(backend.stop("x").await.is_err());
|
|
assert!(backend.restart("x").await.is_err());
|
|
assert!(backend.remove("x").await.is_err());
|
|
}
|
|
|
|
// ── logs stream ──────────────────────────────────────────────────────────
|
|
|
|
#[tokio::test]
|
|
async fn mock_logs_yields_stdout_chunk() {
|
|
let backend = MockBackend::new();
|
|
let mut stream = backend.logs("cid-5", false, 10);
|
|
let item = stream.next().await.unwrap().unwrap();
|
|
match item {
|
|
LogOutput::StdOut { message } => {
|
|
assert_eq!(message.as_ref(), b"hello from mock\n");
|
|
}
|
|
other => panic!("unexpected variant: {:?}", other),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn mock_logs_records_id() {
|
|
let backend = MockBackend::new();
|
|
let mut stream = backend.logs("cid-5", false, 10);
|
|
// drain the stream
|
|
while stream.next().await.is_some() {}
|
|
assert_eq!(*backend.calls.lock().unwrap(), vec!["logs:cid-5"]);
|
|
}
|
|
|
|
// ── ContainerInfo / ContainerPort field mapping ───────────────────────────
|
|
|
|
#[test]
|
|
fn container_info_fields_are_accessible() {
|
|
let port = ContainerPort {
|
|
host_port: 443,
|
|
container_port: 8443,
|
|
protocol: "tcp".to_string(),
|
|
host_ip: "127.0.0.1".to_string(),
|
|
};
|
|
assert_eq!(port.host_port, 443);
|
|
assert_eq!(port.container_port, 8443);
|
|
assert_eq!(port.protocol, "tcp");
|
|
|
|
let info = ContainerInfo {
|
|
id: "id1".to_string(),
|
|
name: "name1".to_string(),
|
|
image: "img".to_string(),
|
|
status: "running".to_string(),
|
|
state: "running".to_string(),
|
|
ports: vec![port],
|
|
created_at: 42,
|
|
labels: HashMap::new(),
|
|
compose_project: "proj".to_string(),
|
|
};
|
|
assert_eq!(info.ports.len(), 1);
|
|
assert_eq!(info.compose_project, "proj");
|
|
}
|
|
}
|