package api import ( "encoding/json" "net/http" "strconv" "time" "github.com/containarr/server/internal/broker" grpcgateway "github.com/containarr/server/internal/grpc" agentv1 "github.com/containarr/server/internal/proto/agentv1" "github.com/containarr/server/internal/store" "github.com/go-chi/chi/v5" "github.com/google/uuid" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type Handler struct { store *store.Store registry *grpcgateway.Registry broker *broker.Broker } func NewHandler(s *store.Store, r *grpcgateway.Registry, b *broker.Broker) *Handler { return &Handler{store: s, registry: r, broker: b} } type agentDTO struct { ID string `json:"id"` Hostname string `json:"hostname"` Alias string `json:"alias"` IPAddress string `json:"ip_address"` Arch string `json:"arch"` OS string `json:"os"` Online bool `json:"online"` LastSeenAt time.Time `json:"last_seen_at"` } // ── Agents ──────────────────────────────────────────────────────────────────── func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) { persisted, err := h.store.ListAgents() if err != nil { http.Error(w, "store error", http.StatusInternalServerError) return } liveByID := map[string]*grpcgateway.AgentState{} for _, s := range h.registry.List() { liveByID[s.ID] = s } out := make([]agentDTO, 0, len(persisted)) for _, a := range persisted { dto := agentDTO{ ID: a.ID, Hostname: a.Hostname, Alias: a.Alias, IPAddress: a.IPAddress, Arch: a.Arch, OS: a.OS, } if live, ok := liveByID[a.ID]; ok { dto.Online = true dto.IPAddress = live.IPAddress dto.LastSeenAt = live.LastSeenAt } out = append(out, dto) } jsonOK(w, out) } func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) { agentID := chi.URLParam(r, "agentID") var body struct { Alias string `json:"alias"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "invalid body", http.StatusBadRequest) return } if err := h.store.UpdateAgentAlias(agentID, body.Alias); err != nil { http.Error(w, "store error", http.StatusInternalServerError) return } h.registry.UpdateAlias(agentID, body.Alias) a, err := h.store.GetAgent(agentID) if err != nil { http.Error(w, "not found", http.StatusNotFound) return } jsonOK(w, agentDTO{ ID: a.ID, Hostname: a.Hostname, Alias: a.Alias, IPAddress: a.IPAddress, Arch: a.Arch, OS: a.OS, Online: a.Online, }) } // ── Containers ──────────────────────────────────────────────────────────────── func (h *Handler) ListContainers(w http.ResponseWriter, r *http.Request) { type containerDTO struct { AgentID string `json:"agent_id"` Hostname string `json:"hostname"` Alias string `json:"alias"` IPAddress string `json:"ip_address"` Container *agentv1.ContainerInfo `json:"container"` } var out []containerDTO for _, agent := range h.registry.List() { for _, c := range agent.Containers { out = append(out, containerDTO{ AgentID: agent.ID, Hostname: agent.Hostname, Alias: agent.Alias, IPAddress: agent.IPAddress, Container: c, }) } } jsonOK(w, out) } func (h *Handler) ContainerAction(w http.ResponseWriter, r *http.Request) { agentID := chi.URLParam(r, "agentID") containerID := chi.URLParam(r, "containerID") var body struct { Action string `json:"action"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { http.Error(w, "invalid body", http.StatusBadRequest) return } action, ok := map[string]agentv1.ContainerAction{ "start": agentv1.ContainerAction_CONTAINER_ACTION_START, "stop": agentv1.ContainerAction_CONTAINER_ACTION_STOP, "restart": agentv1.ContainerAction_CONTAINER_ACTION_RESTART, "remove": agentv1.ContainerAction_CONTAINER_ACTION_REMOVE, }[body.Action] if !ok { http.Error(w, "unknown action", http.StatusBadRequest) return } cmdID := uuid.NewString() sent := h.registry.Send(agentID, &agentv1.ServerMessage{ Payload: &agentv1.ServerMessage_ContainerCmd{ ContainerCmd: &agentv1.ContainerCommand{ CommandId: cmdID, ContainerId: containerID, Action: action, }, }, }) if !sent { http.Error(w, "agent not connected", http.StatusServiceUnavailable) return } jsonOK(w, map[string]string{"command_id": cmdID}) } // ── Agent token provisioning ────────────────────────────────────────────────── func (h *Handler) CreateAgentToken(w http.ResponseWriter, r *http.Request) { var body struct { Hostname string `json:"hostname"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Hostname == "" { http.Error(w, "hostname required", http.StatusBadRequest) return } id := uuid.NewString() token := uuid.NewString() if err := h.store.CreateAgentToken(id, token, body.Hostname); err != nil { http.Error(w, "store error", http.StatusInternalServerError) return } jsonOK(w, map[string]string{"agent_id": id, "token": token}) } // ── Container log stream ────────────────────────────────────────────────────── func (h *Handler) LogsWS(w http.ResponseWriter, r *http.Request) { agentID := chi.URLParam(r, "agentID") containerID := chi.URLParam(r, "containerID") follow := r.URL.Query().Get("follow") != "false" tail := int32(100) if t := r.URL.Query().Get("tail"); t != "" { if n, err := strconv.Atoi(t); err == nil && n > 0 { tail = int32(n) } } conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() sent := h.registry.Send(agentID, &agentv1.ServerMessage{ Payload: &agentv1.ServerMessage_StreamLogs{ StreamLogs: &agentv1.StreamLogsCommand{ CommandId: uuid.NewString(), ContainerId: containerID, Follow: follow, Tail: tail, }, }, }) if !sent { conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"agent not connected"}`)) return } sub := h.broker.Subscribe() defer h.broker.Unsubscribe(sub) done := make(chan struct{}) go func() { defer close(done) for { if _, _, err := conn.ReadMessage(); err != nil { return } } }() for { select { case <-done: return case raw, ok := <-sub: if !ok { return } var envelope struct { Type string `json:"type"` AgentID string `json:"agent_id"` Payload json.RawMessage `json:"payload"` } if json.Unmarshal(raw, &envelope) != nil { continue } if envelope.Type != "log.chunk" || envelope.AgentID != agentID { continue } var chunk struct { ContainerID string `json:"container_id"` Stream string `json:"stream"` Data []byte `json:"data"` } if json.Unmarshal(envelope.Payload, &chunk) != nil { continue } if chunk.ContainerID != containerID { continue } msg, _ := json.Marshal(map[string]string{ "stream": chunk.Stream, "line": string(chunk.Data), }) if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil { return } } } } // ── WebSocket event stream ──────────────────────────────────────────────────── func (h *Handler) EventsWS(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() sub := h.broker.Subscribe() defer h.broker.Unsubscribe(sub) for data := range sub { if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { return } } } func jsonOK(w http.ResponseWriter, v any) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(v) }