From ab77fa3544217f16fc67c9c70b650aaae1d52a13 Mon Sep 17 00:00:00 2001 From: kisfenyo Date: Mon, 8 Jun 2026 16:20:09 +0200 Subject: [PATCH] feat(hub): host-report client + collector + first daemon loop (slice 3, v0.3.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit internal/hub: the agent's first daemon — a periodic read-only host-report POSTed to the hub (the heartbeat; no separate ping). - HostReport wire contract (shared field-for-field with the hub ingest): host metrics, guests (vmid + spec), cloudflared status; storage/backups/restore-tests/ pbs/audit collections DEFINED but emitted empty (slices 5/6 fill). - Collector over a read-only proxmoxReader (adapted to the real proxmox surface; no proxmox changes) + a CloudflaredProber. Partial-failure: NodeStatus fail = hard (skip POST); per-guest GuestConfig fail = status "unknown", still report. - Client: Bearer-auth POST, standard TLS (system roots / optional ca_file), typed TransportError/HTTPError, token never in errors. - Loop: immediate first report, adopt hub poll_interval (clamp [60,3600]), resilient to collect/report errors, clean ctx-cancel shutdown. - ControlEnvelope: only poll_interval_seconds acted on; blocked/desired_generation/ has_signed_ops parsed-but-ignored (slice 4). - config: HubConfig + FELHOM_AGENT_HUB_* overlay + mode-aware HubConfig.Validate + WithDefaults + hub-key redaction; example config updated. - main: no-selftest mode is now the daemon; added --selftest=hub. Version -> 0.3.0. Tests: report serialization, client (incl. token-redaction), collector partial- failure, loop continuation+interval adoption, config. internal/proxmox + internal/ authz untouched. Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 56 +++++++++++++ REPORT.md | 117 +++++++++++++------------- cmd/felhom-agent/main.go | 146 ++++++++++++++++++++++++++------- configs/agent.example.json | 23 ++++++ internal/config/config.go | 103 ++++++++++++++++++++++- internal/config/config_test.go | 55 +++++++++++++ internal/hub/client.go | 118 ++++++++++++++++++++++++++ internal/hub/client_test.go | 69 ++++++++++++++++ internal/hub/cloudflared.go | 46 +++++++++++ internal/hub/collect.go | 144 ++++++++++++++++++++++++++++++++ internal/hub/collect_test.go | 108 ++++++++++++++++++++++++ internal/hub/loop.go | 107 ++++++++++++++++++++++++ internal/hub/loop_test.go | 144 ++++++++++++++++++++++++++++++++ internal/hub/mock_test.go | 62 ++++++++++++++ internal/hub/report.go | 85 +++++++++++++++++++ internal/hub/report_test.go | 60 ++++++++++++++ 16 files changed, 1352 insertions(+), 91 deletions(-) create mode 100644 internal/hub/client.go create mode 100644 internal/hub/client_test.go create mode 100644 internal/hub/cloudflared.go create mode 100644 internal/hub/collect.go create mode 100644 internal/hub/collect_test.go create mode 100644 internal/hub/loop.go create mode 100644 internal/hub/loop_test.go create mode 100644 internal/hub/mock_test.go create mode 100644 internal/hub/report.go create mode 100644 internal/hub/report_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 81173d9..c42f560 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,62 @@ All notable changes to **felhom-agent** are recorded here. Update on every code change that gets pushed. +## v0.3.0 — hub client + host-report + first daemon loop (slice 3) (2026-06-08) + +The agent's first daemon: a periodic read-only host-report POSTed to the hub (the +heartbeat). No Proxmox mutations, no desired-state/signed-op consumption, no +storage/backup collection yet — those are slices 4/5/6. + +### Added +- **`internal/hub`** package: + - **`HostReport`** wire contract (`report.go`) shared field-for-field with the hub + ingest: host metrics, guests (`vmid` + spec), `cloudflared` status, and the + `storage_targets`/`backups`/`restore_tests`/`pbs_snapshots`/`audit_tail` + collections **defined but emitted empty** (typed `[]`, slices 5/6 fill them). + - **`Collector`** (`collect.go`) builds the report from a read-only `proxmoxReader` + (adapted to the real `internal/proxmox` surface — node held by the client, value + returns, `proxmox.Guest`) + a `CloudflaredProber`. Partial-failure policy: a + failed `NodeStatus` is a hard error (skip the POST); a failed per-guest + `GuestConfig` degrades that guest to `status="unknown"` (spec omitted) but still + sends; a cloudflared probe failure → `"unknown"`, never fatal. + - **`CloudflaredProber`** + `SystemctlProber` (`systemctl is-active cloudflared`; + read-only — NOT a Privileged/root op; tunnel management is a later slice). + - **`Client`** (`client.go`): `POST /api/v1/host-report` with + `Authorization: Bearer `, standard TLS (system roots or optional `ca_file`; + verification always on). Typed `*TransportError` / `*HTTPError`; the bearer token + never appears in any error. + - **`Loop`** (`loop.go`): the daemon — immediate first report then tick; adopts the + hub's `poll_interval_seconds` clamped to [60,3600]; resilient (a collect/report + error is logged and the loop continues); clean shutdown on context cancel. + - **`ControlEnvelope`**: only `poll_interval_seconds` is acted on; `blocked` / + `desired_generation` / `has_signed_ops` are parsed-but-ignored (logged at most) + pending reconcile (slice 4). +- **Config**: `HubConfig` (url/host_id/api_key/poll_seconds/timeout_seconds/ca_file), + `FELHOM_AGENT_HUB_*` env overlay, `HubConfig.Validate()` (mode-aware — proxmox-only + `--selftest=read|task` still runs without hub config), `WithDefaults()`, and + `Redacted()` now also blanks the hub key. `configs/agent.example.json` gains `hub` + (and `authz`) blocks. +- **`cmd/felhom-agent`**: the no-`--selftest` mode is now the **daemon** (poll loop); + added **`--selftest=hub`** (one collect+report, prints the report + envelope). + Version 0.2.0 → 0.3.0. + +### Tests +- Report serialization (field names; empty collections are `[]` not `null`; spec + omitted when unknown); client (Bearer header, non-2xx→`*HTTPError`, + transport→`*TransportError`, **token never in error**); collector (host mapping, + guest spec, per-guest failure degrades-but-still-reports, NodeStatus hard error, + cloudflared error→unknown); loop (immediate first report, continuation after an + injected error, interval adoption + clamp); config (hub validate/redact/env). + +### Notes +- `internal/proxmox` and `internal/authz` were **not touched** — no new proxmox + surface was needed (`ListLXC` already exposes status/maxmem/maxdisk; `GuestConfig` + exposes cores). The task's `proxmoxReader` sketch (node-arg/pointer/`LXC`) was + adapted to the real exports as instructed. +- **Defined-but-empty** this slice: `storage_targets`, `backups`, `restore_tests`, + `pbs_snapshots`, `audit_tail` (slices 5/6). **Parsed-but-ignored**: the envelope's + `blocked`/`desired_generation`/`has_signed_ops` (slice 4). + ## v0.2.0 — `authz` signed-op verifier (slice 2) (2026-06-08) Production form of the Phase-4 signing primitive: a key-type-agnostic SSHSIG diff --git a/REPORT.md b/REPORT.md index 9d57921..e8ee82e 100644 --- a/REPORT.md +++ b/REPORT.md @@ -3,74 +3,71 @@ > This file holds the report for the **most recent** change, fully overwritten each task. > Cumulative history lives in [CHANGELOG.md](CHANGELOG.md). -## Task: `authz` signed-op verifier (slice 2) — v0.2.0 +## Task: hub client + host-report + first daemon loop (slice 3) — v0.3.0 -Turned the Phase-4 reference `VerifySignedOp` into a production package -(`internal/authz`): a key-type-agnostic SSHSIG verifier for operator-signed destructive -ops, the full anti-replay/authorization pipeline, and a durable, crash-safe nonce store. -This is what slice 4 (reconcile) calls to gate destructive desired-state deltas. Pushed to -`main`. Build/vet/test green locally (Go 1.26) and on the build server. +The agent's **first daemon**: a periodic, read-only host-report POSTed to the hub — which +**is** the heartbeat (its server-side `received_at` is the dead-man's-switch signal). New +`internal/hub` package + config additions + `main.go` daemon wiring. Pushed to `main`; +build/vet/test green locally (go1.26) and on the build server. -### Public surface (`internal/authz`) -- **`Verifier`** — `New(signers []AllowedSigner, store NonceStore, hostID string) *Verifier`; - `Verify(blob, sigArmored []byte) (*VerifiedOp, error)`. Optional `ClockSkew` (default 2m, - not-yet-valid only) and `Logger` (advisory key_id-mismatch warning). -- **`OpBlob`** — canonical signed object; `Target{HostID,GuestID}` with corrected - `host_id`/`guest_id` json tags; `Params json.RawMessage`, `Nonce`, `IssuedAt`, `ExpiresAt`, `KeyID`. -- **`VerifiedOp`** — `Op, HostID, GuestID, Params, Nonce, IssuedAt, ExpiresAt, KeyID (advisory), - Signer (matched), KeyIDMatchesSigner`. -- **`AllowedSigner`** + `NewAllowedSigner(keyID, role, authorizedKeyLine)`; roles - `RoleOperational` / `RoleRecovery` (doc 04 two-key model; role-scoping enforced by the caller). -- **`NonceStore`** interface + `MemoryNonceStore` (tests) and **`FileNonceStore`** (durable). -- **Typed errors**: `ErrMalformed, ErrNamespace, ErrUnknownSigner, ErrBadSignature, ErrTarget, - ErrExpired, ErrNotYetValid, ErrReplay` (errors.Is-friendly). -- **Config**: `config.AuthzConfig` (nonce-store path + pinned `Signers`). +### `internal/hub` public surface +- **`HostReport`** + sub-types (`HostMetrics`, `Guest`, `GuestSpec`, `Cloudflared`, + `ControlEnvelope`) — the JSON wire contract shared field-for-field with the hub ingest. +- **`Collector`** — `NewCollector(px proxmoxReader, cf CloudflaredProber, hostID, agentVersion, logger)`; + `Collect(ctx) (*HostReport, error)`. +- **`CloudflaredProber`** interface + **`SystemctlProber`** (`systemctl is-active`). +- **`Client`** — `NewClient(cfg config.HubConfig, logger) (*Client, error)`; + `Report(ctx, *HostReport) (*ControlEnvelope, error)`; typed `*TransportError` / `*HTTPError`. +- **`Loop`** — `NewLoop(collector, client, interval, logger)`; `Run(ctx) error`. Constants + `MinPollSeconds=60` / `MaxPollSeconds=3600`. -### Locked pipeline (order load-bearing) -`parse armor → namespace (fixed felhom-op-v1) → parse pubkey → allow-list by key MATERIAL (not -key_id) → crypto verify over RAW received bytes → parse blob → target (host strict, guest -surfaced) → time window → nonce recorded LAST`. Each post-crypto stage rejects even with a -valid signature; an invalid signature can never consume a nonce. +### Config additions (`internal/config`) +- `HubConfig{URL, HostID, APIKey, PollSeconds, TimeoutSeconds, CAFile}` on `Config.Hub`. +- `FELHOM_AGENT_HUB_{URL,HOST_ID,API_KEY,POLL_SECONDS,TIMEOUT_SECONDS,CA_FILE}` overlay + (int parse errors warn to stderr + keep file value, never crash). +- `HubConfig.Validate()` (mode-aware — proxmox-only selftests unaffected; https required + except loopback for tests), `HubConfig.WithDefaults()` (900s/30s), `Redacted()` blanks the key. +- `configs/agent.example.json` gains `hub` (and `authz`) blocks. -### Durable nonce store — mechanism & guarantee -fsync'd append-only JSONL log + in-memory index (replayed on open) + periodic compaction. -- **Crash-safe**: a nonce is written and `fsync`'d before `SeenOrRecord` returns `false`, so the - caller acts only *after* the durable record. A crash between verify and execute drops the op - (fail-safe) and never enables a replay. I/O failure → returns seen=true (op not executed). -- **Survives restart**: the log is replayed into the index on `OpenFileNonceStore`. -- **Pruning**: expired nonces dropped only at compaction (never before exp) — and an expired op - is rejected by the time check before the nonce check, so pruning is housekeeping, not a hole. -- **Concurrency-safe**: single mutex over file handle + index. +### Daemon-loop behaviour (`main.go`) +- No `--selftest` flag → **daemon**: validate proxmox + hub config → build read-path proxmox + client, collector, hub client, loop → `signal.NotifyContext(SIGINT, SIGTERM)` → `loop.Run`. +- **Immediate first report**, then tick at the interval; adopt the hub's + `poll_interval_seconds` (clamped [60,3600], reset the ticker on change). +- **Resilient**: any collect/report error is logged and the loop continues (survives hub 5xx + and transient proxmox read errors). Clean `nil` return on context cancel. +- **`--selftest=hub`**: one collect + report; prints the report it would send + the envelope. +- Startup line logs host_id/url/interval with the **key redacted**; no secret ever logged. -### OPEN choices -- **Clock skew**: 2-minute tolerance on *not-yet-valid* only; expiry not extended (window stays an - honest bound). -- **Durable mechanism**: fsync'd append log + compaction (simple, honest, no embedded-KV dep). -- **Fixtures**: committed real `ssh-keygen -Y sign` vector (hermetic + proves OpenSSH interop) + - in-Go minting for rejection cases; the sk case is synthetic (spec-faithful, no hardware). -- **Package name**: `authz` (control-plane-authorization layer, matches doc 04). +### Explicitly deferred (defined now, not active) +- **Defined-but-EMPTY** this slice (slices 5/6 fill): `storage_targets`, `backups`, + `restore_tests`, `pbs_snapshots`, `audit_tail` — emitted as typed empty `[]`. +- **Parsed-but-IGNORED** (slice 4 / reconcile consumes): the envelope's `blocked`, + `desired_generation`, `has_signed_ops` — logged at most, never acted on. +- No per-guest work queue (zero Proxmox mutations this slice); no canonical JSON (nothing + signs the report); no controller_version (slice 8) — emitted `""`. -### Test matrix (all pass — 14 tests) -Real ssh-keygen fixture · happy path · per-stage rejection {namespace, unknown-signer, tampered, -retargeted-host, expired, not-yet-valid, replay} · **invalid-sig-does-NOT-burn-nonce** (then the -valid op with that nonce still succeeds) · replay-rejected-across-restart (durable store) · -key-type-agnostic synthetic **sk-ssh-ed25519** · byte-exactness (re-serialized blob fails crypto). +### proxmox surface +**No changes to `internal/proxmox` or `internal/authz`.** No new proxmox surface was needed: +`ListLXC` already returns status/maxmem/maxdisk and `GuestConfig` returns cores. The task's +`proxmoxReader` sketch (node-arg / pointer returns / `LXC` type) was **adapted to the real +exports** — `Node()` on the client, value returns, `proxmox.Guest` — per its instruction. -### Corrections to the Phase-4 §7 reference (for production) -- `Target` needed `host_id`/`guest_id` json tags — fixed. -- **The doc's "Go 1.24.4 / x/crypto v0.52.0" does not hold**: x/crypto v0.52.0 declares - `go 1.25.0` and won't build on Go 1.24. Resolved by upgrading the build server to **go1.26.0** - (backward-compatible — felhom-controller/hub build unchanged; distro Go package left intact, - upstream Go fronted on PATH). -- Free function → constructed `Verifier`; returns full `VerifiedOp`; typed errors; clock-skew; - durable nonce store (the net-new engineering). -- **Shared-contract flag (not built)**: the hub and `felhom-sign` CLI must produce byte-identical - canonical JSON or signatures won't verify; a shared canonicalizer both import is the right home. +### Test matrix (all green) +- **report**: field names match §4; empty collections serialize as `[]` not `null`; spec + omitted when unknown. +- **client**: sets `Bearer`; non-2xx → `*HTTPError` (status preserved); transport → `*TransportError`; + **asserts the bearer token never appears in any error string**. +- **collector**: `NodeStatus`→host block; `ListLXC`+`GuestConfig`→guest spec; a failing + `GuestConfig` → `status="unknown"` + omitted spec + **still returns a report**; a failing + `NodeStatus` → hard error; cloudflared probe error → `"unknown"`. +- **loop**: immediate first report; continues after an injected report error (≥3 cycles); + adopts + clamps the envelope interval (cycle-level) and applies a slower interval in `Run`. +- **config**: hub validate cases, key redaction, env overlay + defaults. ### Verification -- `go build/vet/test` green locally (go1.26.0) and on the build server (upgraded to go1.26.0). -- Real OpenSSH `ssh-keygen` (OpenSSH 10.0p2) minted the committed fixture and self-verified it - before commit. +- `go build/vet/test` green locally (go1.26.0) and on the build server (go1.26.0). No live hub + or `systemctl` in unit tests (mock transport + fake prober/collector/reporter). ### Repo state -- Branch: `main` only. Dep: `golang.org/x/crypto v0.52.0` (+ `x/sys` indirect); `go 1.25.0`. +- Branch: `main` only. Version 0.3.0. Dep unchanged (`golang.org/x/crypto v0.52.0`). diff --git a/cmd/felhom-agent/main.go b/cmd/felhom-agent/main.go index 189e361..b5b20e4 100644 --- a/cmd/felhom-agent/main.go +++ b/cmd/felhom-agent/main.go @@ -1,13 +1,14 @@ -// Command felhom-agent is the host agent (slice 1: scaffold + proxmox layer). +// Command felhom-agent is the host agent. // -// This slice is wiring only: it has no daemon/reconcile loop yet (slice 3/4). It -// exposes a read-only --selftest that exercises the proxmox package against a live -// host, and an explicitly-gated --selftest=task that exercises WaitTask on a -// reversible op (snapshot -> rollback -> delete-snapshot). +// With no --selftest flag it runs as the daemon: the host-report poll loop +// (slice 3) that periodically POSTs a read-only host-report to the hub (the +// heartbeat). --selftest=read|task exercise the proxmox layer; --selftest=hub does +// one collect+report against the hub and prints what it would send. package main import ( "context" + "encoding/json" "errors" "flag" "fmt" @@ -18,13 +19,14 @@ import ( "time" "gitea.dooplex.hu/admin/felhom-agent/internal/config" + "gitea.dooplex.hu/admin/felhom-agent/internal/hub" applog "gitea.dooplex.hu/admin/felhom-agent/internal/log" "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" ) // version is the agent version. Overridable at build time with // -ldflags "-X main.version="; defaults to the in-repo CHANGELOG version. -var version = "0.2.0" +var version = "0.3.0" func main() { var ( @@ -34,7 +36,7 @@ func main() { showVersion bool ) flag.StringVar(&cfgPath, "config", envOr("FELHOM_AGENT_CONFIG", "/etc/felhom-agent/agent.json"), "path to the agent config file (JSON)") - flag.Var(&selftest, "selftest", "run a self-test and exit: bare/`read` = read-only queries; `task` = reversible mutating exercise (needs -vmid)") + flag.Var(&selftest, "selftest", "run a self-test and exit: bare/`read` = read-only queries; `task` = reversible mutating exercise (needs -vmid); `hub` = one collect+report to the hub") flag.IntVar(&vmid, "vmid", 0, "guest VMID for --selftest=task (the reversible snapshot/rollback exercise)") flag.BoolVar(&showVersion, "version", false, "print version and exit") flag.Parse() @@ -58,19 +60,116 @@ func main() { switch selftest.mode { case "": - // No daemon loop yet. - logger.Info("felhom-agent scaffold; no run loop yet", - "version", version, - "hint", "use --selftest (read-only) or --selftest=task --vmid N") - // TODO: poll loop — slice 3/4. - return + os.Exit(runDaemon(cfg, logger)) case "read": os.Exit(runSelftestRead(context.Background(), cfg, logger)) case "task": os.Exit(runSelftestTask(context.Background(), cfg, logger, vmid)) + case "hub": + os.Exit(runSelftestHub(context.Background(), cfg, logger)) } } +// newProxmoxClient builds the read-path proxmox client from config. +func newProxmoxClient(cfg config.Config) (*proxmox.Client, error) { + return proxmox.NewClient(proxmox.Config{ + Endpoint: cfg.Proxmox.Endpoint, + Node: cfg.Proxmox.Node, + Token: cfg.Proxmox.Token, + TLS: proxmox.TLSConfig{ + CAFile: cfg.Proxmox.TLS.CAFile, + Fingerprint: cfg.Proxmox.TLS.Fingerprint, + InsecureSkipVerify: cfg.Proxmox.TLS.InsecureSkipVerify, + }, + }) +} + +// runDaemon is the default mode: collect a host-report and POST it to the hub on a +// loop. Requires both proxmox (to collect) and hub config. +func runDaemon(cfg config.Config, logger *slog.Logger) int { + if err := cfg.Validate(); err != nil { + fmt.Fprintln(os.Stderr, "daemon: proxmox not configured:", err) + return 2 + } + if err := cfg.Hub.Validate(); err != nil { + fmt.Fprintln(os.Stderr, "daemon: hub not configured:", err) + return 2 + } + px, err := newProxmoxClient(cfg) + if err != nil { + fmt.Fprintln(os.Stderr, "daemon: proxmox client:", err) + return 1 + } + client, err := hub.NewClient(cfg.Hub, logger) + if err != nil { + fmt.Fprintln(os.Stderr, "daemon: hub client:", err) + return 1 + } + hcfg := cfg.Hub.WithDefaults() + collector := hub.NewCollector(px, hub.SystemctlProber{}, cfg.Hub.HostID, version, logger) + loop := hub.NewLoop(collector, client, time.Duration(hcfg.PollSeconds)*time.Second, logger) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + logger.Info("felhom-agent daemon starting", + "version", version, "host_id", cfg.Hub.HostID, "hub_url", cfg.Hub.URL, + "interval_s", hcfg.PollSeconds) // hub key intentionally not logged + if err := loop.Run(ctx); err != nil { + logger.Error("daemon: loop exited with error", "err", err) + return 1 + } + return 0 +} + +// runSelftestHub validates hub config, does ONE collect + report, and prints the +// report it would send plus the envelope it got back. +func runSelftestHub(ctx context.Context, cfg config.Config, logger *slog.Logger) int { + if err := cfg.Validate(); err != nil { + fmt.Fprintln(os.Stderr, "selftest: proxmox not configured:", err) + return 1 + } + if err := cfg.Hub.Validate(); err != nil { + fmt.Fprintln(os.Stderr, "selftest: hub not configured:", err) + return 1 + } + px, err := newProxmoxClient(cfg) + if err != nil { + fmt.Fprintln(os.Stderr, "selftest: proxmox client:", err) + return 1 + } + client, err := hub.NewClient(cfg.Hub, logger) + if err != nil { + fmt.Fprintln(os.Stderr, "selftest: hub client:", err) + return 1 + } + collector := hub.NewCollector(px, hub.SystemctlProber{}, cfg.Hub.HostID, version, logger) + + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + fmt.Printf("=== felhom-agent %s selftest=hub (host_id=%s url=%s) ===\n", version, cfg.Hub.HostID, cfg.Hub.URL) + report, err := collector.Collect(ctx) + if err != nil { + fmt.Fprintln(os.Stderr, " [FAIL] collect:", err) + return 1 + } + if b, e := json.MarshalIndent(report, " ", " "); e == nil { + fmt.Println(" --- report it would send ---") + fmt.Println(" " + string(b)) + } + env, err := client.Report(ctx, report) + if err != nil { + fmt.Fprintln(os.Stderr, " [FAIL] report:", err) + return 1 + } + if b, e := json.MarshalIndent(env, " ", " "); e == nil { + fmt.Println(" --- control envelope received ---") + fmt.Println(" " + string(b)) + } + fmt.Println("=== selftest=hub OK ===") + return 0 +} + // runSelftestRead loads config, builds the API client, and runs the read-only // queries against the live host, printing a short health report. It mutates // nothing. Missing/invalid config is reported cleanly (no panic). @@ -81,16 +180,7 @@ func runSelftestRead(ctx context.Context, cfg config.Config, logger *slog.Logger } logger.Info("selftest (read-only) starting", "config", fmt.Sprintf("%+v", cfg.Redacted().Proxmox)) - client, err := proxmox.NewClient(proxmox.Config{ - Endpoint: cfg.Proxmox.Endpoint, - Node: cfg.Proxmox.Node, - Token: cfg.Proxmox.Token, - TLS: proxmox.TLSConfig{ - CAFile: cfg.Proxmox.TLS.CAFile, - Fingerprint: cfg.Proxmox.TLS.Fingerprint, - InsecureSkipVerify: cfg.Proxmox.TLS.InsecureSkipVerify, - }, - }) + client, err := newProxmoxClient(cfg) if err != nil { fmt.Fprintln(os.Stderr, "selftest: client init:", err) return 1 @@ -163,13 +253,7 @@ func runSelftestTask(ctx context.Context, cfg config.Config, logger *slog.Logger fmt.Fprintln(os.Stderr, "selftest=task requires -vmid N (a guest safe to snapshot/rollback)") return 2 } - client, err := proxmox.NewClient(proxmox.Config{ - Endpoint: cfg.Proxmox.Endpoint, Node: cfg.Proxmox.Node, Token: cfg.Proxmox.Token, - TLS: proxmox.TLSConfig{ - CAFile: cfg.Proxmox.TLS.CAFile, Fingerprint: cfg.Proxmox.TLS.Fingerprint, - InsecureSkipVerify: cfg.Proxmox.TLS.InsecureSkipVerify, - }, - }) + client, err := newProxmoxClient(cfg) if err != nil { fmt.Fprintln(os.Stderr, "selftest: client init:", err) return 1 @@ -239,6 +323,8 @@ func (f *selftestFlag) Set(v string) error { f.mode = "read" case "task": f.mode = "task" + case "hub": + f.mode = "hub" default: return fmt.Errorf("invalid --selftest value %q (want read|task)", v) } diff --git a/configs/agent.example.json b/configs/agent.example.json index b43a896..3e4d145 100644 --- a/configs/agent.example.json +++ b/configs/agent.example.json @@ -13,5 +13,28 @@ "mode": "sudo", "sudo_path": "sudo" }, + "authz": { + "nonce_store_path": "/var/lib/felhom-agent/nonces.log", + "signers": [ + { + "key_id": "felhom-op-1", + "role": "operational", + "public_key": "ssh-ed25519 AAAA... felhom-op-1" + }, + { + "key_id": "felhom-recovery-1", + "role": "recovery", + "public_key": "ssh-ed25519 AAAA... felhom-recovery-1" + } + ] + }, + "hub": { + "url": "https://hub.felhom.eu", + "host_id": "demo-host-01", + "api_key": "REPLACE_WITH_PER_HOST_HUB_KEY", + "poll_seconds": 900, + "timeout_seconds": 30, + "ca_file": "" + }, "log_level": "info" } diff --git a/internal/config/config.go b/internal/config/config.go index 54a3309..d7e2d80 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,8 @@ package config import ( "encoding/json" "fmt" + "net" + "net/url" "os" "strconv" "strings" @@ -22,9 +24,22 @@ type Config struct { Proxmox ProxmoxConfig `json:"proxmox"` Privileged PrivilegedConfig `json:"privileged"` Authz AuthzConfig `json:"authz"` + Hub HubConfig `json:"hub"` LogLevel string `json:"log_level"` // debug|info|warn|error (default info) } +// HubConfig configures the outbound hub client + daemon poll loop (internal/hub). +// The hub serves a real cert (hub.felhom.eu, cert-manager) — this is standard TLS +// (system roots), NOT the Proxmox fingerprint-pinning path. +type HubConfig struct { + URL string `json:"url"` // e.g. "https://hub.felhom.eu" + HostID string `json:"host_id"` // the hub's PK for this host + APIKey string `json:"api_key"` // per-host hub key; SECRET — redacted + PollSeconds int `json:"poll_seconds"` // default 900; hub may override per-cycle + TimeoutSeconds int `json:"timeout_seconds"` // per-request HTTP timeout; default 30 + CAFile string `json:"ca_file"` // optional; "" = system roots +} + // AuthzConfig configures operator-signed-op verification (internal/authz). The // pinned operator public keys are kept here as raw authorized_keys-style lines // (this package stays dependency-free); the authz package parses them into its @@ -87,6 +102,7 @@ func Default() Config { Proxmox: ProxmoxConfig{Endpoint: "https://127.0.0.1:8006"}, Privileged: PrivilegedConfig{Mode: "sudo"}, Authz: AuthzConfig{NonceStorePath: "/var/lib/felhom-agent/nonces.log"}, + Hub: HubConfig{PollSeconds: 900, TimeoutSeconds: 30}, LogLevel: "info", } } @@ -134,6 +150,36 @@ func applyEnv(cfg *Config) { if v := os.Getenv("FELHOM_AGENT_LOG_LEVEL"); v != "" { cfg.LogLevel = v } + // hub + if v := os.Getenv("FELHOM_AGENT_HUB_URL"); v != "" { + cfg.Hub.URL = v + } + if v := os.Getenv("FELHOM_AGENT_HUB_HOST_ID"); v != "" { + cfg.Hub.HostID = v + } + if v := os.Getenv("FELHOM_AGENT_HUB_API_KEY"); v != "" { + cfg.Hub.APIKey = v + } + if v := os.Getenv("FELHOM_AGENT_HUB_CA_FILE"); v != "" { + cfg.Hub.CAFile = v + } + cfg.Hub.PollSeconds = envInt("FELHOM_AGENT_HUB_POLL_SECONDS", cfg.Hub.PollSeconds) + cfg.Hub.TimeoutSeconds = envInt("FELHOM_AGENT_HUB_TIMEOUT_SECONDS", cfg.Hub.TimeoutSeconds) +} + +// envInt overlays an int env var, keeping cur (with a stderr warning) on parse +// error rather than crashing. (Load runs before the slog logger exists.) +func envInt(key string, cur int) int { + v := os.Getenv(key) + if v == "" { + return cur + } + n, err := strconv.Atoi(v) + if err != nil { + fmt.Fprintf(os.Stderr, "config: %s=%q is not an integer, keeping %d\n", key, v, cur) + return cur + } + return n } // Validate checks the config is usable for talking to the API. @@ -153,14 +199,69 @@ func (c Config) Validate() error { return nil } -// Redacted returns a copy safe to log: the token secret is masked. +// Redacted returns a copy safe to log: the proxmox token and hub key are masked. func (c Config) Redacted() Config { if c.Proxmox.Token != "" { c.Proxmox.Token = redactToken(c.Proxmox.Token) } + if c.Hub.APIKey != "" { + c.Hub.APIKey = "********" + } return c } +// WithDefaults fills zero-valued hub timing fields. Applied at client/loop +// construction so programmatic configs (not from Default()) still get sane values. +func (h HubConfig) WithDefaults() HubConfig { + if h.PollSeconds == 0 { + h.PollSeconds = 900 + } + if h.TimeoutSeconds == 0 { + h.TimeoutSeconds = 30 + } + return h +} + +// Validate checks the hub config is usable for the daemon / --selftest=hub. It is +// separate from Config.Validate (proxmox-only) so --selftest=read|task still runs +// without hub config. +func (h HubConfig) Validate() error { + if h.URL == "" { + return fmt.Errorf("config: hub.url is required (set hub.url or FELHOM_AGENT_HUB_URL)") + } + if h.HostID == "" { + return fmt.Errorf("config: hub.host_id is required") + } + if h.APIKey == "" { + return fmt.Errorf("config: hub.api_key is required (set hub.api_key or FELHOM_AGENT_HUB_API_KEY)") + } + u, err := url.Parse(h.URL) + if err != nil { + return fmt.Errorf("config: hub.url is not a valid URL: %w", err) + } + switch u.Scheme { + case "https": + // always fine + case "http": + if !isLoopbackHost(u.Hostname()) { + return fmt.Errorf("config: hub.url must be https:// (http:// only allowed for loopback in tests)") + } + default: + return fmt.Errorf("config: hub.url must be https:// (got scheme %q)", u.Scheme) + } + return nil +} + +func isLoopbackHost(host string) bool { + if host == "localhost" { + return true + } + if ip := net.ParseIP(host); ip != nil { + return ip.IsLoopback() + } + return false +} + // redactToken keeps the public "USER@REALM!TOKENID=" prefix and masks the secret. func redactToken(tok string) string { if i := strings.LastIndex(tok, "="); i >= 0 { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 7da1a5e..613b741 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -36,6 +36,61 @@ func TestValidate(t *testing.T) { } } +func TestRedactedMasksHubKey(t *testing.T) { + c := Default() + c.Hub.APIKey = "hub-secret-abcdef" + if got := c.Redacted().Hub.APIKey; got == "hub-secret-abcdef" || got == "" { + t.Fatalf("hub key not masked: %q", got) + } + if !strings.Contains(c.Hub.APIKey, "abcdef") { + t.Error("Redacted mutated the original hub key") + } +} + +func TestHubConfigValidate(t *testing.T) { + base := HubConfig{URL: "https://hub.felhom.eu", HostID: "h1", APIKey: "k"} + if err := base.Validate(); err != nil { + t.Fatalf("valid hub config rejected: %v", err) + } + bad := []HubConfig{ + {HostID: "h", APIKey: "k"}, // no URL + {URL: "https://x", APIKey: "k"}, // no host + {URL: "https://x", HostID: "h"}, // no key + {URL: "http://hub.felhom.eu", HostID: "h", APIKey: "k"}, // http non-loopback + {URL: "ftp://x", HostID: "h", APIKey: "k"}, // bad scheme + } + for i, h := range bad { + if err := h.Validate(); err == nil { + t.Errorf("case %d: expected validation error for %+v", i, h) + } + } + // http is allowed for loopback (tests). + if err := (HubConfig{URL: "http://127.0.0.1:8443", HostID: "h", APIKey: "k"}).Validate(); err != nil { + t.Errorf("http loopback should be allowed: %v", err) + } +} + +func TestHubEnvOverlayAndDefaults(t *testing.T) { + t.Setenv("FELHOM_AGENT_HUB_URL", "https://hub.example") + t.Setenv("FELHOM_AGENT_HUB_HOST_ID", "env-host") + t.Setenv("FELHOM_AGENT_HUB_API_KEY", "env-key") + t.Setenv("FELHOM_AGENT_HUB_POLL_SECONDS", "120") + cfg, err := Load("") + if err != nil { + t.Fatal(err) + } + if cfg.Hub.URL != "https://hub.example" || cfg.Hub.HostID != "env-host" || cfg.Hub.APIKey != "env-key" { + t.Errorf("hub env overlay failed: %+v", cfg.Hub) + } + if cfg.Hub.PollSeconds != 120 { + t.Errorf("poll seconds = %d, want 120", cfg.Hub.PollSeconds) + } + // withDefaults fills zero timeout. + if (HubConfig{}).WithDefaults().TimeoutSeconds != 30 { + t.Error("WithDefaults should set TimeoutSeconds=30") + } +} + func TestLoadFileThenEnvOverride(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "agent.json") diff --git a/internal/hub/client.go b/internal/hub/client.go new file mode 100644 index 0000000..9708ff0 --- /dev/null +++ b/internal/hub/client.go @@ -0,0 +1,118 @@ +package hub + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "strings" + "time" + + "gitea.dooplex.hu/admin/felhom-agent/internal/config" +) + +const reportPath = "/api/v1/host-report" + +// Client posts host-reports to the hub. Auth is a per-host Bearer key. Transport is +// standard TLS (system roots, or a CAFile pool); verification is always on — the hub +// has a real cert (unlike the Proxmox self-signed path), so there is no insecure mode. +type Client struct { + baseURL string + apiKey string + hc *http.Client + logger *slog.Logger +} + +// NewClient builds a hub client from config (defaults applied). It never logs the key. +func NewClient(cfg config.HubConfig, logger *slog.Logger) (*Client, error) { + cfg = cfg.WithDefaults() + if logger == nil { + logger = slog.Default() + } + tlsCfg := &tls.Config{} // system roots + if cfg.CAFile != "" { + pem, err := os.ReadFile(cfg.CAFile) + if err != nil { + return nil, fmt.Errorf("hub: reading ca_file: %w", err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(pem) { + return nil, fmt.Errorf("hub: ca_file %q contained no usable certificates", cfg.CAFile) + } + tlsCfg.RootCAs = pool + } + hc := &http.Client{ + Timeout: time.Duration(cfg.TimeoutSeconds) * time.Second, + Transport: &http.Transport{TLSClientConfig: tlsCfg}, + } + return newClient(cfg.URL, cfg.APIKey, hc, logger), nil +} + +// newClient is the shared constructor (tests inject a mock-transport *http.Client). +func newClient(baseURL, apiKey string, hc *http.Client, logger *slog.Logger) *Client { + return &Client{baseURL: strings.TrimRight(baseURL, "/"), apiKey: apiKey, hc: hc, logger: logger} +} + +// TransportError is a network/connection failure (no HTTP response). It never +// contains the bearer token. +type TransportError struct{ Err error } + +func (e *TransportError) Error() string { return "hub: transport error: " + e.Err.Error() } +func (e *TransportError) Unwrap() error { return e.Err } + +// HTTPError is a non-2xx response. BodyTail is a short, token-free excerpt. +type HTTPError struct { + StatusCode int + BodyTail string +} + +func (e *HTTPError) Error() string { + return fmt.Sprintf("hub: HTTP %d: %s", e.StatusCode, e.BodyTail) +} + +// Report POSTs the host-report and returns the parsed control envelope. The report +// IS the heartbeat (locked decision 1). Errors are typed (transport vs HTTP) and +// never include the bearer token. +func (c *Client) Report(ctx context.Context, r *HostReport) (*ControlEnvelope, error) { + body, err := json.Marshal(r) + if err != nil { + return nil, fmt.Errorf("hub: marshaling report: %w", err) + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+reportPath, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("hub: building request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+c.apiKey) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + resp, err := c.hc.Do(req) + if err != nil { + return nil, &TransportError{Err: err} // token is in the request header, never the error + } + defer resp.Body.Close() + + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10)) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, &HTTPError{StatusCode: resp.StatusCode, BodyTail: tail(raw, 256)} + } + var env ControlEnvelope + if err := json.Unmarshal(raw, &env); err != nil { + return nil, fmt.Errorf("hub: decoding control envelope: %w", err) + } + return &env, nil +} + +func tail(b []byte, max int) string { + s := strings.TrimSpace(string(b)) + if len(s) > max { + return s[:max] + "…" + } + return s +} diff --git a/internal/hub/client_test.go b/internal/hub/client_test.go new file mode 100644 index 0000000..c1a8b6d --- /dev/null +++ b/internal/hub/client_test.go @@ -0,0 +1,69 @@ +package hub + +import ( + "context" + "errors" + "net/http" + "strings" + "testing" +) + +const testBearer = "super-secret-bearer-key" + +func TestClient_SetsBearerAndParsesEnvelope(t *testing.T) { + var gotAuth, gotCT, gotPath, gotMethod string + c := testClient(func(r *http.Request) (*http.Response, error) { + gotAuth = r.Header.Get("Authorization") + gotCT = r.Header.Get("Content-Type") + gotPath = r.URL.Path + gotMethod = r.Method + return httpResp(200, `{"status":"ok","poll_interval_seconds":900}`), nil + }) + env, err := c.Report(context.Background(), &HostReport{HostID: "h"}) + if err != nil { + t.Fatalf("Report: %v", err) + } + if gotAuth != "Bearer "+testBearer { + t.Errorf("auth header = %q", gotAuth) + } + if gotCT != "application/json" { + t.Errorf("content-type = %q", gotCT) + } + if gotMethod != http.MethodPost || gotPath != reportPath { + t.Errorf("method/path = %s %s", gotMethod, gotPath) + } + if env.PollIntervalSeconds == nil || *env.PollIntervalSeconds != 900 { + t.Errorf("envelope poll = %v", env.PollIntervalSeconds) + } +} + +func TestClient_Non2xxTypedErrorRedactsToken(t *testing.T) { + c := testClient(func(r *http.Request) (*http.Response, error) { + return httpResp(503, "service unavailable"), nil + }) + _, err := c.Report(context.Background(), &HostReport{HostID: "h"}) + var he *HTTPError + if !errors.As(err, &he) { + t.Fatalf("want *HTTPError, got %T: %v", err, err) + } + if he.StatusCode != 503 { + t.Errorf("status = %d", he.StatusCode) + } + if strings.Contains(err.Error(), testBearer) { + t.Fatalf("bearer token leaked into error: %q", err.Error()) + } +} + +func TestClient_TransportErrorTypedRedactsToken(t *testing.T) { + c := testClient(func(r *http.Request) (*http.Response, error) { + return nil, errors.New("dial tcp: connection refused") + }) + _, err := c.Report(context.Background(), &HostReport{HostID: "h"}) + var te *TransportError + if !errors.As(err, &te) { + t.Fatalf("want *TransportError, got %T: %v", err, err) + } + if strings.Contains(err.Error(), testBearer) { + t.Fatalf("bearer token leaked into error: %q", err.Error()) + } +} diff --git a/internal/hub/cloudflared.go b/internal/hub/cloudflared.go new file mode 100644 index 0000000..0579610 --- /dev/null +++ b/internal/hub/cloudflared.go @@ -0,0 +1,46 @@ +package hub + +import ( + "context" + "os/exec" + "strings" +) + +// CloudflaredProber reports the cloudflared tunnel service health. It is a +// READ-ONLY probe: the agent does NOT manage or restart cloudflared in this slice +// (that is the tunnel-management slice — this is the seam for it). Injectable so +// tests use a fake and never exec. +type CloudflaredProber interface { + // Status returns one of: "active" | "inactive" | "failed" | "unknown". + Status(ctx context.Context) (string, error) +} + +// SystemctlProber runs `systemctl is-active cloudflared`. This is NOT a Privileged +// (root-CLI) op — `is-active` is non-root readable and is not one of the three +// proven root exceptions, so it does not go through internal/proxmox.Privileged. +type SystemctlProber struct { + Unit string // defaults to "cloudflared" +} + +// Status maps `systemctl is-active` output to the report vocabulary. systemctl +// exits non-zero for inactive/failed, so the output string is authoritative over +// the exit code; any exec error (binary missing, etc.) maps to "unknown". +func (p SystemctlProber) Status(ctx context.Context) (string, error) { + unit := p.Unit + if unit == "" { + unit = "cloudflared" + } + out, _ := exec.CommandContext(ctx, "systemctl", "is-active", unit).Output() + switch strings.TrimSpace(string(out)) { + case "active": + return "active", nil + case "failed": + return "failed", nil + case "inactive", "deactivating", "activating": + return "inactive", nil + case "": + return "unknown", nil // no output → systemctl/exec problem + default: + return "unknown", nil + } +} diff --git a/internal/hub/collect.go b/internal/hub/collect.go new file mode 100644 index 0000000..5749164 --- /dev/null +++ b/internal/hub/collect.go @@ -0,0 +1,144 @@ +package hub + +import ( + "context" + "fmt" + "log/slog" + "time" + + "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" +) + +// proxmoxReader is the read-only subset the collector needs. Signatures match the +// REAL internal/proxmox.Client surface (slice 1): the node is held by the Client +// (no per-call node arg), reads return values (not pointers), and the guest type is +// proxmox.Guest. (The task's sketch used node-arg/pointer/LXC shapes; adapted to +// the actual exports per its instruction — no proxmox changes were needed: ListLXC +// already carries status/maxmem/maxdisk, GuestConfig carries cores.) +type proxmoxReader interface { + Node() string + NodeStatus(ctx context.Context) (proxmox.NodeStatus, error) + ListLXC(ctx context.Context) ([]proxmox.Guest, error) + GuestConfig(ctx context.Context, vmid int) (proxmox.GuestConfig, error) +} + +// Collector builds a HostReport from read-only sources. All deps are behind narrow +// interfaces for unit testing. +type Collector struct { + px proxmoxReader + cf CloudflaredProber + hostID string + agentVersion string + logger *slog.Logger + now func() time.Time +} + +// NewCollector builds a collector. hostID echoes config.Hub.HostID; agentVersion is +// the binary version. +func NewCollector(px proxmoxReader, cf CloudflaredProber, hostID, agentVersion string, logger *slog.Logger) *Collector { + if logger == nil { + logger = slog.Default() + } + return &Collector{ + px: px, + cf: cf, + hostID: hostID, + agentVersion: agentVersion, + logger: logger, + now: func() time.Time { return time.Now().UTC() }, + } +} + +// Collect builds the report. Best-effort liveness: a failed NodeStatus is a hard +// error (no useful report — the cycle skips the POST); a failed per-guest +// GuestConfig degrades that guest to status="unknown" without spec but still sends; +// a cloudflared probe failure yields status="unknown" and is never fatal. +func (c *Collector) Collect(ctx context.Context) (*HostReport, error) { + ns, err := c.px.NodeStatus(ctx) + if err != nil { + return nil, fmt.Errorf("hub: NodeStatus failed (no useful report): %w", err) + } + + report := &HostReport{ + HostID: c.hostID, + ReportedAt: c.now().Format(time.RFC3339), + AgentVersion: c.agentVersion, + Host: hostMetrics(c.px.Node(), ns), + Guests: c.collectGuests(ctx), + // Defined-but-empty this slice (slices 5/6). Non-nil so they marshal as []. + StorageTargets: []StorageTarget{}, + Backups: []Backup{}, + RestoreTests: []RestoreTest{}, + PBSSnapshots: []PBSSnapshot{}, + AuditTail: []AuditEntry{}, + Cloudflared: Cloudflared{Status: c.cloudflaredStatus(ctx)}, + } + return report, nil +} + +func hostMetrics(node string, ns proxmox.NodeStatus) HostMetrics { + h := HostMetrics{ + Node: node, + CPUPercent: ns.CPU * 100, // PVE cpu is a 0..1 fraction + MemoryTotalBytes: ns.Memory.Total, + MemoryUsedBytes: ns.Memory.Used, + DiskTotalBytes: ns.RootFS.Total, + DiskUsedBytes: ns.RootFS.Used, + LoadAvg: ns.LoadAvg, + UptimeSeconds: ns.Uptime, + } + h.MemoryPercent = percent(ns.Memory.Used, ns.Memory.Total) + h.DiskPercent = percent(ns.RootFS.Used, ns.RootFS.Total) + if h.LoadAvg == nil { + h.LoadAvg = []string{} + } + return h +} + +func (c *Collector) collectGuests(ctx context.Context) []Guest { + lxc, err := c.px.ListLXC(ctx) + if err != nil { + // Not fatal: a report with no guest list still carries host liveness. + c.logger.Warn("hub: ListLXC failed; reporting no guests", "err", err) + return []Guest{} + } + guests := make([]Guest, 0, len(lxc)) + for _, g := range lxc { + entry := Guest{VMID: g.VMID, Name: g.Name, Status: g.Status, ControllerVersion: ""} + // GuestConfig supplies cores; memory/disk come from the list entry (bytes). + cfg, err := c.px.GuestConfig(ctx, g.VMID) + if err != nil { + c.logger.Warn("hub: GuestConfig failed; guest degraded to unknown", + "vmid", g.VMID, "err", err) + entry.Status = "unknown" + entry.Spec = nil // omitted + } else { + entry.Spec = &GuestSpec{ + Cores: cfg.Cores, + MemoryBytes: g.MaxMem, + DiskBytes: g.MaxDisk, + } + } + guests = append(guests, entry) + } + return guests +} + +func (c *Collector) cloudflaredStatus(ctx context.Context) string { + if c.cf == nil { + return "unknown" + } + st, err := c.cf.Status(ctx) + if err != nil || st == "" { + c.logger.Warn("hub: cloudflared probe failed", "err", err) + return "unknown" + } + return st +} + +func percent(used, total int64) float64 { + if total <= 0 { + return 0 + } + return float64(used) / float64(total) * 100 +} diff --git a/internal/hub/collect_test.go b/internal/hub/collect_test.go new file mode 100644 index 0000000..31ea667 --- /dev/null +++ b/internal/hub/collect_test.go @@ -0,0 +1,108 @@ +package hub + +import ( + "context" + "errors" + "testing" + + "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" +) + +func newTestNodeStatus() proxmox.NodeStatus { + var ns proxmox.NodeStatus + ns.CPU = 0.05 // → 5% + ns.Uptime = 86400 + ns.LoadAvg = []string{"0.10", "0.20", "0.15"} + ns.Memory.Total = 16000000000 + ns.Memory.Used = 4000000000 + ns.RootFS.Total = 100000000000 + ns.RootFS.Used = 20000000000 + return ns +} + +func TestCollect_HostAndGuests(t *testing.T) { + px := &fakePx{ + node: "demo-felhom", + ns: newTestNodeStatus(), + lxc: []proxmox.Guest{ + {VMID: 100, Name: "acme", Status: "running", MaxMem: 2147483648, MaxDisk: 21474836480}, + }, + cfg: map[int]proxmox.GuestConfig{100: {Cores: 2, Memory: 2048}}, + } + c := NewCollector(px, fakeProber{status: "active"}, "demo-host-01", "0.3.0", quietLogger()) + r, err := c.Collect(context.Background()) + if err != nil { + t.Fatalf("Collect: %v", err) + } + if r.HostID != "demo-host-01" || r.AgentVersion != "0.3.0" { + t.Errorf("top-level wrong: %+v", r) + } + if r.Host.Node != "demo-felhom" || r.Host.CPUPercent != 5 { + t.Errorf("host = %+v", r.Host) + } + if r.Host.MemoryPercent != 25 || r.Host.DiskPercent != 20 { + t.Errorf("percents = mem %v disk %v", r.Host.MemoryPercent, r.Host.DiskPercent) + } + if len(r.Guests) != 1 { + t.Fatalf("guests = %d", len(r.Guests)) + } + g := r.Guests[0] + if g.VMID != 100 || g.Status != "running" || g.Spec == nil { + t.Fatalf("guest = %+v", g) + } + if g.Spec.Cores != 2 || g.Spec.MemoryBytes != 2147483648 || g.Spec.DiskBytes != 21474836480 { + t.Errorf("spec = %+v", g.Spec) + } + if r.Cloudflared.Status != "active" { + t.Errorf("cloudflared = %q", r.Cloudflared.Status) + } +} + +func TestCollect_GuestConfigFailureDegradesButStillReports(t *testing.T) { + px := &fakePx{ + node: "demo-felhom", + ns: newTestNodeStatus(), + lxc: []proxmox.Guest{ + {VMID: 100, Name: "ok", Status: "running", MaxMem: 1 << 31, MaxDisk: 1 << 34}, + {VMID: 200, Name: "bad", Status: "running"}, + }, + cfg: map[int]proxmox.GuestConfig{100: {Cores: 2}}, + cfgErr: map[int]error{200: errors.New("config read failed")}, + } + c := NewCollector(px, fakeProber{status: "active"}, "h", "0.3.0", quietLogger()) + r, err := c.Collect(context.Background()) + if err != nil { + t.Fatalf("a per-guest failure must NOT fail the whole report: %v", err) + } + if len(r.Guests) != 2 { + t.Fatalf("guests = %d", len(r.Guests)) + } + bad := r.Guests[1] + if bad.Status != "unknown" || bad.Spec != nil { + t.Errorf("degraded guest = %+v (want status=unknown, spec=nil)", bad) + } +} + +func TestCollect_NodeStatusFailureIsHardError(t *testing.T) { + px := &fakePx{node: "n", nsErr: errors.New("proxmox down")} + c := NewCollector(px, fakeProber{status: "active"}, "h", "0.3.0", quietLogger()) + if _, err := c.Collect(context.Background()); err == nil { + t.Fatal("NodeStatus failure must be a hard error (no useful report)") + } +} + +func TestCollect_CloudflaredProbeErrorIsUnknown(t *testing.T) { + px := &fakePx{node: "n", ns: newTestNodeStatus()} + c := NewCollector(px, fakeProber{err: errors.New("no systemctl")}, "h", "0.3.0", quietLogger()) + r, err := c.Collect(context.Background()) + if err != nil { + t.Fatalf("cloudflared failure must not be fatal: %v", err) + } + if r.Cloudflared.Status != "unknown" { + t.Errorf("cloudflared = %q, want unknown", r.Cloudflared.Status) + } + // Empty collections still present as non-nil. + if r.Guests == nil || r.StorageTargets == nil || r.AuditTail == nil { + t.Error("empty collections must be non-nil") + } +} diff --git a/internal/hub/loop.go b/internal/hub/loop.go new file mode 100644 index 0000000..8592df9 --- /dev/null +++ b/internal/hub/loop.go @@ -0,0 +1,107 @@ +package hub + +import ( + "context" + "log/slog" + "time" +) + +// interval clamp bounds (locked decision 3). +const ( + MinPollSeconds = 60 + MaxPollSeconds = 3600 +) + +// reporter and collectorIface are the loop's deps as interfaces (tests inject fakes). +type reporter interface { + Report(ctx context.Context, r *HostReport) (*ControlEnvelope, error) +} +type collectorIface interface { + Collect(ctx context.Context) (*HostReport, error) +} + +// Loop is the agent's first daemon run loop: collect a host-report, POST it, adopt +// the hub's cadence, repeat. It is resilient — a collect or report error is logged +// and the loop continues (the data plane is independent of the agent; a hub outage +// must not kill it). There are NO Proxmox mutations here (read-only report), so no +// per-guest work queue yet (that lands with reconcile, slice 4). +type Loop struct { + collector collectorIface + client reporter + interval time.Duration + logger *slog.Logger +} + +// NewLoop builds the loop. interval is the starting cadence (the hub may override it +// per-cycle via the control envelope). +func NewLoop(collector collectorIface, client reporter, interval time.Duration, logger *slog.Logger) *Loop { + if logger == nil { + logger = slog.Default() + } + return &Loop{collector: collector, client: client, interval: interval, logger: logger} +} + +// Run reports immediately, then on each tick, until ctx is cancelled (then nil). +func (l *Loop) Run(ctx context.Context) error { + interval := l.interval + interval = l.cycle(ctx, interval) // immediate first report + + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + l.logger.Info("hub: loop shutting down", "reason", ctx.Err()) + return nil + case <-ticker.C: + next := l.cycle(ctx, interval) + if next != interval { + l.logger.Info("hub: poll interval changed", "from", interval, "to", next) + interval = next + ticker.Reset(interval) + } + } + } +} + +// cycle runs one collect→report→adopt. It never returns an error: failures are +// logged and the current interval is kept, so the loop keeps running. +func (l *Loop) cycle(ctx context.Context, current time.Duration) time.Duration { + report, err := l.collector.Collect(ctx) + if err != nil { + l.logger.Warn("hub: collect failed; skipping this cycle's report", "err", err) + return current + } + env, err := l.client.Report(ctx, report) + if err != nil { + l.logger.Warn("hub: report failed; keeping current interval", "err", err) + return current + } + l.logger.Debug("hub: report sent", + "guests", len(report.Guests), + // reserved/forward-compat envelope fields — logged only, never acted on (slice 4). + "blocked", env.Blocked, "desired_generation", env.DesiredGeneration, "has_signed_ops", env.HasSignedOps) + + if env.PollIntervalSeconds == nil { + return current + } + d, clamped := clampInterval(*env.PollIntervalSeconds) + if clamped { + l.logger.Warn("hub: poll_interval_seconds out of range; clamped", + "requested", *env.PollIntervalSeconds, "applied", int(d.Seconds())) + } + return d +} + +// clampInterval clamps a requested seconds value to [60,3600]; clamped reports +// whether it was out of range. +func clampInterval(sec int) (time.Duration, bool) { + clamped := false + if sec < MinPollSeconds { + sec, clamped = MinPollSeconds, true + } + if sec > MaxPollSeconds { + sec, clamped = MaxPollSeconds, true + } + return time.Duration(sec) * time.Second, clamped +} diff --git a/internal/hub/loop_test.go b/internal/hub/loop_test.go new file mode 100644 index 0000000..49e1467 --- /dev/null +++ b/internal/hub/loop_test.go @@ -0,0 +1,144 @@ +package hub + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" +) + +func intPtr(i int) *int { return &i } + +type fakeCollector struct { + report *HostReport + err error + n *int32 +} + +func (c *fakeCollector) Collect(ctx context.Context) (*HostReport, error) { + atomic.AddInt32(c.n, 1) + return c.report, c.err +} + +type fakeReporter struct { + env *ControlEnvelope + errSeq []error // per-call error (nil = ok); calls past the slice are ok + n *int32 +} + +func (r *fakeReporter) Report(ctx context.Context, _ *HostReport) (*ControlEnvelope, error) { + i := int(atomic.AddInt32(r.n, 1) - 1) + if i < len(r.errSeq) && r.errSeq[i] != nil { + return nil, r.errSeq[i] + } + return r.env, nil +} + +func TestClampInterval(t *testing.T) { + cases := []struct { + in int + wantSec int + clamped bool + }{ + {10, 60, true}, {59, 60, true}, {60, 60, false}, {120, 120, false}, + {3600, 3600, false}, {99999, 3600, true}, + } + for _, c := range cases { + d, clamped := clampInterval(c.in) + if int(d.Seconds()) != c.wantSec || clamped != c.clamped { + t.Errorf("clampInterval(%d) = %v,%v want %ds,%v", c.in, d, clamped, c.wantSec, c.clamped) + } + } +} + +func TestLoop_CycleAdoptsAndClamps(t *testing.T) { + current := 900 * time.Second + mk := func(env *ControlEnvelope, collErr, repErr error) *Loop { + var cn, rn int32 + return NewLoop( + &fakeCollector{report: &HostReport{}, err: collErr, n: &cn}, + &fakeReporter{env: env, errSeq: []error{repErr}, n: &rn}, + current, quietLogger()) + } + tests := []struct { + name string + env *ControlEnvelope + want time.Duration + }{ + {"adopt in-range", &ControlEnvelope{PollIntervalSeconds: intPtr(120)}, 120 * time.Second}, + {"clamp low", &ControlEnvelope{PollIntervalSeconds: intPtr(10)}, 60 * time.Second}, + {"clamp high", &ControlEnvelope{PollIntervalSeconds: intPtr(99999)}, 3600 * time.Second}, + {"missing keeps current", &ControlEnvelope{PollIntervalSeconds: nil}, current}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := mk(tt.env, nil, nil).cycle(context.Background(), current) + if got != tt.want { + t.Errorf("cycle adopted %v, want %v", got, tt.want) + } + }) + } + + t.Run("collect error keeps current", func(t *testing.T) { + got := mk(&ControlEnvelope{PollIntervalSeconds: intPtr(120)}, errors.New("x"), nil).cycle(context.Background(), current) + if got != current { + t.Errorf("got %v, want current %v", got, current) + } + }) + t.Run("report error keeps current", func(t *testing.T) { + got := mk(nil, nil, errors.New("x")).cycle(context.Background(), current) + if got != current { + t.Errorf("got %v, want current %v", got, current) + } + }) +} + +func TestLoop_RunImmediateAndResilientAfterError(t *testing.T) { + var cn, rn int32 + loop := NewLoop( + &fakeCollector{report: &HostReport{}, n: &cn}, + // first report errors; subsequent ok; no interval override (keeps fast tick) + &fakeReporter{env: &ControlEnvelope{}, errSeq: []error{errors.New("hub 5xx")}, n: &rn}, + 10*time.Millisecond, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- loop.Run(ctx) }() + time.Sleep(90 * time.Millisecond) + cancel() + + select { + case err := <-done: + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Run did not return after cancel") + } + // Immediate report + several ticks despite the first error → ≥3 collect calls. + if got := atomic.LoadInt32(&cn); got < 3 { + t.Errorf("collect calls = %d, want ≥3 (immediate + continuation after error)", got) + } +} + +func TestLoop_RunAdoptsSlowerInterval(t *testing.T) { + var cn, rn int32 + loop := NewLoop( + &fakeCollector{report: &HostReport{}, n: &cn}, + // every report tells the agent to slow to 60s → after the immediate report, + // the ticker resets to 60s and no further ticks fire within the test window. + &fakeReporter{env: &ControlEnvelope{PollIntervalSeconds: intPtr(60)}, n: &rn}, + 10*time.Millisecond, quietLogger()) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { done <- loop.Run(ctx) }() + time.Sleep(120 * time.Millisecond) + cancel() + <-done + + if got := atomic.LoadInt32(&cn); got != 1 { + t.Errorf("collect calls = %d, want 1 (immediate report adopted 60s, ticker slowed)", got) + } +} diff --git a/internal/hub/mock_test.go b/internal/hub/mock_test.go new file mode 100644 index 0000000..3147159 --- /dev/null +++ b/internal/hub/mock_test.go @@ -0,0 +1,62 @@ +package hub + +import ( + "context" + "io" + "log/slog" + "net/http" + "strings" + + "gitea.dooplex.hu/admin/felhom-agent/internal/proxmox" +) + +func quietLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } + +// roundTripFunc is a mock http.RoundTripper. +type roundTripFunc func(*http.Request) (*http.Response, error) + +func (f roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +// testClient builds a hub Client over a mock transport (no network). +func testClient(rt roundTripFunc) *Client { + return newClient("https://hub.example.test", "super-secret-bearer-key", &http.Client{Transport: rt}, quietLogger()) +} + +func httpResp(code int, body string) *http.Response { + return &http.Response{ + StatusCode: code, + Body: io.NopCloser(strings.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + } +} + +// fakePx is a fake proxmoxReader. +type fakePx struct { + node string + ns proxmox.NodeStatus + nsErr error + lxc []proxmox.Guest + lxcErr error + cfg map[int]proxmox.GuestConfig + cfgErr map[int]error +} + +func (f *fakePx) Node() string { return f.node } +func (f *fakePx) NodeStatus(ctx context.Context) (proxmox.NodeStatus, error) { + return f.ns, f.nsErr +} +func (f *fakePx) ListLXC(ctx context.Context) ([]proxmox.Guest, error) { return f.lxc, f.lxcErr } +func (f *fakePx) GuestConfig(ctx context.Context, vmid int) (proxmox.GuestConfig, error) { + if e := f.cfgErr[vmid]; e != nil { + return proxmox.GuestConfig{}, e + } + return f.cfg[vmid], nil +} + +// fakeProber is a fake CloudflaredProber. +type fakeProber struct { + status string + err error +} + +func (p fakeProber) Status(ctx context.Context) (string, error) { return p.status, p.err } diff --git a/internal/hub/report.go b/internal/hub/report.go new file mode 100644 index 0000000..f3deb70 --- /dev/null +++ b/internal/hub/report.go @@ -0,0 +1,85 @@ +package hub + +// HostReport is the wire contract shared with the hub's ingest +// (felhom.eu TASK-slice3-hub-ingest). Field NAMES must match the hub +// field-for-field. Encoding is ordinary encoding/json (no canonicalization — +// nothing signs this report; canonical JSON is a slice-10 signing concern). +// +// The report IS the heartbeat: one periodic POST /api/v1/host-report, whose +// server-side received_at is the hub's dead-man's-switch liveness signal. There is +// no separate heartbeat endpoint. +type HostReport struct { + HostID string `json:"host_id"` // echoes config.Hub.HostID + ReportedAt string `json:"reported_at"` // RFC3339, agent clock + AgentVersion string `json:"agent_version"` + + Host HostMetrics `json:"host"` + Guests []Guest `json:"guests"` + + // Defined now as the stable contract; emitted EMPTY (non-nil) this slice. + StorageTargets []StorageTarget `json:"storage_targets"` // slice 5 (storage manifest) + Backups []Backup `json:"backups"` // slice 6 + RestoreTests []RestoreTest `json:"restore_tests"` // slice 6 + PBSSnapshots []PBSSnapshot `json:"pbs_snapshots"` // slice 6 + + Cloudflared Cloudflared `json:"cloudflared"` + AuditTail []AuditEntry `json:"audit_tail"` // populated by a later slice +} + +// HostMetrics is the host block, sourced from proxmox NodeStatus. +type HostMetrics struct { + Node string `json:"node"` + CPUPercent float64 `json:"cpu_percent"` // 0–100 + MemoryTotalBytes int64 `json:"memory_total_bytes"` + MemoryUsedBytes int64 `json:"memory_used_bytes"` + MemoryPercent float64 `json:"memory_percent"` + DiskTotalBytes int64 `json:"disk_total_bytes"` // host root fs + DiskUsedBytes int64 `json:"disk_used_bytes"` + DiskPercent float64 `json:"disk_percent"` + LoadAvg []string `json:"loadavg"` // array of STRINGS (PVE shape) + UptimeSeconds int64 `json:"uptime_seconds"` +} + +// Guest is one LXC. The agent reports vmid; the hub derives the guest PK +// "/" (keeping the id scheme hub-side — locked decision 4). +type Guest struct { + VMID int `json:"vmid"` + Name string `json:"name"` + Status string `json:"status"` // running | stopped | unknown + ControllerVersion string `json:"controller_version"` // "" this slice (slice 8 fills) + Spec *GuestSpec `json:"spec,omitempty"` // omitted when status unknown +} + +// GuestSpec is the provisioned guest sizing. +type GuestSpec struct { + Cores int `json:"cores"` + MemoryBytes int64 `json:"memory_bytes"` + DiskBytes int64 `json:"disk_bytes"` +} + +// Cloudflared is the tunnel service health (read-only probe this slice). +type Cloudflared struct { + Status string `json:"status"` // active | inactive | failed | unknown +} + +// The following element types are declared now so the empty collections above are +// typed and slices 5/6 only fill them. No wire fields are committed yet. + +type StorageTarget struct{} // slice 5: storage manifest target fields TBD +type Backup struct{} // slice 6: per-target backup status fields TBD +type RestoreTest struct{} // slice 6: self-restore-test result fields TBD +type PBSSnapshot struct{} // slice 6: PBS snapshot inventory fields TBD +type AuditEntry struct{} // audit-log tail entry fields TBD + +// ControlEnvelope is the hub's 200 response to a host-report. This slice the agent +// adopts ONLY PollIntervalSeconds; the rest are reserved/forward-compat fields it +// logs at most and never acts on (reconcile, slice 4, consumes them). +type ControlEnvelope struct { + Status string `json:"status"` + // PollIntervalSeconds is a pointer so a missing field (keep current interval) is + // distinguishable from an explicit 0. + PollIntervalSeconds *int `json:"poll_interval_seconds"` + Blocked bool `json:"blocked"` // reserved — ignored (slice 4) + DesiredGeneration int64 `json:"desired_generation"` // reserved — ignored (slice 4) + HasSignedOps bool `json:"has_signed_ops"` // reserved — ignored (slice 4) +} diff --git a/internal/hub/report_test.go b/internal/hub/report_test.go new file mode 100644 index 0000000..11a242c --- /dev/null +++ b/internal/hub/report_test.go @@ -0,0 +1,60 @@ +package hub + +import ( + "encoding/json" + "strings" + "testing" +) + +func TestHostReport_FieldNamesAndEmptyCollections(t *testing.T) { + r := &HostReport{ + HostID: "demo-host-01", + ReportedAt: "2026-06-08T12:00:00Z", + AgentVersion: "0.3.0", + Host: HostMetrics{ + Node: "demo-felhom", CPUPercent: 3.2, + MemoryTotalBytes: 16777216000, MemoryUsedBytes: 4194304000, MemoryPercent: 25.0, + DiskTotalBytes: 152000000000, DiskUsedBytes: 30000000000, DiskPercent: 19.7, + LoadAvg: []string{"0.10", "0.20", "0.15"}, UptimeSeconds: 86400, + }, + Guests: []Guest{{ + VMID: 100, Name: "felhom-cust-acme", Status: "running", ControllerVersion: "", + Spec: &GuestSpec{Cores: 2, MemoryBytes: 2147483648, DiskBytes: 21474836480}, + }}, + StorageTargets: []StorageTarget{}, + Backups: []Backup{}, + RestoreTests: []RestoreTest{}, + PBSSnapshots: []PBSSnapshot{}, + AuditTail: []AuditEntry{}, + Cloudflared: Cloudflared{Status: "active"}, + } + b, err := json.Marshal(r) + if err != nil { + t.Fatal(err) + } + got := string(b) + + for _, field := range []string{ + `"host_id":"demo-host-01"`, `"reported_at":`, `"agent_version":"0.3.0"`, + `"cpu_percent":3.2`, `"memory_total_bytes":16777216000`, `"loadavg":["0.10","0.20","0.15"]`, + `"disk_percent":19.7`, `"uptime_seconds":86400`, + `"vmid":100`, `"controller_version":""`, `"memory_bytes":2147483648`, + `"cloudflared":{"status":"active"}`, + // empty collections must be [] not null + `"storage_targets":[]`, `"backups":[]`, `"restore_tests":[]`, `"pbs_snapshots":[]`, `"audit_tail":[]`, + } { + if !strings.Contains(got, field) { + t.Errorf("report JSON missing %s\n got: %s", field, got) + } + } + if strings.Contains(got, "null") { + t.Errorf("report JSON must not contain null (empty collections should be []): %s", got) + } +} + +func TestGuest_SpecOmittedWhenUnknown(t *testing.T) { + b, _ := json.Marshal(Guest{VMID: 9, Name: "g", Status: "unknown"}) + if strings.Contains(string(b), "spec") { + t.Errorf("unknown guest should omit spec, got %s", b) + } +}