87 lines
2.5 KiB
Go
87 lines
2.5 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
|
|
agentv1 "github.com/containarr/server/internal/proto/agentv1"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// DuePolicy is a minimal view of an auto-update policy returned by the store.
|
|
type DuePolicy struct {
|
|
AgentID string
|
|
ContainerID string
|
|
}
|
|
|
|
// StoreInterface defines the minimal store methods used by the scheduler.
|
|
// Implementations must convert their internal policy type to DuePolicy when
|
|
// implementing ListDueAutoUpdatePolicies, or use StoreAdapter provided below.
|
|
type StoreInterface interface {
|
|
ListDueAutoUpdatePolicies(now time.Time) ([]DuePolicy, error)
|
|
UpdateAutoUpdateChecked(agentID, containerID string, at time.Time) error
|
|
}
|
|
|
|
// RegistryInterface defines the minimal registry methods used by the scheduler.
|
|
type RegistryInterface interface {
|
|
Send(agentID string, msg *agentv1.ServerMessage) bool
|
|
}
|
|
|
|
// Scheduler sends CheckUpdateCommand to agents every 60 seconds for containers
|
|
// with an active and due auto-update policy.
|
|
type Scheduler struct {
|
|
store StoreInterface
|
|
registry RegistryInterface
|
|
}
|
|
|
|
// New creates a new Scheduler.
|
|
func New(store StoreInterface, registry RegistryInterface) *Scheduler {
|
|
return &Scheduler{store: store, registry: registry}
|
|
}
|
|
|
|
// Start runs the scheduler loop until ctx is cancelled.
|
|
func (s *Scheduler) Start(ctx context.Context) {
|
|
ticker := time.NewTicker(60 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
slog.Info("scheduler stopped")
|
|
return
|
|
case t := <-ticker.C:
|
|
s.tick(t)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Scheduler) tick(now time.Time) {
|
|
policies, err := s.store.ListDueAutoUpdatePolicies(now)
|
|
if err != nil {
|
|
slog.Error("scheduler: list due policies", "err", err)
|
|
return
|
|
}
|
|
|
|
for _, p := range policies {
|
|
cmdID := uuid.NewString()
|
|
msg := &agentv1.ServerMessage{
|
|
Payload: &agentv1.ServerMessage_CheckUpdate{
|
|
CheckUpdate: &agentv1.CheckUpdateCommand{
|
|
CommandId: cmdID,
|
|
ContainerId: p.ContainerID,
|
|
},
|
|
},
|
|
}
|
|
sent := s.registry.Send(p.AgentID, msg)
|
|
if !sent {
|
|
slog.Debug("scheduler: agent not connected, skipping", "agent_id", p.AgentID, "container_id", p.ContainerID)
|
|
continue
|
|
}
|
|
if err := s.store.UpdateAutoUpdateChecked(p.AgentID, p.ContainerID, now); err != nil {
|
|
slog.Error("scheduler: update last_checked_at", "agent_id", p.AgentID, "container_id", p.ContainerID, "err", err)
|
|
}
|
|
slog.Info("scheduler: sent CheckUpdateCommand", "agent_id", p.AgentID, "container_id", p.ContainerID, "command_id", cmdID)
|
|
}
|
|
}
|