diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cf3629..91cfb8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,75 @@ All notable changes to **felhom-agent** are recorded here. Update on every code change that gets pushed. +## v0.4.0-rc1 — slice 4 Phase A: reconcile engine (structural; runs live, unfed) (2026-06-08) + +The agent-side control core's structural half. **Checkpoint marker** — `-rc1` is the +Phase-A push; awaiting validation before Phase B (the reversibility gate + signed-op +consuming layer) lands the final **v0.4.0**. Runs LIVE but UNFED: with no desired-state +provider until slice 10, the live engine computes an empty action set and performs +**zero mutations**. + +### Added +- **`internal/reconcile`** package — the engine, the per-guest serializer, the + desired-state model, the normalization layer, and the durable op journal: + - **Per-guest serializer (`Queue`, doc 03 §10)** — the single choke point ALL + mutation sources funnel through. Same-vmid jobs run strictly one-at-a-time in + submit order; independent vmids run in parallel. Each vmid is a cond-var FIFO lane + (unbounded, non-blocking, order-preserving); graceful drain on `Close`. + - **Desired-state model + `DesiredProvider` seam** — `DesiredGuest` (per-field + optional: run-state / `*hub.GuestSpec` / `*description`), `DesiredState`. The only + live provider is **`EmptyProvider`** (slice 4 has no source); `StaticProvider` + feeds fixtures. The seam is where slice 10's hub-serving plugs in — no hub/local + source invented here. + - **Normalization layer (`FieldNormalizers`)** — reconcile compares *normalized* + desired-vs-actual so Proxmox round-trip quirks don't read as drift. `description`'s + trailing newline is the first registered case; the registry takes more (boolean + coercion, list ordering) as discovered. `normDesc` **promoted** out of + `cmd/felhom-agent/main.go` to **`reconcile.NormDescription`**; the `--selftest=task` + description round-trip now uses that shared helper (one source of truth for the quirk). + - **Plan engine (`Plan`, pure function)** — computes the minimal **benign** action set + (`Start`/`Stop`/`SetConfig`) for guests present in both desired and actual, with + normalized comparison, deterministic vmid ordering, config-before-run-state. Skips + provision (desired-absent-in-actual, slice 7) and destroy (actual-absent-in-desired, + gated, slice 10); never writes a config it couldn't first read (`SpecKnown`). Disk + (rootfs grow) intentionally not reconciled here. + - **Reconcile engine (`Engine`)** — reads desired+actual, plans, dispatches each action + onto the shared queue. Every Proxmox op handled per the mutate.go contract: non-empty + UPID → `WaitTask` + assert `exitstatus`; empty UPID → clean **synchronous** success + (slice-4 proven). Per-action failures are counted, not fatal (other guests still + converge). + - **Operation journal (`Journal`)** — durable fsync'd append-only JSONL mirroring + `authz.FileNonceStore`: records each op's lifecycle (started → task_running → + succeeded/failed) with its Proxmox task id (crash mid-op is detected and re-checkable + on restart via `InFlight()`), plus an **idempotency-key store** (`AlreadyApplied`) so + a one-shot op never re-runs across retries/restarts. Reconcile actions carry no + idempotency key (convergent — must re-run on real drift). +- **Daemon wiring (`runDaemon`)** — reconcile runs alongside the hub loop on the poll + cadence, **sharing the per-guest queue**. Journal path is a `journal.log` sibling of the + nonce store. The daemon runs cleanly with **no desired state and no signers** (reconcile + is a logged live no-op; a journal-open failure degrades to journal-less, never crashes). + +### Tests +- Serializer: same-guest serialized (max-concurrency 1, submit order preserved) and + different-guests parallel (cross-waiting jobs both complete — would deadlock if not); + error propagation; drain-pending-on-close; submit-after-close. +- Normalization: description round-trip; unknown-field identity; extensibility seam + (synthetic boolean-coercion + list-ordering normalizers). +- Plan: run-state start/stop, spec drift (cores/memory), disk-not-reconciled, + description-newline-not-drift, unmanaged fields, spec-unknown skips config keeps + run-state, desired-absent skipped, combined ordering, empty-desired no-op, deterministic + vmid order. +- Engine: empty-provider zero mutations; async start (WaitTask); synchronous SetConfig + (no WaitTask); WaitTask failure + POST error counted failed; list error = pass failure. +- Journal: lifecycle latest-wins; in-flight survives restart; idempotency dedupe across + restart; failed key not applied; torn-trailing-line skipped. +- Full module **race-clean** (`go test -race`) on the Linux build server; vet clean. + +### Not in this phase (Phase B) +- The benign/destructive classifier, the reversibility gate, and the signed-op consuming + layer over `internal/authz` (doc 03 §4 / doc 04) — added next, in front of the queue's + executor, landing **v0.4.0**. + ## v0.3.2 — SetConfig selftest extension (slice-4 pre-check) (2026-06-08) The gate before slice 4: prove `SetConfig` works live under the scoped token before diff --git a/CLAUDE.md b/CLAUDE.md index 12342dc..268877a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -15,7 +15,7 @@ - Module `gitea.dooplex.hu/admin/felhom-agent`; binary `felhom-agent` (`cmd/felhom-agent/`). - **Pure Go stdlib + `golang.org/x/crypto` only** — no web frameworks. - `go.mod` directive **go 1.25.0**; dep `golang.org/x/crypto v0.52.0` (declares go 1.25, will NOT build on Go 1.24). The **build server (192.168.0.180) runs go1.26.0** (upstream Go on PATH, backward-compatible). Build/run the agent there for live tests (same LAN as the demo host). -- Version: `version` var in `cmd/felhom-agent/main.go`, overridable via `-ldflags "-X main.version="`; `--version` flag. **Current: v0.3.2.** Bump on meaningful changes + add a CHANGELOG entry. +- Version: `version` var in `cmd/felhom-agent/main.go`, overridable via `-ldflags "-X main.version="`; `--version` flag. **Current: v0.4.0-rc1** (slice-4 Phase A checkpoint; v0.4.0 lands with Phase B). Bump on meaningful changes + add a CHANGELOG entry. ## Layout @@ -48,7 +48,8 @@ Built in slices, all on `main`: - **v0.3.0** slice 3 — `internal/hub`: the first **daemon loop** (no-`--selftest` mode) posting a read-only `HostReport` to the hub (= the heartbeat). Report's storage/backup/restore/pbs/audit fields are **defined-but-empty** (slices 5/6); the envelope's desired-state/signed-ops fields are **parsed-but-ignored** (slice 4). - **v0.3.1** — slice-3 validation follow-ups. - **v0.3.2** — slice-4 pre-check: reversible `SetConfig` step added to `--selftest=task`; passed live on guest 9999. Findings: LXC `description` write is **synchronous** (empty UPID — dual-mode modeling confirmed); PVE appends a trailing `\n` to `description` on read (reconcile must normalize). First live `VM.Config.*` exercise. -- **Next: slice 4 (reconcile + benign/destructive gate)** — the first slice that issues real Proxmox mutations. The live `--selftest=task` gate (snapshot/rollback/delete + `SetConfig`) is now **passed**. +- **v0.4.0-rc1** — slice-4 **Phase A** (structural): `internal/reconcile` — engine, per-guest serializer (§10), desired-state model + `DesiredProvider` seam, normalization layer (`NormDescription` promoted out of main.go), plan/diff engine (benign Start/Stop/SetConfig set), durable op journal + idempotency store. Wired into `runDaemon` sharing the queue. Runs **live but unfed** (EmptyProvider → zero mutations until slice 10). Checkpoint: awaiting validation before Phase B. +- **Next: slice 4 Phase B** — the benign/destructive classifier, the reversibility gate, and the signed-op consuming layer over `internal/authz` (doc 03 §4 / doc 04), in front of the queue's executor → lands **v0.4.0**. ## Demo host (for live tests) diff --git a/REPORT.md b/REPORT.md index c62024f..0cdcca1 100644 --- a/REPORT.md +++ b/REPORT.md @@ -1,76 +1,82 @@ -# REPORT — `SetConfig` selftest extension, live self-gate (2026-06-08) +# REPORT — Slice 4 Phase A: reconcile engine (structural) (2026-06-08) -> Overwrite-latest report (most recent significant run only). Cumulative history lives in [CHANGELOG.md](CHANGELOG.md). +> Overwrite-latest report (most recent significant work only). Cumulative history lives in [CHANGELOG.md](CHANGELOG.md). ## Outcome -**`SetConfig` PASSED live under the scoped operator token.** The slice-4 pre-check is -satisfied — `--selftest=task -vmid 9999` now exercises a reversible `SetConfig` -write+revert end-to-end and reached `=== selftest=task OK ===` (exit 0). Reconcile -(slice 4) can be built on `SetConfig` with confidence. +**Phase A of slice 4 is implemented, tested, and pushed as the checkpoint marker +`v0.4.0-rc1`.** This is the structural half of the agent-side control core: the +reconcile engine, the per-guest serializer (doc 03 §10), the desired-state model + its +provider seam, the field-normalization layer, the plan/diff engine, and the durable +operation journal + idempotency store — all adversarially fixture-tested. -## What was implemented +**Per the task, I have STOPPED at the checkpoint and am awaiting the validation pass +before starting Phase B** (the benign/destructive classifier, the reversibility gate, +and the signed-op consuming layer over `internal/authz`). Phase B is the security core +and earns isolated review. -A reversible `SetConfig` step appended to the existing `runSelftestTask` flow -(`cmd/felhom-agent/main.go`, `selftestSetConfig`), keeping the prior -snapshot → rollback → delete-snapshot steps intact. Against guest 9999: +## What runs (and what deliberately doesn't) -1. `GuestConfig` — capture the original `description` (was **absent**). -2. `SetConfig description="felhom-selftest "` — dual-mode return handled per - the `mutate.go` contract (empty UPID = synchronous; UPID = `WaitTask`+assert OK). -3. `GuestConfig` again — confirm the marker landed. -4. **Restore** — original was absent, so `SetConfig delete=description`; confirm cleared. +The engine **runs live but unfed**. At slice 4 there is no desired-state source (hub +serving is slice 10; provisioning is slice 7), so the only production `DesiredProvider` +is `EmptyProvider` → the live engine reads state, computes an **empty action set**, and +performs **zero mutations** every tick. That is the correct, expected slice-4 behavior; +the first live convergence arrives when slice 10 serves desired state into the seam. -Output matches the existing format: -``` - [ ok ] setconfig synchronous exitstatus=OK - [ ok ] verify-write description verified == marker - [ ok ] setconfig-revert synchronous exitstatus=OK - [ ok ] verify-revert description restored to original -``` +The wired action set is **benign-on-existing-guest only**: `Start`, `Stop`, `SetConfig`. +Provisioning and the destructive set are out of scope for Phase A (the destructive set +is classified and gated in Phase B but not wired to live execution — nothing serves +destructive deltas yet). -## Key finding — synchronous, not async +## Package `internal/reconcile` -**The LXC `description` write came back synchronous (empty UPID).** PVE applied it -inline with no task object; the agent printed `synchronous exitstatus=OK` on the -empty-string path. This confirms the agent's **dual-mode `SetConfig` modeling matches -Proxmox reality**: for `description`, the empty-UPID branch is the live path, and -treating `""` as success (not an error) is correct. This was the **first live exercise -of the `VM.Config.*` privilege cluster** (previously only the snapshot/rollback/backup -privileges had been run live). +- **`Queue` (per-guest serializer, doc 03 §10)** — the single choke point all mutation + sources funnel through. Same-vmid jobs run strictly one-at-a-time in submit order; + independent vmids run in parallel. Each vmid is an unbounded cond-var FIFO lane + (non-blocking, order-preserving submission); `Close` drains pending jobs gracefully. +- **Desired-state model + `DesiredProvider`** — `DesiredGuest` makes each field + individually optional (run-state / `*hub.GuestSpec` / `*description`) so a source pins + only what it manages. `EmptyProvider` (live, slice 4) and `StaticProvider` (fixtures). +- **Normalization layer (`FieldNormalizers`)** — reconcile compares *normalized* + desired-vs-actual. `description`'s trailing newline (the slice-4-proven quirk) is the + first registered normalizer; the registry takes more as discovered. `normDesc` was + **promoted** out of `main.go` to `reconcile.NormDescription`, and the `--selftest=task` + round-trip now uses that shared helper — one source of truth. +- **`Plan` (pure diff engine)** — minimal benign action set for guests in both desired + and actual: normalized comparison, deterministic vmid order, config-before-run-state. + Skips provision (slice 7) and destroy (gated, slice 10); never writes a config it + couldn't first read; disk grow deferred. +- **`Engine`** — reads desired+actual, plans, dispatches onto the shared queue. Honors + the mutate.go dual-mode contract: non-empty UPID → `WaitTask`+assert; empty UPID → + clean synchronous success. Per-action failures counted, never fatal. +- **`Journal`** — durable fsync'd JSONL (mirrors `authz.FileNonceStore`): op lifecycle + with the Proxmox task id (crash mid-op detected + re-checkable via `InFlight()`), plus + an idempotency-key store so a one-shot op never double-runs across retries/restarts. + Reconcile actions carry no idempotency key (convergent — must re-run on real drift). -## Second finding — `description` trailing-newline normalization +## Daemon wiring -PVE **appends a trailing `\n` to `description` on read** (stored URL-encoded as -`%0A...`). The first live run surfaced this as a (false) verify mismatch: -`got="...Z\n"` vs `want="...Z"`. The write had genuinely landed — only my exact-match -check was too strict. Fixed with `normDesc` (strip trailing newline) at every -comparison point, and the run went green. **This is load-bearing intel for slice 4:** -a reconcile that compares desired vs actual `description` verbatim will detect -perpetual drift; it must normalize the trailing newline. +`runDaemon` now runs reconcile alongside the hub loop on the poll cadence, sharing the +per-guest queue. The journal lives at a `journal.log` sibling of the nonce store. The +daemon runs cleanly with **no desired state and no signers** — reconcile is a logged +no-op; a journal-open failure degrades to journal-less rather than crashing. -## Live run environment +## Verification -- Built **v0.3.2** on the build server (192.168.0.180, go1.26), pointed at - `demo-felhom` (`https://192.168.0.162:8006`, PVE 9.2.2). -- Pinned leaf-cert SHA-256 fingerprint re-verified — still - `BA:7C:99:7D:45:D0…` (matches the agent's pin). -- `--selftest=read` clean first (PVE 9.2.2, node online, guests 9001+9999 visible, - storages listed), then the gated `--selftest=task -vmid 9999`. -- Task UPIDs name the token actor (`…:vzsnapshot:9999:felhom-agent@pve!agent:` etc.) — - privsep token path genuinely exercised, no privilege drift. +- Full module **race-clean** (`go test -race -count=1 ./...`) and `go vet` clean on the + Linux build server (go1.26); all unit tests green locally and there. +- Adversarial fixture coverage: serializer concurrency/ordering, normalization + + extensibility seam, the full plan matrix (drift / no-false-drift / unmanaged / + spec-unknown / scope skips / ordering / empty-desired), engine sync-vs-async + + failure counting, and journal persistence + idempotency dedupe **across a simulated + restart**. +- No live Proxmox needed (the engine is unfed); the live exercise is deferred — there is + nothing to converge until a desired-state source exists. -## Post-state +## Next (after validation) -Guest **9999** left pristine: **stopped**, `description` **absent**, only `current` -remains (no leftover `felhom-selftest` snapshot). - -## Credentials - -The standing operator token (`felhom-agent@pve!agent`, privsep) was **rotated** during -this run — the prior secret was not retrievable (PVE reveals a token secret only once -at creation), so a fresh secret was minted via `root@felhom-pve` and the `FelhomAgent` -role re-confirmed on **both** the user and the token ACL at `/` (privsep intersection -gotcha). The token was consumed via the **standing operator token through -`FELHOM_AGENT_PROXMOX_TOKEN`, not persisted to the repo** — the on-disk demo config -carries only a placeholder. The new secret is **stored out-of-band**. +Phase B: the classifier (benign vs destructive by provenance + data-bearing-ness, not by +verb), the reversibility gate in front of the queue's executor, and the signed-op +consuming layer over `internal/authz` with role-scoping + op-to-action binding + the +adversarial rejection matrix — landing **v0.4.0**. I will not start it until the Phase-A +validation passes. diff --git a/cmd/felhom-agent/main.go b/cmd/felhom-agent/main.go index 240486b..710d375 100644 --- a/cmd/felhom-agent/main.go +++ b/cmd/felhom-agent/main.go @@ -15,7 +15,7 @@ import ( "log/slog" "os" "os/signal" - "strings" + "path/filepath" "syscall" "time" @@ -23,11 +23,12 @@ import ( "gitea.dooplex.hu/admin/felhom-agent/internal/hub" applog "gitea.dooplex.hu/admin/felhom-agent/internal/log" "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" + "gitea.dooplex.hu/admin/felhom-agent/internal/reconcile" ) // version is the agent version. Overridable at build time with // -ldflags "-X main.version="; defaults to the in-repo CHANGELOG version. -var version = "0.3.2" +var version = "0.4.0-rc1" func main() { var ( @@ -109,19 +110,66 @@ func runDaemon(cfg config.Config, logger *slog.Logger) int { hcfg := cfg.Hub.WithDefaults() collector := hub.NewCollector(px, hub.SystemctlProber{}, cfg.Hub.HostID, version, logger) loop := hub.NewLoop(collector, client, time.Duration(hcfg.PollSeconds)*time.Second, logger) + interval := time.Duration(hcfg.PollSeconds) * time.Second ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() logger.Info("felhom-agent daemon starting", "version", version, "host_id", cfg.Hub.HostID, "hub_url", cfg.Hub.URL, "interval_s", hcfg.PollSeconds) // hub key intentionally not logged - if err := loop.Run(ctx); err != nil { - logger.Error("daemon: loop exited with error", "err", err) + + // Reconcile (slice 4) runs alongside the hub loop, sharing the per-guest queue + // (doc 03 §10). At slice 4 the desired-state provider is empty (no hub serving + // until slice 10), so reconcile is a live no-op: it reads state and computes an + // empty action set each tick, mutating nothing. The daemon must run cleanly with + // no desired state and no signers configured — so a journal-open failure is logged + // and reconcile proceeds journal-less (it has nothing destructive to journal yet). + queue := reconcile.NewQueue() + defer queue.Close() + var journal *reconcile.Journal + if jp := reconcileJournalPath(cfg); jp != "" { + if err := os.MkdirAll(filepath.Dir(jp), 0o700); err != nil { + logger.Warn("daemon: cannot ensure journal dir; reconcile runs without a journal", "path", jp, "err", err) + } else if j, err := reconcile.OpenJournal(jp); err != nil { + logger.Warn("daemon: cannot open op journal; reconcile runs without a journal", "path", jp, "err", err) + } else { + journal = j + defer journal.Close() + } + } + engine := reconcile.NewEngine(reconcile.EngineOptions{ + API: px, + Queue: queue, + Journal: journal, + Provider: reconcile.EmptyProvider{}, // slice 4: no live desired-state source + Logger: logger, + }) + + // Run reconcile and the hub loop concurrently; either returning ends the daemon. + errc := make(chan error, 2) + go func() { errc <- engine.Run(ctx, interval) }() + go func() { errc <- loop.Run(ctx) }() + + err = <-errc + stop() // tear down the sibling on the first exit + <-errc // wait for it + if err != nil && err != context.Canceled { + logger.Error("daemon: exited with error", "err", err) return 1 } return 0 } +// reconcileJournalPath chooses the op-journal path: a `journal.log` sibling of the +// configured nonce store (both are durable agent state), falling back to the standard +// host state dir when the nonce store is unset. +func reconcileJournalPath(cfg config.Config) string { + if p := cfg.Authz.NonceStorePath; p != "" { + return filepath.Join(filepath.Dir(p), "journal.log") + } + return "/var/lib/felhom-agent/journal.log" +} + // runSelftestHub validates hub config, does ONE collect + report, and prints the // report it would send plus the envelope it got back. func runSelftestHub(ctx context.Context, cfg config.Config, logger *slog.Logger) int { @@ -317,10 +365,11 @@ func selftestSetConfig(ctx context.Context, client *proxmox.Client, vmid int) in return 1 } // PVE normalizes `description` by appending a trailing newline on read, so all - // comparisons here use normDesc (strip trailing newlines) and restores write the - // normalized original — otherwise an exact-match check sees false drift. This is - // load-bearing intel for slice-4 reconcile (compare descriptions normalized). - origDesc = normDesc(origDesc) + // comparisons here use the shared reconcile.NormDescription (strip trailing + // newlines) and restores write the normalized original — otherwise an exact-match + // check sees false drift. Same helper the slice-4 reconciler uses to normalize + // description, so the quirk has one source of truth. + origDesc = reconcile.NormDescription(origDesc) // 2. Write the marker. marker := "felhom-selftest " + time.Now().UTC().Format(time.RFC3339) @@ -339,7 +388,7 @@ func selftestSetConfig(ctx context.Context, client *proxmox.Client, vmid int) in fmt.Printf(" [FAIL] %-16s decode description (verify): %v\n", "setconfig", err) return 1 } - if !present || normDesc(got) != marker { + if !present || reconcile.NormDescription(got) != marker { fmt.Printf(" [FAIL] %-16s write did not land: present=%v got=%q want=%q\n", "setconfig", present, got, marker) return 1 } @@ -368,8 +417,8 @@ func selftestSetConfig(ctx context.Context, client *proxmox.Client, vmid int) in return 1 } if origPresent { - if !present || normDesc(got) != origDesc { - fmt.Printf(" [FAIL] %-16s revert did not restore: present=%v got=%q want=%q\n", "setconfig-revert", present, normDesc(got), origDesc) + if !present || reconcile.NormDescription(got) != origDesc { + fmt.Printf(" [FAIL] %-16s revert did not restore: present=%v got=%q want=%q\n", "setconfig-revert", present, reconcile.NormDescription(got), origDesc) return 1 } } else if present { @@ -407,11 +456,6 @@ func applySetConfig(ctx context.Context, client *proxmox.Client, vmid int, step return 0 } -// normDesc strips trailing newlines that PVE appends to the `description` field -// on read, so a written value round-trips equal. (PVE stores `description` with a -// trailing "\n"; comparing raw would always mismatch.) -func normDesc(s string) string { return strings.TrimRight(s, "\n") } - // extraString reads a string-valued key from GuestConfig.Extra (raw JSON). It // returns ("", false, nil) when the key is absent, and decodes the JSON string // otherwise. diff --git a/internal/reconcile/doc.go b/internal/reconcile/doc.go new file mode 100644 index 0000000..19c4fe7 --- /dev/null +++ b/internal/reconcile/doc.go @@ -0,0 +1,24 @@ +// Package reconcile is the agent-side control core (slice 4): the engine that +// converges actual Proxmox state toward a desired state, the per-guest serializer +// that all mutation sources funnel through (doc 03 §10), the field-normalization +// layer that keeps Proxmox round-trip quirks from reading as drift, and the durable +// operation journal + idempotency store. +// +// Phase A (this file set) is structural and runs LIVE but UNFED: at slice 4 there is +// no desired-state provider (hub serving is slice 10, provisioning is slice 7), so +// the live engine computes an empty action set and performs ZERO mutations. The +// engine, serializer, normalizer, journal, and planner are all exercised against +// synthetic fixtures. The first live convergence arrives when slice 10 serves desired +// state into the DesiredProvider seam. +// +// Phase B (added later) layers the benign/destructive classifier and the +// reversibility gate (doc 03 §4) plus the signed-op consuming layer over +// internal/authz (doc 04) in front of the queue's executor — so every mutation passes +// the gate. Phase A deliberately wires only the benign-on-existing-guest action set: +// Start / Stop / SetConfig. +// +// Action set scope (slice 4): Start, Stop, SetConfig on EXISTING guests only. +// Provisioning (restore-to-new-guest) and destroy/overwrite are out of scope here — +// the destructive set is classified and gated in Phase B but not wired to live +// execution, because nothing serves destructive deltas until slice 10. +package reconcile diff --git a/internal/reconcile/engine.go b/internal/reconcile/engine.go new file mode 100644 index 0000000..a355312 --- /dev/null +++ b/internal/reconcile/engine.go @@ -0,0 +1,249 @@ +package reconcile + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "sync/atomic" + "time" + + "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" +) + +// Engine converges actual Proxmox state toward the desired state. One Reconcile pass: +// read desired (from the provider), read actual (from Proxmox), Plan the minimal +// benign action set, and dispatch each action onto the per-guest Queue — journaling +// each op for crash-safety. At slice 4 the provider is EmptyProvider, so the action +// set is empty and the pass performs zero mutations (correct and expected). +// +// Concurrency: actions for different guests run in parallel (separate Queue lanes); +// actions for the same guest run serially in plan order. Every Proxmox mutation is +// async-or-sync per the mutate.go contract: a non-empty UPID is WaitTask'd and its +// exitstatus asserted; an empty UPID is a clean synchronous success. +type Engine struct { + api GuestAPI + queue *Queue + journal *Journal + provider DesiredProvider + norm FieldNormalizers + logger *slog.Logger + + opSeq uint64 // atomic; makes each op id unique per attempt +} + +// EngineOptions configures a new Engine. Norm defaults to DefaultNormalizers, Logger +// to a discard logger. +type EngineOptions struct { + API GuestAPI + Queue *Queue + Journal *Journal + Provider DesiredProvider + Norm FieldNormalizers + Logger *slog.Logger +} + +// NewEngine builds an Engine. The Queue is shared (the single §10 choke point); the +// caller owns its lifecycle (Close on shutdown). +func NewEngine(opts EngineOptions) *Engine { + norm := opts.Norm + if norm == nil { + norm = DefaultNormalizers() + } + logger := opts.Logger + if logger == nil { + logger = slog.New(slog.NewTextHandler(discard{}, nil)) + } + provider := opts.Provider + if provider == nil { + provider = EmptyProvider{} + } + return &Engine{ + api: opts.API, + queue: opts.Queue, + journal: opts.Journal, + provider: provider, + norm: norm, + logger: logger, + } +} + +// Result summarizes one Reconcile pass. +type Result struct { + Planned int + Executed int // succeeded + Failed int // errored + Errors []error // one per failed action +} + +// Reconcile runs one convergence pass. It returns an error only on a pass-level +// failure (can't read desired/actual); per-action failures are counted in Result and +// do not abort the pass (other guests still converge). +func (e *Engine) Reconcile(ctx context.Context) (Result, error) { + desired, err := e.provider.Desired(ctx) + if err != nil { + return Result{}, fmt.Errorf("reconcile: desired state: %w", err) + } + actual, err := e.readActual(ctx) + if err != nil { + return Result{}, fmt.Errorf("reconcile: actual state: %w", err) + } + + actions := Plan(desired, actual, e.norm) + res := Result{Planned: len(actions)} + if len(actions) == 0 { + e.logger.Debug("reconcile: no drift, no actions", + "desired_guests", len(desired.Guests), "actual_guests", len(actual.Guests)) + return res, nil + } + + // Dispatch all actions onto the shared per-guest queue, then await each. Same-vmid + // actions serialize in submit order; different vmids run concurrently. + chans := make([]<-chan error, len(actions)) + for i := range actions { + act := actions[i] + chans[i] = e.queue.Submit(act.VMID, func() error { return e.execute(ctx, act) }) + } + for i, ch := range chans { + if err := <-ch; err != nil { + res.Failed++ + res.Errors = append(res.Errors, err) + e.logger.Error("reconcile: action failed", + "vmid", actions[i].VMID, "kind", actions[i].Kind, "err", err) + } else { + res.Executed++ + e.logger.Info("reconcile: action applied", + "vmid", actions[i].VMID, "kind", actions[i].Kind, "reason", actions[i].Reason) + } + } + return res, nil +} + +// execute dispatches one benign action against Proxmox and journals its lifecycle. +// Reconcile actions carry NO idempotency key (convergent — safe to re-run on drift); +// crash-safety comes from the in-flight journal records, not idempotency suppression. +func (e *Engine) execute(ctx context.Context, act Action) error { + opID := e.nextOpID(act) + e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), + Params: act.Params, State: OpStarted, At: time.Now().UTC()}) + + var upid string + var err error + switch act.Kind { + case ActionStart: + upid, err = e.api.Start(ctx, act.VMID) + case ActionStop: + upid, err = e.api.Stop(ctx, act.VMID) + case ActionSetConfig: + upid, err = e.api.SetConfig(ctx, act.VMID, act.Params) + default: + err = fmt.Errorf("reconcile: unknown action kind %q", act.Kind) + } + if err != nil { + e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), + State: OpFailed, At: time.Now().UTC()}) + return fmt.Errorf("reconcile: %s vmid %d: %w", act.Kind, act.VMID, err) + } + + // Record the task id (if any) before awaiting it, so a crash mid-wait is + // detectable on restart and the task status can be re-checked. + e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), + UPID: upid, State: OpTaskRunning, At: time.Now().UTC()}) + + if upid != "" { + st, err := e.api.WaitTask(ctx, upid, proxmox.WaitOptions{}) + if err != nil { // WaitTask already errors on a non-OK exitstatus + e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), + UPID: upid, State: OpFailed, At: time.Now().UTC()}) + return fmt.Errorf("reconcile: %s vmid %d: %w", act.Kind, act.VMID, err) + } + if st.ExitStatus != "OK" { // defensive — WaitTask should have errored + e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), + UPID: upid, State: OpFailed, At: time.Now().UTC()}) + return fmt.Errorf("reconcile: %s vmid %d: exitstatus=%s", act.Kind, act.VMID, st.ExitStatus) + } + } + // upid == "" is the synchronous path (slice-4 proven for SetConfig description). + + e.append(JournalEntry{OpID: opID, VMID: act.VMID, Kind: string(act.Kind), + UPID: upid, State: OpSucceeded, At: time.Now().UTC()}) + return nil +} + +// readActual reads observed state from Proxmox: run-state from the list, sizing + +// description from per-guest config. A GuestConfig read failure keeps the run-state +// (SpecKnown=false) rather than dropping the guest — matching the collector. +func (e *Engine) readActual(ctx context.Context) (ActualState, error) { + lxc, err := e.api.ListLXC(ctx) + if err != nil { + return ActualState{}, err + } + guests := make(map[int]ActualGuest, len(lxc)) + for _, g := range lxc { + a := ActualGuest{VMID: g.VMID, Run: normRun(g.Status)} + cfg, err := e.api.GuestConfig(ctx, g.VMID) + if err != nil { + e.logger.Warn("reconcile: GuestConfig failed; spec unknown (run-state kept)", + "vmid", g.VMID, "err", err) + } else { + a.SpecKnown = true + a.Cores = cfg.Cores + a.MemoryMiB = cfg.Memory + a.Description = guestDescription(cfg) + } + guests[g.VMID] = a + } + return ActualState{Guests: guests}, nil +} + +// Run reconciles once immediately, then on every interval tick until ctx is done. A +// per-pass failure is logged and the loop continues (drift is corrected next tick). +// At slice 4 (EmptyProvider) every pass is a logged no-op. +func (e *Engine) Run(ctx context.Context, interval time.Duration) error { + e.reconcileOnce(ctx) + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + e.reconcileOnce(ctx) + } + } +} + +func (e *Engine) reconcileOnce(ctx context.Context) { + res, err := e.Reconcile(ctx) + if err != nil { + e.logger.Error("reconcile: pass failed", "err", err) + return + } + if res.Planned > 0 { + e.logger.Info("reconcile: pass complete", + "planned", res.Planned, "executed", res.Executed, "failed", res.Failed) + } +} + +// nextOpID builds a per-attempt unique op id (kind-vmid-seq) for journal correlation. +func (e *Engine) nextOpID(act Action) string { + n := atomic.AddUint64(&e.opSeq, 1) + return string(act.Kind) + "-" + strconv.Itoa(act.VMID) + "-" + strconv.FormatUint(n, 10) +} + +// append journals a lifecycle record, logging (never failing the op on) a journal I/O +// error — the Proxmox op already happened; a missing journal line is a crash-recovery +// degradation, not a reason to abort. +func (e *Engine) append(rec JournalEntry) { + if e.journal == nil { + return + } + if err := e.journal.Append(rec); err != nil { + e.logger.Error("reconcile: journal append failed", "op_id", rec.OpID, "state", rec.State, "err", err) + } +} + +// discard is an io.Writer sink for the default no-op logger. +type discard struct{} + +func (discard) Write(p []byte) (int, error) { return len(p), nil } diff --git a/internal/reconcile/engine_test.go b/internal/reconcile/engine_test.go new file mode 100644 index 0000000..d4dc852 --- /dev/null +++ b/internal/reconcile/engine_test.go @@ -0,0 +1,212 @@ +package reconcile + +import ( + "context" + "errors" + "path/filepath" + "sync" + "testing" + + "gitea.dooplex.hu/admin/felhom-agent/internal/hub" + "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" +) + +// fakeAPI is a configurable GuestAPI for engine tests: it records mutating calls and +// returns canned UPIDs (""=synchronous, non-empty=async) and WaitTask verdicts. +type fakeAPI struct { + mu sync.Mutex + lxc []proxmox.Guest + cfg map[int]proxmox.GuestConfig + + startUPID, stopUPID, setUPID string + startErr, stopErr, setErr error + // waitFunc maps a UPID to a (status, err); default = OK. Mirrors the real client, + // which errors on a non-OK exitstatus. + waitFunc func(upid string) (proxmox.TaskStatus, error) + + starts []int + stops []int + sets []setCall + waits []string + listErr error +} + +type setCall struct { + vmid int + params map[string]string +} + +func (f *fakeAPI) ListLXC(context.Context) ([]proxmox.Guest, error) { + if f.listErr != nil { + return nil, f.listErr + } + return f.lxc, nil +} + +func (f *fakeAPI) GuestConfig(_ context.Context, vmid int) (proxmox.GuestConfig, error) { + c, ok := f.cfg[vmid] + if !ok { + return proxmox.GuestConfig{}, errors.New("no config") + } + return c, nil +} + +func (f *fakeAPI) Start(_ context.Context, vmid int) (string, error) { + f.mu.Lock() + f.starts = append(f.starts, vmid) + f.mu.Unlock() + return f.startUPID, f.startErr +} + +func (f *fakeAPI) Stop(_ context.Context, vmid int) (string, error) { + f.mu.Lock() + f.stops = append(f.stops, vmid) + f.mu.Unlock() + return f.stopUPID, f.stopErr +} + +func (f *fakeAPI) SetConfig(_ context.Context, vmid int, params map[string]string) (string, error) { + f.mu.Lock() + f.sets = append(f.sets, setCall{vmid, params}) + f.mu.Unlock() + return f.setUPID, f.setErr +} + +func (f *fakeAPI) WaitTask(_ context.Context, upid string, _ proxmox.WaitOptions) (proxmox.TaskStatus, error) { + f.mu.Lock() + f.waits = append(f.waits, upid) + f.mu.Unlock() + if f.waitFunc != nil { + return f.waitFunc(upid) + } + return proxmox.TaskStatus{Status: "stopped", ExitStatus: "OK"}, nil +} + +func newEngine(t *testing.T, api GuestAPI, provider DesiredProvider) (*Engine, *Journal, *Queue) { + t.Helper() + jp := filepath.Join(t.TempDir(), "journal.log") + j, err := OpenJournal(jp) + if err != nil { + t.Fatalf("OpenJournal: %v", err) + } + t.Cleanup(func() { j.Close() }) + q := NewQueue() + t.Cleanup(q.Close) + e := NewEngine(EngineOptions{API: api, Queue: q, Journal: j, Provider: provider}) + return e, j, q +} + +func TestEngine_EmptyProviderNoMutations(t *testing.T) { + api := &fakeAPI{ + lxc: []proxmox.Guest{{VMID: 100, Status: "running"}}, + cfg: map[int]proxmox.GuestConfig{100: {Cores: 2}}, + } + e, _, _ := newEngine(t, api, EmptyProvider{}) + res, err := e.Reconcile(context.Background()) + if err != nil { + t.Fatalf("Reconcile: %v", err) + } + if res.Planned != 0 || res.Executed != 0 { + t.Errorf("EmptyProvider should plan nothing, got %+v", res) + } + if len(api.starts)+len(api.stops)+len(api.sets) != 0 { + t.Errorf("EmptyProvider mutated Proxmox: starts=%v stops=%v sets=%v", api.starts, api.stops, api.sets) + } +} + +func TestEngine_AsyncStartWaitsTask(t *testing.T) { + api := &fakeAPI{ + lxc: []proxmox.Guest{{VMID: 100, Status: "stopped"}}, + cfg: map[int]proxmox.GuestConfig{100: {Cores: 2}}, + startUPID: "UPID:demo:start:100:", + } + e, j, _ := newEngine(t, api, StaticProvider{State: desired(DesiredGuest{VMID: 100, Run: RunRunning})}) + res, err := e.Reconcile(context.Background()) + if err != nil { + t.Fatalf("Reconcile: %v", err) + } + if res.Executed != 1 || res.Failed != 0 { + t.Fatalf("want 1 executed, got %+v", res) + } + if len(api.starts) != 1 || api.starts[0] != 100 { + t.Errorf("expected Start(100), got %v", api.starts) + } + if len(api.waits) != 1 { + t.Errorf("async op must WaitTask, got waits=%v", api.waits) + } + if len(j.InFlight()) != 0 { + t.Errorf("no ops should be in-flight after success: %+v", j.InFlight()) + } +} + +func TestEngine_SynchronousSetConfigNoWait(t *testing.T) { + // Empty UPID = PVE applied synchronously (slice-4 proven for description). Must be + // treated as success WITHOUT a WaitTask call. + api := &fakeAPI{ + lxc: []proxmox.Guest{{VMID: 100, Status: "stopped"}}, + cfg: map[int]proxmox.GuestConfig{100: {Cores: 2}}, + setUPID: "", // synchronous + } + e, _, _ := newEngine(t, api, StaticProvider{State: desired( + DesiredGuest{VMID: 100, Spec: &hub.GuestSpec{Cores: 4, MemoryBytes: mib(2048)}})}) + res, err := e.Reconcile(context.Background()) + if err != nil { + t.Fatalf("Reconcile: %v", err) + } + if res.Executed != 1 { + t.Fatalf("want 1 executed, got %+v", res) + } + if len(api.sets) != 1 || api.sets[0].params["cores"] != "4" { + t.Errorf("expected SetConfig cores=4, got %v", api.sets) + } + if len(api.waits) != 0 { + t.Errorf("synchronous op must NOT WaitTask, got waits=%v", api.waits) + } +} + +func TestEngine_WaitTaskFailureCountsFailed(t *testing.T) { + api := &fakeAPI{ + lxc: []proxmox.Guest{{VMID: 100, Status: "stopped"}}, + cfg: map[int]proxmox.GuestConfig{100: {Cores: 2}}, + startUPID: "UPID:demo:start:100:", + waitFunc: func(string) (proxmox.TaskStatus, error) { + return proxmox.TaskStatus{Status: "stopped", ExitStatus: "got 403"}, errors.New("task failed: got 403") + }, + } + e, j, _ := newEngine(t, api, StaticProvider{State: desired(DesiredGuest{VMID: 100, Run: RunRunning})}) + res, err := e.Reconcile(context.Background()) + if err != nil { + t.Fatalf("Reconcile (pass): %v", err) + } + if res.Failed != 1 || res.Executed != 0 { + t.Fatalf("want 1 failed, got %+v", res) + } + // The failed op is journaled terminal (failed), not left in-flight. + if len(j.InFlight()) != 0 { + t.Errorf("failed op should be terminal, in-flight=%+v", j.InFlight()) + } +} + +func TestEngine_PostErrorCountsFailed(t *testing.T) { + api := &fakeAPI{ + lxc: []proxmox.Guest{{VMID: 100, Status: "stopped"}}, + cfg: map[int]proxmox.GuestConfig{100: {Cores: 2}}, + startErr: errors.New("connection refused"), + } + e, _, _ := newEngine(t, api, StaticProvider{State: desired(DesiredGuest{VMID: 100, Run: RunRunning})}) + res, _ := e.Reconcile(context.Background()) + if res.Failed != 1 { + t.Fatalf("want 1 failed on POST error, got %+v", res) + } + if len(api.waits) != 0 { + t.Errorf("POST error must not reach WaitTask, got %v", api.waits) + } +} + +func TestEngine_ListErrorIsPassFailure(t *testing.T) { + api := &fakeAPI{listErr: errors.New("api down")} + e, _, _ := newEngine(t, api, StaticProvider{State: desired(DesiredGuest{VMID: 100, Run: RunRunning})}) + if _, err := e.Reconcile(context.Background()); err == nil { + t.Error("expected a pass-level error when actual state can't be read") + } +} diff --git a/internal/reconcile/journal.go b/internal/reconcile/journal.go new file mode 100644 index 0000000..b548648 --- /dev/null +++ b/internal/reconcile/journal.go @@ -0,0 +1,189 @@ +package reconcile + +import ( + "bytes" + "encoding/json" + "errors" + "io/fs" + "os" + "path/filepath" + "sync" + "time" +) + +// OpState is the lifecycle state of a journaled operation. +type OpState string + +const ( + // OpStarted: the op was planned and dispatch began (no Proxmox task yet). + OpStarted OpState = "started" + // OpTaskRunning: the Proxmox POST returned a UPID we are/were awaiting. Recorded + // so a crash mid-op is detected on restart and the task status re-checked. + OpTaskRunning OpState = "task_running" + // OpSucceeded: the op completed (task exitstatus OK, or a clean synchronous apply). + OpSucceeded OpState = "succeeded" + // OpFailed: the op errored (POST error, task non-OK, or WaitTask error). + OpFailed OpState = "failed" +) + +// terminal reports whether a state is final (no further records expected). +func (s OpState) terminal() bool { return s == OpSucceeded || s == OpFailed } + +// JournalEntry is one durable record. Multiple records share an OpID across an op's +// lifecycle (started → task_running → succeeded/failed); the latest wins in the index. +// +// IdempKey is the one-shot idempotency key: set on ops that must run AT MOST ONCE +// across retries/restarts (signed jobs, slice B). Reconcile actions leave it empty — +// reconcile is convergent and SHOULD re-run on real drift, so it is never suppressed +// by the idempotency set. A non-empty IdempKey that reaches OpSucceeded marks the key +// applied (AlreadyApplied true forever after, surviving restarts). +type JournalEntry struct { + OpID string `json:"op_id"` + VMID int `json:"vmid"` + Kind string `json:"kind"` + Params map[string]string `json:"params,omitempty"` + UPID string `json:"upid,omitempty"` + State OpState `json:"state"` + IdempKey string `json:"idemp_key,omitempty"` + At time.Time `json:"at"` +} + +// Journal is the durable operation log + idempotency store. It mirrors +// authz.FileNonceStore: an fsync'd append-only JSONL with an in-memory index. A +// record is on disk AND fsync'd before Append returns, so a crash never loses a +// committed lifecycle transition. +// +// Phase-A scope: it records single-task ops (Start/Stop/SetConfig) for crash +// detection and provides the idempotency-key dedupe. Full multi-step compensating +// rollback (provision/restore, slices 6/7) reuses this structure with richer replay. +type Journal struct { + mu sync.Mutex + path string + f *os.File + latest map[string]JournalEntry // op_id -> latest record + applied map[string]struct{} // idemp keys that reached OpSucceeded +} + +// OpenJournal opens (or creates) the journal at path, replaying any existing log into +// the index. The parent dir must exist (the daemon ensures it, sibling to the nonce +// store). +func OpenJournal(path string) (*Journal, error) { + j := &Journal{ + path: path, + latest: make(map[string]JournalEntry), + applied: make(map[string]struct{}), + } + if err := j.load(); err != nil { + return nil, err + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600) + if err != nil { + return nil, err + } + j.f = f + syncJournalDir(filepath.Dir(path)) + return j, nil +} + +func (j *Journal) load() error { + b, err := os.ReadFile(j.path) + if errors.Is(err, fs.ErrNotExist) { + return nil + } + if err != nil { + return err + } + for _, line := range bytes.Split(b, []byte("\n")) { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + var e JournalEntry + if json.Unmarshal(line, &e) != nil { + continue // skip a torn trailing line from a crash mid-append + } + j.index(e) + } + return nil +} + +// index folds one entry into the in-memory state (latest-by-op + applied set). +func (j *Journal) index(e JournalEntry) { + j.latest[e.OpID] = e + if e.State == OpSucceeded && e.IdempKey != "" { + j.applied[e.IdempKey] = struct{}{} + } +} + +// Append durably writes one lifecycle record (fsync before returning) and updates the +// index. Callers build entries via the engine's lifecycle (Begin/RecordTask/Complete +// helpers below). +func (j *Journal) Append(e JournalEntry) error { + j.mu.Lock() + defer j.mu.Unlock() + rec, err := json.Marshal(e) + if err != nil { + return err + } + rec = append(rec, '\n') + if _, err := j.f.Write(rec); err != nil { + return err + } + if err := j.f.Sync(); err != nil { + return err + } + j.index(e) + return nil +} + +// Latest returns the most recent record for an op id. +func (j *Journal) Latest(opID string) (JournalEntry, bool) { + j.mu.Lock() + defer j.mu.Unlock() + e, ok := j.latest[opID] + return e, ok +} + +// InFlight returns ops whose latest state is non-terminal — i.e. started or +// task_running with no succeeded/failed record. On restart the daemon re-checks each +// (resume-or-rollback). Order is unspecified. +func (j *Journal) InFlight() []JournalEntry { + j.mu.Lock() + defer j.mu.Unlock() + var out []JournalEntry + for _, e := range j.latest { + if !e.State.terminal() { + out = append(out, e) + } + } + return out +} + +// AlreadyApplied reports whether a one-shot op with this idempotency key has already +// succeeded (survives restarts via the replayed log). Empty key is never "applied". +func (j *Journal) AlreadyApplied(idempKey string) bool { + if idempKey == "" { + return false + } + j.mu.Lock() + defer j.mu.Unlock() + _, ok := j.applied[idempKey] + return ok +} + +// Close releases the file handle. +func (j *Journal) Close() error { + j.mu.Lock() + defer j.mu.Unlock() + if j.f != nil { + return j.f.Close() + } + return nil +} + +func syncJournalDir(dir string) { + if d, err := os.Open(dir); err == nil { + _ = d.Sync() + _ = d.Close() + } +} diff --git a/internal/reconcile/journal_test.go b/internal/reconcile/journal_test.go new file mode 100644 index 0000000..622013d --- /dev/null +++ b/internal/reconcile/journal_test.go @@ -0,0 +1,154 @@ +package reconcile + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +func appendRaw(t *testing.T, path, line string) { + t.Helper() + f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0o600) + if err != nil { + t.Fatalf("open for raw append: %v", err) + } + defer f.Close() + if _, err := f.WriteString(line + "\n"); err != nil { + t.Fatalf("raw append: %v", err) + } +} + +func reopen(t *testing.T, j *Journal, path string) *Journal { + t.Helper() + if err := j.Close(); err != nil { + t.Fatalf("close: %v", err) + } + nj, err := OpenJournal(path) + if err != nil { + t.Fatalf("reopen: %v", err) + } + t.Cleanup(func() { nj.Close() }) + return nj +} + +func TestJournal_LifecycleLatestWins(t *testing.T) { + path := filepath.Join(t.TempDir(), "journal.log") + j, err := OpenJournal(path) + if err != nil { + t.Fatalf("open: %v", err) + } + t.Cleanup(func() { j.Close() }) + + now := time.Now().UTC() + for _, e := range []JournalEntry{ + {OpID: "op1", VMID: 100, Kind: "start", State: OpStarted, At: now}, + {OpID: "op1", VMID: 100, Kind: "start", UPID: "UPID:x:", State: OpTaskRunning, At: now}, + {OpID: "op1", VMID: 100, Kind: "start", UPID: "UPID:x:", State: OpSucceeded, At: now}, + } { + if err := j.Append(e); err != nil { + t.Fatalf("append: %v", err) + } + } + got, ok := j.Latest("op1") + if !ok || got.State != OpSucceeded { + t.Fatalf("Latest(op1) = %+v ok=%v, want succeeded", got, ok) + } + if len(j.InFlight()) != 0 { + t.Errorf("a succeeded op must not be in-flight: %+v", j.InFlight()) + } +} + +func TestJournal_InFlightSurvivesRestart(t *testing.T) { + path := filepath.Join(t.TempDir(), "journal.log") + j, err := OpenJournal(path) + if err != nil { + t.Fatalf("open: %v", err) + } + now := time.Now().UTC() + // op started + got a task id, but NO terminal record — simulates a crash mid-op. + mustAppend(t, j, JournalEntry{OpID: "op9", VMID: 100, Kind: "set_config", UPID: "UPID:crash:", State: OpTaskRunning, At: now}) + + j2 := reopen(t, j, path) + inflight := j2.InFlight() + if len(inflight) != 1 || inflight[0].OpID != "op9" || inflight[0].UPID != "UPID:crash:" { + t.Fatalf("crash-mid-op should replay as in-flight with its task id, got %+v", inflight) + } +} + +func TestJournal_IdempotencyDedupeAcrossRestart(t *testing.T) { + path := filepath.Join(t.TempDir(), "journal.log") + j, err := OpenJournal(path) + if err != nil { + t.Fatalf("open: %v", err) + } + now := time.Now().UTC() + + const key = "job-abc-123" + if j.AlreadyApplied(key) { + t.Fatal("key should not be applied before any record") + } + // A one-shot op succeeds carrying an idempotency key. + mustAppend(t, j, JournalEntry{OpID: "op1", VMID: 100, Kind: "restore", IdempKey: key, State: OpStarted, At: now}) + mustAppend(t, j, JournalEntry{OpID: "op1", VMID: 100, Kind: "restore", IdempKey: key, State: OpSucceeded, At: now}) + if !j.AlreadyApplied(key) { + t.Fatal("key should be applied after success") + } + + // Survives a restart (replayed from the log) — a redelivered job must not re-run. + j2 := reopen(t, j, path) + if !j2.AlreadyApplied(key) { + t.Error("idempotency key must survive an agent restart") + } + // Empty key is never 'applied'. + if j2.AlreadyApplied("") { + t.Error("empty idempotency key must never be considered applied") + } +} + +func TestJournal_FailedKeyNotApplied(t *testing.T) { + path := filepath.Join(t.TempDir(), "journal.log") + j, err := OpenJournal(path) + if err != nil { + t.Fatalf("open: %v", err) + } + t.Cleanup(func() { j.Close() }) + now := time.Now().UTC() + const key = "job-fail" + mustAppend(t, j, JournalEntry{OpID: "opF", VMID: 1, Kind: "restore", IdempKey: key, State: OpStarted, At: now}) + mustAppend(t, j, JournalEntry{OpID: "opF", VMID: 1, Kind: "restore", IdempKey: key, State: OpFailed, At: now}) + if j.AlreadyApplied(key) { + t.Error("a FAILED one-shot op must not mark its key applied (it may be retried)") + } +} + +func TestJournal_SkipsTornTrailingLine(t *testing.T) { + path := filepath.Join(t.TempDir(), "journal.log") + j, err := OpenJournal(path) + if err != nil { + t.Fatalf("open: %v", err) + } + mustAppend(t, j, JournalEntry{OpID: "ok", VMID: 1, Kind: "start", State: OpSucceeded, At: time.Now().UTC()}) + j.Close() + // Append a torn (partial) JSON line as a crash would leave. + appendRaw(t, path, `{"op_id":"torn","state":`) + + j2, err := OpenJournal(path) + if err != nil { + t.Fatalf("reopen with torn line: %v", err) + } + t.Cleanup(func() { j2.Close() }) + if _, ok := j2.Latest("ok"); !ok { + t.Error("the good record before the torn line must still load") + } + if _, ok := j2.Latest("torn"); ok { + t.Error("the torn line must be skipped") + } +} + +func mustAppend(t *testing.T, j *Journal, e JournalEntry) { + t.Helper() + if err := j.Append(e); err != nil { + t.Fatalf("append: %v", err) + } +} diff --git a/internal/reconcile/normalize.go b/internal/reconcile/normalize.go new file mode 100644 index 0000000..b0d3f9c --- /dev/null +++ b/internal/reconcile/normalize.go @@ -0,0 +1,47 @@ +package reconcile + +import "strings" + +// The normalization layer keeps Proxmox round-trip quirks from reading as drift. +// Reconcile compares NORMALIZED desired-vs-actual so a value the agent wrote and then +// read back equal does not look like a change. `description`'s trailing newline (the +// first proven case, slice-4 pre-check) is the seed; the structure is a per-field +// registry so other quirks (omitted-default fields, boolean coercions, list ordering) +// slot in as they are discovered — each is a Normalizer mapped to a field name. + +// Normalizer maps a raw field value to its canonical comparison form. It must be +// idempotent: Normalizer(Normalizer(x)) == Normalizer(x). +type Normalizer func(string) string + +// FieldNormalizers maps a Proxmox config field name to its Normalizer. A field with +// no entry compares verbatim (identity). +type FieldNormalizers map[string]Normalizer + +// DefaultNormalizers is the production set. Today only `description` needs one (PVE +// appends a trailing newline on read). New quirks are added here as they are found. +func DefaultNormalizers() FieldNormalizers { + return FieldNormalizers{ + "description": NormDescription, + } +} + +// Norm returns the canonical comparison form of value for field, applying the +// field's Normalizer if one is registered, else the value unchanged. +func (n FieldNormalizers) Norm(field, value string) string { + if f, ok := n[field]; ok && f != nil { + return f(value) + } + return value +} + +// Equal reports whether two raw values for field are equal AFTER normalization. +func (n FieldNormalizers) Equal(field, a, b string) bool { + return n.Norm(field, a) == n.Norm(field, b) +} + +// NormDescription strips the trailing newline(s) PVE appends to the LXC `description` +// field on read, so a written value round-trips equal. (Proven slice-4 pre-check: +// PVE stores `description` with a trailing "\n"; a verbatim compare always mismatches.) +// Exported so the --selftest=task description round-trip uses the SAME helper the +// reconciler does — one source of truth for the quirk. +func NormDescription(s string) string { return strings.TrimRight(s, "\n") } diff --git a/internal/reconcile/normalize_test.go b/internal/reconcile/normalize_test.go new file mode 100644 index 0000000..cd3bff0 --- /dev/null +++ b/internal/reconcile/normalize_test.go @@ -0,0 +1,87 @@ +package reconcile + +import ( + "sort" + "strings" + "testing" +) + +func TestNormDescription_TrimsTrailingNewlines(t *testing.T) { + cases := map[string]string{ + "felhom-selftest 2026": "felhom-selftest 2026", + "felhom-selftest 2026\n": "felhom-selftest 2026", // PVE's single trailing \n + "x\n\n": "x", // defensive: multiple + "": "", + "\n": "", + "keep trailing spaces ": "keep trailing spaces ", // only newlines stripped + } + for in, want := range cases { + if got := NormDescription(in); got != want { + t.Errorf("NormDescription(%q) = %q, want %q", in, got, want) + } + } + // Idempotent. + if NormDescription(NormDescription("a\n")) != NormDescription("a\n") { + t.Error("NormDescription not idempotent") + } +} + +func TestFieldNormalizers_DescriptionRoundTrip(t *testing.T) { + n := DefaultNormalizers() + // A value the agent wrote ("...Z") and read back with PVE's newline ("...Z\n") + // must compare EQUAL — the whole point of the layer. + if !n.Equal("description", "felhom-op", "felhom-op\n") { + t.Error("description round-trip should normalize equal") + } + if n.Equal("description", "felhom-op", "different") { + t.Error("genuinely different descriptions must not be equal") + } +} + +func TestFieldNormalizers_UnknownFieldIsIdentity(t *testing.T) { + n := DefaultNormalizers() + if n.Norm("cores", "2\n") != "2\n" { + t.Error("a field with no normalizer must compare verbatim") + } + if n.Equal("cores", "2", "2\n") { + t.Error("unknown field must not normalize away differences") + } +} + +// TestFieldNormalizers_ExtensibilitySeam proves the structure accepts new quirks +// (boolean coercion, list ordering) the way it will as they're discovered — the task's +// "structure it so other normalizers slot in." These are synthetic, not production. +func TestFieldNormalizers_ExtensibilitySeam(t *testing.T) { + booleanCoerce := func(s string) string { + switch strings.ToLower(strings.TrimSpace(s)) { + case "1", "true", "on", "yes": + return "1" + default: + return "0" + } + } + sortCSV := func(s string) string { + parts := strings.Split(s, ",") + sort.Strings(parts) + return strings.Join(parts, ",") + } + n := FieldNormalizers{ + "description": NormDescription, + "onboot": booleanCoerce, + "tags": sortCSV, + } + + if !n.Equal("onboot", "true", "1") || !n.Equal("onboot", "on", "1") { + t.Error("boolean coercion normalizer should equate truthy forms") + } + if n.Equal("onboot", "true", "0") { + t.Error("boolean coercion must still distinguish true from false") + } + if !n.Equal("tags", "b,a,c", "a,b,c") { + t.Error("list-ordering normalizer should equate reordered lists") + } + // The built-in still works alongside the synthetic ones. + if !n.Equal("description", "d", "d\n") { + t.Error("description normalizer should coexist with added ones") + } +} diff --git a/internal/reconcile/plan.go b/internal/reconcile/plan.go new file mode 100644 index 0000000..51ece0c --- /dev/null +++ b/internal/reconcile/plan.go @@ -0,0 +1,129 @@ +package reconcile + +import ( + "fmt" + "sort" + "strconv" +) + +// ActionKind is the benign-on-existing-guest action set wired in slice 4. The +// destructive set (guest destroy, storage wipe, restore-overwrite, decommission) is +// classified and gated in Phase B but not represented here — nothing serves +// destructive deltas until slice 10. +type ActionKind string + +const ( + // ActionStart powers on a stopped guest (proxmox VM.PowerMgmt). + ActionStart ActionKind = "start" + // ActionStop powers off a running guest (proxmox VM.PowerMgmt). + ActionStop ActionKind = "stop" + // ActionSetConfig applies benign config changes (cores/memory/description) in one + // PUT (proxmox VM.Config.*). May return synchronously (empty UPID) — slice-4 proven. + ActionSetConfig ActionKind = "set_config" +) + +// Action is one minimal mutation the engine will dispatch onto the per-guest queue. +// In Phase A every Action is benign by construction (only the benign kinds exist). +// Phase B's classifier/gate sits in front of the executor and may tag an action +// destructive (requiring a signature) without changing this shape. +type Action struct { + VMID int + Kind ActionKind + Params map[string]string // non-nil only for ActionSetConfig + Reason string // human/debug: why this action was planned +} + +// bytesPerMiB converts the desired-spec MemoryBytes (hub.GuestSpec is in bytes) to +// the MiB unit Proxmox's LXC `memory` config field uses. +const bytesPerMiB = 1024 * 1024 + +// Plan computes the minimal benign action set converging actual → desired. It is a +// pure function (deterministic, side-effect-free) so it is exhaustively fixture-test +// -able. Actions are returned sorted by vmid, then config-before-runstate per guest. +// +// Scope rules (slice 4): +// - Only guests present in BOTH desired and actual are reconciled. A guest desired +// but absent from actual would be PROVISIONING (restore-to-new-guest, slice 7) — +// skipped here. A guest actual but not desired would be a DESTROY (destructive, +// gated, slice 10) — skipped here. +// - Unmanaged desired fields (RunUnspecified / nil Spec / nil Description) produce +// no action. +// - If actual spec is unknown (GuestConfig read failed), spec/description are not +// compared (run-state still is) — we never write a config we couldn't read first. +// - Comparisons are NORMALIZED (description trailing-newline, etc.) so a faithful +// round-trip is not mistaken for drift. +func Plan(desired DesiredState, actual ActualState, norm FieldNormalizers) []Action { + if norm == nil { + norm = DefaultNormalizers() + } + vmids := make([]int, 0, len(desired.Guests)) + for vmid := range desired.Guests { + vmids = append(vmids, vmid) + } + sort.Ints(vmids) + + var actions []Action + for _, vmid := range vmids { + d := desired.Guests[vmid] + a, ok := actual.Guests[vmid] + if !ok { + // Desired but not present: provisioning (slice 7), not a slice-4 action. + continue + } + + // Benign spec/description changes → a single SetConfig, only when we could + // read the current config (else we'd write blind). + if a.SpecKnown { + params := map[string]string{} + var reasons []string + if d.Spec != nil { + if d.Spec.Cores != a.Cores { + params["cores"] = strconv.Itoa(d.Spec.Cores) + reasons = append(reasons, fmt.Sprintf("cores %d->%d", a.Cores, d.Spec.Cores)) + } + if want := d.Spec.MemoryBytes / bytesPerMiB; want != a.MemoryMiB { + params["memory"] = strconv.FormatInt(want, 10) + reasons = append(reasons, fmt.Sprintf("memory %dMiB->%dMiB", a.MemoryMiB, want)) + } + // DiskBytes is intentionally NOT reconciled here (rootfs grow is + // `pct resize`, grow-only and separate — a later slice). + } + if d.Description != nil && !norm.Equal("description", *d.Description, a.Description) { + params["description"] = *d.Description + reasons = append(reasons, "description") + } + if len(params) > 0 { + actions = append(actions, Action{ + VMID: vmid, + Kind: ActionSetConfig, + Params: params, + Reason: "spec drift: " + join(reasons), + }) + } + } + + // Run-state (Start/Stop) — always comparable from the list status. + if d.Run != RunUnspecified && d.Run != a.Run { + switch d.Run { + case RunRunning: + actions = append(actions, Action{VMID: vmid, Kind: ActionStart, + Reason: fmt.Sprintf("run %q->running", a.Run)}) + case RunStopped: + actions = append(actions, Action{VMID: vmid, Kind: ActionStop, + Reason: fmt.Sprintf("run %q->stopped", a.Run)}) + } + } + } + return actions +} + +func join(parts []string) string { + out := "" + for i, p := range parts { + if i > 0 { + out += ", " + } + out += p + } + return out +} diff --git a/internal/reconcile/plan_test.go b/internal/reconcile/plan_test.go new file mode 100644 index 0000000..f2d8e14 --- /dev/null +++ b/internal/reconcile/plan_test.go @@ -0,0 +1,211 @@ +package reconcile + +import ( + "testing" + + "gitea.dooplex.hu/admin/felhom-agent/internal/hub" +) + +func sp(s string) *string { return &s } + +func mib(n int64) int64 { return n * bytesPerMiB } + +// desired/actual builders keep the table compact. +func desired(gs ...DesiredGuest) DesiredState { + m := map[int]DesiredGuest{} + for _, g := range gs { + m[g.VMID] = g + } + return DesiredState{Guests: m} +} + +func actual(gs ...ActualGuest) ActualState { + m := map[int]ActualGuest{} + for _, g := range gs { + m[g.VMID] = g + } + return ActualState{Guests: m} +} + +func TestPlan_RunStateStartAndStop(t *testing.T) { + // stopped -> running + got := Plan( + desired(DesiredGuest{VMID: 100, Run: RunRunning}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true}), + nil) + mustActions(t, got, Action{VMID: 100, Kind: ActionStart}) + + // running -> stopped + got = Plan( + desired(DesiredGuest{VMID: 100, Run: RunStopped}), + actual(ActualGuest{VMID: 100, Run: RunRunning, SpecKnown: true}), + nil) + mustActions(t, got, Action{VMID: 100, Kind: ActionStop}) + + // already matches -> nothing + got = Plan( + desired(DesiredGuest{VMID: 100, Run: RunRunning}), + actual(ActualGuest{VMID: 100, Run: RunRunning, SpecKnown: true}), + nil) + mustActions(t, got) +} + +func TestPlan_SpecDrift(t *testing.T) { + // cores change + got := Plan( + desired(DesiredGuest{VMID: 100, Spec: &hub.GuestSpec{Cores: 4, MemoryBytes: mib(2048)}}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true, Cores: 2, MemoryMiB: 2048}), + nil) + mustActions(t, got, Action{VMID: 100, Kind: ActionSetConfig, Params: map[string]string{"cores": "4"}}) + + // memory change (bytes desired -> MiB param) + got = Plan( + desired(DesiredGuest{VMID: 100, Spec: &hub.GuestSpec{Cores: 2, MemoryBytes: mib(4096)}}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true, Cores: 2, MemoryMiB: 2048}), + nil) + mustActions(t, got, Action{VMID: 100, Kind: ActionSetConfig, Params: map[string]string{"memory": "4096"}}) + + // no spec drift -> nothing + got = Plan( + desired(DesiredGuest{VMID: 100, Spec: &hub.GuestSpec{Cores: 2, MemoryBytes: mib(2048)}}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true, Cores: 2, MemoryMiB: 2048}), + nil) + mustActions(t, got) +} + +func TestPlan_DiskNotReconciled(t *testing.T) { + // DiskBytes differs but is intentionally not reconciled (pct resize, later slice). + got := Plan( + desired(DesiredGuest{VMID: 100, Spec: &hub.GuestSpec{Cores: 2, MemoryBytes: mib(2048), DiskBytes: 1 << 40}}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true, Cores: 2, MemoryMiB: 2048}), + nil) + mustActions(t, got) +} + +func TestPlan_DescriptionNormalizedNoFalseDrift(t *testing.T) { + // PVE returns the description with a trailing newline; desired has none. Must NOT + // be planned as drift. + got := Plan( + desired(DesiredGuest{VMID: 100, Description: sp("felhom-managed")}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true, Description: "felhom-managed\n"}), + nil) + mustActions(t, got) + + // A genuine description change IS planned. + got = Plan( + desired(DesiredGuest{VMID: 100, Description: sp("new-desc")}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true, Description: "old-desc\n"}), + nil) + mustActions(t, got, Action{VMID: 100, Kind: ActionSetConfig, Params: map[string]string{"description": "new-desc"}}) +} + +func TestPlan_UnmanagedFieldsProduceNothing(t *testing.T) { + // Run unspecified, Spec nil, Description nil -> the reconciler leaves it alone even + // though actual differs from "defaults". + got := Plan( + desired(DesiredGuest{VMID: 100}), + actual(ActualGuest{VMID: 100, Run: RunRunning, SpecKnown: true, Cores: 8, MemoryMiB: 9999, Description: "whatever"}), + nil) + mustActions(t, got) +} + +func TestPlan_SpecUnknownSkipsConfigButKeepsRunState(t *testing.T) { + // GuestConfig read failed (SpecKnown=false): never write a config we couldn't read, + // but run-state is still comparable from the list. + got := Plan( + desired(DesiredGuest{VMID: 100, Run: RunRunning, Spec: &hub.GuestSpec{Cores: 4, MemoryBytes: mib(4096)}, Description: sp("x")}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: false}), + nil) + mustActions(t, got, Action{VMID: 100, Kind: ActionStart}) +} + +func TestPlan_DesiredAbsentInActualSkipped(t *testing.T) { + // A guest desired but not present would be provisioning (slice 7) — not a slice-4 + // action. And a guest present but not desired would be a destroy (gated, slice 10). + got := Plan( + desired(DesiredGuest{VMID: 200, Run: RunRunning}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true}), + nil) + mustActions(t, got) +} + +func TestPlan_CombinedConfigBeforeRunState(t *testing.T) { + // cores + memory + description + run change: one SetConfig (all params) THEN the + // run-state action, both on the same vmid (the queue serializes them). + got := Plan( + desired(DesiredGuest{VMID: 100, Run: RunRunning, + Spec: &hub.GuestSpec{Cores: 4, MemoryBytes: mib(4096)}, Description: sp("new")}), + actual(ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true, Cores: 2, MemoryMiB: 2048, Description: "old\n"}), + nil) + if len(got) != 2 { + t.Fatalf("want 2 actions, got %d: %+v", len(got), got) + } + if got[0].Kind != ActionSetConfig { + t.Errorf("first action should be SetConfig, got %s", got[0].Kind) + } + for _, k := range []string{"cores", "memory", "description"} { + if _, ok := got[0].Params[k]; !ok { + t.Errorf("SetConfig params missing %q: %v", k, got[0].Params) + } + } + if got[1].Kind != ActionStart { + t.Errorf("second action should be Start, got %s", got[1].Kind) + } +} + +func TestPlan_EmptyDesiredNoActions(t *testing.T) { + // The slice-4 production case: empty desired -> zero actions regardless of actual. + got := Plan( + DesiredState{Guests: map[int]DesiredGuest{}}, + actual(ActualGuest{VMID: 100, Run: RunRunning, SpecKnown: true}), + nil) + mustActions(t, got) +} + +func TestPlan_DeterministicVMIDOrder(t *testing.T) { + got := Plan( + desired( + DesiredGuest{VMID: 300, Run: RunRunning}, + DesiredGuest{VMID: 100, Run: RunRunning}, + DesiredGuest{VMID: 200, Run: RunRunning}, + ), + actual( + ActualGuest{VMID: 100, Run: RunStopped, SpecKnown: true}, + ActualGuest{VMID: 200, Run: RunStopped, SpecKnown: true}, + ActualGuest{VMID: 300, Run: RunStopped, SpecKnown: true}, + ), + nil) + if len(got) != 3 || got[0].VMID != 100 || got[1].VMID != 200 || got[2].VMID != 300 { + t.Fatalf("actions not sorted by vmid: %+v", got) + } +} + +// mustActions asserts the planned actions equal want (vmid+kind+params), ignoring +// Reason (debug-only). +func mustActions(t *testing.T, got []Action, want ...Action) { + t.Helper() + if len(got) != len(want) { + t.Fatalf("got %d actions, want %d: %+v", len(got), len(want), got) + } + for i := range want { + if got[i].VMID != want[i].VMID || got[i].Kind != want[i].Kind { + t.Errorf("action[%d] = {vmid:%d kind:%s}, want {vmid:%d kind:%s}", + i, got[i].VMID, got[i].Kind, want[i].VMID, want[i].Kind) + } + if !sameParams(got[i].Params, want[i].Params) { + t.Errorf("action[%d] params = %v, want %v", i, got[i].Params, want[i].Params) + } + } +} + +func sameParams(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + for k, v := range b { + if a[k] != v { + return false + } + } + return true +} diff --git a/internal/reconcile/queue.go b/internal/reconcile/queue.go new file mode 100644 index 0000000..2cf6009 --- /dev/null +++ b/internal/reconcile/queue.go @@ -0,0 +1,137 @@ +package reconcile + +import ( + "errors" + "sync" +) + +// ErrQueueClosed is returned for a job submitted after Close. +var ErrQueueClosed = errors.New("reconcile: queue closed") + +// Queue is the per-guest serializer (doc 03 §10): the single choke point ALL +// mutation sources funnel through (reconcile now; one-shot jobs and the local API +// later). Jobs for the SAME vmid run strictly one-at-a-time in submit order; +// independent vmids proceed in parallel. This is what keeps Proxmox from ever seeing +// concurrent conflicting ops on one guest while letting a multi-guest host converge +// concurrently. +// +// Each vmid gets a "lane": a goroutine draining an unbounded FIFO of jobs. Submit +// never blocks the caller (the FIFO is unbounded) and returns a channel that delivers +// that job's error when it runs — so a caller can await a specific job while others +// proceed. +type Queue struct { + mu sync.Mutex + lanes map[int]*lane + closed bool +} + +// NewQueue builds an empty queue. Lanes are created lazily on first Submit per vmid +// and live for the queue's lifetime (a host has a bounded guest set); Close stops them. +func NewQueue() *Queue { + return &Queue{lanes: make(map[int]*lane)} +} + +// Submit enqueues fn on vmid's lane and returns a buffered channel that receives +// fn's return value (or ErrQueueClosed) exactly once. Submission preserves per-lane +// FIFO order; different lanes run concurrently. The result channel is buffered, so the +// lane never blocks delivering a result even if the caller abandons it. +func (q *Queue) Submit(vmid int, fn func() error) <-chan error { + res := make(chan error, 1) + + q.mu.Lock() + if q.closed { + q.mu.Unlock() + res <- ErrQueueClosed + return res + } + l := q.lanes[vmid] + if l == nil { + l = newLane() + q.lanes[vmid] = l + go l.run() + } + q.mu.Unlock() + + l.enqueue(&laneTask{fn: fn, res: res}) + return res +} + +// Close stops accepting new jobs and signals every lane to drain its pending jobs and +// exit. Already-queued jobs still run (graceful drain). Idempotent. +func (q *Queue) Close() { + q.mu.Lock() + if q.closed { + q.mu.Unlock() + return + } + q.closed = true + lanes := make([]*lane, 0, len(q.lanes)) + for _, l := range q.lanes { + lanes = append(lanes, l) + } + q.mu.Unlock() + + for _, l := range lanes { + l.close() + } +} + +type laneTask struct { + fn func() error + res chan error +} + +// lane is one vmid's serial worker: a goroutine draining an unbounded FIFO guarded by +// a cond var. cond-var (not a buffered channel) gives unbounded, non-blocking, +// order-preserving submission without an arbitrary capacity. +type lane struct { + mu sync.Mutex + cond *sync.Cond + pending []*laneTask + closed bool +} + +func newLane() *lane { + l := &lane{} + l.cond = sync.NewCond(&l.mu) + return l +} + +func (l *lane) enqueue(t *laneTask) { + l.mu.Lock() + if l.closed { + l.mu.Unlock() + t.res <- ErrQueueClosed + return + } + l.pending = append(l.pending, t) + l.cond.Signal() + l.mu.Unlock() +} + +func (l *lane) close() { + l.mu.Lock() + l.closed = true + l.cond.Broadcast() + l.mu.Unlock() +} + +// run drains the FIFO one job at a time. On close it finishes any already-queued jobs +// (so nothing submitted-before-close is silently dropped) and then exits. +func (l *lane) run() { + for { + l.mu.Lock() + for len(l.pending) == 0 && !l.closed { + l.cond.Wait() + } + if len(l.pending) == 0 { // closed and drained + l.mu.Unlock() + return + } + t := l.pending[0] + l.pending = l.pending[1:] + l.mu.Unlock() + + t.res <- t.fn() + } +} diff --git a/internal/reconcile/queue_test.go b/internal/reconcile/queue_test.go new file mode 100644 index 0000000..f5bc7d5 --- /dev/null +++ b/internal/reconcile/queue_test.go @@ -0,0 +1,137 @@ +package reconcile + +import ( + "errors" + "sync" + "testing" + "time" +) + +// TestQueue_SameGuestSerialized asserts that jobs for one vmid run strictly +// one-at-a-time in submit order — the core §10 guarantee that keeps Proxmox from +// seeing concurrent conflicting ops on a guest. +func TestQueue_SameGuestSerialized(t *testing.T) { + q := NewQueue() + defer q.Close() + + const n = 50 + var mu sync.Mutex + var order []int + inside := 0 + maxConcurrent := 0 + + chans := make([]<-chan error, n) + for i := 0; i < n; i++ { + i := i + chans[i] = q.Submit(100, func() error { + mu.Lock() + inside++ + if inside > maxConcurrent { + maxConcurrent = inside + } + order = append(order, i) + mu.Unlock() + + time.Sleep(time.Millisecond) // widen any overlap window + + mu.Lock() + inside-- + mu.Unlock() + return nil + }) + } + for _, ch := range chans { + if err := <-ch; err != nil { + t.Fatalf("unexpected job error: %v", err) + } + } + + if maxConcurrent != 1 { + t.Errorf("same-guest jobs overlapped: maxConcurrent=%d, want 1", maxConcurrent) + } + for i := 0; i < n; i++ { + if order[i] != i { + t.Fatalf("same-guest jobs ran out of submit order: got %v", order) + break + } + } +} + +// TestQueue_DifferentGuestsParallel asserts independent vmids proceed concurrently: +// two jobs on different lanes that each wait for the other before finishing must BOTH +// complete (they'd deadlock under a global lock / single worker). +func TestQueue_DifferentGuestsParallel(t *testing.T) { + q := NewQueue() + defer q.Close() + + aReady := make(chan struct{}) + bReady := make(chan struct{}) + + chA := q.Submit(1, func() error { + close(aReady) + select { + case <-bReady: + return nil + case <-time.After(2 * time.Second): + return errors.New("guest 1 timed out waiting for guest 2 (not parallel)") + } + }) + chB := q.Submit(2, func() error { + close(bReady) + select { + case <-aReady: + return nil + case <-time.After(2 * time.Second): + return errors.New("guest 2 timed out waiting for guest 1 (not parallel)") + } + }) + + if err := <-chA; err != nil { + t.Error(err) + } + if err := <-chB; err != nil { + t.Error(err) + } +} + +// TestQueue_PropagatesJobError confirms a job's error reaches its result channel. +func TestQueue_PropagatesJobError(t *testing.T) { + q := NewQueue() + defer q.Close() + want := errors.New("boom") + if got := <-q.Submit(7, func() error { return want }); got != want { + t.Errorf("Submit result = %v, want %v", got, want) + } +} + +// TestQueue_DrainsPendingOnClose confirms jobs already queued before Close still run. +func TestQueue_DrainsPendingOnClose(t *testing.T) { + q := NewQueue() + release := make(chan struct{}) + var ran sync.WaitGroup + ran.Add(2) + + // First job blocks until released, pinning the lane so the second sits pending. + ch1 := q.Submit(5, func() error { <-release; ran.Done(); return nil }) + ch2 := q.Submit(5, func() error { ran.Done(); return nil }) + + q.Close() // close while job1 is queued/running and job2 is pending + close(release) + + if err := <-ch1; err != nil { + t.Errorf("job1 err: %v", err) + } + if err := <-ch2; err != nil { + t.Errorf("pending job2 should still run after Close, got: %v", err) + } + ran.Wait() +} + +// TestQueue_SubmitAfterClose returns ErrQueueClosed. +func TestQueue_SubmitAfterClose(t *testing.T) { + q := NewQueue() + q.Close() + if got := <-q.Submit(1, func() error { return nil }); got != ErrQueueClosed { + t.Errorf("Submit after Close = %v, want ErrQueueClosed", got) + } +} diff --git a/internal/reconcile/state.go b/internal/reconcile/state.go new file mode 100644 index 0000000..ad7908e --- /dev/null +++ b/internal/reconcile/state.go @@ -0,0 +1,130 @@ +package reconcile + +import ( + "context" + "encoding/json" + + "gitea.dooplex.hu/admin/felhom-agent/internal/hub" + "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" +) + +// RunState is a guest's desired/actual power state. The empty value means +// "unmanaged" on the desired side (the reconciler leaves run-state alone). +type RunState string + +const ( + // RunUnspecified (the zero value) — on a DesiredGuest it means run-state is not + // managed; the reconciler never starts/stops the guest for run-state reasons. + RunUnspecified RunState = "" + // RunRunning maps to proxmox status "running". + RunRunning RunState = "running" + // RunStopped maps to proxmox status "stopped". + RunStopped RunState = "stopped" +) + +// normRun maps a raw proxmox status string to a RunState, collapsing anything +// unrecognized (e.g. "") to RunUnspecified so actual-state comparison is well-defined. +func normRun(status string) RunState { + switch status { + case "running": + return RunRunning + case "stopped": + return RunStopped + default: + return RunUnspecified + } +} + +// DesiredGuest is the target state for one existing guest. Every field is +// individually optional ("unmanaged") so a desired-state source can pin only what it +// cares about — slice 4's planner only acts on the fields that are set. +type DesiredGuest struct { + VMID int + // Run is the target power state; RunUnspecified leaves it alone. + Run RunState + // Spec, when non-nil, manages sizing. Reuses hub.GuestSpec (cores/memory/disk). + // Phase A reconciles Cores and Memory via SetConfig; DiskBytes is reported but + // NOT reconciled here (a rootfs grow is `pct resize`, grow-only and separate — + // deferred to a later slice). Nil = sizing unmanaged. + Spec *hub.GuestSpec + // Description, when non-nil, manages the cosmetic `description` field (the first + // proven SetConfig round-trip, slice-4 pre-check). Nil = unmanaged. + Description *string +} + +// DesiredState is the vmid-keyed target for this host. At slice 4 the only live +// source is the empty provider, so Guests is empty in production; fixtures inject it +// in tests. Host-level desired state (storage manifest, etc.) arrives in later slices. +type DesiredState struct { + Guests map[int]DesiredGuest +} + +// ActualGuest is one guest's observed state, read from Proxmox. +type ActualGuest struct { + VMID int + Run RunState + // SpecKnown is false when GuestConfig could not be read (the run-state from the + // list is still trusted; spec/description comparisons are skipped). Mirrors the + // collector's "keep run-status, omit spec" degradation. + SpecKnown bool + Cores int + MemoryMiB int64 // proxmox LXC `memory` is MiB + Description string // raw (may carry PVE's trailing newline; compared via normalizers) +} + +// ActualState is the vmid-keyed observed state for this host. +type ActualState struct { + Guests map[int]ActualGuest +} + +// DesiredProvider is the seam the desired-state source plugs into. At slice 4 the +// only implementation is EmptyProvider (no live source); slice 10's hub-serving path +// is the real implementation. Do NOT invent a hub/local-file source here. +type DesiredProvider interface { + Desired(ctx context.Context) (DesiredState, error) +} + +// EmptyProvider is the slice-4 production provider: no desired state, so reconcile is +// a live no-op (the engine computes an empty action set). +type EmptyProvider struct{} + +// Desired returns an empty desired state. +func (EmptyProvider) Desired(context.Context) (DesiredState, error) { + return DesiredState{Guests: map[int]DesiredGuest{}}, nil +} + +// StaticProvider serves a fixed DesiredState — used by fixtures (and usable as a +// local override later). It never mutates the value it was given. +type StaticProvider struct{ State DesiredState } + +// Desired returns the static state. +func (p StaticProvider) Desired(context.Context) (DesiredState, error) { return p.State, nil } + +// GuestAPI is the narrow Proxmox surface the engine needs: read actual state and +// dispatch the benign-on-existing-guest mutations. *proxmox.Client satisfies it; a +// fake satisfies it in tests. Every mutating call returns a UPID (or "" for the +// synchronous path) per the proxmox/mutate.go contract — the engine WaitTasks a +// non-empty UPID and treats "" as a clean synchronous success. +type GuestAPI interface { + ListLXC(ctx context.Context) ([]proxmox.Guest, error) + GuestConfig(ctx context.Context, vmid int) (proxmox.GuestConfig, error) + Start(ctx context.Context, vmid int) (string, error) + Stop(ctx context.Context, vmid int) (string, error) + SetConfig(ctx context.Context, vmid int, params map[string]string) (string, error) + WaitTask(ctx context.Context, upid string, opts proxmox.WaitOptions) (proxmox.TaskStatus, error) +} + +// guestDescription decodes the (string-valued) `description` key from a GuestConfig's +// raw Extra map, returning "" when absent. The value is returned raw — PVE appends a +// trailing newline on read, which the normalization layer strips at comparison time. +func guestDescription(cfg proxmox.GuestConfig) string { + raw, ok := cfg.Extra["description"] + if !ok || len(raw) == 0 { + return "" + } + var s string + if err := json.Unmarshal(raw, &s); err != nil { + return "" + } + return s +}