From e54f882e70e99871a5d76f3e57414e37cbb0d63a Mon Sep 17 00:00:00 2001 From: kisfenyo Date: Wed, 10 Jun 2026 19:03:14 +0200 Subject: [PATCH] slice 10A: hub desired-state serving + signed-jobs queue (Down channel) (hub v0.9.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Serve operator intent to authenticated hosts: PUT /admin/hosts/{id}/desired-state (global key) bumps desired_generation; GET /hosts/{id}/desired-state + /jobs are per-host self-scoped; the host-report envelope now carries the real generation + has_signed_ops. New signed_jobs table + store methods. Desired-state stored/served opaquely (agent owns the schema). Cross-repo golden (envelope + desired-state) byte-identical with felhom-agent; doc 03 §4/§9 updated. Co-Authored-By: Claude Opus 4.8 (1M context) --- REPORT.md | 62 ++-- documentation/architecture/03-host-agent.md | 24 +- hub/CHANGELOG.md | 39 +++ hub/internal/api/desired_test.go | 264 ++++++++++++++++++ hub/internal/api/handler.go | 202 +++++++++++++- .../api/testdata/control-envelope.golden.json | 7 + .../api/testdata/desired-state.golden.json | 23 ++ hub/internal/store/store.go | 78 ++++++ 8 files changed, 669 insertions(+), 30 deletions(-) create mode 100644 hub/internal/api/desired_test.go create mode 100644 hub/internal/api/testdata/control-envelope.golden.json create mode 100644 hub/internal/api/testdata/desired-state.golden.json diff --git a/REPORT.md b/REPORT.md index 2f5f8ec..0efdab8 100644 --- a/REPORT.md +++ b/REPORT.md @@ -4,41 +4,51 @@ --- -# REPORT — Slice 9 (hub + docs): host metrics to the controller — `cpu_temp_c` wire field + docs (2026-06-10) +# REPORT — Slice 10A (hub half): desired-state serving — the "Down" channel (hub v0.9.0) (2026-06-10) ## Type -Cross-repo wire-contract + documentation update for **slice 9** (implementation: `felhom-agent` -v0.14.0 + `felhom-controller` v0.39.0). **No hub code change, no hub version bump.** +TASK (CC-implemented). The hub half of slice 10A. Pairs with `felhom-agent` v0.15.0. ## What changed (hub) -- **Cross-repo host-report golden** (`hub/internal/api/testdata/host-report.golden.json`) gained - **`host.cpu_temp_c: 47`**, kept **byte-identical** with - `felhom-agent/internal/hub/testdata/host-report.golden.json` (the duplicated-contract discipline; - manual diff confirmed identical). No code change: the full `report_json` already persists the field - verbatim, and the hub's host parse-struct ignores the extra key — the golden-contract test - (`host_test.go`) still passes. CPU temp on the operator dashboard is an optional later freebie. -- `hub/CHANGELOG.md` records the contract update (no version bump). +The hub now **serves operator intent** down to already-authenticated hosts; the control envelope stops +returning placeholders and carries the host's real generation + signed-jobs flag. -## What changed (doc 03 — host-agent) +### Store (`internal/store`) +- New `signed_jobs` table (per-host **opaque** signed-op blob queue). New methods: `SetHostDesired` + (set desired-state + **atomically bump `desired_generation`**), `EnqueueSignedJob` / `GetSignedJobs` + / `CountSignedJobs`. The `hosts` table's previously-inert `desired_json` / `desired_generation` + columns are now live. -- **§6** — added **`GET /host/metrics`** to the local-API surface: host-wide health - (cpu%/mem/load/uptime/`cpu_temp_c`) + per-storage capacity for the customer's monitoring view. - Reuses the slice-4 collector (no duplicate collection); **host-wide, token-authed, fresh** (not the - 15-min hub snapshot); noted the **one-customer-per-host** assumption. -- **§9 slice table** — **defined + marked slice 9** (the roadmap previously jumped 8→10; this fills - it), incl. the assumption + out-of-scope items (multi-tenant filtering, time-series history). Added - a slice-9 entry to the doc changelog. +### API (`internal/api`) +- **`PUT /api/v1/admin/hosts/{id}/desired-state`** (global key) — set + bump generation; body stored + + served **opaquely** (validated only as well-formed JSON — the agent owns the schema). +- **`GET /api/v1/hosts/{id}/desired-state`** (per-host key, **self-scoped**) — `{generation, + desired_state}`; host A's key cannot read host B (403); global key may read any. +- **`GET /api/v1/hosts/{id}/jobs`** (per-host key, self-scoped) — serves the host's pending opaque + signed-op blobs, oldest first (verify+execute is 10B). +- **`POST /api/v1/admin/hosts/{id}/jobs`** (global key) — enqueue a pre-signed opaque blob (the hub + holds no signing key). +- The host-report **control envelope** now reports the real `desired_generation` + `has_signed_ops`, + degrading safely to defaults on a store error. -## Why (the slice 9 thesis) +## Tests (green) +- admin-set bumps the generation + serves the latest body; global-key-only (per-host 403, malformed + 400, unknown host 404); `GET /desired-state` self-scoped (A→B 403, global any, no-token 401); + envelope carries generation + `has_signed_ops` flips on enqueue; `GET /jobs` self-scoped oldest-first; + cross-repo golden round-trip (set → fetched back unchanged), **byte-identical** with felhom-agent. -The de-privileged controller (slice 8C) sees only its own cgroup — it can't read the host. Slice 9 -re-serves the agent's existing host + storage observation to the customer, plus the one new collector -(CPU/chassis temp, graceful-null). On-ethos for a data-sovereignty product: the customer sees their -own box's health. +## Docs +- Doc 03 §4 (control loop live: heartbeat → envelope generation/jobs → fetch-on-change → reconcile + benign / gate destructive) + §9 slice table (**10A done**; 10B signed-op execution / 10C escrow + consumption / 10D DR capstone pending; the `restore_directive` field exists now, consumed in 10D). -## Deferred / not built +## Deferred / out of scope +- Signed-op **execution** + signature verification → **10B** (10A only serves the queue + flag). +- **Restore-mode / re-enroll** consumption (a new box's first directive) → **10D**; 10A serves + already-authenticated hosts only. Rich desired-state editing UX → doc-05 (10A's admin-set is minimal). -Multi-tenant host-metric filtering (one-customer-per-host assumed); historical/time-series metric -storage (this is a live snapshot view). No secrets committed. +## Pending +- Build + deploy hub v0.9.0 (+ agent v0.15.0) and live-validate against the demo host (admin-set + benign+destructive → generation bump → agent fetch → reconcile/gate; self-scope refusal). diff --git a/documentation/architecture/03-host-agent.md b/documentation/architecture/03-host-agent.md index a4cac23..c0c5362 100644 --- a/documentation/architecture/03-host-agent.md +++ b/documentation/architecture/03-host-agent.md @@ -420,8 +420,10 @@ this path — bring up + reattach external storage and it is whole. This is full | **Quiesced app-consistent backup** (`/backup/due`-driven stack-stop) | **8B** | **implemented** (agent v0.11.0 `/backup/due` cadence + `/backup/status` phases; controller v0.36.0 `internal/quiesce` — stop stacks → backup → restart, with crash-safety marker/guaranteed-unquiesce/max-bound/crash-recovery). Validated live incl. the postgres clean-vs-crash-recovery restore contrast. **8B.2 downtime optimization (resume at `snapshotted`) implemented** (agent v0.13.0 + controller v0.38.0 — §8). | | **Controller de-privileging** (retire the disk-execution subsystem; new customer disk endpoints behind the slice-4 data-bearing classifier) | **8C** | **implemented — slice 8 CLOSED** (agent v0.12.0: `/disks` endpoints + the data-bearing classifier gate + `mkfs`; controller v0.37.0: ~12.3k LOC of disk-execution retired — storage/restic/cross-drive/migrate/watchdog/scanner/infra-backup — `backup.Manager` split to app-data only, disk mgmt rewired to the agent, container de-privileged). The data-bearing format refusal (§6) is the security centerpiece. | | **Host metrics to the controller** (`GET /host/metrics` — the customer host-health view) | **9** | **implemented** (agent v0.14.0: `GET /host/metrics` reuses the slice-4 collector + a new CPU/chassis-temp collector `internal/hub/cputemp.go`, graceful-null; the shared `HostMetrics` gains `cpu_temp_c` so the hub report carries it too — cross-repo golden updated; controller v0.39.0: agentapi `HostMetrics()` + a thin `/api/host-metrics` proxy + the monitoring page's host-health card). **Host-wide, token-authed, fresh** (not the 15-min hub snapshot). **Assumption: one customer per host** (the home-server model) — host-wide CPU/mem would leak cross-customer load on a multi-customer host; revisit then. Out of scope: multi-tenant metric filtering; historical/time-series storage (this is a live snapshot). | -| **Host/hardware loss** DR — re-enroll in "restore mode"; hub serves identity / PBS namespace / tunnel token / storage manifest / restore directive | **10** | deferred — needs hub desired-state serving; hub store today holds only `{host_id, customer_id, api_key}` (slice 3) | -| PBS escrow **consumption** (recover `K` on a new box) | **10** | deferred — exercised by host-loss DR | +| **Hub desired-state serving** (the "Down" channel) — store + serve per-host desired-state, bump `desired_generation`, signed-jobs queue + `has_signed_ops`; agent activates the envelope + a hub-backed provider (benign reconciled, destructive gated pending) | **10A** | **implemented** (hub v0.9.0: `PUT /admin/hosts/{id}/desired-state` bumps the generation, `GET /hosts/{id}/desired-state` + `/jobs` self-scoped, `signed_jobs` queue; agent v0.15.0: `ControlEnvelope` fields live, `Client.FetchDesiredState`, `internal/desired` Syncer + `reconcile.CachingProvider` feeding the engine — an explicit guest `decommission` is the destructive delta, gated `pending_signature`). Serves to already-authenticated hosts only; desired-state stored opaquely (agent owns the schema). Cross-repo golden (envelope + desired-state) byte-identical. | +| **Signed-op execution** (verify + run the gated destructive op) | **10B** | deferred — 10A lays the queue/flag/serving + the gate marks pending; 10B verifies the signature (role-scoped, action-bound, idempotent — `internal/authz`/`internal/reconcile` gate already built) and runs the executor (e.g. the decommission). | +| **PBS escrow consumption** (recover `K` on a new box) | **10C** | **spike validated** (2026-06-10, `documentation/tests/slice10-escrow-consumption-spike-findings.md` — recover-from-`(blob,R)` on a key-less box + real-data restore proven, GO). Productionizing the consumption path is 10C; exercised by host-loss DR (10D). | +| **Host/hardware loss** DR — re-enroll in "restore mode"; hub serves identity / PBS namespace / tunnel token / storage manifest / restore directive (the `restore_directive` field exists in 10A's desired-state, consumed here) | **10D** | deferred — the DR capstone; consumes 10A serving + 10C escrow consumption + re-enrollment authorization | | Golden base refresh cadence + fleet versioning | post-launch | operational, non-blocking (§13) | **Host/hardware loss (design intent — slice 10).** Re-enroll the new host in **restore mode**; @@ -499,6 +501,24 @@ This doc hands the implementation three contracts it was waiting on: ## Changelog — design-review + Phase-3 fold-in (2026-06-08) +### Slice-10A implemented — hub desired-state serving (the "Down" channel) (2026-06-10) +- §4: the **control loop is live**. The report IS the heartbeat; its response — the **control + envelope** — is the Down channel. The envelope is a cheap change-notification: `desired_generation` + (version) + `has_signed_ops` (flag) + `poll_interval_seconds`. The agent **caches** the desired-state + + its generation and re-fetches the heavy state (`GET /hosts/{id}/desired-state`, self-scoped) **only + when the generation advances**. The engine reconciles **benign** deltas; an explicit **destructive** + delta (a guest `decommission`) is classified Destructive → the gate refuses it **`pending_signature`** + (no signer in 10A → never executed). **Signed-job execution is 10B**; the `restore_directive` field + is carried in desired-state now but **consumed in 10D**. +- §9 slice table: **10A done** (hub serves desired-state + bumps generation + signed-jobs queue/flag; + agent activates the envelope + a hub-backed `CachingProvider` feeding the engine). 10B/10C/10D pending. +- Wire: the envelope's now-active fields + the `desired-state` response are a cross-repo contract — + `control-envelope.golden.json` + `desired-state.golden.json`, **byte-identical** agent↔hub. Status: + implemented (hub v0.9.0; agent v0.15.0). **Out of 10A (deliberate):** the hub stores/serves + desired-state **opaquely** (the agent owns the schema); signed-op **execution** + verification is 10B; + **restore-mode/re-enroll** consumption (a new box's first directive) is 10D — 10A serves only + already-authenticated hosts. + ### Slice-9 implemented — host metrics to the controller (customer host-health view) (2026-06-10) - §6: added **`GET /host/metrics`** — host-wide health (cpu%/mem/load/uptime/**`cpu_temp_c`**) + per-storage capacity for the customer's monitoring view. **Reuses the slice-4 collector** (no diff --git a/hub/CHANGELOG.md b/hub/CHANGELOG.md index 925549d..49fb388 100644 --- a/hub/CHANGELOG.md +++ b/hub/CHANGELOG.md @@ -1,5 +1,44 @@ # Felhom Hub — Changelog +## v0.9.0 — slice 10A: desired-state serving + signed-jobs queue (the "Down" channel) (2026-06-10) + +The hub half of slice 10A: the hub now **serves operator intent** down to already-authenticated +hosts. The control envelope (the host-report response) stops returning placeholder +`desired_generation:0 / has_signed_ops:false` and carries the host's **real** generation + a +signed-jobs flag — the cheap change-notification the agent (v0.15.0) acts on. The heavy +desired-state moves only on a dedicated, self-scoped fetch. + +### Added +- **`PUT /api/v1/admin/hosts/{host_id}/desired-state`** (global/operator key only) — sets a host's + desired-state and **atomically bumps `desired_generation`**. The body is JSON the hub stores + + serves **opaquely** (it validates only that it is well-formed JSON; the agent/CLI owns the + schema). Unknown host → 404; malformed JSON → 400. Minimal admin path; rich editing UX is later. +- **`GET /api/v1/hosts/{host_id}/desired-state`** (per-host key, **self-scoped** — a host reads + only its own; the global key may read any) — returns `{generation, desired_state}`. The agent + fetches it when the envelope's generation advances past its cache. +- **`GET /api/v1/hosts/{host_id}/jobs`** (per-host key, self-scoped) — serves the host's pending + **opaque** signed-op blobs (oldest first). The hub never forged, opened, or executes them + (verify + run is slice 10B; this only serves the queue). +- **`POST /api/v1/admin/hosts/{host_id}/jobs`** (global key only) — enqueues a pre-signed opaque + job blob. The minimal operator path to seed the queue; the hub holds no signing key. +- **Store**: a new `signed_jobs` table (per-host opaque blob queue); `SetHostDesired` (set + bump + generation, atomic), `EnqueueSignedJob` / `GetSignedJobs` / `CountSignedJobs`. The `hosts` table's + previously-inert `desired_json` / `desired_generation` columns are now live. + +### Changed +- The host-report **control envelope** now reports the host's actual `desired_generation` and + `has_signed_ops` (queue non-empty), both degrading safely to their old defaults on a store error + (a heartbeat never fails on the control channel). `poll_interval_seconds` / `blocked` unchanged. + +### Tests +- admin-set bumps the generation each write + the served state reflects the latest body; admin-set + is global-key-only (per-host → 403, malformed → 400, unknown host → 404). +- `GET /desired-state` is **self-scoped** (host A's key → host B → 403; global → any; no token → 401). +- the envelope carries the current generation + `has_signed_ops` flips on enqueue; `GET /jobs` is + self-scoped + serves the blobs oldest-first; admin enqueue is global-key-only. +- cross-repo golden round-trip: `testdata/desired-state.golden.json` set → fetched back unchanged + (the opaque pass-through), **byte-identical** with felhom-agent's copy. + ## (no version bump) — slice 9 cross-repo wire-contract: `host.cpu_temp_c` (2026-06-10) Slice 9 adds a nullable **`cpu_temp_c`** field to the shared `HostMetrics` wire struct (the agent's diff --git a/hub/internal/api/desired_test.go b/hub/internal/api/desired_test.go new file mode 100644 index 0000000..0987c76 --- /dev/null +++ b/hub/internal/api/desired_test.go @@ -0,0 +1,264 @@ +package api + +import ( + "encoding/base64" + "encoding/json" + "net/http" + "os" + "testing" + + "gitea.dooplex.hu/admin/felhom-hub/internal/store" +) + +// The desired-state wire is a contract DUPLICATED with felhom-agent. testdata/desired-state.golden.json +// MUST stay byte-identical with the agent's internal/hub/testdata copy. The hub serves desired_state +// OPAQUELY (stores + emits the bytes), so this test proves the golden's desired_state round-trips +// through admin-set → GET unchanged (the pass-through contract). +func TestDesiredStateGolden_RoundTripsThroughHub(t *testing.T) { + h, st, _ := newTestHandler(t) + seedHost(t, st, "h1", "c1", "HKEY1") + + raw, err := os.ReadFile("testdata/desired-state.golden.json") + if err != nil { + t.Fatal(err) + } + var golden struct { + DesiredState json.RawMessage `json:"desired_state"` + } + if err := json.Unmarshal(raw, &golden); err != nil { + t.Fatal(err) + } + + // Operator sets the golden's desired_state. + if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, string(golden.DesiredState)); rr.Code != http.StatusOK { + t.Fatalf("admin-set golden desired_state: %d body=%s", rr.Code, rr.Body.String()) + } + // The host fetches it back — byte-for-byte the same object (the hub never reshapes it). + rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", "") + if rr.Code != 200 { + t.Fatalf("GET desired-state: %d", rr.Code) + } + var got struct { + Generation int64 `json:"generation"` + DesiredState json.RawMessage `json:"desired_state"` + } + json.Unmarshal(rr.Body.Bytes(), &got) + if got.Generation != 1 { + t.Errorf("generation = %d, want 1", got.Generation) + } + // Compare semantically (re-marshal both through a generic map to ignore whitespace). + var want, have any + json.Unmarshal(golden.DesiredState, &want) + json.Unmarshal(got.DesiredState, &have) + wb, _ := json.Marshal(want) + hb, _ := json.Marshal(have) + if string(wb) != string(hb) { + t.Errorf("served desired_state diverged from the set golden:\n set: %s\n served: %s", wb, hb) + } +} + +// seedHost mints a host directly in the store for desired-state tests. +func seedHost(t *testing.T, st *store.Store, hostID, custID, apiKey string) { + t.Helper() + if err := st.UpsertHost(&store.Host{HostID: hostID, CustomerID: custID, APIKey: apiKey}); err != nil { + t.Fatalf("UpsertHost(%s): %v", hostID, err) + } +} + +// Admin-set bumps the generation each write and the served desired-state reflects the latest body. +func TestAdminSetDesiredState_BumpsGenerationAndServes(t *testing.T) { + h, st, _ := newTestHandler(t) + seedHost(t, st, "h1", "c1", "HKEY1") + + // Initial generation is 0 (nothing set yet). + if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", ""); rr.Code != 200 { + t.Fatalf("initial GET desired-state: %d", rr.Code) + } + + // First admin-set → generation 1. + rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `{"guests":[{"vmid":100,"run":"running"}]}`) + if rr.Code != http.StatusOK { + t.Fatalf("admin-set #1: %d body=%s", rr.Code, rr.Body.String()) + } + var set struct { + Generation int64 `json:"generation"` + } + json.Unmarshal(rr.Body.Bytes(), &set) + if set.Generation != 1 { + t.Fatalf("generation after set #1 = %d, want 1", set.Generation) + } + + // Second admin-set → generation 2 (monotonic). + rr = do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `{"guests":[{"vmid":100,"run":"stopped"}]}`) + json.Unmarshal(rr.Body.Bytes(), &set) + if set.Generation != 2 { + t.Fatalf("generation after set #2 = %d, want 2", set.Generation) + } + + // GET serves the latest body + its generation. + rr = do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", "") + if rr.Code != 200 { + t.Fatalf("GET desired-state: %d", rr.Code) + } + var got struct { + Generation int64 `json:"generation"` + DesiredState json.RawMessage `json:"desired_state"` + } + json.Unmarshal(rr.Body.Bytes(), &got) + if got.Generation != 2 { + t.Errorf("served generation = %d, want 2", got.Generation) + } + var ds struct { + Guests []struct { + VMID int `json:"vmid"` + Run string `json:"run"` + } `json:"guests"` + } + if err := json.Unmarshal(got.DesiredState, &ds); err != nil { + t.Fatalf("desired_state not valid JSON: %v", err) + } + if len(ds.Guests) != 1 || ds.Guests[0].VMID != 100 || ds.Guests[0].Run != "stopped" { + t.Errorf("served desired_state = %+v, want the latest (vmid 100 stopped)", ds.Guests) + } +} + +// Admin-set requires the global key; a per-host key is refused. +func TestAdminSetDesiredState_GlobalKeyOnly(t *testing.T) { + h, st, _ := newTestHandler(t) + seedHost(t, st, "h1", "c1", "HKEY1") + + if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", "HKEY1", `{"x":1}`); rr.Code != http.StatusForbidden { + t.Errorf("per-host key admin-set = %d, want 403", rr.Code) + } + if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `{"x":1}`); rr.Code != http.StatusOK { + t.Errorf("global key admin-set = %d, want 200", rr.Code) + } + // Malformed JSON is rejected at the door. + if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `not json`); rr.Code != http.StatusBadRequest { + t.Errorf("malformed admin-set = %d, want 400", rr.Code) + } + // Unknown host → 404. + if rr := do(h, http.MethodPut, "/admin/hosts/nope/desired-state", globalKey, `{}`); rr.Code != http.StatusNotFound { + t.Errorf("unknown host admin-set = %d, want 404", rr.Code) + } +} + +// GET /desired-state is SELF-SCOPED: host A's key cannot read host B; the global key can read any. +func TestGetDesiredState_SelfScoped(t *testing.T) { + h, st, _ := newTestHandler(t) + seedHost(t, st, "h1", "c1", "HKEY1") + seedHost(t, st, "h2", "c2", "HKEY2") + if _, err := st.SetHostDesired("h1", []byte(`{"guests":[]}`)); err != nil { + t.Fatal(err) + } + + // h1's key reading h1 → 200. + if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", ""); rr.Code != 200 { + t.Errorf("h1 reading h1 = %d, want 200", rr.Code) + } + // h2's key reading h1 → 403 (the headline self-scoping check). + if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY2", ""); rr.Code != http.StatusForbidden { + t.Errorf("h2 reading h1 = %d, want 403", rr.Code) + } + // Global key reading any → 200. + if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", globalKey, ""); rr.Code != 200 { + t.Errorf("global reading h1 = %d, want 200", rr.Code) + } + // No token → 401. + if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "", ""); rr.Code != http.StatusUnauthorized { + t.Errorf("no token = %d, want 401", rr.Code) + } +} + +// The control envelope on a host-report carries the current generation + has_signed_ops flag. +func TestHostReportEnvelope_GenerationAndSignedOps(t *testing.T) { + h, st, _ := newTestHandler(t) + st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"}) + seedHost(t, st, "h1", "c1", "HKEY") + + readEnvelope := func() (int64, bool) { + rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("h1")) + if rr.Code != 200 { + t.Fatalf("host-report: %d body=%s", rr.Code, rr.Body.String()) + } + var env struct { + DesiredGeneration int64 `json:"desired_generation"` + HasSignedOps bool `json:"has_signed_ops"` + } + json.Unmarshal(rr.Body.Bytes(), &env) + return env.DesiredGeneration, env.HasSignedOps + } + + // Fresh host: generation 0, no signed ops. + if gen, signed := readEnvelope(); gen != 0 || signed { + t.Fatalf("fresh envelope = gen %d signed %v, want 0/false", gen, signed) + } + + // After an admin-set: generation advances on the next heartbeat. + if _, err := st.SetHostDesired("h1", []byte(`{"guests":[]}`)); err != nil { + t.Fatal(err) + } + if gen, _ := readEnvelope(); gen != 1 { + t.Errorf("envelope generation after set = %d, want 1", gen) + } + + // After enqueueing a signed job: has_signed_ops flips true. + if err := st.EnqueueSignedJob("h1", "job1", []byte("opaque-signed-blob")); err != nil { + t.Fatal(err) + } + if _, signed := readEnvelope(); !signed { + t.Errorf("has_signed_ops after enqueue = false, want true") + } +} + +// GET /jobs is self-scoped and serves the enqueued opaque blobs (oldest first). +func TestGetJobs_SelfScopedAndServesBlobs(t *testing.T) { + h, st, _ := newTestHandler(t) + seedHost(t, st, "h1", "c1", "HKEY1") + seedHost(t, st, "h2", "c2", "HKEY2") + st.EnqueueSignedJob("h1", "jobA", []byte("blobA")) + st.EnqueueSignedJob("h1", "jobB", []byte("blobB")) + + // h2 reading h1's jobs → 403. + if rr := do(h, http.MethodGet, "/hosts/h1/jobs", "HKEY2", ""); rr.Code != http.StatusForbidden { + t.Errorf("h2 reading h1 jobs = %d, want 403", rr.Code) + } + // h1 reading its own → 200 with both blobs, oldest first. + rr := do(h, http.MethodGet, "/hosts/h1/jobs", "HKEY1", "") + if rr.Code != 200 { + t.Fatalf("h1 reading jobs = %d", rr.Code) + } + var resp struct { + Jobs []struct { + JobID string `json:"job_id"` + BlobB64 string `json:"blob_b64"` + } `json:"jobs"` + } + json.Unmarshal(rr.Body.Bytes(), &resp) + if len(resp.Jobs) != 2 || resp.Jobs[0].JobID != "jobA" || resp.Jobs[1].JobID != "jobB" { + t.Fatalf("jobs = %+v, want jobA then jobB", resp.Jobs) + } + blobA, _ := base64.StdEncoding.DecodeString(resp.Jobs[0].BlobB64) + if string(blobA) != "blobA" { + t.Errorf("jobA blob = %q, want blobA", blobA) + } +} + +// The admin enqueue-job endpoint (global key only) seeds the queue, reflected in has_signed_ops. +func TestAdminEnqueueJob_GlobalKeyOnly(t *testing.T) { + h, st, _ := newTestHandler(t) + seedHost(t, st, "h1", "c1", "HKEY1") + blob := base64.StdEncoding.EncodeToString([]byte("signed-op-envelope")) + + // per-host key → 403. + if rr := do(h, http.MethodPost, "/admin/hosts/h1/jobs", "HKEY1", `{"blob_b64":"`+blob+`"}`); rr.Code != http.StatusForbidden { + t.Errorf("per-host enqueue = %d, want 403", rr.Code) + } + // global key → 201. + if rr := do(h, http.MethodPost, "/admin/hosts/h1/jobs", globalKey, `{"job_id":"j1","blob_b64":"`+blob+`"}`); rr.Code != http.StatusCreated { + t.Fatalf("global enqueue = %d, want 201", rr.Code) + } + if n, _ := st.CountSignedJobs("h1"); n != 1 { + t.Errorf("queue depth = %d, want 1", n) + } +} diff --git a/hub/internal/api/handler.go b/hub/internal/api/handler.go index fca95d8..aa4a2d9 100644 --- a/hub/internal/api/handler.go +++ b/hub/internal/api/handler.go @@ -3,6 +3,7 @@ package api import ( "bytes" "crypto/subtle" + "database/sql" "encoding/base64" "encoding/json" "fmt" @@ -128,6 +129,20 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case r.Method == http.MethodPut && strings.HasPrefix(path, "/hosts/") && strings.HasSuffix(path, "/escrow"): hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/hosts/"), "/escrow") h.handleHostEscrowPut(w, r, hostID) + // Desired-state serving (slice 10A) — per-host-key, self-scoped (a host reads only its own). + case r.Method == http.MethodGet && strings.HasPrefix(path, "/hosts/") && strings.HasSuffix(path, "/desired-state"): + hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/hosts/"), "/desired-state") + h.handleGetDesiredState(w, r, hostID) + case r.Method == http.MethodGet && strings.HasPrefix(path, "/hosts/") && strings.HasSuffix(path, "/jobs"): + hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/hosts/"), "/jobs") + h.handleGetJobs(w, r, hostID) + // Admin-set (slice 10A) — global/operator key only; bumps the generation. + case r.Method == http.MethodPut && strings.HasPrefix(path, "/admin/hosts/") && strings.HasSuffix(path, "/desired-state"): + hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/admin/hosts/"), "/desired-state") + h.handleAdminSetDesiredState(w, r, hostID) + case r.Method == http.MethodPost && strings.HasPrefix(path, "/admin/hosts/") && strings.HasSuffix(path, "/jobs"): + hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/admin/hosts/"), "/jobs") + h.handleAdminEnqueueJob(w, r, hostID) case r.Method == http.MethodPost && path == "/event": h.handleEvent(w, r) case r.Method == http.MethodPost && path == "/notify": @@ -500,12 +515,26 @@ func (h *Handler) handleHostReport(w http.ResponseWriter, r *http.Request) { if cc, err := h.store.GetCustomerConfig(custID); err == nil && cc != nil && cc.Status == "blocked" { blocked = true } + + // Control envelope (slice 10A): the cheap change-notification. desired_generation is the + // host's current generation (the agent re-fetches the full desired-state only when it + // advances past its cached one); has_signed_ops flags a non-empty signed-jobs queue (the + // agent fetches/executes them in 10B). Both degrade safely to their slice-4 defaults on a + // store error — a heartbeat must never fail on the control channel. + var desiredGen int64 + if host, err := h.store.GetHost(hostID); err == nil && host != nil { + desiredGen = host.DesiredGeneration + } + hasSignedOps := false + if n, err := h.store.CountSignedJobs(hostID); err == nil && n > 0 { + hasSignedOps = true + } resp := map[string]interface{}{ "status": "ok", "poll_interval_seconds": defaultHostPollSeconds, "blocked": blocked, - "desired_generation": 0, // reserved (slice 4) - "has_signed_ops": false, // reserved (slice 4) + "desired_generation": desiredGen, + "has_signed_ops": hasSignedOps, } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -633,6 +662,175 @@ func (h *Handler) handleHostEscrowPut(w http.ResponseWriter, r *http.Request, pa w.Write([]byte(`{"status":"ok"}`)) } +// handleGetDesiredState serves a host its authoritative desired-state (slice 10A). Per-host key, +// SELF-SCOPED: a host reads ONLY its own (the global operator key may read any). The agent fetches +// this when the heartbeat envelope's desired_generation has advanced past its cached one. The +// response carries the generation the state corresponds to, so the agent caches it atomically. +func (h *Handler) handleGetDesiredState(w http.ResponseWriter, r *http.Request, pathHostID string) { + authHostID, _, isGlobal, ok := h.checkAuthHost(r) + if !ok { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + if pathHostID == "" { + http.Error(w, "Missing host_id", http.StatusBadRequest) + return + } + if !isGlobal && authHostID != pathHostID { + http.Error(w, "Forbidden: host_id mismatch", http.StatusForbidden) + return + } + host, err := h.store.GetHost(pathHostID) + if err != nil { + h.logger.Printf("[ERROR] desired-state lookup for %s: %v", pathHostID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + if host == nil { + http.Error(w, "Unknown host_id", http.StatusNotFound) + return + } + desired := host.DesiredJSON + if strings.TrimSpace(desired) == "" { + desired = "{}" + } + resp := map[string]interface{}{ + "generation": host.DesiredGeneration, + "desired_state": json.RawMessage(desired), // opaque to the hub — agent owns the schema + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) +} + +// handleGetJobs serves a host its pending signed-op blobs (slice 10A). Per-host key, SELF-SCOPED. +// The blobs are OPAQUE (the hub never forged or opened them); the agent verifies + executes them +// in 10B. 10A only serves the queue. +func (h *Handler) handleGetJobs(w http.ResponseWriter, r *http.Request, pathHostID string) { + authHostID, _, isGlobal, ok := h.checkAuthHost(r) + if !ok { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + if pathHostID == "" { + http.Error(w, "Missing host_id", http.StatusBadRequest) + return + } + if !isGlobal && authHostID != pathHostID { + http.Error(w, "Forbidden: host_id mismatch", http.StatusForbidden) + return + } + jobs, err := h.store.GetSignedJobs(pathHostID) + if err != nil { + h.logger.Printf("[ERROR] jobs lookup for %s: %v", pathHostID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + out := make([]map[string]string, 0, len(jobs)) + for _, j := range jobs { + out = append(out, map[string]string{ + "job_id": j.JobID, + "blob_b64": base64.StdEncoding.EncodeToString(j.Blob), + "created_at": j.CreatedAt, + }) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{"jobs": out}) +} + +// handleAdminSetDesiredState sets a host's desired-state (slice 10A). GLOBAL/operator key ONLY — +// a per-host key cannot author its own intent. The body is the desired-state JSON (opaque to the +// hub: it stores + serves bytes, never validates/interprets the schema — the agent/CLI owns it). +// Writing BUMPS desired_generation so the next heartbeat signals the agent to re-fetch. +func (h *Handler) handleAdminSetDesiredState(w http.ResponseWriter, r *http.Request, pathHostID string) { + _, _, isGlobal, ok := h.checkAuthHost(r) + if !ok || !isGlobal { + http.Error(w, "Forbidden: global key required", http.StatusForbidden) + return + } + if pathHostID == "" { + http.Error(w, "Missing host_id", http.StatusBadRequest) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + // Validate it is well-formed JSON (the hub does not interpret the schema, but a malformed + // blob would break the agent's parse — reject it at the door). + if !json.Valid(body) { + http.Error(w, "Invalid payload: body must be JSON", http.StatusBadRequest) + return + } + gen, err := h.store.SetHostDesired(pathHostID, body) + if err == sql.ErrNoRows { + http.Error(w, "Unknown host_id", http.StatusNotFound) + return + } + if err != nil { + h.logger.Printf("[ERROR] set desired-state for %s: %v", pathHostID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + h.logger.Printf("[INFO] admin-set desired-state for host %s (generation now %d, %d bytes)", pathHostID, gen, len(body)) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{"status": "ok", "generation": gen}) +} + +// handleAdminEnqueueJob appends an opaque signed-op blob to a host's queue (slice 10A). GLOBAL key +// ONLY. The blob is pre-signed off-hub (the hub holds no signing key); the hub stores it verbatim. +// This is the minimal operator path to seed the queue so HasSignedOps/serving are exercisable; the +// rich operator/signing UX is later. Execution is 10B. +func (h *Handler) handleAdminEnqueueJob(w http.ResponseWriter, r *http.Request, pathHostID string) { + _, _, isGlobal, ok := h.checkAuthHost(r) + if !ok || !isGlobal { + http.Error(w, "Forbidden: global key required", http.StatusForbidden) + return + } + if pathHostID == "" { + http.Error(w, "Missing host_id", http.StatusBadRequest) + return + } + host, err := h.store.GetHost(pathHostID) + if err != nil || host == nil { + http.Error(w, "Unknown host_id", http.StatusNotFound) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + var req struct { + JobID string `json:"job_id"` + BlobB64 string `json:"blob_b64"` + } + if err := json.Unmarshal(body, &req); err != nil || req.BlobB64 == "" { + http.Error(w, "Invalid payload: blob_b64 required", http.StatusBadRequest) + return + } + blob, err := base64.StdEncoding.DecodeString(req.BlobB64) + if err != nil || len(blob) == 0 { + http.Error(w, "Invalid payload: blob_b64 not valid base64", http.StatusBadRequest) + return + } + if req.JobID == "" { + req.JobID, _ = configgen.RandomHex(8) + } + if err := h.store.EnqueueSignedJob(pathHostID, req.JobID, blob); err != nil { + h.logger.Printf("[ERROR] enqueue job for %s: %v", pathHostID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + h.logger.Printf("[INFO] enqueued signed-op job %s for host %s (%d bytes)", req.JobID, pathHostID, len(blob)) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]interface{}{"status": "ok", "job_id": req.JobID}) +} + // allowedEventTypes lists all valid event_type values the Hub accepts. var allowedEventTypes = map[string]bool{ // Controller-pushed events diff --git a/hub/internal/api/testdata/control-envelope.golden.json b/hub/internal/api/testdata/control-envelope.golden.json new file mode 100644 index 0000000..a935393 --- /dev/null +++ b/hub/internal/api/testdata/control-envelope.golden.json @@ -0,0 +1,7 @@ +{ + "status": "ok", + "poll_interval_seconds": 900, + "blocked": false, + "desired_generation": 4, + "has_signed_ops": true +} diff --git a/hub/internal/api/testdata/desired-state.golden.json b/hub/internal/api/testdata/desired-state.golden.json new file mode 100644 index 0000000..1df911c --- /dev/null +++ b/hub/internal/api/testdata/desired-state.golden.json @@ -0,0 +1,23 @@ +{ + "generation": 4, + "desired_state": { + "guests": [ + { + "vmid": 100, + "run": "running", + "spec": { "cores": 2, "memory_bytes": 2147483648, "disk_bytes": 21474836480 }, + "description": "felhom: acme prod" + }, + { + "vmid": 200, + "decommission": true + } + ], + "pbs_namespace": "felhom-cust-acme", + "restore_directive": { + "mode": "guest_loss", + "archive": "local:backup/vzdump-lxc-200-2026_06_09-11_00_00.tar.zst", + "vmid": 200 + } + } +} diff --git a/hub/internal/store/store.go b/hub/internal/store/store.go index 74dd55c..2d3ede1 100644 --- a/hub/internal/store/store.go +++ b/hub/internal/store/store.go @@ -282,6 +282,20 @@ func (s *Store) migrate() error { created_at DATETIME NOT NULL, updated_at DATETIME NOT NULL DEFAULT (datetime('now')) ); + + -- signed_jobs (slice 10A): the per-host queue of OPAQUE operator-signed destructive-op + -- blobs. The hub STORES + SERVES them; it never forges one (there is no signing key + -- hub-side) and never executes them (execution + signature verification is slice 10B). + -- HasSignedOps on the control envelope is "this host has >=1 pending job". A job is opaque + -- bytes (the signed-op envelope the agent verifies); the hub treats it as a blob. + CREATE TABLE IF NOT EXISTS signed_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + host_id TEXT NOT NULL, + job_id TEXT NOT NULL, + blob BLOB NOT NULL, + created_at DATETIME NOT NULL DEFAULT (datetime('now')) + ); + CREATE INDEX IF NOT EXISTS idx_signed_jobs_host ON signed_jobs(host_id, id); `) if err != nil { return err @@ -1439,6 +1453,70 @@ func (s *Store) GetHostEscrow(hostID string) (*HostEscrow, error) { return &e, nil } +// SetHostDesired sets a host's desired-state JSON and ATOMICALLY bumps its desired_generation +// (slice 10A — the operator "admin-set" write). Returns the NEW generation. The generation is +// the cheap change-signal carried on every heartbeat envelope; the agent re-fetches the full +// desired-state only when it advances. Errors with sql.ErrNoRows if the host does not exist. +func (s *Store) SetHostDesired(hostID string, desiredJSON []byte) (int64, error) { + res, err := s.db.Exec(` + UPDATE hosts SET desired_json = ?, desired_generation = desired_generation + 1, + updated_at = datetime('now') + WHERE host_id = ?`, string(desiredJSON), hostID) + if err != nil { + return 0, err + } + if n, _ := res.RowsAffected(); n == 0 { + return 0, sql.ErrNoRows // unknown host + } + var gen int64 + if err := s.db.QueryRow(`SELECT desired_generation FROM hosts WHERE host_id = ?`, hostID).Scan(&gen); err != nil { + return 0, err + } + return gen, nil +} + +// SignedJob is one OPAQUE operator-signed destructive-op blob queued for a host (slice 10A). The +// hub stores + serves the bytes; it never forges, opens, or executes them (10B owns verify+run). +type SignedJob struct { + JobID string + Blob []byte + CreatedAt string +} + +// EnqueueSignedJob appends an opaque signed-op blob to a host's queue (slice 10A). Operator-side; +// the hub holds no signing key — the blob arrives pre-signed. +func (s *Store) EnqueueSignedJob(hostID, jobID string, blob []byte) error { + _, err := s.db.Exec(`INSERT INTO signed_jobs (host_id, job_id, blob) VALUES (?, ?, ?)`, + hostID, jobID, blob) + return err +} + +// GetSignedJobs returns a host's pending signed-op blobs, oldest first (slice 10A serving). +func (s *Store) GetSignedJobs(hostID string) ([]SignedJob, error) { + rows, err := s.db.Query(`SELECT job_id, blob, created_at FROM signed_jobs WHERE host_id = ? ORDER BY id ASC`, hostID) + if err != nil { + return nil, err + } + defer rows.Close() + var jobs []SignedJob + for rows.Next() { + var j SignedJob + if err := rows.Scan(&j.JobID, &j.Blob, &j.CreatedAt); err != nil { + return nil, err + } + jobs = append(jobs, j) + } + return jobs, rows.Err() +} + +// CountSignedJobs returns the number of pending signed-op blobs for a host (drives the envelope's +// has_signed_ops flag — the cheap "fetch your jobs" notification). +func (s *Store) CountSignedJobs(hostID string) (int, error) { + var n int + err := s.db.QueryRow(`SELECT COUNT(*) FROM signed_jobs WHERE host_id = ?`, hostID).Scan(&n) + return n, err +} + // SaveHostReport inserts a host_reports row and bumps the host's reality columns // (agent_version/last_report_at/updated_at) — never the inert intent columns. func (s *Store) SaveHostReport(hostID, customerID string, reportJSON []byte, d HostReportDenorm) error {