Files
felhom-agent/internal/reconcile/journal.go
T
admin 05c450147c v0.4.0-rc1: slice 4 Phase A — reconcile engine (structural, runs live unfed)
New internal/reconcile package: the agent-side control core's structural half.

- Per-guest serializer Queue (doc 03 §10): the single choke point all mutation
  sources funnel through; same-vmid serial in submit order, different vmids
  parallel (cond-var FIFO lanes).
- Desired-state model + DesiredProvider seam; EmptyProvider is the only live
  source at slice 4 (no hub serving until slice 10) so the live engine computes
  an empty action set and performs zero mutations.
- Normalization layer (FieldNormalizers): normalized desired-vs-actual so
  Proxmox round-trip quirks don't read as drift. normDesc promoted out of
  main.go to reconcile.NormDescription; selftest uses the shared helper.
- Plan (pure diff): minimal benign action set (Start/Stop/SetConfig) for guests
  in both desired and actual; provision/destroy out of scope here.
- Engine: dispatches onto the shared queue; honors the dual-mode SetConfig
  contract (UPID -> WaitTask; empty UPID -> synchronous success).
- Durable op journal + idempotency store (mirrors authz.FileNonceStore):
  in-flight task ids for crash detection + AlreadyApplied dedupe across restart.
- Wired into runDaemon alongside the hub loop, sharing the queue; runs cleanly
  with no desired state and no signers.

Full module race-clean and vet-clean on the Linux build server.

CHECKPOINT: Phase A only. Awaiting validation before Phase B (the reversibility
gate + signed-op consuming layer, landing v0.4.0).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 23:21:55 +02:00

190 lines
5.5 KiB
Go

package reconcile
import (
"bytes"
"encoding/json"
"errors"
"io/fs"
"os"
"path/filepath"
"sync"
"time"
)
// OpState is the lifecycle state of a journaled operation.
type OpState string
const (
// OpStarted: the op was planned and dispatch began (no Proxmox task yet).
OpStarted OpState = "started"
// OpTaskRunning: the Proxmox POST returned a UPID we are/were awaiting. Recorded
// so a crash mid-op is detected on restart and the task status re-checked.
OpTaskRunning OpState = "task_running"
// OpSucceeded: the op completed (task exitstatus OK, or a clean synchronous apply).
OpSucceeded OpState = "succeeded"
// OpFailed: the op errored (POST error, task non-OK, or WaitTask error).
OpFailed OpState = "failed"
)
// terminal reports whether a state is final (no further records expected).
func (s OpState) terminal() bool { return s == OpSucceeded || s == OpFailed }
// JournalEntry is one durable record. Multiple records share an OpID across an op's
// lifecycle (started → task_running → succeeded/failed); the latest wins in the index.
//
// IdempKey is the one-shot idempotency key: set on ops that must run AT MOST ONCE
// across retries/restarts (signed jobs, slice B). Reconcile actions leave it empty —
// reconcile is convergent and SHOULD re-run on real drift, so it is never suppressed
// by the idempotency set. A non-empty IdempKey that reaches OpSucceeded marks the key
// applied (AlreadyApplied true forever after, surviving restarts).
type JournalEntry struct {
OpID string `json:"op_id"`
VMID int `json:"vmid"`
Kind string `json:"kind"`
Params map[string]string `json:"params,omitempty"`
UPID string `json:"upid,omitempty"`
State OpState `json:"state"`
IdempKey string `json:"idemp_key,omitempty"`
At time.Time `json:"at"`
}
// Journal is the durable operation log + idempotency store. It mirrors
// authz.FileNonceStore: an fsync'd append-only JSONL with an in-memory index. A
// record is on disk AND fsync'd before Append returns, so a crash never loses a
// committed lifecycle transition.
//
// Phase-A scope: it records single-task ops (Start/Stop/SetConfig) for crash
// detection and provides the idempotency-key dedupe. Full multi-step compensating
// rollback (provision/restore, slices 6/7) reuses this structure with richer replay.
type Journal struct {
mu sync.Mutex
path string
f *os.File
latest map[string]JournalEntry // op_id -> latest record
applied map[string]struct{} // idemp keys that reached OpSucceeded
}
// OpenJournal opens (or creates) the journal at path, replaying any existing log into
// the index. The parent dir must exist (the daemon ensures it, sibling to the nonce
// store).
func OpenJournal(path string) (*Journal, error) {
j := &Journal{
path: path,
latest: make(map[string]JournalEntry),
applied: make(map[string]struct{}),
}
if err := j.load(); err != nil {
return nil, err
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o600)
if err != nil {
return nil, err
}
j.f = f
syncJournalDir(filepath.Dir(path))
return j, nil
}
func (j *Journal) load() error {
b, err := os.ReadFile(j.path)
if errors.Is(err, fs.ErrNotExist) {
return nil
}
if err != nil {
return err
}
for _, line := range bytes.Split(b, []byte("\n")) {
line = bytes.TrimSpace(line)
if len(line) == 0 {
continue
}
var e JournalEntry
if json.Unmarshal(line, &e) != nil {
continue // skip a torn trailing line from a crash mid-append
}
j.index(e)
}
return nil
}
// index folds one entry into the in-memory state (latest-by-op + applied set).
func (j *Journal) index(e JournalEntry) {
j.latest[e.OpID] = e
if e.State == OpSucceeded && e.IdempKey != "" {
j.applied[e.IdempKey] = struct{}{}
}
}
// Append durably writes one lifecycle record (fsync before returning) and updates the
// index. Callers build entries via the engine's lifecycle (Begin/RecordTask/Complete
// helpers below).
func (j *Journal) Append(e JournalEntry) error {
j.mu.Lock()
defer j.mu.Unlock()
rec, err := json.Marshal(e)
if err != nil {
return err
}
rec = append(rec, '\n')
if _, err := j.f.Write(rec); err != nil {
return err
}
if err := j.f.Sync(); err != nil {
return err
}
j.index(e)
return nil
}
// Latest returns the most recent record for an op id.
func (j *Journal) Latest(opID string) (JournalEntry, bool) {
j.mu.Lock()
defer j.mu.Unlock()
e, ok := j.latest[opID]
return e, ok
}
// InFlight returns ops whose latest state is non-terminal — i.e. started or
// task_running with no succeeded/failed record. On restart the daemon re-checks each
// (resume-or-rollback). Order is unspecified.
func (j *Journal) InFlight() []JournalEntry {
j.mu.Lock()
defer j.mu.Unlock()
var out []JournalEntry
for _, e := range j.latest {
if !e.State.terminal() {
out = append(out, e)
}
}
return out
}
// AlreadyApplied reports whether a one-shot op with this idempotency key has already
// succeeded (survives restarts via the replayed log). Empty key is never "applied".
func (j *Journal) AlreadyApplied(idempKey string) bool {
if idempKey == "" {
return false
}
j.mu.Lock()
defer j.mu.Unlock()
_, ok := j.applied[idempKey]
return ok
}
// Close releases the file handle.
func (j *Journal) Close() error {
j.mu.Lock()
defer j.mu.Unlock()
if j.f != nil {
return j.f.Close()
}
return nil
}
func syncJournalDir(dir string) {
if d, err := os.Open(dir); err == nil {
_ = d.Sync()
_ = d.Close()
}
}