Files
felhom.eu/hub/internal/monitor/host_staleness.go
T
admin 7c0c75457f feat(hub): host-domain ingest — tables + /host-report + per-host auth + host dead-man's-switch (v0.7.0, slice 3)
Purely additive; the controller path (reports/customer_configs/checkAuthCustomer/
existing checkers) is untouched. Cutover remains slice 10.

- store: new hosts/guests/host_reports tables (full schema incl. columns INERT
  until slice 10, so no later ALTER); GetHostByAPIKey/GetHost/ListHosts/UpsertHost/
  SaveHostReport/UpsertGuestFromReport (preserves inert cols)/GetHostStaleness/
  GuestID; Prune also prunes host_reports.
- api: checkAuthHost (sibling of checkAuthCustomer); POST /host-report (per-host
  Bearer, 4MiB, denorm + guest upsert, control envelope); POST /admin/hosts
  (PROVISIONAL global-key host mint); host_* event types registered.
- monitor: HostStalenessChecker sibling over host_reports (host_stale/down/
  recovered), wired on the existing 60s ticker; controller checkers unchanged.
- tests (hermetic): store intent/inert-column preservation, auth, ingest
  (envelope+denorm, mismatch/unknown/blocked/oversize), admin mint round-trip,
  host staleness transitions.

CHANGELOG v0.7.0. Contract matches the agent host-report spec field-for-field.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-08 16:36:16 +02:00

177 lines
5.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package monitor
import (
"log"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
)
// HostStalenessChecker is the host-domain dead-man's-switch (v0.7.0, slice 3). It
// is a deliberate SIBLING of StalenessChecker, not a rename: during slices 39 the
// controller report stream (reports) and the agent host-report stream
// (host_reports) are both live, so both checkers run. It keys on host↔host_reports
// and emits host_stale / host_down / host_recovered. Merging is a slice-10 job.
//
// Events are attributed to the host's CUSTOMER (SaveEvent + onEvent take the
// customer_id) so the existing per-customer notification/event UX picks them up
// unchanged.
type HostStalenessChecker struct {
store *store.Store
threshold time.Duration // "stale" after this (default 30m — same as the controller checker)
downAfter time.Duration // "down" after this (2x threshold)
logger *log.Logger
onEvent EventNotifyFunc
mu sync.Mutex
states map[string]string // hostID → "ok" | "stale" | "down"
customerOf map[string]string // hostID → customerID (for event attribution)
downtimeStart map[string]time.Time // hostID → when it first became unreachable
}
// NewHostStalenessChecker creates the checker and seeds state from current
// host-report recency. No events are generated during initialization.
func NewHostStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *HostStalenessChecker {
sc := &HostStalenessChecker{
store: s,
threshold: threshold,
downAfter: 2 * threshold,
logger: logger,
onEvent: onEvent,
states: make(map[string]string),
customerOf: make(map[string]string),
downtimeStart: make(map[string]time.Time),
}
rows, err := s.GetHostStaleness()
if err != nil {
logger.Printf("[WARN] Host staleness checker: failed to seed states: %v", err)
return sc
}
var okCount, staleCount, downCount int
for _, row := range rows {
if s.IsCustomerBlocked(row.CustomerID) {
continue
}
sc.customerOf[row.HostID] = row.CustomerID
age := time.Since(row.LastReportAt)
switch {
case age > sc.downAfter:
sc.states[row.HostID] = "down"
downCount++
case age > sc.threshold:
sc.states[row.HostID] = "stale"
staleCount++
default:
sc.states[row.HostID] = "ok"
okCount++
}
}
logger.Printf("[INFO] Host staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount)
return sc
}
// Check evaluates all hosts and emits events on state transitions. Call every 60s.
func (sc *HostStalenessChecker) Check() {
rows, err := sc.store.GetHostStaleness()
if err != nil {
sc.logger.Printf("[WARN] Host staleness check failed: %v", err)
return
}
sc.mu.Lock()
defer sc.mu.Unlock()
seen := make(map[string]bool, len(rows))
for _, row := range rows {
seen[row.HostID] = true
if sc.store.IsCustomerBlocked(row.CustomerID) {
delete(sc.states, row.HostID)
continue
}
sc.customerOf[row.HostID] = row.CustomerID
age := time.Since(row.LastReportAt)
var newState string
switch {
case age > sc.downAfter:
newState = "down"
case age > sc.threshold:
newState = "stale"
default:
newState = "ok"
}
oldState := sc.states[row.HostID]
if oldState == "" {
sc.states[row.HostID] = newState // first observation — no event
continue
}
if oldState == newState {
continue
}
sc.states[row.HostID] = newState
if newState == "stale" && oldState == "ok" {
sc.downtimeStart[row.HostID] = time.Now()
}
downtimeDur := age
if newState == "ok" {
if t, ok := sc.downtimeStart[row.HostID]; ok {
downtimeDur = time.Since(t)
}
delete(sc.downtimeStart, row.HostID)
}
sc.emitTransition(row.HostID, row.CustomerID, oldState, newState, downtimeDur)
}
for id := range sc.states {
if !seen[id] {
delete(sc.states, id)
delete(sc.downtimeStart, id)
}
}
}
// GetState returns the current staleness state for a host.
func (sc *HostStalenessChecker) GetState(hostID string) string {
sc.mu.Lock()
defer sc.mu.Unlock()
s := sc.states[hostID]
if s == "" {
return "unknown"
}
return s
}
func (sc *HostStalenessChecker) emitTransition(hostID, customerID, oldState, newState string, age time.Duration) {
var eventType, severity, message string
switch {
case newState == "stale":
eventType = "host_stale"
severity = "warning"
message = "Host " + hostID + ": no report for " + formatDuration(age)
case newState == "down":
eventType = "host_down"
severity = "error"
message = "Host " + hostID + ": no report for " + formatDuration(age)
case newState == "ok" && (oldState == "stale" || oldState == "down"):
eventType = "host_recovered"
severity = "info"
message = "Host " + hostID + ": reports resumed (was " + oldState + " for " + formatDuration(age) + ")"
default:
return
}
sc.logger.Printf("[INFO] Host staleness: %s %s → %s (%s)", hostID, oldState, newState, eventType)
if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil {
sc.logger.Printf("[WARN] Failed to save host staleness event for %s: %v", hostID, err)
return
}
if sc.onEvent != nil {
sc.onEvent(customerID, eventType, severity, message, "{}", "hub")
}
}