Files
felhom.eu/hub/internal/monitor/staleness.go
T
admin 3217cb4751 feat: Hub monitoring takeover — event system, dead man's switch, notifications (v0.3.0)
Replace external Healthchecks.io with Hub-native monitoring. New events
table + /api/v1/event endpoint for structured events from controllers.
Staleness checker (60s) detects unresponsive nodes. Backup deadline
checker (daily 05:00) catches missed backups. Notification dispatcher
sends operator (English) + customer (Hungarian) emails via Resend with
per-event cooldowns. Event timeline on customer page, dashboard badges.
Config form deprecates Monitoring UUIDs section.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 18:53:24 +01:00

184 lines
4.8 KiB
Go

package monitor
import (
"fmt"
"log"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
)
// EventNotifyFunc is called after a hub-generated event is saved,
// to trigger notification dispatch. Keeps monitor decoupled from notify.
type EventNotifyFunc func(customerID, eventType, severity, message, detailsJSON, source string)
// StalenessChecker monitors customer report freshness and generates
// node_stale / node_down / node_recovered events on state transitions.
type StalenessChecker struct {
store *store.Store
threshold time.Duration // "stale" after this duration (default 30m)
downAfter time.Duration // "down" after this duration (2x threshold)
logger *log.Logger
onEvent EventNotifyFunc
mu sync.Mutex
states map[string]string // customerID → "ok" | "stale" | "down"
}
// NewStalenessChecker creates a checker and initializes state from current data.
// No events are generated during initialization (binding #12).
// onEvent is called after each hub-generated event is saved (may be nil).
func NewStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *StalenessChecker {
sc := &StalenessChecker{
store: s,
threshold: threshold,
downAfter: 2 * threshold,
logger: logger,
onEvent: onEvent,
states: make(map[string]string),
}
// Seed states from current report timestamps — no events on init
customers, err := s.GetCustomers()
if err != nil {
logger.Printf("[WARN] Staleness checker: failed to seed states: %v", err)
return sc
}
var okCount, staleCount, downCount int
for _, c := range customers {
if s.IsCustomerBlocked(c.CustomerID) {
continue
}
age := time.Since(c.ReceivedAt)
switch {
case age > sc.downAfter:
sc.states[c.CustomerID] = "down"
downCount++
case age > sc.threshold:
sc.states[c.CustomerID] = "stale"
staleCount++
default:
sc.states[c.CustomerID] = "ok"
okCount++
}
}
logger.Printf("[INFO] Staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount)
return sc
}
// Check evaluates all customers and emits events on state transitions.
// Should be called periodically (every 60s).
func (sc *StalenessChecker) Check() {
customers, err := sc.store.GetCustomers()
if err != nil {
sc.logger.Printf("[WARN] Staleness check failed: %v", err)
return
}
sc.mu.Lock()
defer sc.mu.Unlock()
// Track which customers are still present (to clean up removed ones)
seen := make(map[string]bool, len(customers))
for _, c := range customers {
seen[c.CustomerID] = true
if sc.store.IsCustomerBlocked(c.CustomerID) {
delete(sc.states, c.CustomerID)
continue
}
age := time.Since(c.ReceivedAt)
var newState string
switch {
case age > sc.downAfter:
newState = "down"
case age > sc.threshold:
newState = "stale"
default:
newState = "ok"
}
oldState := sc.states[c.CustomerID]
if oldState == "" {
// New customer — set state without event
sc.states[c.CustomerID] = newState
continue
}
if oldState == newState {
continue
}
// State transition — emit event
sc.states[c.CustomerID] = newState
sc.emitTransition(c.CustomerID, oldState, newState, age)
}
// Clean up customers that no longer have reports
for id := range sc.states {
if !seen[id] {
delete(sc.states, id)
}
}
}
// GetState returns the current staleness state for a customer.
func (sc *StalenessChecker) GetState(customerID string) string {
sc.mu.Lock()
defer sc.mu.Unlock()
s := sc.states[customerID]
if s == "" {
return "unknown"
}
return s
}
func (sc *StalenessChecker) emitTransition(customerID, oldState, newState string, age time.Duration) {
var eventType, severity, message string
switch {
case newState == "stale":
eventType = "node_stale"
severity = "warning"
message = "No report received for " + formatDuration(age)
case newState == "down":
eventType = "node_down"
severity = "error"
message = "No report received for " + formatDuration(age)
case newState == "ok" && (oldState == "stale" || oldState == "down"):
eventType = "node_recovered"
severity = "info"
message = "Reports resumed (was " + oldState + " for " + formatDuration(age) + ")"
default:
return
}
sc.logger.Printf("[INFO] Staleness: %s %s → %s (%s)", customerID, oldState, newState, eventType)
if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil {
sc.logger.Printf("[WARN] Failed to save staleness event for %s: %v", customerID, err)
return
}
if sc.onEvent != nil {
sc.onEvent(customerID, eventType, severity, message, "{}", "hub")
}
}
func formatDuration(d time.Duration) string {
if d < time.Hour {
return fmt.Sprintf("%dm", int(d.Minutes()))
}
h := int(d.Hours())
m := int(d.Minutes()) % 60
if m == 0 {
return fmt.Sprintf("%dh", h)
}
return fmt.Sprintf("%dh%dm", h, m)
}