feat(hub): host-report client + collector + first daemon loop (slice 3, v0.3.0)

internal/hub: the agent's first daemon — a periodic read-only host-report POSTed to
the hub (the heartbeat; no separate ping).

- HostReport wire contract (shared field-for-field with the hub ingest): host
  metrics, guests (vmid + spec), cloudflared status; storage/backups/restore-tests/
  pbs/audit collections DEFINED but emitted empty (slices 5/6 fill).
- Collector over a read-only proxmoxReader (adapted to the real proxmox surface;
  no proxmox changes) + a CloudflaredProber. Partial-failure: NodeStatus fail = hard
  (skip POST); per-guest GuestConfig fail = status "unknown", still report.
- Client: Bearer-auth POST, standard TLS (system roots / optional ca_file), typed
  TransportError/HTTPError, token never in errors.
- Loop: immediate first report, adopt hub poll_interval (clamp [60,3600]), resilient
  to collect/report errors, clean ctx-cancel shutdown.
- ControlEnvelope: only poll_interval_seconds acted on; blocked/desired_generation/
  has_signed_ops parsed-but-ignored (slice 4).
- config: HubConfig + FELHOM_AGENT_HUB_* overlay + mode-aware HubConfig.Validate +
  WithDefaults + hub-key redaction; example config updated.
- main: no-selftest mode is now the daemon; added --selftest=hub. Version -> 0.3.0.

