slice 10A: hub desired-state serving + signed-jobs queue (Down channel) (hub v0.9.0)

Serve operator intent to authenticated hosts: PUT /admin/hosts/{id}/desired-state
(global key) bumps desired_generation; GET /hosts/{id}/desired-state + /jobs are
per-host self-scoped; the host-report envelope now carries the real generation +
has_signed_ops. New signed_jobs table + store methods. Desired-state stored/served
opaquely (agent owns the schema). Cross-repo golden (envelope + desired-state)
byte-identical with felhom-agent; doc 03 §4/§9 updated.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-10 19:03:14 +02:00
parent f9af3243b9
commit e54f882e70
8 changed files with 669 additions and 30 deletions
+264
View File
@@ -0,0 +1,264 @@
package api
import (
"encoding/base64"
"encoding/json"
"net/http"
"os"
"testing"
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
)
// The desired-state wire is a contract DUPLICATED with felhom-agent. testdata/desired-state.golden.json
// MUST stay byte-identical with the agent's internal/hub/testdata copy. The hub serves desired_state
// OPAQUELY (stores + emits the bytes), so this test proves the golden's desired_state round-trips
// through admin-set → GET unchanged (the pass-through contract).
func TestDesiredStateGolden_RoundTripsThroughHub(t *testing.T) {
h, st, _ := newTestHandler(t)
seedHost(t, st, "h1", "c1", "HKEY1")
raw, err := os.ReadFile("testdata/desired-state.golden.json")
if err != nil {
t.Fatal(err)
}
var golden struct {
DesiredState json.RawMessage `json:"desired_state"`
}
if err := json.Unmarshal(raw, &golden); err != nil {
t.Fatal(err)
}
// Operator sets the golden's desired_state.
if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, string(golden.DesiredState)); rr.Code != http.StatusOK {
t.Fatalf("admin-set golden desired_state: %d body=%s", rr.Code, rr.Body.String())
}
// The host fetches it back — byte-for-byte the same object (the hub never reshapes it).
rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", "")
if rr.Code != 200 {
t.Fatalf("GET desired-state: %d", rr.Code)
}
var got struct {
Generation int64 `json:"generation"`
DesiredState json.RawMessage `json:"desired_state"`
}
json.Unmarshal(rr.Body.Bytes(), &got)
if got.Generation != 1 {
t.Errorf("generation = %d, want 1", got.Generation)
}
// Compare semantically (re-marshal both through a generic map to ignore whitespace).
var want, have any
json.Unmarshal(golden.DesiredState, &want)
json.Unmarshal(got.DesiredState, &have)
wb, _ := json.Marshal(want)
hb, _ := json.Marshal(have)
if string(wb) != string(hb) {
t.Errorf("served desired_state diverged from the set golden:\n set: %s\n served: %s", wb, hb)
}
}
// seedHost mints a host directly in the store for desired-state tests.
func seedHost(t *testing.T, st *store.Store, hostID, custID, apiKey string) {
t.Helper()
if err := st.UpsertHost(&store.Host{HostID: hostID, CustomerID: custID, APIKey: apiKey}); err != nil {
t.Fatalf("UpsertHost(%s): %v", hostID, err)
}
}
// Admin-set bumps the generation each write and the served desired-state reflects the latest body.
func TestAdminSetDesiredState_BumpsGenerationAndServes(t *testing.T) {
h, st, _ := newTestHandler(t)
seedHost(t, st, "h1", "c1", "HKEY1")
// Initial generation is 0 (nothing set yet).
if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", ""); rr.Code != 200 {
t.Fatalf("initial GET desired-state: %d", rr.Code)
}
// First admin-set → generation 1.
rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `{"guests":[{"vmid":100,"run":"running"}]}`)
if rr.Code != http.StatusOK {
t.Fatalf("admin-set #1: %d body=%s", rr.Code, rr.Body.String())
}
var set struct {
Generation int64 `json:"generation"`
}
json.Unmarshal(rr.Body.Bytes(), &set)
if set.Generation != 1 {
t.Fatalf("generation after set #1 = %d, want 1", set.Generation)
}
// Second admin-set → generation 2 (monotonic).
rr = do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `{"guests":[{"vmid":100,"run":"stopped"}]}`)
json.Unmarshal(rr.Body.Bytes(), &set)
if set.Generation != 2 {
t.Fatalf("generation after set #2 = %d, want 2", set.Generation)
}
// GET serves the latest body + its generation.
rr = do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", "")
if rr.Code != 200 {
t.Fatalf("GET desired-state: %d", rr.Code)
}
var got struct {
Generation int64 `json:"generation"`
DesiredState json.RawMessage `json:"desired_state"`
}
json.Unmarshal(rr.Body.Bytes(), &got)
if got.Generation != 2 {
t.Errorf("served generation = %d, want 2", got.Generation)
}
var ds struct {
Guests []struct {
VMID int `json:"vmid"`
Run string `json:"run"`
} `json:"guests"`
}
if err := json.Unmarshal(got.DesiredState, &ds); err != nil {
t.Fatalf("desired_state not valid JSON: %v", err)
}
if len(ds.Guests) != 1 || ds.Guests[0].VMID != 100 || ds.Guests[0].Run != "stopped" {
t.Errorf("served desired_state = %+v, want the latest (vmid 100 stopped)", ds.Guests)
}
}
// Admin-set requires the global key; a per-host key is refused.
func TestAdminSetDesiredState_GlobalKeyOnly(t *testing.T) {
h, st, _ := newTestHandler(t)
seedHost(t, st, "h1", "c1", "HKEY1")
if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", "HKEY1", `{"x":1}`); rr.Code != http.StatusForbidden {
t.Errorf("per-host key admin-set = %d, want 403", rr.Code)
}
if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `{"x":1}`); rr.Code != http.StatusOK {
t.Errorf("global key admin-set = %d, want 200", rr.Code)
}
// Malformed JSON is rejected at the door.
if rr := do(h, http.MethodPut, "/admin/hosts/h1/desired-state", globalKey, `not json`); rr.Code != http.StatusBadRequest {
t.Errorf("malformed admin-set = %d, want 400", rr.Code)
}
// Unknown host → 404.
if rr := do(h, http.MethodPut, "/admin/hosts/nope/desired-state", globalKey, `{}`); rr.Code != http.StatusNotFound {
t.Errorf("unknown host admin-set = %d, want 404", rr.Code)
}
}
// GET /desired-state is SELF-SCOPED: host A's key cannot read host B; the global key can read any.
func TestGetDesiredState_SelfScoped(t *testing.T) {
h, st, _ := newTestHandler(t)
seedHost(t, st, "h1", "c1", "HKEY1")
seedHost(t, st, "h2", "c2", "HKEY2")
if _, err := st.SetHostDesired("h1", []byte(`{"guests":[]}`)); err != nil {
t.Fatal(err)
}
// h1's key reading h1 → 200.
if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY1", ""); rr.Code != 200 {
t.Errorf("h1 reading h1 = %d, want 200", rr.Code)
}
// h2's key reading h1 → 403 (the headline self-scoping check).
if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "HKEY2", ""); rr.Code != http.StatusForbidden {
t.Errorf("h2 reading h1 = %d, want 403", rr.Code)
}
// Global key reading any → 200.
if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", globalKey, ""); rr.Code != 200 {
t.Errorf("global reading h1 = %d, want 200", rr.Code)
}
// No token → 401.
if rr := do(h, http.MethodGet, "/hosts/h1/desired-state", "", ""); rr.Code != http.StatusUnauthorized {
t.Errorf("no token = %d, want 401", rr.Code)
}
}
// The control envelope on a host-report carries the current generation + has_signed_ops flag.
func TestHostReportEnvelope_GenerationAndSignedOps(t *testing.T) {
h, st, _ := newTestHandler(t)
st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"})
seedHost(t, st, "h1", "c1", "HKEY")
readEnvelope := func() (int64, bool) {
rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("h1"))
if rr.Code != 200 {
t.Fatalf("host-report: %d body=%s", rr.Code, rr.Body.String())
}
var env struct {
DesiredGeneration int64 `json:"desired_generation"`
HasSignedOps bool `json:"has_signed_ops"`
}
json.Unmarshal(rr.Body.Bytes(), &env)
return env.DesiredGeneration, env.HasSignedOps
}
// Fresh host: generation 0, no signed ops.
if gen, signed := readEnvelope(); gen != 0 || signed {
t.Fatalf("fresh envelope = gen %d signed %v, want 0/false", gen, signed)
}
// After an admin-set: generation advances on the next heartbeat.
if _, err := st.SetHostDesired("h1", []byte(`{"guests":[]}`)); err != nil {
t.Fatal(err)
}
if gen, _ := readEnvelope(); gen != 1 {
t.Errorf("envelope generation after set = %d, want 1", gen)
}
// After enqueueing a signed job: has_signed_ops flips true.
if err := st.EnqueueSignedJob("h1", "job1", []byte("opaque-signed-blob")); err != nil {
t.Fatal(err)
}
if _, signed := readEnvelope(); !signed {
t.Errorf("has_signed_ops after enqueue = false, want true")
}
}
// GET /jobs is self-scoped and serves the enqueued opaque blobs (oldest first).
func TestGetJobs_SelfScopedAndServesBlobs(t *testing.T) {
h, st, _ := newTestHandler(t)
seedHost(t, st, "h1", "c1", "HKEY1")
seedHost(t, st, "h2", "c2", "HKEY2")
st.EnqueueSignedJob("h1", "jobA", []byte("blobA"))
st.EnqueueSignedJob("h1", "jobB", []byte("blobB"))
// h2 reading h1's jobs → 403.
if rr := do(h, http.MethodGet, "/hosts/h1/jobs", "HKEY2", ""); rr.Code != http.StatusForbidden {
t.Errorf("h2 reading h1 jobs = %d, want 403", rr.Code)
}
// h1 reading its own → 200 with both blobs, oldest first.
rr := do(h, http.MethodGet, "/hosts/h1/jobs", "HKEY1", "")
if rr.Code != 200 {
t.Fatalf("h1 reading jobs = %d", rr.Code)
}
var resp struct {
Jobs []struct {
JobID string `json:"job_id"`
BlobB64 string `json:"blob_b64"`
} `json:"jobs"`
}
json.Unmarshal(rr.Body.Bytes(), &resp)
if len(resp.Jobs) != 2 || resp.Jobs[0].JobID != "jobA" || resp.Jobs[1].JobID != "jobB" {
t.Fatalf("jobs = %+v, want jobA then jobB", resp.Jobs)
}
blobA, _ := base64.StdEncoding.DecodeString(resp.Jobs[0].BlobB64)
if string(blobA) != "blobA" {
t.Errorf("jobA blob = %q, want blobA", blobA)
}
}
// The admin enqueue-job endpoint (global key only) seeds the queue, reflected in has_signed_ops.
func TestAdminEnqueueJob_GlobalKeyOnly(t *testing.T) {
h, st, _ := newTestHandler(t)
seedHost(t, st, "h1", "c1", "HKEY1")
blob := base64.StdEncoding.EncodeToString([]byte("signed-op-envelope"))
// per-host key → 403.
if rr := do(h, http.MethodPost, "/admin/hosts/h1/jobs", "HKEY1", `{"blob_b64":"`+blob+`"}`); rr.Code != http.StatusForbidden {
t.Errorf("per-host enqueue = %d, want 403", rr.Code)
}
// global key → 201.
if rr := do(h, http.MethodPost, "/admin/hosts/h1/jobs", globalKey, `{"job_id":"j1","blob_b64":"`+blob+`"}`); rr.Code != http.StatusCreated {
t.Fatalf("global enqueue = %d, want 201", rr.Code)
}
if n, _ := st.CountSignedJobs("h1"); n != 1 {
t.Errorf("queue depth = %d, want 1", n)
}
}
+200 -2
View File
@@ -3,6 +3,7 @@ package api
import (
"bytes"
"crypto/subtle"
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
@@ -128,6 +129,20 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case r.Method == http.MethodPut && strings.HasPrefix(path, "/hosts/") && strings.HasSuffix(path, "/escrow"):
hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/hosts/"), "/escrow")
h.handleHostEscrowPut(w, r, hostID)
// Desired-state serving (slice 10A) — per-host-key, self-scoped (a host reads only its own).
case r.Method == http.MethodGet && strings.HasPrefix(path, "/hosts/") && strings.HasSuffix(path, "/desired-state"):
hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/hosts/"), "/desired-state")
h.handleGetDesiredState(w, r, hostID)
case r.Method == http.MethodGet && strings.HasPrefix(path, "/hosts/") && strings.HasSuffix(path, "/jobs"):
hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/hosts/"), "/jobs")
h.handleGetJobs(w, r, hostID)
// Admin-set (slice 10A) — global/operator key only; bumps the generation.
case r.Method == http.MethodPut && strings.HasPrefix(path, "/admin/hosts/") && strings.HasSuffix(path, "/desired-state"):
hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/admin/hosts/"), "/desired-state")
h.handleAdminSetDesiredState(w, r, hostID)
case r.Method == http.MethodPost && strings.HasPrefix(path, "/admin/hosts/") && strings.HasSuffix(path, "/jobs"):
hostID := strings.TrimSuffix(strings.TrimPrefix(path, "/admin/hosts/"), "/jobs")
h.handleAdminEnqueueJob(w, r, hostID)
case r.Method == http.MethodPost && path == "/event":
h.handleEvent(w, r)
case r.Method == http.MethodPost && path == "/notify":
@@ -500,12 +515,26 @@ func (h *Handler) handleHostReport(w http.ResponseWriter, r *http.Request) {
if cc, err := h.store.GetCustomerConfig(custID); err == nil && cc != nil && cc.Status == "blocked" {
blocked = true
}
// Control envelope (slice 10A): the cheap change-notification. desired_generation is the
// host's current generation (the agent re-fetches the full desired-state only when it
// advances past its cached one); has_signed_ops flags a non-empty signed-jobs queue (the
// agent fetches/executes them in 10B). Both degrade safely to their slice-4 defaults on a
// store error — a heartbeat must never fail on the control channel.
var desiredGen int64
if host, err := h.store.GetHost(hostID); err == nil && host != nil {
desiredGen = host.DesiredGeneration
}
hasSignedOps := false
if n, err := h.store.CountSignedJobs(hostID); err == nil && n > 0 {
hasSignedOps = true
}
resp := map[string]interface{}{
"status": "ok",
"poll_interval_seconds": defaultHostPollSeconds,
"blocked": blocked,
"desired_generation": 0, // reserved (slice 4)
"has_signed_ops": false, // reserved (slice 4)
"desired_generation": desiredGen,
"has_signed_ops": hasSignedOps,
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
@@ -633,6 +662,175 @@ func (h *Handler) handleHostEscrowPut(w http.ResponseWriter, r *http.Request, pa
w.Write([]byte(`{"status":"ok"}`))
}
// handleGetDesiredState serves a host its authoritative desired-state (slice 10A). Per-host key,
// SELF-SCOPED: a host reads ONLY its own (the global operator key may read any). The agent fetches
// this when the heartbeat envelope's desired_generation has advanced past its cached one. The
// response carries the generation the state corresponds to, so the agent caches it atomically.
func (h *Handler) handleGetDesiredState(w http.ResponseWriter, r *http.Request, pathHostID string) {
authHostID, _, isGlobal, ok := h.checkAuthHost(r)
if !ok {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
if pathHostID == "" {
http.Error(w, "Missing host_id", http.StatusBadRequest)
return
}
if !isGlobal && authHostID != pathHostID {
http.Error(w, "Forbidden: host_id mismatch", http.StatusForbidden)
return
}
host, err := h.store.GetHost(pathHostID)
if err != nil {
h.logger.Printf("[ERROR] desired-state lookup for %s: %v", pathHostID, err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
if host == nil {
http.Error(w, "Unknown host_id", http.StatusNotFound)
return
}
desired := host.DesiredJSON
if strings.TrimSpace(desired) == "" {
desired = "{}"
}
resp := map[string]interface{}{
"generation": host.DesiredGeneration,
"desired_state": json.RawMessage(desired), // opaque to the hub — agent owns the schema
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
}
// handleGetJobs serves a host its pending signed-op blobs (slice 10A). Per-host key, SELF-SCOPED.
// The blobs are OPAQUE (the hub never forged or opened them); the agent verifies + executes them
// in 10B. 10A only serves the queue.
func (h *Handler) handleGetJobs(w http.ResponseWriter, r *http.Request, pathHostID string) {
authHostID, _, isGlobal, ok := h.checkAuthHost(r)
if !ok {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
if pathHostID == "" {
http.Error(w, "Missing host_id", http.StatusBadRequest)
return
}
if !isGlobal && authHostID != pathHostID {
http.Error(w, "Forbidden: host_id mismatch", http.StatusForbidden)
return
}
jobs, err := h.store.GetSignedJobs(pathHostID)
if err != nil {
h.logger.Printf("[ERROR] jobs lookup for %s: %v", pathHostID, err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
out := make([]map[string]string, 0, len(jobs))
for _, j := range jobs {
out = append(out, map[string]string{
"job_id": j.JobID,
"blob_b64": base64.StdEncoding.EncodeToString(j.Blob),
"created_at": j.CreatedAt,
})
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{"jobs": out})
}
// handleAdminSetDesiredState sets a host's desired-state (slice 10A). GLOBAL/operator key ONLY —
// a per-host key cannot author its own intent. The body is the desired-state JSON (opaque to the
// hub: it stores + serves bytes, never validates/interprets the schema — the agent/CLI owns it).
// Writing BUMPS desired_generation so the next heartbeat signals the agent to re-fetch.
func (h *Handler) handleAdminSetDesiredState(w http.ResponseWriter, r *http.Request, pathHostID string) {
_, _, isGlobal, ok := h.checkAuthHost(r)
if !ok || !isGlobal {
http.Error(w, "Forbidden: global key required", http.StatusForbidden)
return
}
if pathHostID == "" {
http.Error(w, "Missing host_id", http.StatusBadRequest)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
// Validate it is well-formed JSON (the hub does not interpret the schema, but a malformed
// blob would break the agent's parse — reject it at the door).
if !json.Valid(body) {
http.Error(w, "Invalid payload: body must be JSON", http.StatusBadRequest)
return
}
gen, err := h.store.SetHostDesired(pathHostID, body)
if err == sql.ErrNoRows {
http.Error(w, "Unknown host_id", http.StatusNotFound)
return
}
if err != nil {
h.logger.Printf("[ERROR] set desired-state for %s: %v", pathHostID, err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
h.logger.Printf("[INFO] admin-set desired-state for host %s (generation now %d, %d bytes)", pathHostID, gen, len(body))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{"status": "ok", "generation": gen})
}
// handleAdminEnqueueJob appends an opaque signed-op blob to a host's queue (slice 10A). GLOBAL key
// ONLY. The blob is pre-signed off-hub (the hub holds no signing key); the hub stores it verbatim.
// This is the minimal operator path to seed the queue so HasSignedOps/serving are exercisable; the
// rich operator/signing UX is later. Execution is 10B.
func (h *Handler) handleAdminEnqueueJob(w http.ResponseWriter, r *http.Request, pathHostID string) {
_, _, isGlobal, ok := h.checkAuthHost(r)
if !ok || !isGlobal {
http.Error(w, "Forbidden: global key required", http.StatusForbidden)
return
}
if pathHostID == "" {
http.Error(w, "Missing host_id", http.StatusBadRequest)
return
}
host, err := h.store.GetHost(pathHostID)
if err != nil || host == nil {
http.Error(w, "Unknown host_id", http.StatusNotFound)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
var req struct {
JobID string `json:"job_id"`
BlobB64 string `json:"blob_b64"`
}
if err := json.Unmarshal(body, &req); err != nil || req.BlobB64 == "" {
http.Error(w, "Invalid payload: blob_b64 required", http.StatusBadRequest)
return
}
blob, err := base64.StdEncoding.DecodeString(req.BlobB64)
if err != nil || len(blob) == 0 {
http.Error(w, "Invalid payload: blob_b64 not valid base64", http.StatusBadRequest)
return
}
if req.JobID == "" {
req.JobID, _ = configgen.RandomHex(8)
}
if err := h.store.EnqueueSignedJob(pathHostID, req.JobID, blob); err != nil {
h.logger.Printf("[ERROR] enqueue job for %s: %v", pathHostID, err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
h.logger.Printf("[INFO] enqueued signed-op job %s for host %s (%d bytes)", req.JobID, pathHostID, len(blob))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]interface{}{"status": "ok", "job_id": req.JobID})
}
// allowedEventTypes lists all valid event_type values the Hub accepts.
var allowedEventTypes = map[string]bool{
// Controller-pushed events
@@ -0,0 +1,7 @@
{
"status": "ok",
"poll_interval_seconds": 900,
"blocked": false,
"desired_generation": 4,
"has_signed_ops": true
}
+23
View File
@@ -0,0 +1,23 @@
{
"generation": 4,
"desired_state": {
"guests": [
{
"vmid": 100,
"run": "running",
"spec": { "cores": 2, "memory_bytes": 2147483648, "disk_bytes": 21474836480 },
"description": "felhom: acme prod"
},
{
"vmid": 200,
"decommission": true
}
],
"pbs_namespace": "felhom-cust-acme",
"restore_directive": {
"mode": "guest_loss",
"archive": "local:backup/vzdump-lxc-200-2026_06_09-11_00_00.tar.zst",
"vmid": 200
}
}
}