feat(hub): host-domain ingest — tables + /host-report + per-host auth + host dead-man's-switch (v0.7.0, slice 3)

Purely additive; the controller path (reports/customer_configs/checkAuthCustomer/
existing checkers) is untouched. Cutover remains slice 10.

- store: new hosts/guests/host_reports tables (full schema incl. columns INERT
  until slice 10, so no later ALTER); GetHostByAPIKey/GetHost/ListHosts/UpsertHost/
  SaveHostReport/UpsertGuestFromReport (preserves inert cols)/GetHostStaleness/
  GuestID; Prune also prunes host_reports.
- api: checkAuthHost (sibling of checkAuthCustomer); POST /host-report (per-host
  Bearer, 4MiB, denorm + guest upsert, control envelope); POST /admin/hosts
  (PROVISIONAL global-key host mint); host_* event types registered.
- monitor: HostStalenessChecker sibling over host_reports (host_stale/down/
  recovered), wired on the existing 60s ticker; controller checkers unchanged.
- tests (hermetic): store intent/inert-column preservation, auth, ingest
  (envelope+denorm, mismatch/unknown/blocked/oversize), admin mint round-trip,
  host staleness transitions.

CHANGELOG v0.7.0. Contract matches the agent host-report spec field-for-field.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-08 16:36:16 +02:00
parent 0d832def7b
commit 7c0c75457f
12 changed files with 1204 additions and 38 deletions
+229 -9
View File
@@ -89,6 +89,30 @@ func (h *Handler) checkAuthCustomer(r *http.Request) (customerID string, isGloba
return cfg.CustomerID, false, true
}
// checkAuthHost resolves a Bearer token to a HOST identity (the agent's auth
// path). It is a sibling of checkAuthCustomer — the controller path is unchanged.
// - global key -> ("", "", true, true) caller trusts body.host_id
// - per-host key -> (hostID, customerID, false, true)
// - failure -> ("", "", false, false)
func (h *Handler) checkAuthHost(r *http.Request) (hostID, customerID string, isGlobal, ok bool) {
auth := r.Header.Get("Authorization")
if !strings.HasPrefix(auth, "Bearer ") {
return "", "", false, false
}
token := strings.TrimPrefix(auth, "Bearer ")
// Global key first (same constant-time compare as checkAuthCustomer).
if h.apiKey != "" && subtle.ConstantTimeCompare([]byte(token), []byte(h.apiKey)) == 1 {
return "", "", true, true
}
host, err := h.store.GetHostByAPIKey(token)
if err != nil || host == nil {
return "", "", false, false
}
return host.HostID, host.CustomerID, false, true
}
// ServeHTTP routes API requests.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/api/v1")
@@ -96,6 +120,10 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodPost && path == "/report":
h.handleReport(w, r)
case r.Method == http.MethodPost && path == "/host-report":
h.handleHostReport(w, r)
case r.Method == http.MethodPost && path == "/admin/hosts":
h.handleAdminCreateHost(w, r)
case r.Method == http.MethodPost && path == "/event":
h.handleEvent(w, r)
case r.Method == http.MethodPost && path == "/notify":
@@ -194,6 +222,194 @@ func (h *Handler) handleReport(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(resp)
}
// defaultHostPollSeconds is the cadence the hub hands every agent this slice (no
// per-host override UI yet — that is a later slice).
const defaultHostPollSeconds = 900
// hostReportPayload is the subset of the agent host-report (slice-3 contract,
// §3 / agent spec §4) the hub needs for denorm + guest reality. Unknown fields
// (storage_targets/backups/restore_tests/pbs_snapshots/audit_tail) are ignored,
// so an empty or absent collection is accepted without error.
type hostReportPayload struct {
HostID string `json:"host_id"`
AgentVersion string `json:"agent_version"`
Host struct {
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
DiskPercent float64 `json:"disk_percent"`
} `json:"host"`
Guests []struct {
VMID int `json:"vmid"`
Name string `json:"name"`
Status string `json:"status"`
ControllerVersion string `json:"controller_version"`
} `json:"guests"`
Cloudflared struct {
Status string `json:"status"`
} `json:"cloudflared"`
}
// handleHostReport ingests the agent's host-report (the heartbeat) and returns the
// control envelope (agent spec §5).
func (h *Handler) handleHostReport(w http.ResponseWriter, r *http.Request) {
hostID, custID, isGlobal, ok := h.checkAuthHost(r)
if !ok {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// 4 MiB: host reports carry the full guest list + future storage/backup arrays;
// the controller path's 1 MiB is too tight here.
body, err := io.ReadAll(io.LimitReader(r.Body, 4<<20))
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
var rep hostReportPayload
if err := json.Unmarshal(body, &rep); err != nil || rep.HostID == "" {
http.Error(w, "Invalid payload: host_id required", http.StatusBadRequest)
return
}
if isGlobal {
// Global-key bootstrap: trust body.host_id but require the host to exist
// (it must be minted first) and resolve its customer from the row.
host, err := h.store.GetHost(rep.HostID)
if err != nil {
h.logger.Printf("[ERROR] host lookup failed for %s: %v", rep.HostID, err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
if host == nil {
http.Error(w, "Unknown host_id (mint via /admin/hosts first)", http.StatusBadRequest)
return
}
hostID, custID = rep.HostID, host.CustomerID
} else if rep.HostID != hostID {
http.Error(w, "Forbidden: host_id mismatch", http.StatusForbidden)
return
}
running := 0
for _, g := range rep.Guests {
if g.Status == "running" {
running++
}
}
denorm := store.HostReportDenorm{
AgentVersion: rep.AgentVersion,
CPUPercent: rep.Host.CPUPercent,
MemoryPercent: rep.Host.MemoryPercent,
DiskPercent: rep.Host.DiskPercent,
GuestTotal: len(rep.Guests),
GuestRunning: running,
CloudflaredStatus: rep.Cloudflared.Status,
}
if err := h.store.SaveHostReport(hostID, custID, body, denorm); err != nil {
h.logger.Printf("[ERROR] Failed to save host-report from %s: %v", hostID, err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
for _, g := range rep.Guests {
status := g.Status
if status == "" {
status = "unknown"
}
guest := &store.Guest{
GuestID: store.GuestID(hostID, g.VMID),
CustomerID: custID,
HostID: hostID,
VMID: g.VMID,
DisplayName: g.Name,
Status: status,
ControllerVersion: g.ControllerVersion,
}
if err := h.store.UpsertGuestFromReport(guest); err != nil {
// A guest upsert failure must not drop the whole report (liveness).
h.logger.Printf("[WARN] Failed to upsert guest %s: %v", guest.GuestID, err)
}
}
h.logger.Printf("[INFO] host-report from %s (%d guests, %d bytes)", hostID, len(rep.Guests), len(body))
blocked := false
if cc, err := h.store.GetCustomerConfig(custID); err == nil && cc != nil && cc.Status == "blocked" {
blocked = true
}
resp := map[string]interface{}{
"status": "ok",
"poll_interval_seconds": defaultHostPollSeconds,
"blocked": blocked,
"desired_generation": 0, // reserved (slice 4)
"has_signed_ops": false, // reserved (slice 4)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(resp)
}
// handleAdminCreateHost mints a host identity (host_id + per-host api_key).
//
// PROVISIONAL (slice-3 bootstrap): global-key only, so the demo agent can
// authenticate before enrollment (slices 78) exists. Enrollment will mint host
// identity + pin signing keys; this endpoint should be removed/locked down then
// (tracked under doc 05 §11 auth-tightening at cutover).
func (h *Handler) handleAdminCreateHost(w http.ResponseWriter, r *http.Request) {
_, _, isGlobal, ok := h.checkAuthHost(r)
if !ok || !isGlobal {
http.Error(w, "Forbidden: global key required", http.StatusForbidden)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
var req struct {
CustomerID string `json:"customer_id"`
HostID string `json:"host_id"`
DisplayName string `json:"display_name"`
}
if err := json.Unmarshal(body, &req); err != nil || req.CustomerID == "" {
http.Error(w, "Invalid payload: customer_id required", http.StatusBadRequest)
return
}
cc, err := h.store.GetCustomerConfig(req.CustomerID)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
if cc == nil {
http.Error(w, "Unknown customer_id", http.StatusBadRequest)
return
}
hostID := req.HostID
if hostID == "" {
sfx, err := configgen.RandomHex(3) // 6 hex chars — human-legible for the demo
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
hostID = req.CustomerID + "-" + sfx
}
apiKey, err := configgen.RandomHex(32)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
if err := h.store.UpsertHost(&store.Host{HostID: hostID, CustomerID: req.CustomerID, APIKey: apiKey}); err != nil {
h.logger.Printf("[ERROR] Failed to mint host for %s: %v", req.CustomerID, err)
http.Error(w, "Internal error", http.StatusInternalServerError)
return
}
h.logger.Printf("[INFO] provisional host mint: %s (customer %s)", hostID, req.CustomerID)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]string{"host_id": hostID, "api_key": apiKey})
}
// allowedEventTypes lists all valid event_type values the Hub accepts.
var allowedEventTypes = map[string]bool{
// Controller-pushed events
@@ -219,11 +435,15 @@ var allowedEventTypes = map[string]bool{
"disaster_recovery_started": true,
"disaster_recovery_completed": true,
// Hub-generated events
"node_stale": true,
"node_down": true,
"node_recovered": true,
"expected_backup_missed": true,
"expected_dbdump_missed": true,
"node_stale": true,
"node_down": true,
"node_recovered": true,
// Hub-generated host-domain events (v0.7.0, slice 3)
"host_stale": true,
"host_down": true,
"host_recovered": true,
"expected_backup_missed": true,
"expected_dbdump_missed": true,
// Special
"test": true,
}
@@ -686,10 +906,10 @@ func (h *Handler) handleRecovery(w http.ResponseWriter, r *http.Request, custome
}
resp := struct {
CustomerID string `json:"customer_id"`
ConfigYAML string `json:"config_yaml"`
InfraBackup json.RawMessage `json:"infra_backup"`
HasInfraBackup bool `json:"has_infra_backup"`
CustomerID string `json:"customer_id"`
ConfigYAML string `json:"config_yaml"`
InfraBackup json.RawMessage `json:"infra_backup"`
HasInfraBackup bool `json:"has_infra_backup"`
BackupVersions []store.InfraBackupVersion `json:"backup_versions,omitempty"`
}{
CustomerID: customerID,
+190
View File
@@ -0,0 +1,190 @@
package api
import (
"database/sql"
"encoding/json"
"io"
"log"
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"testing"
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
_ "modernc.org/sqlite"
)
const globalKey = "GLOBALKEY"
func newTestHandler(t *testing.T) (*Handler, *store.Store, string) {
t.Helper()
path := filepath.Join(t.TempDir(), "test.db")
st, err := store.New(path, log.New(io.Discard, "", 0))
if err != nil {
t.Fatalf("store.New: %v", err)
}
t.Cleanup(func() { st.Close() })
h := New(st, globalKey, "", "", nil, log.New(io.Discard, "", 0))
return h, st, path
}
func do(h *Handler, method, path, bearer, body string) *httptest.ResponseRecorder {
req := httptest.NewRequest(method, "/api/v1"+path, strings.NewReader(body))
if bearer != "" {
req.Header.Set("Authorization", "Bearer "+bearer)
}
rr := httptest.NewRecorder()
h.ServeHTTP(rr, req)
return rr
}
func TestCheckAuthHost(t *testing.T) {
h, st, _ := newTestHandler(t)
st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"})
mk := func(bearer string) *http.Request {
req := httptest.NewRequest(http.MethodPost, "/api/v1/host-report", nil)
if bearer != "" {
req.Header.Set("Authorization", "Bearer "+bearer)
}
return req
}
if _, _, isGlobal, ok := h.checkAuthHost(mk(globalKey)); !ok || !isGlobal {
t.Error("global key should resolve isGlobal=true")
}
hostID, custID, isGlobal, ok := h.checkAuthHost(mk("HKEY"))
if !ok || isGlobal || hostID != "h1" || custID != "c1" {
t.Errorf("per-host key = %q/%q global=%v ok=%v", hostID, custID, isGlobal, ok)
}
if _, _, _, ok := h.checkAuthHost(mk("bogus")); ok {
t.Error("unknown key should fail")
}
}
func validReportBody(hostID string) string {
return `{"host_id":"` + hostID + `","agent_version":"0.3.0",` +
`"host":{"cpu_percent":3.2,"memory_percent":25,"disk_percent":19,"loadavg":["0.1"],"uptime_seconds":100},` +
`"guests":[{"vmid":100,"name":"acme","status":"running","controller_version":""},` +
`{"vmid":101,"name":"beta","status":"stopped"}],` +
`"storage_targets":[],"backups":[],"cloudflared":{"status":"active"},"audit_tail":[]}`
}
func TestHandleHostReport_ValidAndEnvelopeAndDenorm(t *testing.T) {
h, st, dbPath := newTestHandler(t)
st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"})
st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"})
rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("h1"))
if rr.Code != 200 {
t.Fatalf("status = %d, body=%s", rr.Code, rr.Body.String())
}
var env struct {
Status string `json:"status"`
PollIntervalSeconds int `json:"poll_interval_seconds"`
Blocked bool `json:"blocked"`
DesiredGeneration int `json:"desired_generation"`
HasSignedOps bool `json:"has_signed_ops"`
}
json.Unmarshal(rr.Body.Bytes(), &env)
if env.Status != "ok" || env.PollIntervalSeconds != 900 || env.Blocked || env.DesiredGeneration != 0 || env.HasSignedOps {
t.Errorf("envelope = %+v", env)
}
// Denorm: guest_running counts only "running" (1 of 2). Read via a 2nd connection.
db, _ := sql.Open("sqlite", dbPath)
defer db.Close()
var total, running int
var cf string
db.QueryRow(`SELECT guest_total, guest_running, cloudflared_status FROM host_reports WHERE host_id='h1' ORDER BY id DESC LIMIT 1`).
Scan(&total, &running, &cf)
if total != 2 || running != 1 || cf != "active" {
t.Errorf("denorm total=%d running=%d cloudflared=%q (want 2,1,active)", total, running, cf)
}
// Guests upserted.
var gname, gstatus string
if err := db.QueryRow(`SELECT display_name, status FROM guests WHERE guest_id='h1/100'`).Scan(&gname, &gstatus); err != nil {
t.Fatalf("guest h1/100 not upserted: %v", err)
}
if gname != "acme" || gstatus != "running" {
t.Errorf("guest = %q/%q", gname, gstatus)
}
}
func TestHandleHostReport_HostIDMismatch(t *testing.T) {
h, st, _ := newTestHandler(t)
st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"})
rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("other-host"))
if rr.Code != http.StatusForbidden {
t.Errorf("status = %d, want 403", rr.Code)
}
}
func TestHandleHostReport_UnknownHostUnderGlobalKey(t *testing.T) {
h, _, _ := newTestHandler(t)
rr := do(h, http.MethodPost, "/host-report", globalKey, validReportBody("ghost"))
if rr.Code != http.StatusBadRequest {
t.Errorf("status = %d, want 400 (unknown host_id)", rr.Code)
}
}
func TestHandleHostReport_BlockedCustomer(t *testing.T) {
h, st, _ := newTestHandler(t)
st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"})
st.SetCustomerConfigStatus("c1", "blocked")
st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"})
rr := do(h, http.MethodPost, "/host-report", "HKEY", validReportBody("h1"))
if rr.Code != 200 {
t.Fatalf("status = %d", rr.Code)
}
var env struct {
Blocked bool `json:"blocked"`
}
json.Unmarshal(rr.Body.Bytes(), &env)
if !env.Blocked {
t.Error("blocked customer should yield blocked:true")
}
}
func TestHandleHostReport_OversizeRejected(t *testing.T) {
h, st, _ := newTestHandler(t)
st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "HKEY"})
big := `{"host_id":"h1","guests":[{"vmid":1,"name":"` + strings.Repeat("a", 5<<20) + `"}]}`
rr := do(h, http.MethodPost, "/host-report", "HKEY", big)
if rr.Code != http.StatusBadRequest {
t.Errorf("oversize body status = %d, want 400", rr.Code)
}
}
func TestAdminCreateHost(t *testing.T) {
h, st, _ := newTestHandler(t)
st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ckey", RetrievalPassword: "p"})
// non-global key (per-customer) → 403
if rr := do(h, http.MethodPost, "/admin/hosts", "ckey", `{"customer_id":"c1"}`); rr.Code != http.StatusForbidden {
t.Errorf("per-customer key status = %d, want 403", rr.Code)
}
// missing/unknown customer → 400
if rr := do(h, http.MethodPost, "/admin/hosts", globalKey, `{"customer_id":"nope"}`); rr.Code != http.StatusBadRequest {
t.Errorf("unknown customer status = %d, want 400", rr.Code)
}
// success → 201 + usable key (round-trip)
rr := do(h, http.MethodPost, "/admin/hosts", globalKey, `{"customer_id":"c1"}`)
if rr.Code != http.StatusCreated {
t.Fatalf("mint status = %d, body=%s", rr.Code, rr.Body.String())
}
var minted struct {
HostID string `json:"host_id"`
APIKey string `json:"api_key"`
}
json.Unmarshal(rr.Body.Bytes(), &minted)
if minted.HostID == "" || minted.APIKey == "" {
t.Fatalf("mint response = %+v", minted)
}
// the minted key authenticates a host-report
rr2 := do(h, http.MethodPost, "/host-report", minted.APIKey, validReportBody(minted.HostID))
if rr2.Code != 200 {
t.Errorf("round-trip host-report with minted key = %d, body=%s", rr2.Code, rr2.Body.String())
}
}
+176
View File
@@ -0,0 +1,176 @@
package monitor
import (
"log"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
)
// HostStalenessChecker is the host-domain dead-man's-switch (v0.7.0, slice 3). It
// is a deliberate SIBLING of StalenessChecker, not a rename: during slices 39 the
// controller report stream (reports) and the agent host-report stream
// (host_reports) are both live, so both checkers run. It keys on host↔host_reports
// and emits host_stale / host_down / host_recovered. Merging is a slice-10 job.
//
// Events are attributed to the host's CUSTOMER (SaveEvent + onEvent take the
// customer_id) so the existing per-customer notification/event UX picks them up
// unchanged.
type HostStalenessChecker struct {
store *store.Store
threshold time.Duration // "stale" after this (default 30m — same as the controller checker)
downAfter time.Duration // "down" after this (2x threshold)
logger *log.Logger
onEvent EventNotifyFunc
mu sync.Mutex
states map[string]string // hostID → "ok" | "stale" | "down"
customerOf map[string]string // hostID → customerID (for event attribution)
downtimeStart map[string]time.Time // hostID → when it first became unreachable
}
// NewHostStalenessChecker creates the checker and seeds state from current
// host-report recency. No events are generated during initialization.
func NewHostStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *HostStalenessChecker {
sc := &HostStalenessChecker{
store: s,
threshold: threshold,
downAfter: 2 * threshold,
logger: logger,
onEvent: onEvent,
states: make(map[string]string),
customerOf: make(map[string]string),
downtimeStart: make(map[string]time.Time),
}
rows, err := s.GetHostStaleness()
if err != nil {
logger.Printf("[WARN] Host staleness checker: failed to seed states: %v", err)
return sc
}
var okCount, staleCount, downCount int
for _, row := range rows {
if s.IsCustomerBlocked(row.CustomerID) {
continue
}
sc.customerOf[row.HostID] = row.CustomerID
age := time.Since(row.LastReportAt)
switch {
case age > sc.downAfter:
sc.states[row.HostID] = "down"
downCount++
case age > sc.threshold:
sc.states[row.HostID] = "stale"
staleCount++
default:
sc.states[row.HostID] = "ok"
okCount++
}
}
logger.Printf("[INFO] Host staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount)
return sc
}
// Check evaluates all hosts and emits events on state transitions. Call every 60s.
func (sc *HostStalenessChecker) Check() {
rows, err := sc.store.GetHostStaleness()
if err != nil {
sc.logger.Printf("[WARN] Host staleness check failed: %v", err)
return
}
sc.mu.Lock()
defer sc.mu.Unlock()
seen := make(map[string]bool, len(rows))
for _, row := range rows {
seen[row.HostID] = true
if sc.store.IsCustomerBlocked(row.CustomerID) {
delete(sc.states, row.HostID)
continue
}
sc.customerOf[row.HostID] = row.CustomerID
age := time.Since(row.LastReportAt)
var newState string
switch {
case age > sc.downAfter:
newState = "down"
case age > sc.threshold:
newState = "stale"
default:
newState = "ok"
}
oldState := sc.states[row.HostID]
if oldState == "" {
sc.states[row.HostID] = newState // first observation — no event
continue
}
if oldState == newState {
continue
}
sc.states[row.HostID] = newState
if newState == "stale" && oldState == "ok" {
sc.downtimeStart[row.HostID] = time.Now()
}
downtimeDur := age
if newState == "ok" {
if t, ok := sc.downtimeStart[row.HostID]; ok {
downtimeDur = time.Since(t)
}
delete(sc.downtimeStart, row.HostID)
}
sc.emitTransition(row.HostID, row.CustomerID, oldState, newState, downtimeDur)
}
for id := range sc.states {
if !seen[id] {
delete(sc.states, id)
delete(sc.downtimeStart, id)
}
}
}
// GetState returns the current staleness state for a host.
func (sc *HostStalenessChecker) GetState(hostID string) string {
sc.mu.Lock()
defer sc.mu.Unlock()
s := sc.states[hostID]
if s == "" {
return "unknown"
}
return s
}
func (sc *HostStalenessChecker) emitTransition(hostID, customerID, oldState, newState string, age time.Duration) {
var eventType, severity, message string
switch {
case newState == "stale":
eventType = "host_stale"
severity = "warning"
message = "Host " + hostID + ": no report for " + formatDuration(age)
case newState == "down":
eventType = "host_down"
severity = "error"
message = "Host " + hostID + ": no report for " + formatDuration(age)
case newState == "ok" && (oldState == "stale" || oldState == "down"):
eventType = "host_recovered"
severity = "info"
message = "Host " + hostID + ": reports resumed (was " + oldState + " for " + formatDuration(age) + ")"
default:
return
}
sc.logger.Printf("[INFO] Host staleness: %s %s → %s (%s)", hostID, oldState, newState, eventType)
if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil {
sc.logger.Printf("[WARN] Failed to save host staleness event for %s: %v", hostID, err)
return
}
if sc.onEvent != nil {
sc.onEvent(customerID, eventType, severity, message, "{}", "hub")
}
}
@@ -0,0 +1,88 @@
package monitor
import (
"database/sql"
"fmt"
"io"
"log"
"path/filepath"
"testing"
"time"
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
_ "modernc.org/sqlite"
)
// backdate sets a host's last_report_at to N minutes ago, simulating the passage
// of time without sleeping. Uses a second connection (the checker reads via store).
func backdate(t *testing.T, db *sql.DB, hostID string, minutesAgo int) {
t.Helper()
if _, err := db.Exec(`UPDATE hosts SET last_report_at = datetime('now', ?) WHERE host_id = ?`,
fmt.Sprintf("-%d minutes", minutesAgo), hostID); err != nil {
t.Fatal(err)
}
}
func TestHostStalenessChecker(t *testing.T) {
path := filepath.Join(t.TempDir(), "test.db")
st, err := store.New(path, log.New(io.Discard, "", 0))
if err != nil {
t.Fatal(err)
}
defer st.Close()
db, _ := sql.Open("sqlite", path)
defer db.Close()
st.SaveCustomerConfig(&store.CustomerConfig{CustomerID: "c1", APIKey: "ck", RetrievalPassword: "p"})
st.UpsertHost(&store.Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"})
st.SaveHostReport("h1", "c1", []byte(`{}`), store.HostReportDenorm{}) // sets last_report_at
var events []string
onEvent := func(customerID, eventType, severity, message, detailsJSON, source string) {
events = append(events, eventType)
}
// Seed already-stale (40m) → state stale, but NO event on init.
backdate(t, db, "h1", 40)
sc := NewHostStalenessChecker(st, 30*time.Minute, onEvent, log.New(io.Discard, "", 0))
if len(events) != 0 {
t.Fatalf("seed must not emit events, got %v", events)
}
if sc.GetState("h1") != "stale" {
t.Fatalf("seeded state = %q, want stale", sc.GetState("h1"))
}
// Same age → no transition.
sc.Check()
if len(events) != 0 {
t.Fatalf("no transition expected, got %v", events)
}
// Fresh report → host_recovered.
backdate(t, db, "h1", 2)
sc.Check()
if last(events) != "host_recovered" {
t.Fatalf("events = %v, want last host_recovered", events)
}
// Aged to stale → host_stale.
backdate(t, db, "h1", 40)
sc.Check()
if last(events) != "host_stale" {
t.Fatalf("events = %v, want last host_stale", events)
}
// Aged past 2× → host_down.
backdate(t, db, "h1", 130)
sc.Check()
if last(events) != "host_down" {
t.Fatalf("events = %v, want last host_down", events)
}
}
func last(s []string) string {
if len(s) == 0 {
return ""
}
return s[len(s)-1]
}
+8 -8
View File
@@ -52,14 +52,14 @@ Message: %s`, customerID, eventType, severity, now, message)
// customerMessages maps event_type → Hungarian customer message.
var customerMessages = map[string]string{
// Backup events
"backup_completed": "A biztonsági mentés sikeresen elkészült.",
"backup_failed": "A biztonsági mentés sikertelen! Kérjük, ellenőrizd a rendszert.",
"db_dump_completed": "Az adatbázis mentés sikeresen elkészült.",
"db_dump_failed": "Az adatbázis mentés sikertelen!",
"backup_integrity_ok": "A mentés integritás ellenőrzés sikeres.",
"backup_integrity_failed": "A mentés integritás ellenőrzés hibát talált!",
"crossdrive_completed": "A másodlagos mentés sikeresen elkészült.",
"crossdrive_failed": "A másodlagos mentés sikertelen!",
"backup_completed": "A biztonsági mentés sikeresen elkészült.",
"backup_failed": "A biztonsági mentés sikertelen! Kérjük, ellenőrizd a rendszert.",
"db_dump_completed": "Az adatbázis mentés sikeresen elkészült.",
"db_dump_failed": "Az adatbázis mentés sikertelen!",
"backup_integrity_ok": "A mentés integritás ellenőrzés sikeres.",
"backup_integrity_failed": "A mentés integritás ellenőrzés hibát talált!",
"crossdrive_completed": "A másodlagos mentés sikeresen elkészült.",
"crossdrive_failed": "A másodlagos mentés sikertelen!",
// Disk events
"disk_warning": "A lemezterület 90% felett van — kérjük, szabadíts fel helyet.",
+122
View File
@@ -0,0 +1,122 @@
package store
import (
"io"
"log"
"path/filepath"
"testing"
)
func newTestStore(t *testing.T) *Store {
t.Helper()
s, err := New(filepath.Join(t.TempDir(), "test.db"), log.New(io.Discard, "", 0))
if err != nil {
t.Fatalf("store.New: %v", err)
}
t.Cleanup(func() { s.Close() })
return s
}
func TestGuestID(t *testing.T) {
if got := GuestID("demo-host-01", 100); got != "demo-host-01/100" {
t.Errorf("GuestID = %q", got)
}
}
func TestUpsertHost_AndLookup(t *testing.T) {
s := newTestStore(t)
if err := s.UpsertHost(&Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"}); err != nil {
t.Fatalf("UpsertHost: %v", err)
}
h, err := s.GetHost("h1")
if err != nil || h == nil {
t.Fatalf("GetHost: %v / %v", h, err)
}
if h.CustomerID != "c1" || h.APIKey != "k1" || h.DesiredJSON != "{}" || h.LastReportAt != nil {
t.Errorf("host = %+v", h)
}
byKey, err := s.GetHostByAPIKey("k1")
if err != nil || byKey == nil || byKey.HostID != "h1" {
t.Errorf("GetHostByAPIKey hit = %+v / %v", byKey, err)
}
miss, err := s.GetHostByAPIKey("nope")
if err != nil || miss != nil {
t.Errorf("GetHostByAPIKey miss = %+v / %v (want nil,nil)", miss, err)
}
}
func TestSaveHostReport_BumpsRealityPreservesIntent(t *testing.T) {
s := newTestStore(t)
if err := s.UpsertHost(&Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"}); err != nil {
t.Fatal(err)
}
// Operator-owned intent columns (inert this slice) set out-of-band.
if _, err := s.db.Exec(`UPDATE hosts SET desired_json='{"want":1}', desired_generation=7 WHERE host_id='h1'`); err != nil {
t.Fatal(err)
}
denorm := HostReportDenorm{AgentVersion: "0.3.0", CPUPercent: 3.2, MemoryPercent: 25, DiskPercent: 19, GuestTotal: 2, GuestRunning: 1, CloudflaredStatus: "active"}
if err := s.SaveHostReport("h1", "c1", []byte(`{"host_id":"h1"}`), denorm); err != nil {
t.Fatalf("SaveHostReport: %v", err)
}
h, _ := s.GetHost("h1")
if h.AgentVersion != "0.3.0" || h.LastReportAt == nil {
t.Errorf("reality not bumped: %+v", h)
}
if h.DesiredJSON != `{"want":1}` || h.DesiredGeneration != 7 {
t.Errorf("a report must NOT clobber intent columns: desired_json=%q gen=%d", h.DesiredJSON, h.DesiredGeneration)
}
var n int
s.db.QueryRow(`SELECT COUNT(*) FROM host_reports WHERE host_id='h1'`).Scan(&n)
if n != 1 {
t.Errorf("host_reports rows = %d, want 1", n)
}
}
func TestUpsertGuestFromReport_PreservesInertColumns(t *testing.T) {
s := newTestStore(t)
gid := GuestID("h1", 100)
if err := s.UpsertGuestFromReport(&Guest{GuestID: gid, CustomerID: "c1", HostID: "h1", VMID: 100, DisplayName: "acme", Status: "running"}); err != nil {
t.Fatal(err)
}
// Slice-10 columns set out-of-band; a report upsert must not touch them.
if _, err := s.db.Exec(`UPDATE guests SET api_key='controllerkey', desired_spec_json='{"cores":4}' WHERE guest_id=?`, gid); err != nil {
t.Fatal(err)
}
// A later report changes reality (status/name).
if err := s.UpsertGuestFromReport(&Guest{GuestID: gid, CustomerID: "c1", HostID: "h1", VMID: 100, DisplayName: "acme-renamed", Status: "stopped"}); err != nil {
t.Fatal(err)
}
var apiKey, desiredSpec, status, name string
err := s.db.QueryRow(`SELECT api_key, desired_spec_json, status, display_name FROM guests WHERE guest_id=?`, gid).
Scan(&apiKey, &desiredSpec, &status, &name)
if err != nil {
t.Fatal(err)
}
if apiKey != "controllerkey" || desiredSpec != `{"cores":4}` {
t.Errorf("inert columns clobbered: api_key=%q desired_spec_json=%q", apiKey, desiredSpec)
}
if status != "stopped" || name != "acme-renamed" {
t.Errorf("reality not updated: status=%q name=%q", status, name)
}
}
func TestGetHostStaleness_SkipsNeverReported(t *testing.T) {
s := newTestStore(t)
s.UpsertHost(&Host{HostID: "h1", CustomerID: "c1", APIKey: "k1"})
rows, err := s.GetHostStaleness()
if err != nil {
t.Fatal(err)
}
if len(rows) != 0 {
t.Errorf("never-reported host should be skipped, got %d rows", len(rows))
}
s.SaveHostReport("h1", "c1", []byte(`{}`), HostReportDenorm{})
rows, _ = s.GetHostStaleness()
if len(rows) != 1 || rows[0].HostID != "h1" {
t.Errorf("after a report expected 1 row, got %+v", rows)
}
}
+277 -16
View File
@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"strconv"
"time"
_ "modernc.org/sqlite"
@@ -18,18 +19,18 @@ type Store struct {
// CustomerSummary holds the latest status for a customer (for dashboard).
type CustomerSummary struct {
CustomerID string
CustomerName string
ControllerVersion string
ReceivedAt time.Time
HealthStatus string
CPUPercent float64
MemoryPercent float64
ContainerTotal int
ContainerRunning int
CustomerID string
CustomerName string
ControllerVersion string
ReceivedAt time.Time
HealthStatus string
CPUPercent float64
MemoryPercent float64
ContainerTotal int
ContainerRunning int
BackupLastSnapshot *time.Time
ReportJSON string
ControllerURL string
ReportJSON string
ControllerURL string
// Computed fields (not stored)
TimeSinceReport time.Duration
@@ -216,6 +217,63 @@ func (s *Store) migrate() error {
WHERE NOT EXISTS (SELECT 1 FROM infra_backup_versions
WHERE infra_backup_versions.customer_id = infra_backups.customer_id)`)
// v0.7.0: host-domain (slice 3). Purely additive — the controller path
// (reports/customer_configs) is untouched; the schema cutover is slice 10.
// Columns marked INERT exist now so slice 10 needs no ALTER; nothing reads or
// writes them this slice.
_, err = s.db.Exec(`
CREATE TABLE IF NOT EXISTS hosts (
host_id TEXT PRIMARY KEY,
customer_id TEXT NOT NULL,
api_key TEXT NOT NULL,
agent_version TEXT NOT NULL DEFAULT '',
last_report_at DATETIME,
desired_json TEXT NOT NULL DEFAULT '{}',
desired_generation INTEGER NOT NULL DEFAULT 0,
dr_record_json TEXT NOT NULL DEFAULT '{}',
created_at DATETIME NOT NULL DEFAULT (datetime('now')),
updated_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_hosts_customer ON hosts(customer_id);
CREATE TABLE IF NOT EXISTS guests (
guest_id TEXT PRIMARY KEY,
customer_id TEXT NOT NULL,
host_id TEXT NOT NULL,
vmid INTEGER NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'unknown',
controller_version TEXT NOT NULL DEFAULT '',
last_seen_at DATETIME,
api_key TEXT NOT NULL DEFAULT '',
desired_spec_json TEXT NOT NULL DEFAULT '{}',
created_at DATETIME NOT NULL DEFAULT (datetime('now')),
updated_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_guests_host ON guests(host_id);
CREATE INDEX IF NOT EXISTS idx_guests_customer ON guests(customer_id);
CREATE TABLE IF NOT EXISTS host_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
host_id TEXT NOT NULL,
customer_id TEXT NOT NULL,
received_at DATETIME NOT NULL DEFAULT (datetime('now')),
report_json TEXT NOT NULL,
agent_version TEXT,
cpu_percent REAL,
memory_percent REAL,
disk_percent REAL,
guest_total INTEGER,
guest_running INTEGER,
cloudflared_status TEXT
);
CREATE INDEX IF NOT EXISTS idx_host_reports_host ON host_reports(host_id, received_at DESC);
CREATE INDEX IF NOT EXISTS idx_host_reports_customer ON host_reports(customer_id, received_at DESC);
`)
if err != nil {
return err
}
return nil
}
@@ -812,7 +870,13 @@ func (s *Store) Prune(maxDays int) (int64, error) {
if err != nil {
return 0, err
}
return res.RowsAffected()
n, _ := res.RowsAffected()
// v0.7.0: prune the parallel host-domain report stream, same retention.
if hres, herr := s.db.Exec("DELETE FROM host_reports WHERE received_at < ?", cutoff); herr == nil {
hn, _ := hres.RowsAffected()
n += hn
}
return n, nil
}
// Close closes the database connection.
@@ -1138,11 +1202,11 @@ func scanEvents(rows *sql.Rows) ([]Event, error) {
// parseSQLiteTime tries multiple formats that modernc.org/sqlite may return.
func parseSQLiteTime(s string) time.Time {
formats := []string{
"2006-01-02 15:04:05", // SQLite datetime('now')
"2006-01-02T15:04:05Z", // RFC3339 without fractional
"2006-01-02 15:04:05", // SQLite datetime('now')
"2006-01-02T15:04:05Z", // RFC3339 without fractional
time.RFC3339, // 2006-01-02T15:04:05Z07:00
time.RFC3339Nano, // with fractional seconds
"2006-01-02 15:04:05+00:00", // with explicit UTC offset
time.RFC3339Nano, // with fractional seconds
"2006-01-02 15:04:05+00:00", // with explicit UTC offset
"2006-01-02 15:04:05.999999999", // with fractional, no TZ
}
for _, f := range formats {
@@ -1180,3 +1244,200 @@ func parseDiskSummary(reportJSON string) string {
}
return result
}
// ---- v0.7.0: host-domain (slice 3) ----
// Additive store surface for the agent's host-report stream. The controller-path
// methods above are untouched.
// Host is one customer agent. Mixes operator-intent columns (Desired*, DRRecord —
// INERT until slice 10) with box-reported reality (AgentVersion, LastReportAt).
type Host struct {
HostID string
CustomerID string
APIKey string
AgentVersion string
LastReportAt *time.Time
DesiredJSON string
DesiredGeneration int64
DRRecordJSON string
CreatedAt time.Time
UpdatedAt time.Time
}
// Guest is one controller LXC. Reality columns are report-driven; APIKey and
// DesiredSpecJSON are INERT until slice 10 and must survive report upserts.
type Guest struct {
GuestID string
CustomerID string
HostID string
VMID int
DisplayName string
Status string
ControllerVersion string
LastSeenAt *time.Time
APIKey string
DesiredSpecJSON string
CreatedAt time.Time
UpdatedAt time.Time
}
// HostReportDenorm are the denormalized fields pulled from a host-report for the
// dashboard / staleness, mirroring the reports table's denorm pattern.
type HostReportDenorm struct {
AgentVersion string
CPUPercent float64
MemoryPercent float64
DiskPercent float64
GuestTotal int
GuestRunning int
CloudflaredStatus string
}
// HostStaleRow is the minimal per-host recency row the dead-man's-switch reads.
type HostStaleRow struct {
HostID string
CustomerID string
LastReportAt time.Time
}
// GuestID derives the interim guest primary key from host + vmid. The hub owns the
// id scheme (locked decision 3) so the slice-10 swap to durable ids is hub-only.
func GuestID(hostID string, vmid int) string {
return hostID + "/" + strconv.Itoa(vmid)
}
func scanHost(scan func(dest ...any) error) (*Host, error) {
var h Host
var lastReport sql.NullString
var createdAt, updatedAt string
err := scan(&h.HostID, &h.CustomerID, &h.APIKey, &h.AgentVersion, &lastReport,
&h.DesiredJSON, &h.DesiredGeneration, &h.DRRecordJSON, &createdAt, &updatedAt)
if err != nil {
return nil, err
}
if lastReport.Valid {
t := parseSQLiteTime(lastReport.String)
h.LastReportAt = &t
}
h.CreatedAt = parseSQLiteTime(createdAt)
h.UpdatedAt = parseSQLiteTime(updatedAt)
return &h, nil
}
const hostSelectCols = `host_id, customer_id, api_key, agent_version, last_report_at,
desired_json, desired_generation, dr_record_json, created_at, updated_at`
// GetHostByAPIKey looks up a host by its per-host hub key. Returns nil (no error)
// if no match — parallels GetCustomerConfigByAPIKey.
func (s *Store) GetHostByAPIKey(apiKey string) (*Host, error) {
h, err := scanHost(s.db.QueryRow(`SELECT `+hostSelectCols+` FROM hosts WHERE api_key = ?`, apiKey).Scan)
if err == sql.ErrNoRows {
return nil, nil
}
return h, err
}
// GetHost looks up a host by id. Returns nil (no error) if not found.
func (s *Store) GetHost(hostID string) (*Host, error) {
h, err := scanHost(s.db.QueryRow(`SELECT `+hostSelectCols+` FROM hosts WHERE host_id = ?`, hostID).Scan)
if err == sql.ErrNoRows {
return nil, nil
}
return h, err
}
// ListHosts returns all hosts (debug / host-domain views).
func (s *Store) ListHosts() ([]Host, error) {
rows, err := s.db.Query(`SELECT ` + hostSelectCols + ` FROM hosts ORDER BY host_id`)
if err != nil {
return nil, err
}
defer rows.Close()
var hosts []Host
for rows.Next() {
h, err := scanHost(rows.Scan)
if err != nil {
return nil, err
}
hosts = append(hosts, *h)
}
return hosts, rows.Err()
}
// UpsertHost creates or updates a host identity (used by the admin mint). On
// conflict it updates only operator-settable identity fields + updated_at; it does
// NOT touch the reality columns (agent_version/last_report_at) or the inert intent
// columns (desired_*/dr_record_json) — those are owned elsewhere.
func (s *Store) UpsertHost(h *Host) error {
_, err := s.db.Exec(`
INSERT INTO hosts (host_id, customer_id, api_key, updated_at)
VALUES (?, ?, ?, datetime('now'))
ON CONFLICT(host_id) DO UPDATE SET
customer_id = excluded.customer_id,
api_key = excluded.api_key,
updated_at = datetime('now')`,
h.HostID, h.CustomerID, h.APIKey,
)
return err
}
// SaveHostReport inserts a host_reports row and bumps the host's reality columns
// (agent_version/last_report_at/updated_at) — never the inert intent columns.
func (s *Store) SaveHostReport(hostID, customerID string, reportJSON []byte, d HostReportDenorm) error {
_, err := s.db.Exec(`
INSERT INTO host_reports (host_id, customer_id, report_json, agent_version,
cpu_percent, memory_percent, disk_percent, guest_total, guest_running, cloudflared_status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
hostID, customerID, string(reportJSON), d.AgentVersion,
d.CPUPercent, d.MemoryPercent, d.DiskPercent, d.GuestTotal, d.GuestRunning, d.CloudflaredStatus,
)
if err != nil {
return err
}
_, err = s.db.Exec(`
UPDATE hosts SET agent_version = ?, last_report_at = datetime('now'), updated_at = datetime('now')
WHERE host_id = ?`, d.AgentVersion, hostID)
return err
}
// UpsertGuestFromReport upserts the REALITY columns of a guest. On conflict it
// must NOT clobber the inert columns (api_key / desired_spec_json).
func (s *Store) UpsertGuestFromReport(g *Guest) error {
_, err := s.db.Exec(`
INSERT INTO guests (guest_id, customer_id, host_id, vmid, display_name, status,
controller_version, last_seen_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(guest_id) DO UPDATE SET
vmid = excluded.vmid,
display_name = excluded.display_name,
status = excluded.status,
controller_version = excluded.controller_version,
last_seen_at = datetime('now'),
updated_at = datetime('now')`,
g.GuestID, g.CustomerID, g.HostID, g.VMID, g.DisplayName, g.Status,
g.ControllerVersion,
)
return err
}
// GetHostStaleness returns per-host recency for the dead-man's-switch. Hosts that
// have never reported (NULL last_report_at) are skipped — a freshly-minted host is
// not "down" until it has checked in at least once.
func (s *Store) GetHostStaleness() ([]HostStaleRow, error) {
rows, err := s.db.Query(`SELECT host_id, customer_id, last_report_at FROM hosts WHERE last_report_at IS NOT NULL`)
if err != nil {
return nil, err
}
defer rows.Close()
var out []HostStaleRow
for rows.Next() {
var r HostStaleRow
var last string
if err := rows.Scan(&r.HostID, &r.CustomerID, &last); err != nil {
return nil, err
}
r.LastReportAt = parseSQLiteTime(last)
out = append(out, r)
}
return out, rows.Err()
}
+1 -1
View File
@@ -10,7 +10,7 @@ import (
var (
reANSI = regexp.MustCompile(`\x1b\[[0-9;]*m`)
reTimestamp = regexp.MustCompile(`\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}[.\d]*([+-]\d{2}:?\d{2})?[Z ]?:? ?`)
reTimestamp = regexp.MustCompile(`\d{4}[-/]\d{2}[-/]\d{2}[T ]\d{2}:\d{2}:\d{2}[.\d]*([+-]\d{2}:?\d{2})?[Z ]?:? ?`)
reSyslog = regexp.MustCompile(`[A-Z][a-z]{2}\s+\d{1,2} \d{2}:\d{2}:\d{2} `)
)
+4 -4
View File
@@ -792,10 +792,10 @@ func flattenYAML(m map[string]interface{}, prefix string) map[string]string {
// configDiff represents a single key-value difference between two configs.
type configDiff struct {
Key string `json:"key"`
HubValue string `json:"hub"`
CtrlValue string `json:"controller"`
Status string `json:"status"` // "changed", "hub_only", "controller_only"
Key string `json:"key"`
HubValue string `json:"hub"`
CtrlValue string `json:"controller"`
Status string `json:"status"` // "changed", "hub_only", "controller_only"
}
// compareYAMLValues parses two YAML strings and returns their value differences.