312 lines
8.5 KiB
Go
312 lines
8.5 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/containarr/server/internal/broker"
|
|
grpcgateway "github.com/containarr/server/internal/grpc"
|
|
agentv1 "github.com/containarr/server/internal/proto/agentv1"
|
|
"github.com/containarr/server/internal/store"
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/google/uuid"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
}
|
|
|
|
type Handler struct {
|
|
store *store.Store
|
|
registry *grpcgateway.Registry
|
|
broker *broker.Broker
|
|
}
|
|
|
|
func NewHandler(s *store.Store, r *grpcgateway.Registry, b *broker.Broker) *Handler {
|
|
return &Handler{store: s, registry: r, broker: b}
|
|
}
|
|
|
|
type agentDTO struct {
|
|
ID string `json:"id"`
|
|
Hostname string `json:"hostname"`
|
|
Alias string `json:"alias"`
|
|
IPAddress string `json:"ip_address"`
|
|
Arch string `json:"arch"`
|
|
OS string `json:"os"`
|
|
Online bool `json:"online"`
|
|
LastSeenAt time.Time `json:"last_seen_at"`
|
|
}
|
|
|
|
// ── Agents ────────────────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) {
|
|
persisted, err := h.store.ListAgents()
|
|
if err != nil {
|
|
http.Error(w, "store error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
liveByID := map[string]*grpcgateway.AgentState{}
|
|
for _, s := range h.registry.List() {
|
|
liveByID[s.ID] = s
|
|
}
|
|
out := make([]agentDTO, 0, len(persisted))
|
|
for _, a := range persisted {
|
|
dto := agentDTO{
|
|
ID: a.ID,
|
|
Hostname: a.Hostname,
|
|
Alias: a.Alias,
|
|
IPAddress: a.IPAddress,
|
|
Arch: a.Arch,
|
|
OS: a.OS,
|
|
}
|
|
if live, ok := liveByID[a.ID]; ok {
|
|
dto.Online = true
|
|
dto.IPAddress = live.IPAddress
|
|
dto.LastSeenAt = live.LastSeenAt
|
|
}
|
|
out = append(out, dto)
|
|
}
|
|
jsonOK(w, out)
|
|
}
|
|
|
|
func (h *Handler) DeleteAgent(w http.ResponseWriter, r *http.Request) {
|
|
agentID := chi.URLParam(r, "agentID")
|
|
if err := h.store.DeleteAgent(agentID); err != nil {
|
|
http.Error(w, "store error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
h.registry.Deregister(agentID)
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
func (h *Handler) UpdateAgent(w http.ResponseWriter, r *http.Request) {
|
|
agentID := chi.URLParam(r, "agentID")
|
|
var body struct {
|
|
Alias string `json:"alias"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
|
http.Error(w, "invalid body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
if err := h.store.UpdateAgentAlias(agentID, body.Alias); err != nil {
|
|
http.Error(w, "store error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
h.registry.UpdateAlias(agentID, body.Alias)
|
|
|
|
a, err := h.store.GetAgent(agentID)
|
|
if err != nil {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
jsonOK(w, agentDTO{
|
|
ID: a.ID,
|
|
Hostname: a.Hostname,
|
|
Alias: a.Alias,
|
|
IPAddress: a.IPAddress,
|
|
Arch: a.Arch,
|
|
OS: a.OS,
|
|
Online: a.Online,
|
|
})
|
|
}
|
|
|
|
// ── Containers ────────────────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) ListContainers(w http.ResponseWriter, r *http.Request) {
|
|
type containerDTO struct {
|
|
AgentID string `json:"agent_id"`
|
|
Hostname string `json:"hostname"`
|
|
Alias string `json:"alias"`
|
|
IPAddress string `json:"ip_address"`
|
|
Container *agentv1.ContainerInfo `json:"container"`
|
|
}
|
|
var out []containerDTO
|
|
for _, agent := range h.registry.List() {
|
|
for _, c := range agent.Containers {
|
|
out = append(out, containerDTO{
|
|
AgentID: agent.ID,
|
|
Hostname: agent.Hostname,
|
|
Alias: agent.Alias,
|
|
IPAddress: agent.IPAddress,
|
|
Container: c,
|
|
})
|
|
}
|
|
}
|
|
jsonOK(w, out)
|
|
}
|
|
|
|
func (h *Handler) ContainerAction(w http.ResponseWriter, r *http.Request) {
|
|
agentID := chi.URLParam(r, "agentID")
|
|
containerID := chi.URLParam(r, "containerID")
|
|
|
|
var body struct {
|
|
Action string `json:"action"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
|
http.Error(w, "invalid body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
action, ok := map[string]agentv1.ContainerAction{
|
|
"start": agentv1.ContainerAction_CONTAINER_ACTION_START,
|
|
"stop": agentv1.ContainerAction_CONTAINER_ACTION_STOP,
|
|
"restart": agentv1.ContainerAction_CONTAINER_ACTION_RESTART,
|
|
"remove": agentv1.ContainerAction_CONTAINER_ACTION_REMOVE,
|
|
}[body.Action]
|
|
if !ok {
|
|
http.Error(w, "unknown action", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
cmdID := uuid.NewString()
|
|
sent := h.registry.Send(agentID, &agentv1.ServerMessage{
|
|
Payload: &agentv1.ServerMessage_ContainerCmd{
|
|
ContainerCmd: &agentv1.ContainerCommand{
|
|
CommandId: cmdID,
|
|
ContainerId: containerID,
|
|
Action: action,
|
|
},
|
|
},
|
|
})
|
|
if !sent {
|
|
http.Error(w, "agent not connected", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
jsonOK(w, map[string]string{"command_id": cmdID})
|
|
}
|
|
|
|
// ── Agent token provisioning ──────────────────────────────────────────────────
|
|
|
|
func (h *Handler) CreateAgentToken(w http.ResponseWriter, r *http.Request) {
|
|
var body struct {
|
|
Hostname string `json:"hostname"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Hostname == "" {
|
|
http.Error(w, "hostname required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
id := uuid.NewString()
|
|
token := uuid.NewString()
|
|
if err := h.store.CreateAgentToken(id, token, body.Hostname); err != nil {
|
|
http.Error(w, "store error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
jsonOK(w, map[string]string{"agent_id": id, "token": token})
|
|
}
|
|
|
|
// ── Container log stream ──────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) LogsWS(w http.ResponseWriter, r *http.Request) {
|
|
agentID := chi.URLParam(r, "agentID")
|
|
containerID := chi.URLParam(r, "containerID")
|
|
|
|
follow := r.URL.Query().Get("follow") != "false"
|
|
tail := int32(100)
|
|
if t := r.URL.Query().Get("tail"); t != "" {
|
|
if n, err := strconv.Atoi(t); err == nil && n > 0 {
|
|
tail = int32(n)
|
|
}
|
|
}
|
|
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
sent := h.registry.Send(agentID, &agentv1.ServerMessage{
|
|
Payload: &agentv1.ServerMessage_StreamLogs{
|
|
StreamLogs: &agentv1.StreamLogsCommand{
|
|
CommandId: uuid.NewString(),
|
|
ContainerId: containerID,
|
|
Follow: follow,
|
|
Tail: tail,
|
|
},
|
|
},
|
|
})
|
|
if !sent {
|
|
conn.WriteMessage(websocket.TextMessage, []byte(`{"error":"agent not connected"}`))
|
|
return
|
|
}
|
|
|
|
sub := h.broker.Subscribe()
|
|
defer h.broker.Unsubscribe(sub)
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
for {
|
|
if _, _, err := conn.ReadMessage(); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return
|
|
case raw, ok := <-sub:
|
|
if !ok {
|
|
return
|
|
}
|
|
var envelope struct {
|
|
Type string `json:"type"`
|
|
AgentID string `json:"agent_id"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
}
|
|
if json.Unmarshal(raw, &envelope) != nil {
|
|
continue
|
|
}
|
|
if envelope.Type != "log.chunk" || envelope.AgentID != agentID {
|
|
continue
|
|
}
|
|
var chunk struct {
|
|
ContainerID string `json:"container_id"`
|
|
Stream string `json:"stream"`
|
|
Data []byte `json:"data"`
|
|
}
|
|
if json.Unmarshal(envelope.Payload, &chunk) != nil {
|
|
continue
|
|
}
|
|
if chunk.ContainerID != containerID {
|
|
continue
|
|
}
|
|
msg, _ := json.Marshal(map[string]string{
|
|
"stream": chunk.Stream,
|
|
"line": string(chunk.Data),
|
|
})
|
|
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── WebSocket event stream ────────────────────────────────────────────────────
|
|
|
|
func (h *Handler) EventsWS(w http.ResponseWriter, r *http.Request) {
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
sub := h.broker.Subscribe()
|
|
defer h.broker.Unsubscribe(sub)
|
|
|
|
for data := range sub {
|
|
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func jsonOK(w http.ResponseWriter, v any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(v)
|
|
}
|