package reconcile import ( "context" "fmt" "log/slog" "strconv" "sync/atomic" "time" "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" ) // Engine converges actual Proxmox state toward the desired state. One Reconcile pass: // read desired (from the provider), read actual (from Proxmox), Plan the minimal // benign action set, and dispatch each action onto the per-guest Queue — journaling // each op for crash-safety. At slice 4 the provider is EmptyProvider, so the action // set is empty and the pass performs zero mutations (correct and expected). // // Concurrency: actions for different guests run in parallel (separate Queue lanes); // actions for the same guest run serially in plan order. Every Proxmox mutation is // async-or-sync per the mutate.go contract: a non-empty UPID is WaitTask'd and its // exitstatus asserted; an empty UPID is a clean synchronous success. type Engine struct { api GuestAPI queue *Queue journal *Journal provider DesiredProvider norm FieldNormalizers logger *slog.Logger opSeq uint64 // atomic; makes each op id unique per attempt } // EngineOptions configures a new Engine. Norm defaults to DefaultNormalizers, Logger // to a discard logger. type EngineOptions struct { API GuestAPI Queue *Queue Journal *Journal Provider DesiredProvider Norm FieldNormalizers Logger *slog.Logger } // NewEngine builds an Engine. The Queue is shared (the single §10 choke point); the // caller owns its lifecycle (Close on shutdown). func NewEngine(opts EngineOptions) *Engine { norm := opts.Norm if norm == nil { norm = DefaultNormalizers() } logger := opts.Logger if logger == nil { logger = slog.New(slog.NewTextHandler(discard{}, nil)) } provider := opts.Provider if provider == nil { provider = EmptyProvider{} } return &Engine{ api: opts.API, queue: opts.Queue, journal: opts.Journal, provider: provider, norm: norm, logger: logger, } } // Result summarizes one Reconcile pass. type Result struct { Planned int Executed int // succeeded Failed int // errored Errors []error // one per failed action } // Reconcile runs one convergence pass. It returns an error only on a pass-level // failure (can't read desired/actual); per-action failures are counted in Result and // do not abort the pass (other guests still converge). func (e *Engine) Reconcile(ctx context.Context) (Result, error) { desired, err := e.provider.Desired(ctx) if err != nil { return Result{}, fmt.Errorf("reconcile: desired state: %w", err) } actual, err := e.readActual(ctx) if err != nil { return Result{}, fmt.Errorf("reconcile: actual state: %w", err) } actions := Plan(desired, actual, e.norm) res := Result{Planned: len(actions)} if len(actions) == 0 { e.logger.Debug("reconcile: no drift, no actions", "desired_guests", len(desired.Guests), "actual_guests", len(actual.Guests)) return res, nil } // Dispatch all actions onto the shared per-guest queue, then await each. Same-vmid // actions serialize in submit order; different vmids run concurrently. chans := make([]<-chan error, len(actions)) for i := range actions { act := actions[i] chans[i] = e.queue.Submit(act.VMID, func() error { return e.execute(ctx, act) }) } for i, ch := range chans { if err := <-ch; err != nil { res.Failed++ res.Errors = append(res.Errors, err) e.logger.Error("reconcile: action failed", "vmid", actions[i].VMID, "kind", actions[i].Kind, "err", err) } else { res.Executed++ e.logger.Info("reconcile: action applied", "vmid", actions[i].VMID, "kind", actions[i].Kind, "reason", actions[i].Reason) } } return res, nil } // execute dispatches one benign action against Proxmox and journals its lifecycle. // Reconcile actions carry NO idempotency key (convergent — safe to re-run on drift); // crash-safety comes from the in-flight journal records, not idempotency suppression. func (e *Engine) execute(ctx context.Context, act Action) error { opID := e.nextOpID(act) e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), Params: act.Params, State: OpStarted, At: time.Now().UTC()}) var upid string var err error switch act.Kind { case ActionStart: upid, err = e.api.Start(ctx, act.VMID) case ActionStop: upid, err = e.api.Stop(ctx, act.VMID) case ActionSetConfig: upid, err = e.api.SetConfig(ctx, act.VMID, act.Params) default: err = fmt.Errorf("reconcile: unknown action kind %q", act.Kind) } if err != nil { e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), State: OpFailed, At: time.Now().UTC()}) return fmt.Errorf("reconcile: %s vmid %d: %w", act.Kind, act.VMID, err) } // Record the task id (if any) before awaiting it, so a crash mid-wait is // detectable on restart and the task status can be re-checked. e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), UPID: upid, State: OpTaskRunning, At: time.Now().UTC()}) if upid != "" { st, err := e.api.WaitTask(ctx, upid, proxmox.WaitOptions{}) if err != nil { // WaitTask already errors on a non-OK exitstatus e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), UPID: upid, State: OpFailed, At: time.Now().UTC()}) return fmt.Errorf("reconcile: %s vmid %d: %w", act.Kind, act.VMID, err) } if st.ExitStatus != "OK" { // defensive — WaitTask should have errored e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), UPID: upid, State: OpFailed, At: time.Now().UTC()}) return fmt.Errorf("reconcile: %s vmid %d: exitstatus=%s", act.Kind, act.VMID, st.ExitStatus) } } // upid == "" is the synchronous path (slice-4 proven for SetConfig description). e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), UPID: upid, State: OpSucceeded, At: time.Now().UTC()}) return nil } // readActual reads observed state from Proxmox: run-state from the list, sizing + // description from per-guest config. A GuestConfig read failure keeps the run-state // (SpecKnown=false) rather than dropping the guest — matching the collector. func (e *Engine) readActual(ctx context.Context) (ActualState, error) { lxc, err := e.api.ListLXC(ctx) if err != nil { return ActualState{}, err } guests := make(map[int]ActualGuest, len(lxc)) for _, g := range lxc { a := ActualGuest{VMID: g.VMID, Run: normRun(g.Status)} cfg, err := e.api.GuestConfig(ctx, g.VMID) if err != nil { e.logger.Warn("reconcile: GuestConfig failed; spec unknown (run-state kept)", "vmid", g.VMID, "err", err) } else { a.SpecKnown = true a.Cores = cfg.Cores a.MemoryMiB = cfg.Memory a.Description = guestDescription(cfg) } guests[g.VMID] = a } return ActualState{Guests: guests}, nil } // Run reconciles once immediately, then on every interval tick until ctx is done. A // per-pass failure is logged and the loop continues (drift is corrected next tick). // At slice 4 (EmptyProvider) every pass is a logged no-op. func (e *Engine) Run(ctx context.Context, interval time.Duration) error { e.reconcileOnce(ctx) t := time.NewTicker(interval) defer t.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-t.C: e.reconcileOnce(ctx) } } } func (e *Engine) reconcileOnce(ctx context.Context) { res, err := e.Reconcile(ctx) if err != nil { e.logger.Error("reconcile: pass failed", "err", err) return } if res.Planned > 0 { e.logger.Info("reconcile: pass complete", "planned", res.Planned, "executed", res.Executed, "failed", res.Failed) } } // nextOpID builds a per-attempt unique op id (kind-vmid-seq) for journal correlation. func (e *Engine) nextOpID(act Action) string { n := atomic.AddUint64(&e.opSeq, 1) return string(act.Kind) + "-" + strconv.Itoa(act.VMID) + "-" + strconv.FormatUint(n, 10) } // append journals a lifecycle record, logging (never failing the op on) a journal I/O // error — the Proxmox op already happened; a missing journal line is a crash-recovery // degradation, not a reason to abort. func (e *Engine) append(rec JournalEntry) { if e.journal == nil { return } if err := e.journal.Append(rec); err != nil { e.logger.Error("reconcile: journal append failed", "op_id", rec.OpID, "state", rec.State, "err", err) } } // discard is an io.Writer sink for the default no-op logger. type discard struct{} func (discard) Write(p []byte) (int, error) { return len(p), nil }