248 lines
5.9 KiB
Go
248 lines
5.9 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
agentv1 "github.com/containarr/server/internal/proto/agentv1"
|
|
)
|
|
|
|
type AgentState struct {
|
|
ID string
|
|
Hostname string
|
|
Alias string
|
|
IPAddress string
|
|
Arch string
|
|
OS string
|
|
LastSeenAt time.Time
|
|
Containers []*agentv1.ContainerInfo
|
|
Images []*agentv1.ImageInfo
|
|
Volumes []*agentv1.VolumeInfo
|
|
Networks []*agentv1.NetworkInfo
|
|
|
|
Stats *agentv1.StatsSnapshot
|
|
cmdCh chan *agentv1.ServerMessage
|
|
pendingFiles map[string]chan *agentv1.FileResult
|
|
pendingUpdates map[string]string // commandID → containerID
|
|
pendingMu sync.Mutex
|
|
}
|
|
|
|
type Registry struct {
|
|
mu sync.RWMutex
|
|
agents map[string]*AgentState
|
|
}
|
|
|
|
func NewRegistry() *Registry {
|
|
return &Registry{agents: make(map[string]*AgentState)}
|
|
}
|
|
|
|
func (r *Registry) Register(id, hostname, alias, ipAddress, arch, os string) *AgentState {
|
|
state := &AgentState{
|
|
ID: id,
|
|
Hostname: hostname,
|
|
Alias: alias,
|
|
IPAddress: ipAddress,
|
|
Arch: arch,
|
|
OS: os,
|
|
cmdCh: make(chan *agentv1.ServerMessage, 16),
|
|
pendingFiles: make(map[string]chan *agentv1.FileResult),
|
|
pendingUpdates: make(map[string]string),
|
|
}
|
|
r.mu.Lock()
|
|
r.agents[id] = state
|
|
r.mu.Unlock()
|
|
return state
|
|
}
|
|
|
|
func (r *Registry) Deregister(id string) {
|
|
r.mu.Lock()
|
|
if s, ok := r.agents[id]; ok {
|
|
close(s.cmdCh)
|
|
delete(r.agents, id)
|
|
}
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
func (r *Registry) Get(id string) (*AgentState, bool) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
s, ok := r.agents[id]
|
|
return s, ok
|
|
}
|
|
|
|
func (r *Registry) List() []*AgentState {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
out := make([]*AgentState, 0, len(r.agents))
|
|
for _, s := range r.agents {
|
|
out = append(out, s)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (r *Registry) UpdateContainers(id string, containers []*agentv1.ContainerInfo) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if s, ok := r.agents[id]; ok {
|
|
s.Containers = containers
|
|
s.LastSeenAt = time.Now()
|
|
}
|
|
}
|
|
|
|
func (r *Registry) UpdateResources(id string, containers []*agentv1.ContainerInfo, images []*agentv1.ImageInfo, volumes []*agentv1.VolumeInfo, networks []*agentv1.NetworkInfo) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if s, ok := r.agents[id]; ok {
|
|
s.Containers = containers
|
|
s.Images = images
|
|
s.Volumes = volumes
|
|
s.Networks = networks
|
|
s.LastSeenAt = time.Now()
|
|
}
|
|
}
|
|
|
|
func (r *Registry) UpdateStats(id string, stats *agentv1.StatsSnapshot) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if s, ok := r.agents[id]; ok {
|
|
s.Stats = stats
|
|
s.LastSeenAt = time.Now()
|
|
}
|
|
}
|
|
|
|
// UpdateAlias refreshes the alias for a live agent (called after an admin update).
|
|
func (r *Registry) UpdateAlias(id, alias string) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if s, ok := r.agents[id]; ok {
|
|
s.Alias = alias
|
|
}
|
|
}
|
|
|
|
func (r *Registry) Send(agentID string, msg *agentv1.ServerMessage) bool {
|
|
r.mu.RLock()
|
|
s, ok := r.agents[agentID]
|
|
r.mu.RUnlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
select {
|
|
case s.cmdCh <- msg:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// RegisterPending registers a channel waiting for a FileResult with the given cmdID.
|
|
func (r *Registry) RegisterPending(agentID, cmdID string) chan *agentv1.FileResult {
|
|
r.mu.RLock()
|
|
s, ok := r.agents[agentID]
|
|
r.mu.RUnlock()
|
|
if !ok {
|
|
return nil
|
|
}
|
|
ch := make(chan *agentv1.FileResult, 1)
|
|
s.pendingMu.Lock()
|
|
s.pendingFiles[cmdID] = ch
|
|
s.pendingMu.Unlock()
|
|
return ch
|
|
}
|
|
|
|
// ResolvePending sends the FileResult to the waiting channel identified by cmdID.
|
|
func (r *Registry) ResolvePending(agentID, cmdID string, result *agentv1.FileResult) {
|
|
r.mu.RLock()
|
|
s, ok := r.agents[agentID]
|
|
r.mu.RUnlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
s.pendingMu.Lock()
|
|
ch, ok := s.pendingFiles[cmdID]
|
|
if ok {
|
|
delete(s.pendingFiles, cmdID)
|
|
}
|
|
s.pendingMu.Unlock()
|
|
if ok {
|
|
select {
|
|
case ch <- result:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// CancelPending removes the pending channel for cmdID (cleanup on timeout).
|
|
func (r *Registry) CancelPending(agentID, cmdID string) {
|
|
r.mu.RLock()
|
|
s, ok := r.agents[agentID]
|
|
r.mu.RUnlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
s.pendingMu.Lock()
|
|
delete(s.pendingFiles, cmdID)
|
|
s.pendingMu.Unlock()
|
|
}
|
|
|
|
// SendAndWait registers a pending channel, sends msg to the agent, and waits up
|
|
// to 30 seconds for the FileResult response identified by cmdID.
|
|
func (r *Registry) SendAndWait(agentID string, msg *agentv1.ServerMessage, cmdID string) (*agentv1.FileResult, error) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
return r.SendAndWaitCtx(ctx, agentID, msg, cmdID)
|
|
}
|
|
|
|
// RegisterPendingUpdate enregistre un commandID en attente de CommandResult pour un UpdateContainer.
|
|
func (r *Registry) RegisterPendingUpdate(agentID, cmdID, containerID string) {
|
|
r.mu.RLock()
|
|
s, ok := r.agents[agentID]
|
|
r.mu.RUnlock()
|
|
if !ok {
|
|
return
|
|
}
|
|
s.pendingMu.Lock()
|
|
s.pendingUpdates[cmdID] = containerID
|
|
s.pendingMu.Unlock()
|
|
}
|
|
|
|
// ResolvePendingUpdate retourne le containerID associé au commandID et le supprime de la map.
|
|
// Retourne ("", false) si le commandID n'est pas connu.
|
|
func (r *Registry) ResolvePendingUpdate(agentID, cmdID string) (string, bool) {
|
|
r.mu.RLock()
|
|
s, ok := r.agents[agentID]
|
|
r.mu.RUnlock()
|
|
if !ok {
|
|
return "", false
|
|
}
|
|
s.pendingMu.Lock()
|
|
containerID, found := s.pendingUpdates[cmdID]
|
|
if found {
|
|
delete(s.pendingUpdates, cmdID)
|
|
}
|
|
s.pendingMu.Unlock()
|
|
return containerID, found
|
|
}
|
|
|
|
// SendAndWaitCtx is like SendAndWait but uses the provided context for timeout control.
|
|
func (r *Registry) SendAndWaitCtx(ctx context.Context, agentID string, msg *agentv1.ServerMessage, cmdID string) (*agentv1.FileResult, error) {
|
|
ch := r.RegisterPending(agentID, cmdID)
|
|
if ch == nil {
|
|
return nil, fmt.Errorf("agent not connected")
|
|
}
|
|
|
|
if !r.Send(agentID, msg) {
|
|
r.CancelPending(agentID, cmdID)
|
|
return nil, fmt.Errorf("agent not connected")
|
|
}
|
|
|
|
select {
|
|
case result := <-ch:
|
|
return result, nil
|
|
case <-ctx.Done():
|
|
r.CancelPending(agentID, cmdID)
|
|
return nil, fmt.Errorf("timeout waiting for agent response")
|
|
}
|
|
}
|