Tests: report serialization, client (incl. token-redaction), collector partial-
failure, loop continuation+interval adoption, config. internal/proxmox + internal/
authz untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-08 16:20:09 +02:00
parent f0fee7e193
commit ab77fa3544
16 changed files with 1352 additions and 91 deletions
+56
View File
@@ -3,6 +3,62 @@
All notable changes to **felhom-agent** are recorded here. Update on every code
change that gets pushed.
## v0.3.0 — hub client + host-report + first daemon loop (slice 3) (2026-06-08)
The agent's first daemon: a periodic read-only host-report POSTed to the hub (the
heartbeat). No Proxmox mutations, no desired-state/signed-op consumption, no
storage/backup collection yet — those are slices 4/5/6.
### Added
- **`internal/hub`** package:
- **`HostReport`** wire contract (`report.go`) shared field-for-field with the hub
ingest: host metrics, guests (`vmid` + spec), `cloudflared` status, and the
`storage_targets`/`backups`/`restore_tests`/`pbs_snapshots`/`audit_tail`
collections **defined but emitted empty** (typed `[]`, slices 5/6 fill them).
- **`Collector`** (`collect.go`) builds the report from a read-only `proxmoxReader`
(adapted to the real `internal/proxmox` surface — node held by the client, value
returns, `proxmox.Guest`) + a `CloudflaredProber`. Partial-failure policy: a
failed `NodeStatus` is a hard error (skip the POST); a failed per-guest
`GuestConfig` degrades that guest to `status="unknown"` (spec omitted) but still
sends; a cloudflared probe failure → `"unknown"`, never fatal.
- **`CloudflaredProber`** + `SystemctlProber` (`systemctl is-active cloudflared`;
read-only — NOT a Privileged/root op; tunnel management is a later slice).
- **`Client`** (`client.go`): `POST /api/v1/host-report` with
`Authorization: Bearer <key>`, standard TLS (system roots or optional `ca_file`;
verification always on). Typed `*TransportError` / `*HTTPError`; the bearer token
never appears in any error.
- **`Loop`** (`loop.go`): the daemon — immediate first report then tick; adopts the
hub's `poll_interval_seconds` clamped to [60,3600]; resilient (a collect/report
error is logged and the loop continues); clean shutdown on context cancel.
- **`ControlEnvelope`**: only `poll_interval_seconds` is acted on; `blocked` /
`desired_generation` / `has_signed_ops` are parsed-but-ignored (logged at most)
pending reconcile (slice 4).
- **Config**: `HubConfig` (url/host_id/api_key/poll_seconds/timeout_seconds/ca_file),
`FELHOM_AGENT_HUB_*` env overlay, `HubConfig.Validate()` (mode-aware — proxmox-only
`--selftest=read|task` still runs without hub config), `WithDefaults()`, and
`Redacted()` now also blanks the hub key. `configs/agent.example.json` gains `hub`
(and `authz`) blocks.
- **`cmd/felhom-agent`**: the no-`--selftest` mode is now the **daemon** (poll loop);
added **`--selftest=hub`** (one collect+report, prints the report + envelope).
Version 0.2.0 → 0.3.0.
### Tests
- Report serialization (field names; empty collections are `[]` not `null`; spec
omitted when unknown); client (Bearer header, non-2xx→`*HTTPError`,
transport→`*TransportError`, **token never in error**); collector (host mapping,
guest spec, per-guest failure degrades-but-still-reports, NodeStatus hard error,
cloudflared error→unknown); loop (immediate first report, continuation after an
injected error, interval adoption + clamp); config (hub validate/redact/env).
### Notes
- `internal/proxmox` and `internal/authz` were **not touched** — no new proxmox
surface was needed (`ListLXC` already exposes status/maxmem/maxdisk; `GuestConfig`
exposes cores). The task's `proxmoxReader` sketch (node-arg/pointer/`LXC`) was
adapted to the real exports as instructed.
- **Defined-but-empty** this slice: `storage_targets`, `backups`, `restore_tests`,
`pbs_snapshots`, `audit_tail` (slices 5/6). **Parsed-but-ignored**: the envelope's
`blocked`/`desired_generation`/`has_signed_ops` (slice 4).
## v0.2.0 — `authz` signed-op verifier (slice 2) (2026-06-08)
Production form of the Phase-4 signing primitive: a key-type-agnostic SSHSIG
+57 -60
View File
@@ -3,74 +3,71 @@
> This file holds the report for the **most recent** change, fully overwritten each task.
> Cumulative history lives in [CHANGELOG.md](CHANGELOG.md).
## Task: `authz` signed-op verifier (slice 2) — v0.2.0
## Task: hub client + host-report + first daemon loop (slice 3) — v0.3.0
Turned the Phase-4 reference `VerifySignedOp` into a production package
(`internal/authz`): a key-type-agnostic SSHSIG verifier for operator-signed destructive
ops, the full anti-replay/authorization pipeline, and a durable, crash-safe nonce store.
This is what slice 4 (reconcile) calls to gate destructive desired-state deltas. Pushed to
`main`. Build/vet/test green locally (Go 1.26) and on the build server.
The agent's **first daemon**: a periodic, read-only host-report POSTed to the hub — which
**is** the heartbeat (its server-side `received_at` is the dead-man's-switch signal). New
`internal/hub` package + config additions + `main.go` daemon wiring. Pushed to `main`;
build/vet/test green locally (go1.26) and on the build server.
### Public surface (`internal/authz`)
- **`Verifier`** — `New(signers []AllowedSigner, store NonceStore, hostID string) *Verifier`;
`Verify(blob, sigArmored []byte) (*VerifiedOp, error)`. Optional `ClockSkew` (default 2m,
not-yet-valid only) and `Logger` (advisory key_id-mismatch warning).
- **`OpBlob`** — canonical signed object; `Target{HostID,GuestID}` with corrected
`host_id`/`guest_id` json tags; `Params json.RawMessage`, `Nonce`, `IssuedAt`, `ExpiresAt`, `KeyID`.
- **`VerifiedOp`** — `Op, HostID, GuestID, Params, Nonce, IssuedAt, ExpiresAt, KeyID (advisory),
Signer (matched), KeyIDMatchesSigner`.
- **`AllowedSigner`** + `NewAllowedSigner(keyID, role, authorizedKeyLine)`; roles
`RoleOperational` / `RoleRecovery` (doc 04 two-key model; role-scoping enforced by the caller).
- **`NonceStore`** interface + `MemoryNonceStore` (tests) and **`FileNonceStore`** (durable).
- **Typed errors**: `ErrMalformed, ErrNamespace, ErrUnknownSigner, ErrBadSignature, ErrTarget,
ErrExpired, ErrNotYetValid, ErrReplay` (errors.Is-friendly).
- **Config**: `config.AuthzConfig` (nonce-store path + pinned `Signers`).
### `internal/hub` public surface
- **`HostReport`** + sub-types (`HostMetrics`, `Guest`, `GuestSpec`, `Cloudflared`,
`ControlEnvelope`) — the JSON wire contract shared field-for-field with the hub ingest.
- **`Collector`** — `NewCollector(px proxmoxReader, cf CloudflaredProber, hostID, agentVersion, logger)`;
`Collect(ctx) (*HostReport, error)`.
- **`CloudflaredProber`** interface + **`SystemctlProber`** (`systemctl is-active`).
- **`Client`** — `NewClient(cfg config.HubConfig, logger) (*Client, error)`;
`Report(ctx, *HostReport) (*ControlEnvelope, error)`; typed `*TransportError` / `*HTTPError`.
- **`Loop`** — `NewLoop(collector, client, interval, logger)`; `Run(ctx) error`. Constants
`MinPollSeconds=60` / `MaxPollSeconds=3600`.
### Locked pipeline (order load-bearing)
`parse armor → namespace (fixed felhom-op-v1) → parse pubkey → allow-list by key MATERIAL (not
key_id) → crypto verify over RAW received bytes → parse blob → target (host strict, guest
surfaced) → time window → nonce recorded LAST`. Each post-crypto stage rejects even with a
valid signature; an invalid signature can never consume a nonce.
### Config additions (`internal/config`)
- `HubConfig{URL, HostID, APIKey, PollSeconds, TimeoutSeconds, CAFile}` on `Config.Hub`.
- `FELHOM_AGENT_HUB_{URL,HOST_ID,API_KEY,POLL_SECONDS,TIMEOUT_SECONDS,CA_FILE}` overlay
(int parse errors warn to stderr + keep file value, never crash).
- `HubConfig.Validate()` (mode-aware — proxmox-only selftests unaffected; https required
except loopback for tests), `HubConfig.WithDefaults()` (900s/30s), `Redacted()` blanks the key.
- `configs/agent.example.json` gains `hub` (and `authz`) blocks.
### Durable nonce store — mechanism & guarantee
fsync'd append-only JSONL log + in-memory index (replayed on open) + periodic compaction.
- **Crash-safe**: a nonce is written and `fsync`'d before `SeenOrRecord` returns `false`, so the
caller acts only *after* the durable record. A crash between verify and execute drops the op
(fail-safe) and never enables a replay. I/O failure → returns seen=true (op not executed).
- **Survives restart**: the log is replayed into the index on `OpenFileNonceStore`.
- **Pruning**: expired nonces dropped only at compaction (never before exp) — and an expired op
is rejected by the time check before the nonce check, so pruning is housekeeping, not a hole.
- **Concurrency-safe**: single mutex over file handle + index.
### Daemon-loop behaviour (`main.go`)
- No `--selftest` flag → **daemon**: validate proxmox + hub config → build read-path proxmox
client, collector, hub client, loop → `signal.NotifyContext(SIGINT, SIGTERM)``loop.Run`.
- **Immediate first report**, then tick at the interval; adopt the hub's
`poll_interval_seconds` (clamped [60,3600], reset the ticker on change).
- **Resilient**: any collect/report error is logged and the loop continues (survives hub 5xx
and transient proxmox read errors). Clean `nil` return on context cancel.
- **`--selftest=hub`**: one collect + report; prints the report it would send + the envelope.
- Startup line logs host_id/url/interval with the **key redacted**; no secret ever logged.
### OPEN choices
- **Clock skew**: 2-minute tolerance on *not-yet-valid* only; expiry not extended (window stays an
honest bound).
- **Durable mechanism**: fsync'd append log + compaction (simple, honest, no embedded-KV dep).
- **Fixtures**: committed real `ssh-keygen -Y sign` vector (hermetic + proves OpenSSH interop) +
in-Go minting for rejection cases; the sk case is synthetic (spec-faithful, no hardware).
- **Package name**: `authz` (control-plane-authorization layer, matches doc 04).
### Explicitly deferred (defined now, not active)
- **Defined-but-EMPTY** this slice (slices 5/6 fill): `storage_targets`, `backups`,
`restore_tests`, `pbs_snapshots`, `audit_tail` — emitted as typed empty `[]`.
- **Parsed-but-IGNORED** (slice 4 / reconcile consumes): the envelope's `blocked`,
`desired_generation`, `has_signed_ops` — logged at most, never acted on.
- No per-guest work queue (zero Proxmox mutations this slice); no canonical JSON (nothing
signs the report); no controller_version (slice 8) — emitted `""`.
### Test matrix (all pass — 14 tests)
Real ssh-keygen fixture · happy path · per-stage rejection {namespace, unknown-signer, tampered,
retargeted-host, expired, not-yet-valid, replay} · **invalid-sig-does-NOT-burn-nonce** (then the
valid op with that nonce still succeeds) · replay-rejected-across-restart (durable store) ·
key-type-agnostic synthetic **sk-ssh-ed25519** · byte-exactness (re-serialized blob fails crypto).
### proxmox surface
**No changes to `internal/proxmox` or `internal/authz`.** No new proxmox surface was needed:
`ListLXC` already returns status/maxmem/maxdisk and `GuestConfig` returns cores. The task's
`proxmoxReader` sketch (node-arg / pointer returns / `LXC` type) was **adapted to the real
exports** — `Node()` on the client, value returns, `proxmox.Guest` — per its instruction.
### Corrections to the Phase-4 §7 reference (for production)
- `Target` needed `host_id`/`guest_id` json tags — fixed.
- **The doc's "Go 1.24.4 / x/crypto v0.52.0" does not hold**: x/crypto v0.52.0 declares
`go 1.25.0` and won't build on Go 1.24. Resolved by upgrading the build server to **go1.26.0**
(backward-compatible — felhom-controller/hub build unchanged; distro Go package left intact,
upstream Go fronted on PATH).
- Free function → constructed `Verifier`; returns full `VerifiedOp`; typed errors; clock-skew;
durable nonce store (the net-new engineering).
- **Shared-contract flag (not built)**: the hub and `felhom-sign` CLI must produce byte-identical
canonical JSON or signatures won't verify; a shared canonicalizer both import is the right home.
### Test matrix (all green)
- **report**: field names match §4; empty collections serialize as `[]` not `null`; spec
omitted when unknown.
- **client**: sets `Bearer`; non-2xx → `*HTTPError` (status preserved); transport → `*TransportError`;
**asserts the bearer token never appears in any error string**.
- **collector**: `NodeStatus`→host block; `ListLXC`+`GuestConfig`→guest spec; a failing
`GuestConfig``status="unknown"` + omitted spec + **still returns a report**; a failing
`NodeStatus` → hard error; cloudflared probe error → `"unknown"`.
- **loop**: immediate first report; continues after an injected report error (≥3 cycles);
adopts + clamps the envelope interval (cycle-level) and applies a slower interval in `Run`.
- **config**: hub validate cases, key redaction, env overlay + defaults.
### Verification
- `go build/vet/test` green locally (go1.26.0) and on the build server (upgraded to go1.26.0).
- Real OpenSSH `ssh-keygen` (OpenSSH 10.0p2) minted the committed fixture and self-verified it
before commit.
- `go build/vet/test` green locally (go1.26.0) and on the build server (go1.26.0). No live hub
or `systemctl` in unit tests (mock transport + fake prober/collector/reporter).
### Repo state
- Branch: `main` only. Dep: `golang.org/x/crypto v0.52.0` (+ `x/sys` indirect); `go 1.25.0`.
- Branch: `main` only. Version 0.3.0. Dep unchanged (`golang.org/x/crypto v0.52.0`).
+116 -30
View File
@@ -1,13 +1,14 @@
// Command felhom-agent is the host agent (slice 1: scaffold + proxmox layer).
// Command felhom-agent is the host agent.
//
// This slice is wiring only: it has no daemon/reconcile loop yet (slice 3/4). It
// exposes a read-only --selftest that exercises the proxmox package against a live
// host, and an explicitly-gated --selftest=task that exercises WaitTask on a
// reversible op (snapshot -> rollback -> delete-snapshot).
// With no --selftest flag it runs as the daemon: the host-report poll loop
// (slice 3) that periodically POSTs a read-only host-report to the hub (the
// heartbeat). --selftest=read|task exercise the proxmox layer; --selftest=hub does
// one collect+report against the hub and prints what it would send.
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
@@ -18,13 +19,14 @@ import (
"time"
"gitea.dooplex.hu/admin/felhom-agent/internal/config"
"gitea.dooplex.hu/admin/felhom-agent/internal/hub"
applog "gitea.dooplex.hu/admin/felhom-agent/internal/log"
"gitea.dooplex.hu/admin/felhom-agent/internal/proxmox"
)
// version is the agent version. Overridable at build time with
// -ldflags "-X main.version=<v>"; defaults to the in-repo CHANGELOG version.
var version = "0.2.0"
var version = "0.3.0"
func main() {
var (
@@ -34,7 +36,7 @@ func main() {
showVersion bool
)
flag.StringVar(&cfgPath, "config", envOr("FELHOM_AGENT_CONFIG", "/etc/felhom-agent/agent.json"), "path to the agent config file (JSON)")
flag.Var(&selftest, "selftest", "run a self-test and exit: bare/`read` = read-only queries; `task` = reversible mutating exercise (needs -vmid)")
flag.Var(&selftest, "selftest", "run a self-test and exit: bare/`read` = read-only queries; `task` = reversible mutating exercise (needs -vmid); `hub` = one collect+report to the hub")
flag.IntVar(&vmid, "vmid", 0, "guest VMID for --selftest=task (the reversible snapshot/rollback exercise)")
flag.BoolVar(&showVersion, "version", false, "print version and exit")
flag.Parse()
@@ -58,19 +60,116 @@ func main() {
switch selftest.mode {
case "":
// No daemon loop yet.
logger.Info("felhom-agent scaffold; no run loop yet",
"version", version,
"hint", "use --selftest (read-only) or --selftest=task --vmid N")
// TODO: poll loop — slice 3/4.
return
os.Exit(runDaemon(cfg, logger))
case "read":
os.Exit(runSelftestRead(context.Background(), cfg, logger))
case "task":
os.Exit(runSelftestTask(context.Background(), cfg, logger, vmid))
case "hub":
os.Exit(runSelftestHub(context.Background(), cfg, logger))
}
}
// newProxmoxClient builds the read-path proxmox client from config.
func newProxmoxClient(cfg config.Config) (*proxmox.Client, error) {
return proxmox.NewClient(proxmox.Config{
Endpoint: cfg.Proxmox.Endpoint,
Node: cfg.Proxmox.Node,
Token: cfg.Proxmox.Token,
TLS: proxmox.TLSConfig{
CAFile: cfg.Proxmox.TLS.CAFile,
Fingerprint: cfg.Proxmox.TLS.Fingerprint,
InsecureSkipVerify: cfg.Proxmox.TLS.InsecureSkipVerify,
},
})
}
// runDaemon is the default mode: collect a host-report and POST it to the hub on a
// loop. Requires both proxmox (to collect) and hub config.
func runDaemon(cfg config.Config, logger *slog.Logger) int {
if err := cfg.Validate(); err != nil {
fmt.Fprintln(os.Stderr, "daemon: proxmox not configured:", err)
return 2
}
if err := cfg.Hub.Validate(); err != nil {
fmt.Fprintln(os.Stderr, "daemon: hub not configured:", err)
return 2
}
px, err := newProxmoxClient(cfg)
if err != nil {
fmt.Fprintln(os.Stderr, "daemon: proxmox client:", err)
return 1
}
client, err := hub.NewClient(cfg.Hub, logger)
if err != nil {
fmt.Fprintln(os.Stderr, "daemon: hub client:", err)
return 1
}
hcfg := cfg.Hub.WithDefaults()
collector := hub.NewCollector(px, hub.SystemctlProber{}, cfg.Hub.HostID, version, logger)
loop := hub.NewLoop(collector, client, time.Duration(hcfg.PollSeconds)*time.Second, logger)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
logger.Info("felhom-agent daemon starting",
"version", version, "host_id", cfg.Hub.HostID, "hub_url", cfg.Hub.URL,
"interval_s", hcfg.PollSeconds) // hub key intentionally not logged
if err := loop.Run(ctx); err != nil {
logger.Error("daemon: loop exited with error", "err", err)
return 1
}
return 0
}
// runSelftestHub validates hub config, does ONE collect + report, and prints the
// report it would send plus the envelope it got back.
func runSelftestHub(ctx context.Context, cfg config.Config, logger *slog.Logger) int {
if err := cfg.Validate(); err != nil {
fmt.Fprintln(os.Stderr, "selftest: proxmox not configured:", err)
return 1
}
if err := cfg.Hub.Validate(); err != nil {
fmt.Fprintln(os.Stderr, "selftest: hub not configured:", err)
return 1
}
px, err := newProxmoxClient(cfg)
if err != nil {
fmt.Fprintln(os.Stderr, "selftest: proxmox client:", err)
return 1
}
client, err := hub.NewClient(cfg.Hub, logger)
if err != nil {
fmt.Fprintln(os.Stderr, "selftest: hub client:", err)
return 1
}
collector := hub.NewCollector(px, hub.SystemctlProber{}, cfg.Hub.HostID, version, logger)
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
fmt.Printf("=== felhom-agent %s selftest=hub (host_id=%s url=%s) ===\n", version, cfg.Hub.HostID, cfg.Hub.URL)
report, err := collector.Collect(ctx)
if err != nil {
fmt.Fprintln(os.Stderr, " [FAIL] collect:", err)
return 1
}
if b, e := json.MarshalIndent(report, " ", " "); e == nil {
fmt.Println(" --- report it would send ---")
fmt.Println(" " + string(b))
}
env, err := client.Report(ctx, report)
if err != nil {
fmt.Fprintln(os.Stderr, " [FAIL] report:", err)
return 1
}
if b, e := json.MarshalIndent(env, " ", " "); e == nil {
fmt.Println(" --- control envelope received ---")
fmt.Println(" " + string(b))
}
fmt.Println("=== selftest=hub OK ===")
return 0
}
// runSelftestRead loads config, builds the API client, and runs the read-only
// queries against the live host, printing a short health report. It mutates
// nothing. Missing/invalid config is reported cleanly (no panic).
@@ -81,16 +180,7 @@ func runSelftestRead(ctx context.Context, cfg config.Config, logger *slog.Logger
}
logger.Info("selftest (read-only) starting", "config", fmt.Sprintf("%+v", cfg.Redacted().Proxmox))
client, err := proxmox.NewClient(proxmox.Config{
Endpoint: cfg.Proxmox.Endpoint,
Node: cfg.Proxmox.Node,
Token: cfg.Proxmox.Token,
TLS: proxmox.TLSConfig{
CAFile: cfg.Proxmox.TLS.CAFile,
Fingerprint: cfg.Proxmox.TLS.Fingerprint,
InsecureSkipVerify: cfg.Proxmox.TLS.InsecureSkipVerify,
},
})
client, err := newProxmoxClient(cfg)
if err != nil {
fmt.Fprintln(os.Stderr, "selftest: client init:", err)
return 1
@@ -163,13 +253,7 @@ func runSelftestTask(ctx context.Context, cfg config.Config, logger *slog.Logger
fmt.Fprintln(os.Stderr, "selftest=task requires -vmid N (a guest safe to snapshot/rollback)")
return 2
}
client, err := proxmox.NewClient(proxmox.Config{
Endpoint: cfg.Proxmox.Endpoint, Node: cfg.Proxmox.Node, Token: cfg.Proxmox.Token,
TLS: proxmox.TLSConfig{
CAFile: cfg.Proxmox.TLS.CAFile, Fingerprint: cfg.Proxmox.TLS.Fingerprint,
InsecureSkipVerify: cfg.Proxmox.TLS.InsecureSkipVerify,
},
})
client, err := newProxmoxClient(cfg)
if err != nil {
fmt.Fprintln(os.Stderr, "selftest: client init:", err)
return 1
@@ -239,6 +323,8 @@ func (f *selftestFlag) Set(v string) error {
f.mode = "read"
case "task":
f.mode = "task"
case "hub":
f.mode = "hub"
default:
return fmt.Errorf("invalid --selftest value %q (want read|task)", v)
}
+23
View File
@@ -13,5 +13,28 @@
"mode": "sudo",
"sudo_path": "sudo"
},
"authz": {
"nonce_store_path": "/var/lib/felhom-agent/nonces.log",
"signers": [
{
"key_id": "felhom-op-1",
"role": "operational",
"public_key": "ssh-ed25519 AAAA... felhom-op-1"
},
{
"key_id": "felhom-recovery-1",
"role": "recovery",
"public_key": "ssh-ed25519 AAAA... felhom-recovery-1"
}
]
},
"hub": {
"url": "https://hub.felhom.eu",
"host_id": "demo-host-01",
"api_key": "REPLACE_WITH_PER_HOST_HUB_KEY",
"poll_seconds": 900,
"timeout_seconds": 30,
"ca_file": ""
},
"log_level": "info"
}
+102 -1
View File
@@ -12,6 +12,8 @@ package config
import (
"encoding/json"
"fmt"
"net"
"net/url"
"os"
"strconv"
"strings"
@@ -22,9 +24,22 @@ type Config struct {
Proxmox ProxmoxConfig `json:"proxmox"`
Privileged PrivilegedConfig `json:"privileged"`
Authz AuthzConfig `json:"authz"`
Hub HubConfig `json:"hub"`
LogLevel string `json:"log_level"` // debug|info|warn|error (default info)
}
// HubConfig configures the outbound hub client + daemon poll loop (internal/hub).
// The hub serves a real cert (hub.felhom.eu, cert-manager) — this is standard TLS
// (system roots), NOT the Proxmox fingerprint-pinning path.
type HubConfig struct {
URL string `json:"url"` // e.g. "https://hub.felhom.eu"
HostID string `json:"host_id"` // the hub's PK for this host
APIKey string `json:"api_key"` // per-host hub key; SECRET — redacted
PollSeconds int `json:"poll_seconds"` // default 900; hub may override per-cycle
TimeoutSeconds int `json:"timeout_seconds"` // per-request HTTP timeout; default 30
CAFile string `json:"ca_file"` // optional; "" = system roots
}
// AuthzConfig configures operator-signed-op verification (internal/authz). The
// pinned operator public keys are kept here as raw authorized_keys-style lines
// (this package stays dependency-free); the authz package parses them into its
@@ -87,6 +102,7 @@ func Default() Config {
Proxmox: ProxmoxConfig{Endpoint: "https://127.0.0.1:8006"},
Privileged: PrivilegedConfig{Mode: "sudo"},
Authz: AuthzConfig{NonceStorePath: "/var/lib/felhom-agent/nonces.log"},
Hub: HubConfig{PollSeconds: 900, TimeoutSeconds: 30},
LogLevel: "info",
}
}
@@ -134,6 +150,36 @@ func applyEnv(cfg *Config) {
if v := os.Getenv("FELHOM_AGENT_LOG_LEVEL"); v != "" {
cfg.LogLevel = v
}
// hub
if v := os.Getenv("FELHOM_AGENT_HUB_URL"); v != "" {
cfg.Hub.URL = v
}
if v := os.Getenv("FELHOM_AGENT_HUB_HOST_ID"); v != "" {
cfg.Hub.HostID = v
}
if v := os.Getenv("FELHOM_AGENT_HUB_API_KEY"); v != "" {
cfg.Hub.APIKey = v
}
if v := os.Getenv("FELHOM_AGENT_HUB_CA_FILE"); v != "" {
cfg.Hub.CAFile = v
}
cfg.Hub.PollSeconds = envInt("FELHOM_AGENT_HUB_POLL_SECONDS", cfg.Hub.PollSeconds)
cfg.Hub.TimeoutSeconds = envInt("FELHOM_AGENT_HUB_TIMEOUT_SECONDS", cfg.Hub.TimeoutSeconds)
}
// envInt overlays an int env var, keeping cur (with a stderr warning) on parse
// error rather than crashing. (Load runs before the slog logger exists.)
func envInt(key string, cur int) int {
v := os.Getenv(key)
if v == "" {
return cur
}
n, err := strconv.Atoi(v)
if err != nil {
fmt.Fprintf(os.Stderr, "config: %s=%q is not an integer, keeping %d\n", key, v, cur)
return cur
}
return n
}
// Validate checks the config is usable for talking to the API.
@@ -153,14 +199,69 @@ func (c Config) Validate() error {
return nil
}
// Redacted returns a copy safe to log: the token secret is masked.
// Redacted returns a copy safe to log: the proxmox token and hub key are masked.
func (c Config) Redacted() Config {
if c.Proxmox.Token != "" {
c.Proxmox.Token = redactToken(c.Proxmox.Token)
}
if c.Hub.APIKey != "" {
c.Hub.APIKey = "********"
}
return c
}
// WithDefaults fills zero-valued hub timing fields. Applied at client/loop
// construction so programmatic configs (not from Default()) still get sane values.
func (h HubConfig) WithDefaults() HubConfig {
if h.PollSeconds == 0 {
h.PollSeconds = 900
}
if h.TimeoutSeconds == 0 {
h.TimeoutSeconds = 30
}
return h
}
// Validate checks the hub config is usable for the daemon / --selftest=hub. It is
// separate from Config.Validate (proxmox-only) so --selftest=read|task still runs
// without hub config.
func (h HubConfig) Validate() error {
if h.URL == "" {
return fmt.Errorf("config: hub.url is required (set hub.url or FELHOM_AGENT_HUB_URL)")
}
if h.HostID == "" {
return fmt.Errorf("config: hub.host_id is required")
}
if h.APIKey == "" {
return fmt.Errorf("config: hub.api_key is required (set hub.api_key or FELHOM_AGENT_HUB_API_KEY)")
}
u, err := url.Parse(h.URL)
if err != nil {
return fmt.Errorf("config: hub.url is not a valid URL: %w", err)
}
switch u.Scheme {
case "https":
// always fine
case "http":
if !isLoopbackHost(u.Hostname()) {
return fmt.Errorf("config: hub.url must be https:// (http:// only allowed for loopback in tests)")
}
default:
return fmt.Errorf("config: hub.url must be https:// (got scheme %q)", u.Scheme)
}
return nil
}
func isLoopbackHost(host string) bool {
if host == "localhost" {
return true
}
if ip := net.ParseIP(host); ip != nil {
return ip.IsLoopback()
}
return false
}
// redactToken keeps the public "USER@REALM!TOKENID=" prefix and masks the secret.
func redactToken(tok string) string {
if i := strings.LastIndex(tok, "="); i >= 0 {
+55
View File
@@ -36,6 +36,61 @@ func TestValidate(t *testing.T) {
}
}
func TestRedactedMasksHubKey(t *testing.T) {
c := Default()
c.Hub.APIKey = "hub-secret-abcdef"
if got := c.Redacted().Hub.APIKey; got == "hub-secret-abcdef" || got == "" {
t.Fatalf("hub key not masked: %q", got)
}
if !strings.Contains(c.Hub.APIKey, "abcdef") {
t.Error("Redacted mutated the original hub key")
}
}
func TestHubConfigValidate(t *testing.T) {
base := HubConfig{URL: "https://hub.felhom.eu", HostID: "h1", APIKey: "k"}
if err := base.Validate(); err != nil {
t.Fatalf("valid hub config rejected: %v", err)
}
bad := []HubConfig{
{HostID: "h", APIKey: "k"}, // no URL
{URL: "https://x", APIKey: "k"}, // no host
{URL: "https://x", HostID: "h"}, // no key
{URL: "http://hub.felhom.eu", HostID: "h", APIKey: "k"}, // http non-loopback
{URL: "ftp://x", HostID: "h", APIKey: "k"}, // bad scheme
}
for i, h := range bad {
if err := h.Validate(); err == nil {
t.Errorf("case %d: expected validation error for %+v", i, h)
}
}
// http is allowed for loopback (tests).
if err := (HubConfig{URL: "http://127.0.0.1:8443", HostID: "h", APIKey: "k"}).Validate(); err != nil {
t.Errorf("http loopback should be allowed: %v", err)
}
}
func TestHubEnvOverlayAndDefaults(t *testing.T) {
t.Setenv("FELHOM_AGENT_HUB_URL", "https://hub.example")
t.Setenv("FELHOM_AGENT_HUB_HOST_ID", "env-host")
t.Setenv("FELHOM_AGENT_HUB_API_KEY", "env-key")
t.Setenv("FELHOM_AGENT_HUB_POLL_SECONDS", "120")
cfg, err := Load("")
if err != nil {
t.Fatal(err)
}
if cfg.Hub.URL != "https://hub.example" || cfg.Hub.HostID != "env-host" || cfg.Hub.APIKey != "env-key" {
t.Errorf("hub env overlay failed: %+v", cfg.Hub)
}
if cfg.Hub.PollSeconds != 120 {
t.Errorf("poll seconds = %d, want 120", cfg.Hub.PollSeconds)
}
// withDefaults fills zero timeout.
if (HubConfig{}).WithDefaults().TimeoutSeconds != 30 {
t.Error("WithDefaults should set TimeoutSeconds=30")
}
}
func TestLoadFileThenEnvOverride(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "agent.json")
+118
View File
@@ -0,0 +1,118 @@
package hub
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"time"
"gitea.dooplex.hu/admin/felhom-agent/internal/config"
)
const reportPath = "/api/v1/host-report"
// Client posts host-reports to the hub. Auth is a per-host Bearer key. Transport is
// standard TLS (system roots, or a CAFile pool); verification is always on — the hub
// has a real cert (unlike the Proxmox self-signed path), so there is no insecure mode.
type Client struct {
baseURL string
apiKey string
hc *http.Client
logger *slog.Logger
}
// NewClient builds a hub client from config (defaults applied). It never logs the key.
func NewClient(cfg config.HubConfig, logger *slog.Logger) (*Client, error) {
cfg = cfg.WithDefaults()
if logger == nil {
logger = slog.Default()
}
tlsCfg := &tls.Config{} // system roots
if cfg.CAFile != "" {
pem, err := os.ReadFile(cfg.CAFile)
if err != nil {
return nil, fmt.Errorf("hub: reading ca_file: %w", err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(pem) {
return nil, fmt.Errorf("hub: ca_file %q contained no usable certificates", cfg.CAFile)
}
tlsCfg.RootCAs = pool
}
hc := &http.Client{
Timeout: time.Duration(cfg.TimeoutSeconds) * time.Second,
Transport: &http.Transport{TLSClientConfig: tlsCfg},
}
return newClient(cfg.URL, cfg.APIKey, hc, logger), nil
}
// newClient is the shared constructor (tests inject a mock-transport *http.Client).
func newClient(baseURL, apiKey string, hc *http.Client, logger *slog.Logger) *Client {
return &Client{baseURL: strings.TrimRight(baseURL, "/"), apiKey: apiKey, hc: hc, logger: logger}
}
// TransportError is a network/connection failure (no HTTP response). It never
// contains the bearer token.
type TransportError struct{ Err error }
func (e *TransportError) Error() string { return "hub: transport error: " + e.Err.Error() }
func (e *TransportError) Unwrap() error { return e.Err }
// HTTPError is a non-2xx response. BodyTail is a short, token-free excerpt.
type HTTPError struct {
StatusCode int
BodyTail string
}
func (e *HTTPError) Error() string {
return fmt.Sprintf("hub: HTTP %d: %s", e.StatusCode, e.BodyTail)
}
// Report POSTs the host-report and returns the parsed control envelope. The report
// IS the heartbeat (locked decision 1). Errors are typed (transport vs HTTP) and
// never include the bearer token.
func (c *Client) Report(ctx context.Context, r *HostReport) (*ControlEnvelope, error) {
body, err := json.Marshal(r)
if err != nil {
return nil, fmt.Errorf("hub: marshaling report: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+reportPath, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("hub: building request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
resp, err := c.hc.Do(req)
if err != nil {
return nil, &TransportError{Err: err} // token is in the request header, never the error
}
defer resp.Body.Close()
raw, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10))
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, &HTTPError{StatusCode: resp.StatusCode, BodyTail: tail(raw, 256)}
}
var env ControlEnvelope
if err := json.Unmarshal(raw, &env); err != nil {
return nil, fmt.Errorf("hub: decoding control envelope: %w", err)
}
return &env, nil
}
func tail(b []byte, max int) string {
s := strings.TrimSpace(string(b))
if len(s) > max {
return s[:max] + "…"
}
return s
}
+69
View File
@@ -0,0 +1,69 @@
package hub
import (
"context"
"errors"
"net/http"
"strings"
"testing"
)
const testBearer = "super-secret-bearer-key"
func TestClient_SetsBearerAndParsesEnvelope(t *testing.T) {
var gotAuth, gotCT, gotPath, gotMethod string
c := testClient(func(r *http.Request) (*http.Response, error) {
gotAuth = r.Header.Get("Authorization")
gotCT = r.Header.Get("Content-Type")
gotPath = r.URL.Path
gotMethod = r.Method
return httpResp(200, `{"status":"ok","poll_interval_seconds":900}`), nil
})
env, err := c.Report(context.Background(), &HostReport{HostID: "h"})
if err != nil {
t.Fatalf("Report: %v", err)
}
if gotAuth != "Bearer "+testBearer {
t.Errorf("auth header = %q", gotAuth)
}
if gotCT != "application/json" {
t.Errorf("content-type = %q", gotCT)
}
if gotMethod != http.MethodPost || gotPath != reportPath {
t.Errorf("method/path = %s %s", gotMethod, gotPath)
}
if env.PollIntervalSeconds == nil || *env.PollIntervalSeconds != 900 {
t.Errorf("envelope poll = %v", env.PollIntervalSeconds)
}
}
func TestClient_Non2xxTypedErrorRedactsToken(t *testing.T) {
c := testClient(func(r *http.Request) (*http.Response, error) {
return httpResp(503, "service unavailable"), nil
})
_, err := c.Report(context.Background(), &HostReport{HostID: "h"})
var he *HTTPError
if !errors.As(err, &he) {
t.Fatalf("want *HTTPError, got %T: %v", err, err)
}
if he.StatusCode != 503 {
t.Errorf("status = %d", he.StatusCode)
}
if strings.Contains(err.Error(), testBearer) {
t.Fatalf("bearer token leaked into error: %q", err.Error())
}
}
func TestClient_TransportErrorTypedRedactsToken(t *testing.T) {
c := testClient(func(r *http.Request) (*http.Response, error) {
return nil, errors.New("dial tcp: connection refused")
})
_, err := c.Report(context.Background(), &HostReport{HostID: "h"})
var te *TransportError
if !errors.As(err, &te) {
t.Fatalf("want *TransportError, got %T: %v", err, err)
}
if strings.Contains(err.Error(), testBearer) {
t.Fatalf("bearer token leaked into error: %q", err.Error())
}
}
+46
View File
@@ -0,0 +1,46 @@
package hub
import (
"context"
"os/exec"
"strings"
)
// CloudflaredProber reports the cloudflared tunnel service health. It is a
// READ-ONLY probe: the agent does NOT manage or restart cloudflared in this slice
// (that is the tunnel-management slice — this is the seam for it). Injectable so
// tests use a fake and never exec.
type CloudflaredProber interface {
// Status returns one of: "active" | "inactive" | "failed" | "unknown".
Status(ctx context.Context) (string, error)
}
// SystemctlProber runs `systemctl is-active cloudflared`. This is NOT a Privileged
// (root-CLI) op — `is-active` is non-root readable and is not one of the three
// proven root exceptions, so it does not go through internal/proxmox.Privileged.
type SystemctlProber struct {
Unit string // defaults to "cloudflared"
}
// Status maps `systemctl is-active` output to the report vocabulary. systemctl
// exits non-zero for inactive/failed, so the output string is authoritative over
// the exit code; any exec error (binary missing, etc.) maps to "unknown".
func (p SystemctlProber) Status(ctx context.Context) (string, error) {
unit := p.Unit
if unit == "" {
unit = "cloudflared"
}
out, _ := exec.CommandContext(ctx, "systemctl", "is-active", unit).Output()
switch strings.TrimSpace(string(out)) {
case "active":
return "active", nil
case "failed":
return "failed", nil
case "inactive", "deactivating", "activating":
return "inactive", nil
case "":
return "unknown", nil // no output → systemctl/exec problem
default:
return "unknown", nil
}
}
+144
View File
@@ -0,0 +1,144 @@
package hub
import (
"context"
"fmt"
"log/slog"
"time"
"gitea.dooplex.hu/admin/felhom-agent/internal/proxmox"
)
// proxmoxReader is the read-only subset the collector needs. Signatures match the
// REAL internal/proxmox.Client surface (slice 1): the node is held by the Client
// (no per-call node arg), reads return values (not pointers), and the guest type is
// proxmox.Guest. (The task's sketch used node-arg/pointer/LXC shapes; adapted to
// the actual exports per its instruction — no proxmox changes were needed: ListLXC
// already carries status/maxmem/maxdisk, GuestConfig carries cores.)
type proxmoxReader interface {
Node() string
NodeStatus(ctx context.Context) (proxmox.NodeStatus, error)
ListLXC(ctx context.Context) ([]proxmox.Guest, error)
GuestConfig(ctx context.Context, vmid int) (proxmox.GuestConfig, error)
}
// Collector builds a HostReport from read-only sources. All deps are behind narrow
// interfaces for unit testing.
type Collector struct {
px proxmoxReader
cf CloudflaredProber
hostID string
agentVersion string
logger *slog.Logger
now func() time.Time
}
// NewCollector builds a collector. hostID echoes config.Hub.HostID; agentVersion is
// the binary version.
func NewCollector(px proxmoxReader, cf CloudflaredProber, hostID, agentVersion string, logger *slog.Logger) *Collector {
if logger == nil {
logger = slog.Default()
}
return &Collector{
px: px,
cf: cf,
hostID: hostID,
agentVersion: agentVersion,
logger: logger,
now: func() time.Time { return time.Now().UTC() },
}
}
// Collect builds the report. Best-effort liveness: a failed NodeStatus is a hard
// error (no useful report — the cycle skips the POST); a failed per-guest
// GuestConfig degrades that guest to status="unknown" without spec but still sends;
// a cloudflared probe failure yields status="unknown" and is never fatal.
func (c *Collector) Collect(ctx context.Context) (*HostReport, error) {
ns, err := c.px.NodeStatus(ctx)
if err != nil {
return nil, fmt.Errorf("hub: NodeStatus failed (no useful report): %w", err)
}
report := &HostReport{
HostID: c.hostID,
ReportedAt: c.now().Format(time.RFC3339),
AgentVersion: c.agentVersion,
Host: hostMetrics(c.px.Node(), ns),
Guests: c.collectGuests(ctx),
// Defined-but-empty this slice (slices 5/6). Non-nil so they marshal as [].
StorageTargets: []StorageTarget{},
Backups: []Backup{},
RestoreTests: []RestoreTest{},
PBSSnapshots: []PBSSnapshot{},
AuditTail: []AuditEntry{},
Cloudflared: Cloudflared{Status: c.cloudflaredStatus(ctx)},
}
return report, nil
}
func hostMetrics(node string, ns proxmox.NodeStatus) HostMetrics {
h := HostMetrics{
Node: node,
CPUPercent: ns.CPU * 100, // PVE cpu is a 0..1 fraction
MemoryTotalBytes: ns.Memory.Total,
MemoryUsedBytes: ns.Memory.Used,
DiskTotalBytes: ns.RootFS.Total,
DiskUsedBytes: ns.RootFS.Used,
LoadAvg: ns.LoadAvg,
UptimeSeconds: ns.Uptime,
}
h.MemoryPercent = percent(ns.Memory.Used, ns.Memory.Total)
h.DiskPercent = percent(ns.RootFS.Used, ns.RootFS.Total)
if h.LoadAvg == nil {
h.LoadAvg = []string{}
}
return h
}
func (c *Collector) collectGuests(ctx context.Context) []Guest {
lxc, err := c.px.ListLXC(ctx)
if err != nil {
// Not fatal: a report with no guest list still carries host liveness.
c.logger.Warn("hub: ListLXC failed; reporting no guests", "err", err)
return []Guest{}
}
guests := make([]Guest, 0, len(lxc))
for _, g := range lxc {
entry := Guest{VMID: g.VMID, Name: g.Name, Status: g.Status, ControllerVersion: ""}
// GuestConfig supplies cores; memory/disk come from the list entry (bytes).
cfg, err := c.px.GuestConfig(ctx, g.VMID)
if err != nil {
c.logger.Warn("hub: GuestConfig failed; guest degraded to unknown",
"vmid", g.VMID, "err", err)
entry.Status = "unknown"
entry.Spec = nil // omitted
} else {
entry.Spec = &GuestSpec{
Cores: cfg.Cores,
MemoryBytes: g.MaxMem,
DiskBytes: g.MaxDisk,
}
}
guests = append(guests, entry)
}
return guests
}
func (c *Collector) cloudflaredStatus(ctx context.Context) string {
if c.cf == nil {
return "unknown"
}
st, err := c.cf.Status(ctx)
if err != nil || st == "" {
c.logger.Warn("hub: cloudflared probe failed", "err", err)
return "unknown"
}
return st
}
func percent(used, total int64) float64 {
if total <= 0 {
return 0
}
return float64(used) / float64(total) * 100
}
+108
View File
@@ -0,0 +1,108 @@
package hub
import (
"context"
"errors"
"testing"
"gitea.dooplex.hu/admin/felhom-agent/internal/proxmox"
)
func newTestNodeStatus() proxmox.NodeStatus {
var ns proxmox.NodeStatus
ns.CPU = 0.05 // → 5%
ns.Uptime = 86400
ns.LoadAvg = []string{"0.10", "0.20", "0.15"}
ns.Memory.Total = 16000000000
ns.Memory.Used = 4000000000
ns.RootFS.Total = 100000000000
ns.RootFS.Used = 20000000000
return ns
}
func TestCollect_HostAndGuests(t *testing.T) {
px := &fakePx{
node: "demo-felhom",
ns: newTestNodeStatus(),
lxc: []proxmox.Guest{
{VMID: 100, Name: "acme", Status: "running", MaxMem: 2147483648, MaxDisk: 21474836480},
},
cfg: map[int]proxmox.GuestConfig{100: {Cores: 2, Memory: 2048}},
}
c := NewCollector(px, fakeProber{status: "active"}, "demo-host-01", "0.3.0", quietLogger())
r, err := c.Collect(context.Background())
if err != nil {
t.Fatalf("Collect: %v", err)
}
if r.HostID != "demo-host-01" || r.AgentVersion != "0.3.0" {
t.Errorf("top-level wrong: %+v", r)
}
if r.Host.Node != "demo-felhom" || r.Host.CPUPercent != 5 {
t.Errorf("host = %+v", r.Host)
}
if r.Host.MemoryPercent != 25 || r.Host.DiskPercent != 20 {
t.Errorf("percents = mem %v disk %v", r.Host.MemoryPercent, r.Host.DiskPercent)
}
if len(r.Guests) != 1 {
t.Fatalf("guests = %d", len(r.Guests))
}
g := r.Guests[0]
if g.VMID != 100 || g.Status != "running" || g.Spec == nil {
t.Fatalf("guest = %+v", g)
}
if g.Spec.Cores != 2 || g.Spec.MemoryBytes != 2147483648 || g.Spec.DiskBytes != 21474836480 {
t.Errorf("spec = %+v", g.Spec)
}
if r.Cloudflared.Status != "active" {
t.Errorf("cloudflared = %q", r.Cloudflared.Status)
}
}
func TestCollect_GuestConfigFailureDegradesButStillReports(t *testing.T) {
px := &fakePx{
node: "demo-felhom",
ns: newTestNodeStatus(),
lxc: []proxmox.Guest{
{VMID: 100, Name: "ok", Status: "running", MaxMem: 1 << 31, MaxDisk: 1 << 34},
{VMID: 200, Name: "bad", Status: "running"},
},
cfg: map[int]proxmox.GuestConfig{100: {Cores: 2}},
cfgErr: map[int]error{200: errors.New("config read failed")},
}
c := NewCollector(px, fakeProber{status: "active"}, "h", "0.3.0", quietLogger())
r, err := c.Collect(context.Background())
if err != nil {
t.Fatalf("a per-guest failure must NOT fail the whole report: %v", err)
}
if len(r.Guests) != 2 {
t.Fatalf("guests = %d", len(r.Guests))
}
bad := r.Guests[1]
if bad.Status != "unknown" || bad.Spec != nil {
t.Errorf("degraded guest = %+v (want status=unknown, spec=nil)", bad)
}
}
func TestCollect_NodeStatusFailureIsHardError(t *testing.T) {
px := &fakePx{node: "n", nsErr: errors.New("proxmox down")}
c := NewCollector(px, fakeProber{status: "active"}, "h", "0.3.0", quietLogger())
if _, err := c.Collect(context.Background()); err == nil {
t.Fatal("NodeStatus failure must be a hard error (no useful report)")
}
}
func TestCollect_CloudflaredProbeErrorIsUnknown(t *testing.T) {
px := &fakePx{node: "n", ns: newTestNodeStatus()}
c := NewCollector(px, fakeProber{err: errors.New("no systemctl")}, "h", "0.3.0", quietLogger())
r, err := c.Collect(context.Background())
if err != nil {
t.Fatalf("cloudflared failure must not be fatal: %v", err)
}
if r.Cloudflared.Status != "unknown" {
t.Errorf("cloudflared = %q, want unknown", r.Cloudflared.Status)
}
// Empty collections still present as non-nil.
if r.Guests == nil || r.StorageTargets == nil || r.AuditTail == nil {
t.Error("empty collections must be non-nil")
}
}
+107
View File
@@ -0,0 +1,107 @@
package hub
import (
"context"
"log/slog"
"time"
)
// interval clamp bounds (locked decision 3).
const (
MinPollSeconds = 60
MaxPollSeconds = 3600
)
// reporter and collectorIface are the loop's deps as interfaces (tests inject fakes).
type reporter interface {
Report(ctx context.Context, r *HostReport) (*ControlEnvelope, error)
}
type collectorIface interface {
Collect(ctx context.Context) (*HostReport, error)
}
// Loop is the agent's first daemon run loop: collect a host-report, POST it, adopt
// the hub's cadence, repeat. It is resilient — a collect or report error is logged
// and the loop continues (the data plane is independent of the agent; a hub outage
// must not kill it). There are NO Proxmox mutations here (read-only report), so no
// per-guest work queue yet (that lands with reconcile, slice 4).
type Loop struct {
collector collectorIface
client reporter
interval time.Duration
logger *slog.Logger
}
// NewLoop builds the loop. interval is the starting cadence (the hub may override it
// per-cycle via the control envelope).
func NewLoop(collector collectorIface, client reporter, interval time.Duration, logger *slog.Logger) *Loop {
if logger == nil {
logger = slog.Default()
}
return &Loop{collector: collector, client: client, interval: interval, logger: logger}
}
// Run reports immediately, then on each tick, until ctx is cancelled (then nil).
func (l *Loop) Run(ctx context.Context) error {
interval := l.interval
interval = l.cycle(ctx, interval) // immediate first report
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
l.logger.Info("hub: loop shutting down", "reason", ctx.Err())
return nil
case <-ticker.C:
next := l.cycle(ctx, interval)
if next != interval {
l.logger.Info("hub: poll interval changed", "from", interval, "to", next)
interval = next
ticker.Reset(interval)
}
}
}
}
// cycle runs one collect→report→adopt. It never returns an error: failures are
// logged and the current interval is kept, so the loop keeps running.
func (l *Loop) cycle(ctx context.Context, current time.Duration) time.Duration {
report, err := l.collector.Collect(ctx)
if err != nil {
l.logger.Warn("hub: collect failed; skipping this cycle's report", "err", err)
return current
}
env, err := l.client.Report(ctx, report)
if err != nil {
l.logger.Warn("hub: report failed; keeping current interval", "err", err)
return current
}
l.logger.Debug("hub: report sent",
"guests", len(report.Guests),
// reserved/forward-compat envelope fields — logged only, never acted on (slice 4).
"blocked", env.Blocked, "desired_generation", env.DesiredGeneration, "has_signed_ops", env.HasSignedOps)
if env.PollIntervalSeconds == nil {
return current
}
d, clamped := clampInterval(*env.PollIntervalSeconds)
if clamped {
l.logger.Warn("hub: poll_interval_seconds out of range; clamped",
"requested", *env.PollIntervalSeconds, "applied", int(d.Seconds()))
}
return d
}
// clampInterval clamps a requested seconds value to [60,3600]; clamped reports
// whether it was out of range.
func clampInterval(sec int) (time.Duration, bool) {
clamped := false
if sec < MinPollSeconds {
sec, clamped = MinPollSeconds, true
}
if sec > MaxPollSeconds {
sec, clamped = MaxPollSeconds, true
}
return time.Duration(sec) * time.Second, clamped
}
+144
View File
@@ -0,0 +1,144 @@
package hub
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
)
func intPtr(i int) *int { return &i }
type fakeCollector struct {
report *HostReport
err error
n *int32
}
func (c *fakeCollector) Collect(ctx context.Context) (*HostReport, error) {
atomic.AddInt32(c.n, 1)
return c.report, c.err
}
type fakeReporter struct {
env *ControlEnvelope
errSeq []error // per-call error (nil = ok); calls past the slice are ok
n *int32
}
func (r *fakeReporter) Report(ctx context.Context, _ *HostReport) (*ControlEnvelope, error) {
i := int(atomic.AddInt32(r.n, 1) - 1)
if i < len(r.errSeq) && r.errSeq[i] != nil {
return nil, r.errSeq[i]
}
return r.env, nil
}
func TestClampInterval(t *testing.T) {
cases := []struct {
in int
wantSec int
clamped bool
}{
{10, 60, true}, {59, 60, true}, {60, 60, false}, {120, 120, false},
{3600, 3600, false}, {99999, 3600, true},
}
for _, c := range cases {
d, clamped := clampInterval(c.in)
if int(d.Seconds()) != c.wantSec || clamped != c.clamped {
t.Errorf("clampInterval(%d) = %v,%v want %ds,%v", c.in, d, clamped, c.wantSec, c.clamped)
}
}
}
func TestLoop_CycleAdoptsAndClamps(t *testing.T) {
current := 900 * time.Second
mk := func(env *ControlEnvelope, collErr, repErr error) *Loop {
var cn, rn int32
return NewLoop(
&fakeCollector{report: &HostReport{}, err: collErr, n: &cn},
&fakeReporter{env: env, errSeq: []error{repErr}, n: &rn},
current, quietLogger())
}
tests := []struct {
name string
env *ControlEnvelope
want time.Duration
}{
{"adopt in-range", &ControlEnvelope{PollIntervalSeconds: intPtr(120)}, 120 * time.Second},
{"clamp low", &ControlEnvelope{PollIntervalSeconds: intPtr(10)}, 60 * time.Second},
{"clamp high", &ControlEnvelope{PollIntervalSeconds: intPtr(99999)}, 3600 * time.Second},
{"missing keeps current", &ControlEnvelope{PollIntervalSeconds: nil}, current},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := mk(tt.env, nil, nil).cycle(context.Background(), current)
if got != tt.want {
t.Errorf("cycle adopted %v, want %v", got, tt.want)
}
})
}
t.Run("collect error keeps current", func(t *testing.T) {
got := mk(&ControlEnvelope{PollIntervalSeconds: intPtr(120)}, errors.New("x"), nil).cycle(context.Background(), current)
if got != current {
t.Errorf("got %v, want current %v", got, current)
}
})
t.Run("report error keeps current", func(t *testing.T) {
got := mk(nil, nil, errors.New("x")).cycle(context.Background(), current)
if got != current {
t.Errorf("got %v, want current %v", got, current)
}
})
}
func TestLoop_RunImmediateAndResilientAfterError(t *testing.T) {
var cn, rn int32
loop := NewLoop(
&fakeCollector{report: &HostReport{}, n: &cn},
// first report errors; subsequent ok; no interval override (keeps fast tick)
&fakeReporter{env: &ControlEnvelope{}, errSeq: []error{errors.New("hub 5xx")}, n: &rn},
10*time.Millisecond, quietLogger())
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() { done <- loop.Run(ctx) }()
time.Sleep(90 * time.Millisecond)
cancel()
select {
case err := <-done:
if err != nil {
t.Fatalf("Run returned error: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Run did not return after cancel")
}
// Immediate report + several ticks despite the first error → ≥3 collect calls.
if got := atomic.LoadInt32(&cn); got < 3 {
t.Errorf("collect calls = %d, want ≥3 (immediate + continuation after error)", got)
}
}
func TestLoop_RunAdoptsSlowerInterval(t *testing.T) {
var cn, rn int32
loop := NewLoop(
&fakeCollector{report: &HostReport{}, n: &cn},
// every report tells the agent to slow to 60s → after the immediate report,
// the ticker resets to 60s and no further ticks fire within the test window.
&fakeReporter{env: &ControlEnvelope{PollIntervalSeconds: intPtr(60)}, n: &rn},
10*time.Millisecond, quietLogger())
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() { done <- loop.Run(ctx) }()
time.Sleep(120 * time.Millisecond)
cancel()
<-done
if got := atomic.LoadInt32(&cn); got != 1 {
t.Errorf("collect calls = %d, want 1 (immediate report adopted 60s, ticker slowed)", got)
}
}
+62
View File
@@ -0,0 +1,62 @@
package hub
import (
"context"
"io"
"log/slog"
"net/http"
"strings"
"gitea.dooplex.hu/admin/felhom-agent/internal/proxmox"
)
func quietLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) }
// roundTripFunc is a mock http.RoundTripper.
type roundTripFunc func(*http.Request) (*http.Response, error)
func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) }
// testClient builds a hub Client over a mock transport (no network).
func testClient(rt roundTripFunc) *Client {
return newClient("https://hub.example.test", "super-secret-bearer-key", &http.Client{Transport: rt}, quietLogger())
}
func httpResp(code int, body string) *http.Response {
return &http.Response{
StatusCode: code,
Body: io.NopCloser(strings.NewReader(body)),
Header: http.Header{"Content-Type": []string{"application/json"}},
}
}
// fakePx is a fake proxmoxReader.
type fakePx struct {
node string
ns proxmox.NodeStatus
nsErr error
lxc []proxmox.Guest
lxcErr error
cfg map[int]proxmox.GuestConfig
cfgErr map[int]error
}
func (f *fakePx) Node() string { return f.node }
func (f *fakePx) NodeStatus(ctx context.Context) (proxmox.NodeStatus, error) {
return f.ns, f.nsErr
}
func (f *fakePx) ListLXC(ctx context.Context) ([]proxmox.Guest, error) { return f.lxc, f.lxcErr }
func (f *fakePx) GuestConfig(ctx context.Context, vmid int) (proxmox.GuestConfig, error) {
if e := f.cfgErr[vmid]; e != nil {
return proxmox.GuestConfig{}, e
}
return f.cfg[vmid], nil
}
// fakeProber is a fake CloudflaredProber.
type fakeProber struct {
status string
err error
}
func (p fakeProber) Status(ctx context.Context) (string, error) { return p.status, p.err }
+85
View File
@@ -0,0 +1,85 @@
package hub
// HostReport is the wire contract shared with the hub's ingest
// (felhom.eu TASK-slice3-hub-ingest). Field NAMES must match the hub
// field-for-field. Encoding is ordinary encoding/json (no canonicalization —
// nothing signs this report; canonical JSON is a slice-10 signing concern).
//
// The report IS the heartbeat: one periodic POST /api/v1/host-report, whose
// server-side received_at is the hub's dead-man's-switch liveness signal. There is
// no separate heartbeat endpoint.
type HostReport struct {
HostID string `json:"host_id"` // echoes config.Hub.HostID
ReportedAt string `json:"reported_at"` // RFC3339, agent clock
AgentVersion string `json:"agent_version"`
Host HostMetrics `json:"host"`
Guests []Guest `json:"guests"`
// Defined now as the stable contract; emitted EMPTY (non-nil) this slice.
StorageTargets []StorageTarget `json:"storage_targets"` // slice 5 (storage manifest)
Backups []Backup `json:"backups"` // slice 6
RestoreTests []RestoreTest `json:"restore_tests"` // slice 6
PBSSnapshots []PBSSnapshot `json:"pbs_snapshots"` // slice 6
Cloudflared Cloudflared `json:"cloudflared"`
AuditTail []AuditEntry `json:"audit_tail"` // populated by a later slice
}
// HostMetrics is the host block, sourced from proxmox NodeStatus.
type HostMetrics struct {
Node string `json:"node"`
CPUPercent float64 `json:"cpu_percent"` // 0100
MemoryTotalBytes int64 `json:"memory_total_bytes"`
MemoryUsedBytes int64 `json:"memory_used_bytes"`
MemoryPercent float64 `json:"memory_percent"`
DiskTotalBytes int64 `json:"disk_total_bytes"` // host root fs
DiskUsedBytes int64 `json:"disk_used_bytes"`
DiskPercent float64 `json:"disk_percent"`
LoadAvg []string `json:"loadavg"` // array of STRINGS (PVE shape)
UptimeSeconds int64 `json:"uptime_seconds"`
}
// Guest is one LXC. The agent reports vmid; the hub derives the guest PK
// "<host_id>/<vmid>" (keeping the id scheme hub-side — locked decision 4).
type Guest struct {
VMID int `json:"vmid"`
Name string `json:"name"`
Status string `json:"status"` // running | stopped | unknown
ControllerVersion string `json:"controller_version"` // "" this slice (slice 8 fills)
Spec *GuestSpec `json:"spec,omitempty"` // omitted when status unknown
}
// GuestSpec is the provisioned guest sizing.
type GuestSpec struct {
Cores int `json:"cores"`
MemoryBytes int64 `json:"memory_bytes"`
DiskBytes int64 `json:"disk_bytes"`
}
// Cloudflared is the tunnel service health (read-only probe this slice).
type Cloudflared struct {
Status string `json:"status"` // active | inactive | failed | unknown
}
// The following element types are declared now so the empty collections above are
// typed and slices 5/6 only fill them. No wire fields are committed yet.
type StorageTarget struct{} // slice 5: storage manifest target fields TBD
type Backup struct{} // slice 6: per-target backup status fields TBD
type RestoreTest struct{} // slice 6: self-restore-test result fields TBD
type PBSSnapshot struct{} // slice 6: PBS snapshot inventory fields TBD
type AuditEntry struct{} // audit-log tail entry fields TBD
// ControlEnvelope is the hub's 200 response to a host-report. This slice the agent
// adopts ONLY PollIntervalSeconds; the rest are reserved/forward-compat fields it
// logs at most and never acts on (reconcile, slice 4, consumes them).
type ControlEnvelope struct {
Status string `json:"status"`
// PollIntervalSeconds is a pointer so a missing field (keep current interval) is
// distinguishable from an explicit 0.
PollIntervalSeconds *int `json:"poll_interval_seconds"`
Blocked bool `json:"blocked"` // reserved — ignored (slice 4)
DesiredGeneration int64 `json:"desired_generation"` // reserved — ignored (slice 4)
HasSignedOps bool `json:"has_signed_ops"` // reserved — ignored (slice 4)
}
+60
View File
@@ -0,0 +1,60 @@
package hub
import (
"encoding/json"
"strings"
"testing"
)
func TestHostReport_FieldNamesAndEmptyCollections(t *testing.T) {
r := &HostReport{
HostID: "demo-host-01",
ReportedAt: "2026-06-08T12:00:00Z",
AgentVersion: "0.3.0",
Host: HostMetrics{
Node: "demo-felhom", CPUPercent: 3.2,
MemoryTotalBytes: 16777216000, MemoryUsedBytes: 4194304000, MemoryPercent: 25.0,
DiskTotalBytes: 152000000000, DiskUsedBytes: 30000000000, DiskPercent: 19.7,
LoadAvg: []string{"0.10", "0.20", "0.15"}, UptimeSeconds: 86400,
},
Guests: []Guest{{
VMID: 100, Name: "felhom-cust-acme", Status: "running", ControllerVersion: "",
Spec: &GuestSpec{Cores: 2, MemoryBytes: 2147483648, DiskBytes: 21474836480},
}},
StorageTargets: []StorageTarget{},
Backups: []Backup{},
RestoreTests: []RestoreTest{},
PBSSnapshots: []PBSSnapshot{},
AuditTail: []AuditEntry{},
Cloudflared: Cloudflared{Status: "active"},
}
b, err := json.Marshal(r)
if err != nil {
t.Fatal(err)
}
got := string(b)
for _, field := range []string{
`"host_id":"demo-host-01"`, `"reported_at":`, `"agent_version":"0.3.0"`,
`"cpu_percent":3.2`, `"memory_total_bytes":16777216000`, `"loadavg":["0.10","0.20","0.15"]`,
`"disk_percent":19.7`, `"uptime_seconds":86400`,
`"vmid":100`, `"controller_version":""`, `"memory_bytes":2147483648`,
`"cloudflared":{"status":"active"}`,
// empty collections must be [] not null
`"storage_targets":[]`, `"backups":[]`, `"restore_tests":[]`, `"pbs_snapshots":[]`, `"audit_tail":[]`,
} {
if !strings.Contains(got, field) {
t.Errorf("report JSON missing %s\n got: %s", field, got)
}
}
if strings.Contains(got, "null") {
t.Errorf("report JSON must not contain null (empty collections should be []): %s", got)
}
}
func TestGuest_SpecOmittedWhenUnknown(t *testing.T) {
b, _ := json.Marshal(Guest{VMID: 9, Name: "g", Status: "unknown"})
if strings.Contains(string(b), "spec") {
t.Errorf("unknown guest should omit spec, got %s", b)
}
}