Files
Containarr/server/internal/api/handlers.go
2026-05-19 15:53:30 +02:00

690 lines
20 KiB
Go

package api
import (
"context"
"encoding/json"
"net/http"
"strconv"
"time"
"github.com/containarr/server/internal/broker"
grpcgateway "github.com/containarr/server/internal/grpc"
agentv1 "github.com/containarr/server/internal/proto/agentv1"
"github.com/containarr/server/internal/store"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
type Handler struct {
store *store.Store
registry *grpcgateway.Registry
broker *broker.Broker
}
func NewHandler(s *store.Store, r *grpcgateway.Registry, b *broker.Broker) *Handler {
return &Handler{store: s, registry: r, broker: b}
}
type agentDTO struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Alias string `json:"alias"`
IPAddress string `json:"ip_address"`
Arch string `json:"arch"`
OS string `json:"os"`
Online bool `json:"online"`
LastSeenAt time.Time `json:"last_seen_at"`
}
// ── Agents ────────────────────────────────────────────────────────────────────
func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) {
persisted, err := h.store.ListAgents()
if err != nil {
http.Error(w, "store error", http.StatusInternalServerError)
return
}
liveByID := map[string]*grpcgateway.AgentState{}
for _, s := range h.registry.List() {
liveByID[s.ID] = s
}
out := make([]agentDTO, 0, len(persisted))
for _, a := range persisted {
dto := agentDTO{
ID: a.ID,
Hostname: a.Hostname,
Alias: a.Alias,
IPAddress: a.IPAddress,
Arch: a.Arch,
OS: a.OS,
}
if live, ok := liveByID[a.ID]; ok {
dto.Online = true
dto.IPAddress = live.IPAddress
dto.LastSeenAt = live.LastSeenAt
}
out = append(out, dto)
}
jsonOK(w, out)
}
func (h *Handler) DeleteAgent(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
if err := h.store.DeleteAgent(agentID); err != nil {
http.Error(w, "store error", http.StatusInternalServerError)
return
}
h.registry.Deregister(agentID)
w.WriteHeader(http.StatusNoContent)
}
func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
var body struct {
Alias string `json:"alias"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
if err := h.store.UpdateAgentAlias(agentID, body.Alias); err != nil {
http.Error(w, "store error", http.StatusInternalServerError)
return
}
h.registry.UpdateAlias(agentID, body.Alias)
a, err := h.store.GetAgent(agentID)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
jsonOK(w, agentDTO{
ID: a.ID,
Hostname: a.Hostname,
Alias: a.Alias,
IPAddress: a.IPAddress,
Arch: a.Arch,
OS: a.OS,
Online: a.Online,
})
}
// ── Containers ────────────────────────────────────────────────────────────────
func (h *Handler) ListContainers(w http.ResponseWriter, r *http.Request) {
type containerDTO struct {
AgentID string `json:"agent_id"`
Hostname string `json:"hostname"`
Alias string `json:"alias"`
IPAddress string `json:"ip_address"`
Container *agentv1.ContainerInfo `json:"container"`
}
var out []containerDTO
for _, agent := range h.registry.List() {
for _, c := range agent.Containers {
out = append(out, containerDTO{
AgentID: agent.ID,
Hostname: agent.Hostname,
Alias: agent.Alias,
IPAddress: agent.IPAddress,
Container: c,
})
}
}
jsonOK(w, out)
}
func (h *Handler) ContainerAction(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
containerID := chi.URLParam(r, "containerID")
var body struct {
Action string `json:"action"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
action, ok := map[string]agentv1.ContainerAction{
"start": agentv1.ContainerAction_CONTAINER_ACTION_START,
"stop": agentv1.ContainerAction_CONTAINER_ACTION_STOP,
"restart": agentv1.ContainerAction_CONTAINER_ACTION_RESTART,
"remove": agentv1.ContainerAction_CONTAINER_ACTION_REMOVE,
}[body.Action]
if !ok {
http.Error(w, "unknown action", http.StatusBadRequest)
return
}
cmdID := uuid.NewString()
sent := h.registry.Send(agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_ContainerCmd{
ContainerCmd: &agentv1.ContainerCommand{
CommandId: cmdID,
ContainerId: containerID,
Action: action,
},
},
})
if !sent {
http.Error(w, "agent not connected", http.StatusServiceUnavailable)
return
}
jsonOK(w, map[string]string{"command_id": cmdID})
}
// ── Images ────────────────────────────────────────────────────────────────────
func (h *Handler) ListImages(w http.ResponseWriter, r *http.Request) {
type imageDTO struct {
AgentID string `json:"agent_id"`
Hostname string `json:"hostname"`
Alias string `json:"alias"`
IPAddress string `json:"ip_address"`
ID string `json:"id"`
Tags []string `json:"tags"`
Size int64 `json:"size"`
CreatedAt int64 `json:"created_at"`
}
var out []imageDTO
for _, agent := range h.registry.List() {
for _, img := range agent.Images {
out = append(out, imageDTO{
AgentID: agent.ID,
Hostname: agent.Hostname,
Alias: agent.Alias,
IPAddress: agent.IPAddress,
ID: img.GetId(),
Tags: func() []string { if t := img.GetTags(); t != nil { return t }; return []string{} }(),
Size: img.GetSize(),
CreatedAt: img.GetCreatedAt(),
})
}
}
jsonOK(w, out)
}
// ── Volumes ───────────────────────────────────────────────────────────────────
func (h *Handler) ListVolumes(w http.ResponseWriter, r *http.Request) {
type volumeDTO struct {
AgentID string `json:"agent_id"`
Hostname string `json:"hostname"`
Alias string `json:"alias"`
IPAddress string `json:"ip_address"`
Name string `json:"name"`
Driver string `json:"driver"`
Mountpoint string `json:"mountpoint"`
}
var out []volumeDTO
for _, agent := range h.registry.List() {
for _, vol := range agent.Volumes {
out = append(out, volumeDTO{
AgentID: agent.ID,
Hostname: agent.Hostname,
Alias: agent.Alias,
IPAddress: agent.IPAddress,
Name: vol.GetName(),
Driver: vol.GetDriver(),
Mountpoint: vol.GetMountpoint(),
})
}
}
jsonOK(w, out)
}
// ── Networks ──────────────────────────────────────────────────────────────────
func (h *Handler) ListNetworks(w http.ResponseWriter, r *http.Request) {
type networkDTO struct {
AgentID string `json:"agent_id"`
Hostname string `json:"hostname"`
Alias string `json:"alias"`
IPAddress string `json:"ip_address"`
ID string `json:"id"`
Name string `json:"name"`
Driver string `json:"driver"`
Scope string `json:"scope"`
}
var out []networkDTO
for _, agent := range h.registry.List() {
for _, net := range agent.Networks {
out = append(out, networkDTO{
AgentID: agent.ID,
Hostname: agent.Hostname,
Alias: agent.Alias,
IPAddress: agent.IPAddress,
ID: net.GetId(),
Name: net.GetName(),
Driver: net.GetDriver(),
Scope: net.GetScope(),
})
}
}
jsonOK(w, out)
}
// ── Agent token provisioning ──────────────────────────────────────────────────
func (h *Handler) CreateAgentToken(w http.ResponseWriter, r *http.Request) {
var body struct {
Hostname string `json:"hostname"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Hostname == "" {
http.Error(w, "hostname required", http.StatusBadRequest)
return
}
id := uuid.NewString()
token := uuid.NewString()
if err := h.store.CreateAgentToken(id, token, body.Hostname); err != nil {
http.Error(w, "store error", http.StatusInternalServerError)
return
}
jsonOK(w, map[string]string{"agent_id": id, "token": token})
}
// ── Container log stream ──────────────────────────────────────────────────────
func (h *Handler) LogsWS(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
containerID := chi.URLParam(r, "containerID")
follow := r.URL.Query().Get("follow") != "false"
tail := int32(100)
if t := r.URL.Query().Get("tail"); t != "" {
if n, err := strconv.Atoi(t); err == nil && n > 0 {
tail = int32(n)
}
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
sent := h.registry.Send(agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_StreamLogs{
StreamLogs: &agentv1.StreamLogsCommand{
CommandId: uuid.NewString(),
ContainerId: containerID,
Follow: follow,
Tail: tail,
},
},
})
if !sent {
conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"agent not connected"}`))
return
}
sub := h.broker.Subscribe()
defer h.broker.Unsubscribe(sub)
done := make(chan struct{})
go func() {
defer close(done)
for {
if _, _, err := conn.ReadMessage(); err != nil {
return
}
}
}()
for {
select {
case <-done:
return
case raw, ok := <-sub:
if !ok {
return
}
var envelope struct {
Type string `json:"type"`
AgentID string `json:"agent_id"`
Payload json.RawMessage `json:"payload"`
}
if json.Unmarshal(raw, &envelope) != nil {
continue
}
if envelope.Type != "log.chunk" || envelope.AgentID != agentID {
continue
}
var chunk struct {
ContainerID string `json:"container_id"`
Stream string `json:"stream"`
Data []byte `json:"data"`
}
if json.Unmarshal(envelope.Payload, &chunk) != nil {
continue
}
if chunk.ContainerID != containerID {
continue
}
msg, _ := json.Marshal(map[string]string{
"stream": chunk.Stream,
"line": string(chunk.Data),
})
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
}
}
}
// ── WebSocket event stream ────────────────────────────────────────────────────
func (h *Handler) EventsWS(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
sub := h.broker.Subscribe()
defer h.broker.Unsubscribe(sub)
for data := range sub {
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
return
}
}
}
// ── File system & Compose ─────────────────────────────────────────────────────
// sendFileCmd sends a file/compose command to an agent and waits for the response.
// It uses the request context with an added 30s deadline so the handler can be
// tested by cancelling the context.
func (h *Handler) sendFileCmd(r *http.Request, agentID string, msg *agentv1.ServerMessage, cmdID string) (*agentv1.FileResult, error) {
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
return h.registry.SendAndWaitCtx(ctx, agentID, msg, cmdID)
}
// FsList handles GET /api/v1/agents/{agentID}/fs/list?path=/some/dir
func (h *Handler) FsList(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
path := r.URL.Query().Get("path")
if path == "" {
http.Error(w, "path required", http.StatusBadRequest)
return
}
cmdID := uuid.NewString()
result, err := h.sendFileCmd(r, agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_ListDir{
ListDir: &agentv1.ListDirCommand{
CommandId: cmdID,
Path: path,
},
},
}, cmdID)
if err != nil {
if err.Error() == "agent not connected" {
http.Error(w, "agent not connected", http.StatusNotFound)
return
}
http.Error(w, "timeout waiting for agent", http.StatusGatewayTimeout)
return
}
if !result.Success {
http.Error(w, result.Error, http.StatusInternalServerError)
return
}
// Content is JSON-encoded list of entries from the agent
var entries json.RawMessage = result.Content
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(entries)
}
// FsRead handles GET /api/v1/agents/{agentID}/fs/read?path=/some/file
func (h *Handler) FsRead(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
path := r.URL.Query().Get("path")
if path == "" {
http.Error(w, "path required", http.StatusBadRequest)
return
}
cmdID := uuid.NewString()
result, err := h.sendFileCmd(r, agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_ReadFile{
ReadFile: &agentv1.ReadFileCommand{
CommandId: cmdID,
Path: path,
},
},
}, cmdID)
if err != nil {
if err.Error() == "agent not connected" {
http.Error(w, "agent not connected", http.StatusNotFound)
return
}
http.Error(w, "timeout waiting for agent", http.StatusGatewayTimeout)
return
}
if !result.Success {
http.Error(w, result.Error, http.StatusInternalServerError)
return
}
jsonOK(w, map[string]string{"content": string(result.Content)})
}
// FsWrite handles POST /api/v1/agents/{agentID}/fs/write
func (h *Handler) FsWrite(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
var body struct {
Path string `json:"path"`
Content string `json:"content"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Path == "" {
http.Error(w, "path and content required", http.StatusBadRequest)
return
}
cmdID := uuid.NewString()
result, err := h.sendFileCmd(r, agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_WriteFile{
WriteFile: &agentv1.WriteFileCommand{
CommandId: cmdID,
Path: body.Path,
Content: []byte(body.Content),
},
},
}, cmdID)
if err != nil {
if err.Error() == "agent not connected" {
http.Error(w, "agent not connected", http.StatusNotFound)
return
}
http.Error(w, "timeout waiting for agent", http.StatusGatewayTimeout)
return
}
if !result.Success {
http.Error(w, result.Error, http.StatusInternalServerError)
return
}
jsonOK(w, map[string]bool{"ok": true})
}
// FsMkdir handles POST /api/v1/agents/{agentID}/fs/mkdir
func (h *Handler) FsMkdir(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
var body struct {
Path string `json:"path"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Path == "" {
http.Error(w, "path required", http.StatusBadRequest)
return
}
cmdID := uuid.NewString()
result, err := h.sendFileCmd(r, agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_CreateDir{
CreateDir: &agentv1.CreateDirCommand{
CommandId: cmdID,
Path: body.Path,
},
},
}, cmdID)
if err != nil {
if err.Error() == "agent not connected" {
http.Error(w, "agent not connected", http.StatusNotFound)
return
}
http.Error(w, "timeout waiting for agent", http.StatusGatewayTimeout)
return
}
if !result.Success {
http.Error(w, result.Error, http.StatusInternalServerError)
return
}
jsonOK(w, map[string]bool{"ok": true})
}
// ComposeAction handles POST /api/v1/agents/{agentID}/compose
func (h *Handler) ComposeAction(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
var body struct {
Path string `json:"path"`
Action string `json:"action"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Path == "" || body.Action == "" {
http.Error(w, "path and action required", http.StatusBadRequest)
return
}
validActions := map[string]bool{"up": true, "down": true, "pull": true}
if !validActions[body.Action] {
http.Error(w, "action must be one of: up, down, pull", http.StatusBadRequest)
return
}
cmdID := uuid.NewString()
result, err := h.sendFileCmd(r, agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_ExecCompose{
ExecCompose: &agentv1.ExecComposeCommand{
CommandId: cmdID,
Path: body.Path,
Action: body.Action,
},
},
}, cmdID)
if err != nil {
if err.Error() == "agent not connected" {
http.Error(w, "agent not connected", http.StatusNotFound)
return
}
http.Error(w, "timeout waiting for agent", http.StatusGatewayTimeout)
return
}
if !result.Success {
jsonErr, _ := json.Marshal(map[string]string{"error": result.Error, "output": string(result.Content)})
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
w.Write(jsonErr)
return
}
jsonOK(w, map[string]any{"ok": true, "output": string(result.Content)})
}
// ── Auto-update policies ──────────────────────────────────────────────────────
// GetAutoUpdatePolicy handles GET /api/v1/agents/{agentID}/containers/{containerID}/auto-update
func (h *Handler) GetAutoUpdatePolicy(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
containerID := chi.URLParam(r, "containerID")
p, err := h.store.GetAutoUpdatePolicy(agentID, containerID)
if err != nil {
http.Error(w, "store error", http.StatusInternalServerError)
return
}
if p == nil {
jsonOK(w, map[string]any{"enabled": false, "interval_minutes": 1440})
return
}
jsonOK(w, map[string]any{
"enabled": p.Enabled,
"interval_minutes": p.IntervalMinutes,
"last_checked_at": p.LastCheckedAt,
"last_updated_at": p.LastUpdatedAt,
})
}
// PutAutoUpdatePolicy handles PUT /api/v1/agents/{agentID}/containers/{containerID}/auto-update
func (h *Handler) PutAutoUpdatePolicy(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
containerID := chi.URLParam(r, "containerID")
var body struct {
Enabled bool `json:"enabled"`
IntervalMinutes int `json:"interval_minutes"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
http.Error(w, "invalid body", http.StatusBadRequest)
return
}
if body.IntervalMinutes < 60 || body.IntervalMinutes > 43200 {
http.Error(w, "interval_minutes must be between 60 and 43200", http.StatusBadRequest)
return
}
p := &store.AutoUpdatePolicy{
AgentID: agentID,
ContainerID: containerID,
Enabled: body.Enabled,
IntervalMinutes: body.IntervalMinutes,
}
if err := h.store.UpsertAutoUpdatePolicy(p); err != nil {
http.Error(w, "store error", http.StatusInternalServerError)
return
}
jsonOK(w, map[string]any{
"enabled": p.Enabled,
"interval_minutes": p.IntervalMinutes,
})
}
// UpdateNow handles POST /api/v1/agents/{agentID}/containers/{containerID}/update-now
func (h *Handler) UpdateNow(w http.ResponseWriter, r *http.Request) {
agentID := chi.URLParam(r, "agentID")
containerID := chi.URLParam(r, "containerID")
cmdID := uuid.NewString()
sent := h.registry.Send(agentID, &agentv1.ServerMessage{
Payload: &agentv1.ServerMessage_UpdateContainer{
UpdateContainer: &agentv1.UpdateContainerCommand{
CommandId: cmdID,
ContainerId: containerID,
},
},
})
if !sent {
http.Error(w, "agent not connected", http.StatusServiceUnavailable)
return
}
h.registry.RegisterPendingUpdate(agentID, cmdID, containerID)
jsonOK(w, map[string]string{"command_id": cmdID})
}
func jsonOK(w http.ResponseWriter, v any) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(v)
}