diff --git a/REPORT.md b/REPORT.md new file mode 100644 index 0000000..a2519a6 --- /dev/null +++ b/REPORT.md @@ -0,0 +1,91 @@ +# felhom.eu — task reports + +> One section per task, **appended** (newest last) — not overwritten. Cumulative +> hub history lives in [hub/CHANGELOG.md](hub/CHANGELOG.md). + +--- + +## Hub slice 3 — host-domain ingest (v0.7.0) — 2026-06-08 + +Purely **additive** host-domain ingest in `hub/`: new tables, the agent's +`/host-report` heartbeat endpoint, per-host Bearer auth, a provisional host mint, and a +host-domain dead-man's-switch. The existing controller path is **untouched**; the schema/ +auth cutover remains **slice 10**. Pushed to `main`; build/vet/test green locally and on +the build server. + +### New tables (`store.go migrate()`, idempotent — `// v0.7.0: host-domain`) +- **`hosts`** — one per customer agent. Reality columns (`agent_version`, `last_report_at`) + + operator-intent columns **INERT until slice 10** (`desired_json`, `desired_generation`, + `dr_record_json`). +- **`guests`** — one per controller LXC, PK `guest_id = "/"` (hub-derived). + Reality columns (`display_name`, `status`, `controller_version`, `vmid`, `last_seen_at`) + + **INERT** `api_key`, `desired_spec_json`. +- **`host_reports`** — the report stream + denormalized columns (cpu/mem/disk %, guest + counts, cloudflared status); pruned by `Prune(maxDays)` alongside `reports`. + +> Inert columns exist **now** so slice 10 needs no `ALTER`; nothing reads/writes them this +> slice. Migration is additive-only (no `DROP`, no edits to `reports`/`customer_configs`) +> and idempotent. + +### New store methods +`GetHostByAPIKey`, `GetHost`, `ListHosts`, `UpsertHost` (updates only identity + `updated_at` +on conflict), `SaveHostReport` (inserts a report row + bumps reality columns only), +`UpsertGuestFromReport` (updates reality columns only — **preserves** `api_key`/ +`desired_spec_json`), `GetHostStaleness` (skips never-reported hosts), `GuestID`. +Structs: `Host`, `Guest`, `HostReportDenorm`, `HostStaleRow`. + +### Auth (added; existing path unchanged) +`checkAuthHost(r)` → `(hostID, customerID, isGlobal, ok)`: global key → trust `body.host_id`; +per-host key → bound identity; failure → not-ok. `checkAuthCustomer` is byte-for-byte unchanged. + +### Endpoints +- **`POST /api/v1/host-report`** (the heartbeat): per-host auth; 4 MiB body; computes denorm + (`guest_running` counts only `status=="running"`); `SaveHostReport` + per-guest + `UpsertGuestFromReport` (a guest upsert failure is logged, not fatal — liveness); returns the + control envelope `{status:"ok", poll_interval_seconds:900, blocked, desired_generation:0, + has_signed_ops:false}`. `blocked` reflects `customer_configs.status`; the other two are + reserved placeholders (slice 4). Global-key bootstrap requires the host to already exist + (else 400); per-host key requires `body.host_id == hostID` (else 403). +- **`POST /api/v1/admin/hosts`** — **PROVISIONAL**, global-key only. Mints `host_id` (legible + `-`) + a random `api_key` (`configgen.RandomHex(32)`); 201 `{host_id, api_key}`. + Flagged in code as the slice-3 bootstrap to be removed/locked at enrollment (slices 7–8). + +### Host dead-man's-switch +`monitor.HostStalenessChecker` (`host_staleness.go`) — a **sibling** of the controller +`StalenessChecker`, keyed on host↔`host_reports`, emitting `host_stale`/`host_down`/ +`host_recovered` (30m / 60m), attributed to the host's customer (so the existing per-customer +notification UX picks them up). Registered in `allowedEventTypes`; wired in `main.go` on the +existing 60s ticker. The controller staleness/deadline checkers are untouched and keep running. + +### Contract +The `/host-report` JSON matches the agent spec §4 field-for-field (host_id, reported_at, +agent_version, host{…}, guests[{vmid,name,status,controller_version,spec}], cloudflared{status}, +and the empty storage_targets/backups/restore_tests/pbs_snapshots/audit_tail — accepted +empty/absent). The envelope matches agent spec §5. + +### Test matrix (new, hermetic — temp SQLite, no live data) +- **store**: upsert/lookup; a report-path update **preserves** `desired_json`/`desired_generation`; + guest upsert **preserves** `api_key`/`desired_spec_json` while updating reality; `GuestID`; + staleness skips never-reported. +- **auth**: `checkAuthHost` global / per-host / unknown. +- **ingest**: valid → 200 + envelope + denorm (`guest_running` = 1 of 2); host_id mismatch → 403; + unknown host under global key → 400; blocked customer → `blocked:true`; oversize body → 400. +- **admin mint**: non-global → 403; unknown customer → 400; success → 201 + minted key + round-trips through `/host-report`. +- **host staleness**: seed emits no events; ok→stale→down→recovered transitions. + +### Untouched / deferred (explicit) +- **Controller path unchanged**: `/api/v1/report`, `reports`, `customer_configs`, + `checkAuthCustomer`, existing staleness + deadline checkers — additions only, all still green. +- **Not built** (per scope): desired-state serving, `signed_ops`, geo→hub, DR-record migration, + dashboard re-design. The cutover (drop `reports`→`guest_reports`, merge checkers, tighten the + provisional admin/global-key auth) remains **slice 10**. + +### Versioning / deploy +Hub version is the `main.Version` ldflags var (`build.sh `), default `"dev"`; recorded +**v0.7.0** in `hub/CHANGELOG.md`. The image build + ArgoCD deploy are **not** part of this task +(no deploy performed). + +### Repo state +Branch: `main`. Verified `go build/vet/test ./...` green in `hub/` locally (go1.26) and on the +build server (go1.26). diff --git a/hub/CHANGELOG.md b/hub/CHANGELOG.md index 71ea9b8..60860fe 100644 --- a/hub/CHANGELOG.md +++ b/hub/CHANGELOG.md @@ -1,5 +1,19 @@ # Felhom Hub — Changelog +## v0.7.0 (2026-06-08) + +### Added — host-domain ingest (slice 3, additive; controller path untouched) +- **New tables** `hosts`, `guests`, `host_reports` (`store.go migrate()`, idempotent). Full schema now, including columns **inert until slice 10** (`hosts.desired_json`/`desired_generation`/`dr_record_json`, `guests.api_key`/`desired_spec_json`) so the cutover needs no `ALTER`. Nothing reads/writes the inert columns this slice. +- **`POST /api/v1/host-report`** — the agent's heartbeat. Per-host Bearer auth; 4 MiB body; persists the full report + denormalized fields (cpu/mem/disk %, guest counts, cloudflared status); upserts each guest's **reality** columns (`guest_id = "/"`, hub-derived); returns the control envelope `{status, poll_interval_seconds:900, blocked, desired_generation:0, has_signed_ops:false}` (`blocked` reflects the customer's status; the latter two are reserved/placeholder for slice 4). +- **Per-host key auth** — `checkAuthHost` (Bearer → host → customer), added alongside the unchanged `checkAuthCustomer`. Global key remains a bootstrap fallback. +- **`POST /api/v1/admin/hosts`** — **PROVISIONAL** global-key-only host mint (host_id + per-host api_key); the slice-3 bootstrap until enrollment (slices 7–8) replaces it. +- **Host dead-man's-switch** — `monitor.HostStalenessChecker` over `host_reports`, emitting `host_stale`/`host_down`/`host_recovered` (30m/60m), attributed to the host's customer; registered in `allowedEventTypes`; wired in `cmd/hub/main.go` on the existing 60s ticker. A deliberate **sibling** of the controller `StalenessChecker` (both run until slice 10). +- **Store methods**: `GetHostByAPIKey`, `GetHost`, `ListHosts`, `UpsertHost`, `SaveHostReport`, `UpsertGuestFromReport` (preserves inert columns on conflict), `GetHostStaleness` (skips never-reported hosts), `GuestID`. `Prune` now also prunes `host_reports` (same retention). +- **Tests** (new, hermetic): store, auth (`checkAuthHost`), ingest (valid+envelope+denorm, host_id mismatch→403, unknown-host-under-global→400, blocked→true, oversize→400), admin mint (non-global→403, unknown customer→400, mint+round-trip), host staleness transitions. + +### Unchanged (explicit) +- The controller path — `/api/v1/report`, `reports`, `customer_configs`, `checkAuthCustomer`, the existing staleness/deadline checkers — is untouched and still green. The old controller and the new agent report in parallel during slices 3–9; the schema/auth cutover is **slice 10**. + ## v0.6.2 (2026-02-26) ### Added diff --git a/hub/cmd/hub/main.go b/hub/cmd/hub/main.go index 258594e..8b99d6c 100644 --- a/hub/cmd/hub/main.go +++ b/hub/cmd/hub/main.go @@ -206,6 +206,9 @@ func main() { // Staleness checker — runs every 60s stalenessChecker := monitor.NewStalenessChecker(dataStore, staleThreshold, dispatcher.ProcessEvent, logger) + // v0.7.0: host-domain dead-man's-switch (sibling; the controller checker above is + // unchanged and keeps running until the slice-10 cutover). Same 60s cadence. + hostStalenessChecker := monitor.NewHostStalenessChecker(dataStore, staleThreshold, dispatcher.ProcessEvent, logger) go func() { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() @@ -215,6 +218,7 @@ func main() { return case <-ticker.C: stalenessChecker.Check() + hostStalenessChecker.Check() } } }() diff --git a/hub/internal/api/handler.go b/hub/internal/api/handler.go index 357fc5a..b0f1b56 100644 --- a/hub/internal/api/handler.go +++ b/hub/internal/api/handler.go @@ -89,6 +89,30 @@ func (h *Handler) checkAuthCustomer(r *http.Request) (customerID string, isGloba return cfg.CustomerID, false, true } +// checkAuthHost resolves a Bearer token to a HOST identity (the agent's auth +// path). It is a sibling of checkAuthCustomer — the controller path is unchanged. +// - global key -> ("", "", true, true) caller trusts body.host_id +// - per-host key -> (hostID, customerID, false, true) +// - failure -> ("", "", false, false) +func (h *Handler) checkAuthHost(r *http.Request) (hostID, customerID string, isGlobal, ok bool) { + auth := r.Header.Get("Authorization") + if !strings.HasPrefix(auth, "Bearer ") { + return "", "", false, false + } + token := strings.TrimPrefix(auth, "Bearer ") + + // Global key first (same constant-time compare as checkAuthCustomer). + if h.apiKey != "" && subtle.ConstantTimeCompare([]byte(token), []byte(h.apiKey)) == 1 { + return "", "", true, true + } + + host, err := h.store.GetHostByAPIKey(token) + if err != nil || host == nil { + return "", "", false, false + } + return host.HostID, host.CustomerID, false, true +} + // ServeHTTP routes API requests. func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/api/v1") @@ -96,6 +120,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodPost && path == "/report": h.handleReport(w, r) + case r.Method == http.MethodPost && path == "/host-report": + h.handleHostReport(w, r) + case r.Method == http.MethodPost && path == "/admin/hosts": + h.handleAdminCreateHost(w, r) case r.Method == http.MethodPost && path == "/event": h.handleEvent(w, r) case r.Method == http.MethodPost && path == "/notify": @@ -194,6 +222,194 @@ func (h *Handler) handleReport(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(resp) } +// defaultHostPollSeconds is the cadence the hub hands every agent this slice (no +// per-host override UI yet — that is a later slice). +const defaultHostPollSeconds = 900 + +// hostReportPayload is the subset of the agent host-report (slice-3 contract, +// §3 / agent spec §4) the hub needs for denorm + guest reality. Unknown fields +// (storage_targets/backups/restore_tests/pbs_snapshots/audit_tail) are ignored, +// so an empty or absent collection is accepted without error. +type hostReportPayload struct { + HostID string `json:"host_id"` + AgentVersion string `json:"agent_version"` + Host struct { + CPUPercent float64 `json:"cpu_percent"` + MemoryPercent float64 `json:"memory_percent"` + DiskPercent float64 `json:"disk_percent"` + } `json:"host"` + Guests []struct { + VMID int `json:"vmid"` + Name string `json:"name"` + Status string `json:"status"` + ControllerVersion string `json:"controller_version"` + } `json:"guests"` + Cloudflared struct { + Status string `json:"status"` + } `json:"cloudflared"` +} + +// handleHostReport ingests the agent's host-report (the heartbeat) and returns the +// control envelope (agent spec §5). +func (h *Handler) handleHostReport(w http.ResponseWriter, r *http.Request) { + hostID, custID, isGlobal, ok := h.checkAuthHost(r) + if !ok { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + // 4 MiB: host reports carry the full guest list + future storage/backup arrays; + // the controller path's 1 MiB is too tight here. + body, err := io.ReadAll(io.LimitReader(r.Body, 4<<20)) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + var rep hostReportPayload + if err := json.Unmarshal(body, &rep); err != nil || rep.HostID == "" { + http.Error(w, "Invalid payload: host_id required", http.StatusBadRequest) + return + } + + if isGlobal { + // Global-key bootstrap: trust body.host_id but require the host to exist + // (it must be minted first) and resolve its customer from the row. + host, err := h.store.GetHost(rep.HostID) + if err != nil { + h.logger.Printf("[ERROR] host lookup failed for %s: %v", rep.HostID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + if host == nil { + http.Error(w, "Unknown host_id (mint via /admin/hosts first)", http.StatusBadRequest) + return + } + hostID, custID = rep.HostID, host.CustomerID + } else if rep.HostID != hostID { + http.Error(w, "Forbidden: host_id mismatch", http.StatusForbidden) + return + } + + running := 0 + for _, g := range rep.Guests { + if g.Status == "running" { + running++ + } + } + denorm := store.HostReportDenorm{ + AgentVersion: rep.AgentVersion, + CPUPercent: rep.Host.CPUPercent, + MemoryPercent: rep.Host.MemoryPercent, + DiskPercent: rep.Host.DiskPercent, + GuestTotal: len(rep.Guests), + GuestRunning: running, + CloudflaredStatus: rep.Cloudflared.Status, + } + if err := h.store.SaveHostReport(hostID, custID, body, denorm); err != nil { + h.logger.Printf("[ERROR] Failed to save host-report from %s: %v", hostID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + + for _, g := range rep.Guests { + status := g.Status + if status == "" { + status = "unknown" + } + guest := &store.Guest{ + GuestID: store.GuestID(hostID, g.VMID), + CustomerID: custID, + HostID: hostID, + VMID: g.VMID, + DisplayName: g.Name, + Status: status, + ControllerVersion: g.ControllerVersion, + } + if err := h.store.UpsertGuestFromReport(guest); err != nil { + // A guest upsert failure must not drop the whole report (liveness). + h.logger.Printf("[WARN] Failed to upsert guest %s: %v", guest.GuestID, err) + } + } + + h.logger.Printf("[INFO] host-report from %s (%d guests, %d bytes)", hostID, len(rep.Guests), len(body)) + + blocked := false + if cc, err := h.store.GetCustomerConfig(custID); err == nil && cc != nil && cc.Status == "blocked" { + blocked = true + } + resp := map[string]interface{}{ + "status": "ok", + "poll_interval_seconds": defaultHostPollSeconds, + "blocked": blocked, + "desired_generation": 0, // reserved (slice 4) + "has_signed_ops": false, // reserved (slice 4) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) +} + +// handleAdminCreateHost mints a host identity (host_id + per-host api_key). +// +// PROVISIONAL (slice-3 bootstrap): global-key only, so the demo agent can +// authenticate before enrollment (slices 7–8) exists. Enrollment will mint host +// identity + pin signing keys; this endpoint should be removed/locked down then +// (tracked under doc 05 §11 auth-tightening at cutover). +func (h *Handler) handleAdminCreateHost(w http.ResponseWriter, r *http.Request) { + _, _, isGlobal, ok := h.checkAuthHost(r) + if !ok || !isGlobal { + http.Error(w, "Forbidden: global key required", http.StatusForbidden) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + var req struct { + CustomerID string `json:"customer_id"` + HostID string `json:"host_id"` + DisplayName string `json:"display_name"` + } + if err := json.Unmarshal(body, &req); err != nil || req.CustomerID == "" { + http.Error(w, "Invalid payload: customer_id required", http.StatusBadRequest) + return + } + cc, err := h.store.GetCustomerConfig(req.CustomerID) + if err != nil { + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + if cc == nil { + http.Error(w, "Unknown customer_id", http.StatusBadRequest) + return + } + + hostID := req.HostID + if hostID == "" { + sfx, err := configgen.RandomHex(3) // 6 hex chars — human-legible for the demo + if err != nil { + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + hostID = req.CustomerID + "-" + sfx + } + apiKey, err := configgen.RandomHex(32) + if err != nil { + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + if err := h.store.UpsertHost(&store.Host{HostID: hostID, CustomerID: req.CustomerID, APIKey: apiKey}); err != nil { + h.logger.Printf("[ERROR] Failed to mint host for %s: %v", req.CustomerID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + h.logger.Printf("[INFO] provisional host mint: %s (customer %s)", hostID, req.CustomerID) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]string{"host_id": hostID, "api_key": apiKey}) +} + // allowedEventTypes lists all valid event_type values the Hub accepts. var allowedEventTypes = map[string]bool{ // Controller-pushed events @@ -219,11 +435,15 @@ var allowedEventTypes = map[string]bool{ "disaster_recovery_started": true, "disaster_recovery_completed": true, // Hub-generated events - "node_stale": true, - "node_down": true, - "node_recovered": true, - "expected_backup_missed": true, - "expected_dbdump_missed": true, + "node_stale": true, + "node_down": true, + "node_recovered": true, + // Hub-generated host-domain events (v0.7.0, slice 3) + "host_stale": true, + "host_down": true, + "host_recovered": true, + "expected_backup_missed": true, + "expected_dbdump_missed": true, // Special "test": true, } @@ -686,10 +906,10 @@ func (h *Handler) handleRecovery(w http.ResponseWriter, r *http.Request, custome } resp := struct { - CustomerID string `json:"customer_id"` - ConfigYAML string `json:"config_yaml"` - InfraBackup json.RawMessage `json:"infra_backup"` - HasInfraBackup bool `json:"has_infra_backup"` + CustomerID string `json:"customer_id"` + ConfigYAML string `json:"config_yaml"` + InfraBackup json.RawMessage `json:"infra_backup"` + HasInfraBackup bool `json:"has_infra_backup"` BackupVersions []store.InfraBackupVersion `json:"backup_versions,omitempty"` }{ CustomerID: customerID, diff --git a/hub/internal/api/host_test.go b/hub/internal/api/host_test.go new file mode 100644 index 0000000..91f8a28 --- /dev/null +++ b/hub/internal/api/host_test.go @@ -0,0 +1,190 @@ +package api + +import ( + "database/sql" + "encoding/json" + "io" + "log" + "net/http" + "net/http/httptest" + "path/filepath" + "strings" + "testing" + + "gitea.dooplex.hu/admin/felhom-hub/internal/store" + _ "modernc.org/sqlite" +) + +const globalKey = "GLOBALKEY" + +func newTestHandler(t *testing.T) (*Handler, *store.Store, string) { + t.Helper() + path := filepath.Join(t.TempDir(), "test.db") + st, err := store.New(path, log.New(io.Discard, "", 0)) + if err != nil { + t.Fatalf("store.New: %v", err) + } + t.Cleanup(func() { st.Close() }) + h := New(st, globalKey, "", "", nil, log.New(io.Discard, "", 0)) + return h, st, path +} + +func do(h *Handler, method, path, bearer, body string) *httptest.ResponseRecorder { + req := httptest.NewRequest(method, "/api/v1"+path, strings.NewReader(body)) + if bearer != "" { + req.Header.Set("Authorization", "Bearer "+bearer) + } + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + return rr +} + +func TestCheckAuthHost(t *testing.T) { + h, st, _ := newTestHandler(t) + st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"}) + + mk := func(bearer string) *http.Request { + req := httptest.NewRequest(http.MethodPost, "/api/v1/host-report", nil) + if bearer != "" { + req.Header.Set("Authorization", "Bearer "+bearer) + } + return req + } + + if _, _, isGlobal, ok := h.checkAuthHost(mk(globalKey)); !ok || !isGlobal { + t.Error("global key should resolve isGlobal=true") + } + hostID, custID, isGlobal, ok := h.checkAuthHost(mk("HKEY")) + if !ok || isGlobal || hostID != "h1" || custID != "c1" { + t.Errorf("per-host key = %q/%q global=%v ok=%v", hostID, custID, isGlobal, ok) + } + if _, _, _, ok := h.checkAuthHost(mk("bogus")); ok { + t.Error("unknown key should fail") + } +} + +func validReportBody(hostID string) string { + return `{"host_id":"` + hostID + `","agent_version":"0.3.0",` + + `"host":{"cpu_percent":3.2,"memory_percent":25,"disk_percent":19,"loadavg":["0.1"],"uptime_seconds":100},` + + `"guests":[{"vmid":100,"name":"acme","status":"running","controller_version":""},` + + `{"vmid":101,"name":"beta","status":"stopped"}],` + + `"storage_targets":[],"backups":[],"cloudflared":{"status":"active"},"audit_tail":[]}` +} + +func TestHandleHostReport_ValidAndEnvelopeAndDenorm(t *testing.T) { + h, st, dbPath := newTestHandler(t) + st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"}) + st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"}) + + rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("h1")) + if rr.Code != 200 { + t.Fatalf("status = %d, body=%s", rr.Code, rr.Body.String()) + } + var env struct { + Status string `json:"status"` + PollIntervalSeconds int `json:"poll_interval_seconds"` + Blocked bool `json:"blocked"` + DesiredGeneration int `json:"desired_generation"` + HasSignedOps bool `json:"has_signed_ops"` + } + json.Unmarshal(rr.Body.Bytes(), &env) + if env.Status != "ok" || env.PollIntervalSeconds != 900 || env.Blocked || env.DesiredGeneration != 0 || env.HasSignedOps { + t.Errorf("envelope = %+v", env) + } + + // Denorm: guest_running counts only "running" (1 of 2). Read via a 2nd connection. + db, _ := sql.Open("sqlite", dbPath) + defer db.Close() + var total, running int + var cf string + db.QueryRow(`SELECT guest_total, guest_running, cloudflared_status FROM host_reports WHERE host_id='h1' ORDER BY id DESC LIMIT 1`). + Scan(&total, &running, &cf) + if total != 2 || running != 1 || cf != "active" { + t.Errorf("denorm total=%d running=%d cloudflared=%q (want 2,1,active)", total, running, cf) + } + // Guests upserted. + var gname, gstatus string + if err := db.QueryRow(`SELECT display_name, status FROM guests WHERE guest_id='h1/100'`).Scan(&gname, &gstatus); err != nil { + t.Fatalf("guest h1/100 not upserted: %v", err) + } + if gname != "acme" || gstatus != "running" { + t.Errorf("guest = %q/%q", gname, gstatus) + } +} + +func TestHandleHostReport_HostIDMismatch(t *testing.T) { + h, st, _ := newTestHandler(t) + st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"}) + rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("other-host")) + if rr.Code != http.StatusForbidden { + t.Errorf("status = %d, want 403", rr.Code) + } +} + +func TestHandleHostReport_UnknownHostUnderGlobalKey(t *testing.T) { + h, _, _ := newTestHandler(t) + rr := do(h, http.MethodPost, "/host-report", globalKey, validReportBody("ghost")) + if rr.Code != http.StatusBadRequest { + t.Errorf("status = %d, want 400 (unknown host_id)", rr.Code) + } +} + +func TestHandleHostReport_BlockedCustomer(t *testing.T) { + h, st, _ := newTestHandler(t) + st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"}) + st.SetCustomerConfigStatus("c1", "blocked") + st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"}) + rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("h1")) + if rr.Code != 200 { + t.Fatalf("status = %d", rr.Code) + } + var env struct { + Blocked bool `json:"blocked"` + } + json.Unmarshal(rr.Body.Bytes(), &env) + if !env.Blocked { + t.Error("blocked customer should yield blocked:true") + } +} + +func TestHandleHostReport_OversizeRejected(t *testing.T) { + h, st, _ := newTestHandler(t) + st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"}) + big := `{"host_id":"h1","guests":[{"vmid":1,"name":"` + strings.Repeat("a", 5<<20) + `"}]}` + rr := do(h, http.MethodPost, "/host-report", "HKEY", big) + if rr.Code != http.StatusBadRequest { + t.Errorf("oversize body status = %d, want 400", rr.Code) + } +} + +func TestAdminCreateHost(t *testing.T) { + h, st, _ := newTestHandler(t) + st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"}) + + // non-global key (per-customer) → 403 + if rr := do(h, http.MethodPost, "/admin/hosts", "ckey", `{"customer_id":"c1"}`); rr.Code != http.StatusForbidden { + t.Errorf("per-customer key status = %d, want 403", rr.Code) + } + // missing/unknown customer → 400 + if rr := do(h, http.MethodPost, "/admin/hosts", globalKey, `{"customer_id":"nope"}`); rr.Code != http.StatusBadRequest { + t.Errorf("unknown customer status = %d, want 400", rr.Code) + } + // success → 201 + usable key (round-trip) + rr := do(h, http.MethodPost, "/admin/hosts", globalKey, `{"customer_id":"c1"}`) + if rr.Code != http.StatusCreated { + t.Fatalf("mint status = %d, body=%s", rr.Code, rr.Body.String()) + } + var minted struct { + HostID string `json:"host_id"` + APIKey string `json:"api_key"` + } + json.Unmarshal(rr.Body.Bytes(), &minted) + if minted.HostID == "" || minted.APIKey == "" { + t.Fatalf("mint response = %+v", minted) + } + // the minted key authenticates a host-report + rr2 := do(h, http.MethodPost, "/host-report", minted.APIKey, validReportBody(minted.HostID)) + if rr2.Code != 200 { + t.Errorf("round-trip host-report with minted key = %d, body=%s", rr2.Code, rr2.Body.String()) + } +} diff --git a/hub/internal/monitor/host_staleness.go b/hub/internal/monitor/host_staleness.go new file mode 100644 index 0000000..a1184b8 --- /dev/null +++ b/hub/internal/monitor/host_staleness.go @@ -0,0 +1,176 @@ +package monitor + +import ( + "log" + "sync" + "time" + + "gitea.dooplex.hu/admin/felhom-hub/internal/store" +) + +// HostStalenessChecker is the host-domain dead-man's-switch (v0.7.0, slice 3). It +// is a deliberate SIBLING of StalenessChecker, not a rename: during slices 3–9 the +// controller report stream (reports) and the agent host-report stream +// (host_reports) are both live, so both checkers run. It keys on host↔host_reports +// and emits host_stale / host_down / host_recovered. Merging is a slice-10 job. +// +// Events are attributed to the host's CUSTOMER (SaveEvent + onEvent take the +// customer_id) so the existing per-customer notification/event UX picks them up +// unchanged. +type HostStalenessChecker struct { + store *store.Store + threshold time.Duration // "stale" after this (default 30m — same as the controller checker) + downAfter time.Duration // "down" after this (2x threshold) + logger *log.Logger + onEvent EventNotifyFunc + + mu sync.Mutex + states map[string]string // hostID → "ok" | "stale" | "down" + customerOf map[string]string // hostID → customerID (for event attribution) + downtimeStart map[string]time.Time // hostID → when it first became unreachable +} + +// NewHostStalenessChecker creates the checker and seeds state from current +// host-report recency. No events are generated during initialization. +func NewHostStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *HostStalenessChecker { + sc := &HostStalenessChecker{ + store: s, + threshold: threshold, + downAfter: 2 * threshold, + logger: logger, + onEvent: onEvent, + states: make(map[string]string), + customerOf: make(map[string]string), + downtimeStart: make(map[string]time.Time), + } + + rows, err := s.GetHostStaleness() + if err != nil { + logger.Printf("[WARN] Host staleness checker: failed to seed states: %v", err) + return sc + } + var okCount, staleCount, downCount int + for _, row := range rows { + if s.IsCustomerBlocked(row.CustomerID) { + continue + } + sc.customerOf[row.HostID] = row.CustomerID + age := time.Since(row.LastReportAt) + switch { + case age > sc.downAfter: + sc.states[row.HostID] = "down" + downCount++ + case age > sc.threshold: + sc.states[row.HostID] = "stale" + staleCount++ + default: + sc.states[row.HostID] = "ok" + okCount++ + } + } + logger.Printf("[INFO] Host staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount) + return sc +} + +// Check evaluates all hosts and emits events on state transitions. Call every 60s. +func (sc *HostStalenessChecker) Check() { + rows, err := sc.store.GetHostStaleness() + if err != nil { + sc.logger.Printf("[WARN] Host staleness check failed: %v", err) + return + } + + sc.mu.Lock() + defer sc.mu.Unlock() + + seen := make(map[string]bool, len(rows)) + for _, row := range rows { + seen[row.HostID] = true + if sc.store.IsCustomerBlocked(row.CustomerID) { + delete(sc.states, row.HostID) + continue + } + sc.customerOf[row.HostID] = row.CustomerID + + age := time.Since(row.LastReportAt) + var newState string + switch { + case age > sc.downAfter: + newState = "down" + case age > sc.threshold: + newState = "stale" + default: + newState = "ok" + } + + oldState := sc.states[row.HostID] + if oldState == "" { + sc.states[row.HostID] = newState // first observation — no event + continue + } + if oldState == newState { + continue + } + + sc.states[row.HostID] = newState + if newState == "stale" && oldState == "ok" { + sc.downtimeStart[row.HostID] = time.Now() + } + downtimeDur := age + if newState == "ok" { + if t, ok := sc.downtimeStart[row.HostID]; ok { + downtimeDur = time.Since(t) + } + delete(sc.downtimeStart, row.HostID) + } + sc.emitTransition(row.HostID, row.CustomerID, oldState, newState, downtimeDur) + } + + for id := range sc.states { + if !seen[id] { + delete(sc.states, id) + delete(sc.downtimeStart, id) + } + } +} + +// GetState returns the current staleness state for a host. +func (sc *HostStalenessChecker) GetState(hostID string) string { + sc.mu.Lock() + defer sc.mu.Unlock() + s := sc.states[hostID] + if s == "" { + return "unknown" + } + return s +} + +func (sc *HostStalenessChecker) emitTransition(hostID, customerID, oldState, newState string, age time.Duration) { + var eventType, severity, message string + switch { + case newState == "stale": + eventType = "host_stale" + severity = "warning" + message = "Host " + hostID + ": no report for " + formatDuration(age) + case newState == "down": + eventType = "host_down" + severity = "error" + message = "Host " + hostID + ": no report for " + formatDuration(age) + case newState == "ok" && (oldState == "stale" || oldState == "down"): + eventType = "host_recovered" + severity = "info" + message = "Host " + hostID + ": reports resumed (was " + oldState + " for " + formatDuration(age) + ")" + default: + return + } + + sc.logger.Printf("[INFO] Host staleness: %s %s → %s (%s)", hostID, oldState, newState, eventType) + + if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil { + sc.logger.Printf("[WARN] Failed to save host staleness event for %s: %v", hostID, err) + return + } + if sc.onEvent != nil { + sc.onEvent(customerID, eventType, severity, message, "{}", "hub") + } +} diff --git a/hub/internal/monitor/host_staleness_test.go b/hub/internal/monitor/host_staleness_test.go new file mode 100644 index 0000000..4a0952d --- /dev/null +++ b/hub/internal/monitor/host_staleness_test.go @@ -0,0 +1,88 @@ +package monitor + +import ( + "database/sql" + "fmt" + "io" + "log" + "path/filepath" + "testing" + "time" + + "gitea.dooplex.hu/admin/felhom-hub/internal/store" + _ "modernc.org/sqlite" +) + +// backdate sets a host's last_report_at to N minutes ago, simulating the passage +// of time without sleeping. Uses a second connection (the checker reads via store). +func backdate(t *testing.T, db *sql.DB, hostID string, minutesAgo int) { + t.Helper() + if _, err := db.Exec(`UPDATE hosts SET last_report_at = datetime('now', ?) WHERE host_id = ?`, + fmt.Sprintf("-%d minutes", minutesAgo), hostID); err != nil { + t.Fatal(err) + } +} + +func TestHostStalenessChecker(t *testing.T) { + path := filepath.Join(t.TempDir(), "test.db") + st, err := store.New(path, log.New(io.Discard, "", 0)) + if err != nil { + t.Fatal(err) + } + defer st.Close() + db, _ := sql.Open("sqlite", path) + defer db.Close() + + st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ck", RetrievalPassword: "p"}) + st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"}) + st.SaveHostReport("h1", "c1", []byte(`{}`), store.HostReportDenorm{}) // sets last_report_at + + var events []string + onEvent := func(customerID, eventType, severity, message, detailsJSON, source string) { + events = append(events, eventType) + } + + // Seed already-stale (40m) → state stale, but NO event on init. + backdate(t, db, "h1", 40) + sc := NewHostStalenessChecker(st, 30*time.Minute, onEvent, log.New(io.Discard, "", 0)) + if len(events) != 0 { + t.Fatalf("seed must not emit events, got %v", events) + } + if sc.GetState("h1") != "stale" { + t.Fatalf("seeded state = %q, want stale", sc.GetState("h1")) + } + + // Same age → no transition. + sc.Check() + if len(events) != 0 { + t.Fatalf("no transition expected, got %v", events) + } + + // Fresh report → host_recovered. + backdate(t, db, "h1", 2) + sc.Check() + if last(events) != "host_recovered" { + t.Fatalf("events = %v, want last host_recovered", events) + } + + // Aged to stale → host_stale. + backdate(t, db, "h1", 40) + sc.Check() + if last(events) != "host_stale" { + t.Fatalf("events = %v, want last host_stale", events) + } + + // Aged past 2× → host_down. + backdate(t, db, "h1", 130) + sc.Check() + if last(events) != "host_down" { + t.Fatalf("events = %v, want last host_down", events) + } +} + +func last(s []string) string { + if len(s) == 0 { + return "" + } + return s[len(s)-1] +} diff --git a/hub/internal/notify/templates.go b/hub/internal/notify/templates.go index 377b5e7..2d56f28 100644 --- a/hub/internal/notify/templates.go +++ b/hub/internal/notify/templates.go @@ -52,14 +52,14 @@ Message: %s`, customerID, eventType, severity, now, message) // customerMessages maps event_type → Hungarian customer message. var customerMessages = map[string]string{ // Backup events - "backup_completed": "A biztonsági mentés sikeresen elkészült.", - "backup_failed": "A biztonsági mentés sikertelen! Kérjük, ellenőrizd a rendszert.", - "db_dump_completed": "Az adatbázis mentés sikeresen elkészült.", - "db_dump_failed": "Az adatbázis mentés sikertelen!", - "backup_integrity_ok": "A mentés integritás ellenőrzés sikeres.", - "backup_integrity_failed": "A mentés integritás ellenőrzés hibát talált!", - "crossdrive_completed": "A másodlagos mentés sikeresen elkészült.", - "crossdrive_failed": "A másodlagos mentés sikertelen!", + "backup_completed": "A biztonsági mentés sikeresen elkészült.", + "backup_failed": "A biztonsági mentés sikertelen! Kérjük, ellenőrizd a rendszert.", + "db_dump_completed": "Az adatbázis mentés sikeresen elkészült.", + "db_dump_failed": "Az adatbázis mentés sikertelen!", + "backup_integrity_ok": "A mentés integritás ellenőrzés sikeres.", + "backup_integrity_failed": "A mentés integritás ellenőrzés hibát talált!", + "crossdrive_completed": "A másodlagos mentés sikeresen elkészült.", + "crossdrive_failed": "A másodlagos mentés sikertelen!", // Disk events "disk_warning": "A lemezterület 90% felett van — kérjük, szabadíts fel helyet.", diff --git a/hub/internal/store/host_test.go b/hub/internal/store/host_test.go new file mode 100644 index 0000000..1eac1eb --- /dev/null +++ b/hub/internal/store/host_test.go @@ -0,0 +1,122 @@ +package store + +import ( + "io" + "log" + "path/filepath" + "testing" +) + +func newTestStore(t *testing.T) *Store { + t.Helper() + s, err := New(filepath.Join(t.TempDir(), "test.db"), log.New(io.Discard, "", 0)) + if err != nil { + t.Fatalf("store.New: %v", err) + } + t.Cleanup(func() { s.Close() }) + return s +} + +func TestGuestID(t *testing.T) { + if got := GuestID("demo-host-01", 100); got != "demo-host-01/100" { + t.Errorf("GuestID = %q", got) + } +} + +func TestUpsertHost_AndLookup(t *testing.T) { + s := newTestStore(t) + if err := s.UpsertHost(&Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"}); err != nil { + t.Fatalf("UpsertHost: %v", err) + } + h, err := s.GetHost("h1") + if err != nil || h == nil { + t.Fatalf("GetHost: %v / %v", h, err) + } + if h.CustomerID != "c1" || h.APIKey != "k1" || h.DesiredJSON != "{}" || h.LastReportAt != nil { + t.Errorf("host = %+v", h) + } + byKey, err := s.GetHostByAPIKey("k1") + if err != nil || byKey == nil || byKey.HostID != "h1" { + t.Errorf("GetHostByAPIKey hit = %+v / %v", byKey, err) + } + miss, err := s.GetHostByAPIKey("nope") + if err != nil || miss != nil { + t.Errorf("GetHostByAPIKey miss = %+v / %v (want nil,nil)", miss, err) + } +} + +func TestSaveHostReport_BumpsRealityPreservesIntent(t *testing.T) { + s := newTestStore(t) + if err := s.UpsertHost(&Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"}); err != nil { + t.Fatal(err) + } + // Operator-owned intent columns (inert this slice) set out-of-band. + if _, err := s.db.Exec(`UPDATE hosts SET desired_json='{"want":1}', desired_generation=7 WHERE host_id='h1'`); err != nil { + t.Fatal(err) + } + + denorm := HostReportDenorm{AgentVersion: "0.3.0", CPUPercent: 3.2, MemoryPercent: 25, DiskPercent: 19, GuestTotal: 2, GuestRunning: 1, CloudflaredStatus: "active"} + if err := s.SaveHostReport("h1", "c1", []byte(`{"host_id":"h1"}`), denorm); err != nil { + t.Fatalf("SaveHostReport: %v", err) + } + + h, _ := s.GetHost("h1") + if h.AgentVersion != "0.3.0" || h.LastReportAt == nil { + t.Errorf("reality not bumped: %+v", h) + } + if h.DesiredJSON != `{"want":1}` || h.DesiredGeneration != 7 { + t.Errorf("a report must NOT clobber intent columns: desired_json=%q gen=%d", h.DesiredJSON, h.DesiredGeneration) + } + var n int + s.db.QueryRow(`SELECT COUNT(*) FROM host_reports WHERE host_id='h1'`).Scan(&n) + if n != 1 { + t.Errorf("host_reports rows = %d, want 1", n) + } +} + +func TestUpsertGuestFromReport_PreservesInertColumns(t *testing.T) { + s := newTestStore(t) + gid := GuestID("h1", 100) + if err := s.UpsertGuestFromReport(&Guest{GuestID: gid, CustomerID: "c1", HostID: "h1", VMID: 100, DisplayName: "acme", Status: "running"}); err != nil { + t.Fatal(err) + } + // Slice-10 columns set out-of-band; a report upsert must not touch them. + if _, err := s.db.Exec(`UPDATE guests SET api_key='controllerkey', desired_spec_json='{"cores":4}' WHERE guest_id=?`, gid); err != nil { + t.Fatal(err) + } + + // A later report changes reality (status/name). + if err := s.UpsertGuestFromReport(&Guest{GuestID: gid, CustomerID: "c1", HostID: "h1", VMID: 100, DisplayName: "acme-renamed", Status: "stopped"}); err != nil { + t.Fatal(err) + } + + var apiKey, desiredSpec, status, name string + err := s.db.QueryRow(`SELECT api_key, desired_spec_json, status, display_name FROM guests WHERE guest_id=?`, gid). + Scan(&apiKey, &desiredSpec, &status, &name) + if err != nil { + t.Fatal(err) + } + if apiKey != "controllerkey" || desiredSpec != `{"cores":4}` { + t.Errorf("inert columns clobbered: api_key=%q desired_spec_json=%q", apiKey, desiredSpec) + } + if status != "stopped" || name != "acme-renamed" { + t.Errorf("reality not updated: status=%q name=%q", status, name) + } +} + +func TestGetHostStaleness_SkipsNeverReported(t *testing.T) { + s := newTestStore(t) + s.UpsertHost(&Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"}) + rows, err := s.GetHostStaleness() + if err != nil { + t.Fatal(err) + } + if len(rows) != 0 { + t.Errorf("never-reported host should be skipped, got %d rows", len(rows)) + } + s.SaveHostReport("h1", "c1", []byte(`{}`), HostReportDenorm{}) + rows, _ = s.GetHostStaleness() + if len(rows) != 1 || rows[0].HostID != "h1" { + t.Errorf("after a report expected 1 row, got %+v", rows) + } +} diff --git a/hub/internal/store/store.go b/hub/internal/store/store.go index ca5bda7..1d8e85c 100644 --- a/hub/internal/store/store.go +++ b/hub/internal/store/store.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log" + "strconv" "time" _ "modernc.org/sqlite" @@ -18,18 +19,18 @@ type Store struct { // CustomerSummary holds the latest status for a customer (for dashboard). type CustomerSummary struct { - CustomerID string - CustomerName string - ControllerVersion string - ReceivedAt time.Time - HealthStatus string - CPUPercent float64 - MemoryPercent float64 - ContainerTotal int - ContainerRunning int + CustomerID string + CustomerName string + ControllerVersion string + ReceivedAt time.Time + HealthStatus string + CPUPercent float64 + MemoryPercent float64 + ContainerTotal int + ContainerRunning int BackupLastSnapshot *time.Time - ReportJSON string - ControllerURL string + ReportJSON string + ControllerURL string // Computed fields (not stored) TimeSinceReport time.Duration @@ -216,6 +217,63 @@ func (s *Store) migrate() error { WHERE NOT EXISTS (SELECT 1 FROM infra_backup_versions WHERE infra_backup_versions.customer_id = infra_backups.customer_id)`) + // v0.7.0: host-domain (slice 3). Purely additive — the controller path + // (reports/customer_configs) is untouched; the schema cutover is slice 10. + // Columns marked INERT exist now so slice 10 needs no ALTER; nothing reads or + // writes them this slice. + _, err = s.db.Exec(` + CREATE TABLE IF NOT EXISTS hosts ( + host_id TEXT PRIMARY KEY, + customer_id TEXT NOT NULL, + api_key TEXT NOT NULL, + agent_version TEXT NOT NULL DEFAULT '', + last_report_at DATETIME, + desired_json TEXT NOT NULL DEFAULT '{}', + desired_generation INTEGER NOT NULL DEFAULT 0, + dr_record_json TEXT NOT NULL DEFAULT '{}', + created_at DATETIME NOT NULL DEFAULT (datetime('now')), + updated_at DATETIME NOT NULL DEFAULT (datetime('now')) + ); + CREATE INDEX IF NOT EXISTS idx_hosts_customer ON hosts(customer_id); + + CREATE TABLE IF NOT EXISTS guests ( + guest_id TEXT PRIMARY KEY, + customer_id TEXT NOT NULL, + host_id TEXT NOT NULL, + vmid INTEGER NOT NULL, + display_name TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'unknown', + controller_version TEXT NOT NULL DEFAULT '', + last_seen_at DATETIME, + api_key TEXT NOT NULL DEFAULT '', + desired_spec_json TEXT NOT NULL DEFAULT '{}', + created_at DATETIME NOT NULL DEFAULT (datetime('now')), + updated_at DATETIME NOT NULL DEFAULT (datetime('now')) + ); + CREATE INDEX IF NOT EXISTS idx_guests_host ON guests(host_id); + CREATE INDEX IF NOT EXISTS idx_guests_customer ON guests(customer_id); + + CREATE TABLE IF NOT EXISTS host_reports ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + host_id TEXT NOT NULL, + customer_id TEXT NOT NULL, + received_at DATETIME NOT NULL DEFAULT (datetime('now')), + report_json TEXT NOT NULL, + agent_version TEXT, + cpu_percent REAL, + memory_percent REAL, + disk_percent REAL, + guest_total INTEGER, + guest_running INTEGER, + cloudflared_status TEXT + ); + CREATE INDEX IF NOT EXISTS idx_host_reports_host ON host_reports(host_id, received_at DESC); + CREATE INDEX IF NOT EXISTS idx_host_reports_customer ON host_reports(customer_id, received_at DESC); + `) + if err != nil { + return err + } + return nil } @@ -812,7 +870,13 @@ func (s *Store) Prune(maxDays int) (int64, error) { if err != nil { return 0, err } - return res.RowsAffected() + n, _ := res.RowsAffected() + // v0.7.0: prune the parallel host-domain report stream, same retention. + if hres, herr := s.db.Exec("DELETE FROM host_reports WHERE received_at < ?", cutoff); herr == nil { + hn, _ := hres.RowsAffected() + n += hn + } + return n, nil } // Close closes the database connection. @@ -1138,11 +1202,11 @@ func scanEvents(rows *sql.Rows) ([]Event, error) { // parseSQLiteTime tries multiple formats that modernc.org/sqlite may return. func parseSQLiteTime(s string) time.Time { formats := []string{ - "2006-01-02 15:04:05", // SQLite datetime('now') - "2006-01-02T15:04:05Z", // RFC3339 without fractional + "2006-01-02 15:04:05", // SQLite datetime('now') + "2006-01-02T15:04:05Z", // RFC3339 without fractional time.RFC3339, // 2006-01-02T15:04:05Z07:00 - time.RFC3339Nano, // with fractional seconds - "2006-01-02 15:04:05+00:00", // with explicit UTC offset + time.RFC3339Nano, // with fractional seconds + "2006-01-02 15:04:05+00:00", // with explicit UTC offset "2006-01-02 15:04:05.999999999", // with fractional, no TZ } for _, f := range formats { @@ -1180,3 +1244,200 @@ func parseDiskSummary(reportJSON string) string { } return result } + +// ---- v0.7.0: host-domain (slice 3) ---- +// Additive store surface for the agent's host-report stream. The controller-path +// methods above are untouched. + +// Host is one customer agent. Mixes operator-intent columns (Desired*, DRRecord — +// INERT until slice 10) with box-reported reality (AgentVersion, LastReportAt). +type Host struct { + HostID string + CustomerID string + APIKey string + AgentVersion string + LastReportAt *time.Time + DesiredJSON string + DesiredGeneration int64 + DRRecordJSON string + CreatedAt time.Time + UpdatedAt time.Time +} + +// Guest is one controller LXC. Reality columns are report-driven; APIKey and +// DesiredSpecJSON are INERT until slice 10 and must survive report upserts. +type Guest struct { + GuestID string + CustomerID string + HostID string + VMID int + DisplayName string + Status string + ControllerVersion string + LastSeenAt *time.Time + APIKey string + DesiredSpecJSON string + CreatedAt time.Time + UpdatedAt time.Time +} + +// HostReportDenorm are the denormalized fields pulled from a host-report for the +// dashboard / staleness, mirroring the reports table's denorm pattern. +type HostReportDenorm struct { + AgentVersion string + CPUPercent float64 + MemoryPercent float64 + DiskPercent float64 + GuestTotal int + GuestRunning int + CloudflaredStatus string +} + +// HostStaleRow is the minimal per-host recency row the dead-man's-switch reads. +type HostStaleRow struct { + HostID string + CustomerID string + LastReportAt time.Time +} + +// GuestID derives the interim guest primary key from host + vmid. The hub owns the +// id scheme (locked decision 3) so the slice-10 swap to durable ids is hub-only. +func GuestID(hostID string, vmid int) string { + return hostID + "/" + strconv.Itoa(vmid) +} + +func scanHost(scan func(dest ...any) error) (*Host, error) { + var h Host + var lastReport sql.NullString + var createdAt, updatedAt string + err := scan(&h.HostID, &h.CustomerID, &h.APIKey, &h.AgentVersion, &lastReport, + &h.DesiredJSON, &h.DesiredGeneration, &h.DRRecordJSON, &createdAt, &updatedAt) + if err != nil { + return nil, err + } + if lastReport.Valid { + t := parseSQLiteTime(lastReport.String) + h.LastReportAt = &t + } + h.CreatedAt = parseSQLiteTime(createdAt) + h.UpdatedAt = parseSQLiteTime(updatedAt) + return &h, nil +} + +const hostSelectCols = `host_id, customer_id, api_key, agent_version, last_report_at, + desired_json, desired_generation, dr_record_json, created_at, updated_at` + +// GetHostByAPIKey looks up a host by its per-host hub key. Returns nil (no error) +// if no match — parallels GetCustomerConfigByAPIKey. +func (s *Store) GetHostByAPIKey(apiKey string) (*Host, error) { + h, err := scanHost(s.db.QueryRow(`SELECT `+hostSelectCols+` FROM hosts WHERE api_key = ?`, apiKey).Scan) + if err == sql.ErrNoRows { + return nil, nil + } + return h, err +} + +// GetHost looks up a host by id. Returns nil (no error) if not found. +func (s *Store) GetHost(hostID string) (*Host, error) { + h, err := scanHost(s.db.QueryRow(`SELECT `+hostSelectCols+` FROM hosts WHERE host_id = ?`, hostID).Scan) + if err == sql.ErrNoRows { + return nil, nil + } + return h, err +} + +// ListHosts returns all hosts (debug / host-domain views). +func (s *Store) ListHosts() ([]Host, error) { + rows, err := s.db.Query(`SELECT ` + hostSelectCols + ` FROM hosts ORDER BY host_id`) + if err != nil { + return nil, err + } + defer rows.Close() + var hosts []Host + for rows.Next() { + h, err := scanHost(rows.Scan) + if err != nil { + return nil, err + } + hosts = append(hosts, *h) + } + return hosts, rows.Err() +} + +// UpsertHost creates or updates a host identity (used by the admin mint). On +// conflict it updates only operator-settable identity fields + updated_at; it does +// NOT touch the reality columns (agent_version/last_report_at) or the inert intent +// columns (desired_*/dr_record_json) — those are owned elsewhere. +func (s *Store) UpsertHost(h *Host) error { + _, err := s.db.Exec(` + INSERT INTO hosts (host_id, customer_id, api_key, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(host_id) DO UPDATE SET + customer_id = excluded.customer_id, + api_key = excluded.api_key, + updated_at = datetime('now')`, + h.HostID, h.CustomerID, h.APIKey, + ) + return err +} + +// SaveHostReport inserts a host_reports row and bumps the host's reality columns +// (agent_version/last_report_at/updated_at) — never the inert intent columns. +func (s *Store) SaveHostReport(hostID, customerID string, reportJSON []byte, d HostReportDenorm) error { + _, err := s.db.Exec(` + INSERT INTO host_reports (host_id, customer_id, report_json, agent_version, + cpu_percent, memory_percent, disk_percent, guest_total, guest_running, cloudflared_status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + hostID, customerID, string(reportJSON), d.AgentVersion, + d.CPUPercent, d.MemoryPercent, d.DiskPercent, d.GuestTotal, d.GuestRunning, d.CloudflaredStatus, + ) + if err != nil { + return err + } + _, err = s.db.Exec(` + UPDATE hosts SET agent_version = ?, last_report_at = datetime('now'), updated_at = datetime('now') + WHERE host_id = ?`, d.AgentVersion, hostID) + return err +} + +// UpsertGuestFromReport upserts the REALITY columns of a guest. On conflict it +// must NOT clobber the inert columns (api_key / desired_spec_json). +func (s *Store) UpsertGuestFromReport(g *Guest) error { + _, err := s.db.Exec(` + INSERT INTO guests (guest_id, customer_id, host_id, vmid, display_name, status, + controller_version, last_seen_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + ON CONFLICT(guest_id) DO UPDATE SET + vmid = excluded.vmid, + display_name = excluded.display_name, + status = excluded.status, + controller_version = excluded.controller_version, + last_seen_at = datetime('now'), + updated_at = datetime('now')`, + g.GuestID, g.CustomerID, g.HostID, g.VMID, g.DisplayName, g.Status, + g.ControllerVersion, + ) + return err +} + +// GetHostStaleness returns per-host recency for the dead-man's-switch. Hosts that +// have never reported (NULL last_report_at) are skipped — a freshly-minted host is +// not "down" until it has checked in at least once. +func (s *Store) GetHostStaleness() ([]HostStaleRow, error) { + rows, err := s.db.Query(`SELECT host_id, customer_id, last_report_at FROM hosts WHERE last_report_at IS NOT NULL`) + if err != nil { + return nil, err + } + defer rows.Close() + var out []HostStaleRow + for rows.Next() { + var r HostStaleRow + var last string + if err := rows.Scan(&r.HostID, &r.CustomerID, &last); err != nil { + return nil, err + } + r.LastReportAt = parseSQLiteTime(last) + out = append(out, r) + } + return out, rows.Err() +} diff --git a/hub/internal/store/telemetry.go b/hub/internal/store/telemetry.go index 9138554..a618ad9 100644 --- a/hub/internal/store/telemetry.go +++ b/hub/internal/store/telemetry.go @@ -10,7 +10,7 @@ import ( var ( reANSI = regexp.MustCompile(`\x1b\[[0-9;]*m`) - reTimestamp = regexp.MustCompile(`\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}[.\d]*([+-]\d{2}:?\d{2})?[Z ]?:? ?`) + reTimestamp = regexp.MustCompile(`\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}[.\d]*([+-]\d{2}:?\d{2})?[Z ]?:? ?`) reSyslog = regexp.MustCompile(`[A-Z][a-z]{2}\s+\d{1,2} \d{2}:\d{2}:\d{2} `) ) diff --git a/hub/internal/web/configs.go b/hub/internal/web/configs.go index b94e386..49d0530 100644 --- a/hub/internal/web/configs.go +++ b/hub/internal/web/configs.go @@ -792,10 +792,10 @@ func flattenYAML(m map[string]interface{}, prefix string) map[string]string { // configDiff represents a single key-value difference between two configs. type configDiff struct { - Key string `json:"key"` - HubValue string `json:"hub"` - CtrlValue string `json:"controller"` - Status string `json:"status"` // "changed", "hub_only", "controller_only" + Key string `json:"key"` + HubValue string `json:"hub"` + CtrlValue string `json:"controller"` + Status string `json:"status"` // "changed", "hub_only", "controller_only" } // compareYAMLValues parses two YAML strings and returns their value differences.