7c0c75457f
Purely additive; the controller path (reports/customer_configs/checkAuthCustomer/ existing checkers) is untouched. Cutover remains slice 10. - store: new hosts/guests/host_reports tables (full schema incl. columns INERT until slice 10, so no later ALTER); GetHostByAPIKey/GetHost/ListHosts/UpsertHost/ SaveHostReport/UpsertGuestFromReport (preserves inert cols)/GetHostStaleness/ GuestID; Prune also prunes host_reports. - api: checkAuthHost (sibling of checkAuthCustomer); POST /host-report (per-host Bearer, 4MiB, denorm + guest upsert, control envelope); POST /admin/hosts (PROVISIONAL global-key host mint); host_* event types registered. - monitor: HostStalenessChecker sibling over host_reports (host_stale/down/ recovered), wired on the existing 60s ticker; controller checkers unchanged. - tests (hermetic): store intent/inert-column preservation, auth, ingest (envelope+denorm, mismatch/unknown/blocked/oversize), admin mint round-trip, host staleness transitions. CHANGELOG v0.7.0. Contract matches the agent host-report spec field-for-field. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
177 lines
5.2 KiB
Go
177 lines
5.2 KiB
Go
package monitor
|
||
|
||
import (
|
||
"log"
|
||
"sync"
|
||
"time"
|
||
|
||
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
|
||
)
|
||
|
||
// HostStalenessChecker is the host-domain dead-man's-switch (v0.7.0, slice 3). It
|
||
// is a deliberate SIBLING of StalenessChecker, not a rename: during slices 3–9 the
|
||
// controller report stream (reports) and the agent host-report stream
|
||
// (host_reports) are both live, so both checkers run. It keys on host↔host_reports
|
||
// and emits host_stale / host_down / host_recovered. Merging is a slice-10 job.
|
||
//
|
||
// Events are attributed to the host's CUSTOMER (SaveEvent + onEvent take the
|
||
// customer_id) so the existing per-customer notification/event UX picks them up
|
||
// unchanged.
|
||
type HostStalenessChecker struct {
|
||
store *store.Store
|
||
threshold time.Duration // "stale" after this (default 30m — same as the controller checker)
|
||
downAfter time.Duration // "down" after this (2x threshold)
|
||
logger *log.Logger
|
||
onEvent EventNotifyFunc
|
||
|
||
mu sync.Mutex
|
||
states map[string]string // hostID → "ok" | "stale" | "down"
|
||
customerOf map[string]string // hostID → customerID (for event attribution)
|
||
downtimeStart map[string]time.Time // hostID → when it first became unreachable
|
||
}
|
||
|
||
// NewHostStalenessChecker creates the checker and seeds state from current
|
||
// host-report recency. No events are generated during initialization.
|
||
func NewHostStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *HostStalenessChecker {
|
||
sc := &HostStalenessChecker{
|
||
store: s,
|
||
threshold: threshold,
|
||
downAfter: 2 * threshold,
|
||
logger: logger,
|
||
onEvent: onEvent,
|
||
states: make(map[string]string),
|
||
customerOf: make(map[string]string),
|
||
downtimeStart: make(map[string]time.Time),
|
||
}
|
||
|
||
rows, err := s.GetHostStaleness()
|
||
if err != nil {
|
||
logger.Printf("[WARN] Host staleness checker: failed to seed states: %v", err)
|
||
return sc
|
||
}
|
||
var okCount, staleCount, downCount int
|
||
for _, row := range rows {
|
||
if s.IsCustomerBlocked(row.CustomerID) {
|
||
continue
|
||
}
|
||
sc.customerOf[row.HostID] = row.CustomerID
|
||
age := time.Since(row.LastReportAt)
|
||
switch {
|
||
case age > sc.downAfter:
|
||
sc.states[row.HostID] = "down"
|
||
downCount++
|
||
case age > sc.threshold:
|
||
sc.states[row.HostID] = "stale"
|
||
staleCount++
|
||
default:
|
||
sc.states[row.HostID] = "ok"
|
||
okCount++
|
||
}
|
||
}
|
||
logger.Printf("[INFO] Host staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount)
|
||
return sc
|
||
}
|
||
|
||
// Check evaluates all hosts and emits events on state transitions. Call every 60s.
|
||
func (sc *HostStalenessChecker) Check() {
|
||
rows, err := sc.store.GetHostStaleness()
|
||
if err != nil {
|
||
sc.logger.Printf("[WARN] Host staleness check failed: %v", err)
|
||
return
|
||
}
|
||
|
||
sc.mu.Lock()
|
||
defer sc.mu.Unlock()
|
||
|
||
seen := make(map[string]bool, len(rows))
|
||
for _, row := range rows {
|
||
seen[row.HostID] = true
|
||
if sc.store.IsCustomerBlocked(row.CustomerID) {
|
||
delete(sc.states, row.HostID)
|
||
continue
|
||
}
|
||
sc.customerOf[row.HostID] = row.CustomerID
|
||
|
||
age := time.Since(row.LastReportAt)
|
||
var newState string
|
||
switch {
|
||
case age > sc.downAfter:
|
||
newState = "down"
|
||
case age > sc.threshold:
|
||
newState = "stale"
|
||
default:
|
||
newState = "ok"
|
||
}
|
||
|
||
oldState := sc.states[row.HostID]
|
||
if oldState == "" {
|
||
sc.states[row.HostID] = newState // first observation — no event
|
||
continue
|
||
}
|
||
if oldState == newState {
|
||
continue
|
||
}
|
||
|
||
sc.states[row.HostID] = newState
|
||
if newState == "stale" && oldState == "ok" {
|
||
sc.downtimeStart[row.HostID] = time.Now()
|
||
}
|
||
downtimeDur := age
|
||
if newState == "ok" {
|
||
if t, ok := sc.downtimeStart[row.HostID]; ok {
|
||
downtimeDur = time.Since(t)
|
||
}
|
||
delete(sc.downtimeStart, row.HostID)
|
||
}
|
||
sc.emitTransition(row.HostID, row.CustomerID, oldState, newState, downtimeDur)
|
||
}
|
||
|
||
for id := range sc.states {
|
||
if !seen[id] {
|
||
delete(sc.states, id)
|
||
delete(sc.downtimeStart, id)
|
||
}
|
||
}
|
||
}
|
||
|
||
// GetState returns the current staleness state for a host.
|
||
func (sc *HostStalenessChecker) GetState(hostID string) string {
|
||
sc.mu.Lock()
|
||
defer sc.mu.Unlock()
|
||
s := sc.states[hostID]
|
||
if s == "" {
|
||
return "unknown"
|
||
}
|
||
return s
|
||
}
|
||
|
||
func (sc *HostStalenessChecker) emitTransition(hostID, customerID, oldState, newState string, age time.Duration) {
|
||
var eventType, severity, message string
|
||
switch {
|
||
case newState == "stale":
|
||
eventType = "host_stale"
|
||
severity = "warning"
|
||
message = "Host " + hostID + ": no report for " + formatDuration(age)
|
||
case newState == "down":
|
||
eventType = "host_down"
|
||
severity = "error"
|
||
message = "Host " + hostID + ": no report for " + formatDuration(age)
|
||
case newState == "ok" && (oldState == "stale" || oldState == "down"):
|
||
eventType = "host_recovered"
|
||
severity = "info"
|
||
message = "Host " + hostID + ": reports resumed (was " + oldState + " for " + formatDuration(age) + ")"
|
||
default:
|
||
return
|
||
}
|
||
|
||
sc.logger.Printf("[INFO] Host staleness: %s %s → %s (%s)", hostID, oldState, newState, eventType)
|
||
|
||
if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil {
|
||
sc.logger.Printf("[WARN] Failed to save host staleness event for %s: %v", hostID, err)
|
||
return
|
||
}
|
||
if sc.onEvent != nil {
|
||
sc.onEvent(customerID, eventType, severity, message, "{}", "hub")
|
||
}
|
||
}
|