feat: Hub monitoring takeover — event system, dead man's switch, notifications (v0.3.0)
Replace external Healthchecks.io with Hub-native monitoring. New events table + /api/v1/event endpoint for structured events from controllers. Staleness checker (60s) detects unresponsive nodes. Backup deadline checker (daily 05:00) catches missed backups. Notification dispatcher sends operator (English) + customer (Hungarian) emails via Resend with per-event cooldowns. Event timeline on customer page, dashboard badges. Config form deprecates Monitoring UUIDs section. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,86 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
|
||||
)
|
||||
|
||||
// budapest returns the Europe/Budapest timezone (cached).
|
||||
var budapest *time.Location
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
budapest, err = time.LoadLocation("Europe/Budapest")
|
||||
if err != nil {
|
||||
// Fallback: UTC+1 (CET base; DST handled by OS if available)
|
||||
budapest = time.FixedZone("CET", 3600)
|
||||
}
|
||||
}
|
||||
|
||||
// CheckBackupDeadlines checks whether active customers had their expected
|
||||
// daily backups and DB dumps. Runs once daily at 05:00 Budapest time.
|
||||
//
|
||||
// For each active customer, it checks for backup_completed and db_dump_completed
|
||||
// events since Budapest midnight. If neither success nor failure events exist,
|
||||
// it inserts expected_backup_missed / expected_dbdump_missed events.
|
||||
//
|
||||
// Customers whose nodes are "down" (no report in >1h) are skipped — they
|
||||
// already have staleness events.
|
||||
func CheckBackupDeadlines(s *store.Store, staleness *StalenessChecker, onEvent EventNotifyFunc, logger *log.Logger) {
|
||||
customerIDs, err := s.GetActiveCustomerIDs()
|
||||
if err != nil {
|
||||
logger.Printf("[WARN] Deadline check: failed to get active customers: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Budapest midnight today
|
||||
now := time.Now().In(budapest)
|
||||
midnightBudapest := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, budapest)
|
||||
sinceUTC := midnightBudapest.UTC()
|
||||
|
||||
var backupMissed, dbdumpMissed, skipped int
|
||||
|
||||
for _, id := range customerIDs {
|
||||
// Skip nodes that are down — they already have staleness events
|
||||
if staleness != nil && staleness.GetState(id) == "down" {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
// Check blocked
|
||||
if s.IsCustomerBlocked(id) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check backup_completed / backup_failed since midnight
|
||||
backupOK, _ := s.GetEventsByType(id, "backup_completed", sinceUTC)
|
||||
backupFailed, _ := s.GetEventsByType(id, "backup_failed", sinceUTC)
|
||||
if len(backupOK) == 0 && len(backupFailed) == 0 {
|
||||
msg := "No backup completed or failed since midnight"
|
||||
if _, err := s.SaveEvent(id, "expected_backup_missed", "error", msg, "{}", "hub"); err != nil {
|
||||
logger.Printf("[WARN] Failed to save expected_backup_missed for %s: %v", id, err)
|
||||
} else if onEvent != nil {
|
||||
onEvent(id, "expected_backup_missed", "error", msg, "{}", "hub")
|
||||
}
|
||||
backupMissed++
|
||||
}
|
||||
|
||||
// Check db_dump_completed / db_dump_failed since midnight
|
||||
dumpOK, _ := s.GetEventsByType(id, "db_dump_completed", sinceUTC)
|
||||
dumpFailed, _ := s.GetEventsByType(id, "db_dump_failed", sinceUTC)
|
||||
if len(dumpOK) == 0 && len(dumpFailed) == 0 {
|
||||
msg := "No DB dump completed or failed since midnight"
|
||||
if _, err := s.SaveEvent(id, "expected_dbdump_missed", "error", msg, "{}", "hub"); err != nil {
|
||||
logger.Printf("[WARN] Failed to save expected_dbdump_missed for %s: %v", id, err)
|
||||
} else if onEvent != nil {
|
||||
onEvent(id, "expected_dbdump_missed", "error", msg, "{}", "hub")
|
||||
}
|
||||
dbdumpMissed++
|
||||
}
|
||||
}
|
||||
|
||||
logger.Printf("[INFO] Deadline check: %d customers, %d backup missed, %d dbdump missed, %d skipped (down)",
|
||||
len(customerIDs), backupMissed, dbdumpMissed, skipped)
|
||||
}
|
||||
@@ -0,0 +1,183 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
|
||||
)
|
||||
|
||||
// EventNotifyFunc is called after a hub-generated event is saved,
|
||||
// to trigger notification dispatch. Keeps monitor decoupled from notify.
|
||||
type EventNotifyFunc func(customerID, eventType, severity, message, detailsJSON, source string)
|
||||
|
||||
// StalenessChecker monitors customer report freshness and generates
|
||||
// node_stale / node_down / node_recovered events on state transitions.
|
||||
type StalenessChecker struct {
|
||||
store *store.Store
|
||||
threshold time.Duration // "stale" after this duration (default 30m)
|
||||
downAfter time.Duration // "down" after this duration (2x threshold)
|
||||
logger *log.Logger
|
||||
onEvent EventNotifyFunc
|
||||
|
||||
mu sync.Mutex
|
||||
states map[string]string // customerID → "ok" | "stale" | "down"
|
||||
}
|
||||
|
||||
// NewStalenessChecker creates a checker and initializes state from current data.
|
||||
// No events are generated during initialization (binding #12).
|
||||
// onEvent is called after each hub-generated event is saved (may be nil).
|
||||
func NewStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *StalenessChecker {
|
||||
sc := &StalenessChecker{
|
||||
store: s,
|
||||
threshold: threshold,
|
||||
downAfter: 2 * threshold,
|
||||
logger: logger,
|
||||
onEvent: onEvent,
|
||||
states: make(map[string]string),
|
||||
}
|
||||
|
||||
// Seed states from current report timestamps — no events on init
|
||||
customers, err := s.GetCustomers()
|
||||
if err != nil {
|
||||
logger.Printf("[WARN] Staleness checker: failed to seed states: %v", err)
|
||||
return sc
|
||||
}
|
||||
|
||||
var okCount, staleCount, downCount int
|
||||
for _, c := range customers {
|
||||
if s.IsCustomerBlocked(c.CustomerID) {
|
||||
continue
|
||||
}
|
||||
age := time.Since(c.ReceivedAt)
|
||||
switch {
|
||||
case age > sc.downAfter:
|
||||
sc.states[c.CustomerID] = "down"
|
||||
downCount++
|
||||
case age > sc.threshold:
|
||||
sc.states[c.CustomerID] = "stale"
|
||||
staleCount++
|
||||
default:
|
||||
sc.states[c.CustomerID] = "ok"
|
||||
okCount++
|
||||
}
|
||||
}
|
||||
logger.Printf("[INFO] Staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount)
|
||||
|
||||
return sc
|
||||
}
|
||||
|
||||
// Check evaluates all customers and emits events on state transitions.
|
||||
// Should be called periodically (every 60s).
|
||||
func (sc *StalenessChecker) Check() {
|
||||
customers, err := sc.store.GetCustomers()
|
||||
if err != nil {
|
||||
sc.logger.Printf("[WARN] Staleness check failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
|
||||
// Track which customers are still present (to clean up removed ones)
|
||||
seen := make(map[string]bool, len(customers))
|
||||
|
||||
for _, c := range customers {
|
||||
seen[c.CustomerID] = true
|
||||
|
||||
if sc.store.IsCustomerBlocked(c.CustomerID) {
|
||||
delete(sc.states, c.CustomerID)
|
||||
continue
|
||||
}
|
||||
|
||||
age := time.Since(c.ReceivedAt)
|
||||
var newState string
|
||||
switch {
|
||||
case age > sc.downAfter:
|
||||
newState = "down"
|
||||
case age > sc.threshold:
|
||||
newState = "stale"
|
||||
default:
|
||||
newState = "ok"
|
||||
}
|
||||
|
||||
oldState := sc.states[c.CustomerID]
|
||||
if oldState == "" {
|
||||
// New customer — set state without event
|
||||
sc.states[c.CustomerID] = newState
|
||||
continue
|
||||
}
|
||||
|
||||
if oldState == newState {
|
||||
continue
|
||||
}
|
||||
|
||||
// State transition — emit event
|
||||
sc.states[c.CustomerID] = newState
|
||||
sc.emitTransition(c.CustomerID, oldState, newState, age)
|
||||
}
|
||||
|
||||
// Clean up customers that no longer have reports
|
||||
for id := range sc.states {
|
||||
if !seen[id] {
|
||||
delete(sc.states, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetState returns the current staleness state for a customer.
|
||||
func (sc *StalenessChecker) GetState(customerID string) string {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
s := sc.states[customerID]
|
||||
if s == "" {
|
||||
return "unknown"
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (sc *StalenessChecker) emitTransition(customerID, oldState, newState string, age time.Duration) {
|
||||
var eventType, severity, message string
|
||||
|
||||
switch {
|
||||
case newState == "stale":
|
||||
eventType = "node_stale"
|
||||
severity = "warning"
|
||||
message = "No report received for " + formatDuration(age)
|
||||
case newState == "down":
|
||||
eventType = "node_down"
|
||||
severity = "error"
|
||||
message = "No report received for " + formatDuration(age)
|
||||
case newState == "ok" && (oldState == "stale" || oldState == "down"):
|
||||
eventType = "node_recovered"
|
||||
severity = "info"
|
||||
message = "Reports resumed (was " + oldState + " for " + formatDuration(age) + ")"
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
sc.logger.Printf("[INFO] Staleness: %s %s → %s (%s)", customerID, oldState, newState, eventType)
|
||||
|
||||
if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil {
|
||||
sc.logger.Printf("[WARN] Failed to save staleness event for %s: %v", customerID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if sc.onEvent != nil {
|
||||
sc.onEvent(customerID, eventType, severity, message, "{}", "hub")
|
||||
}
|
||||
}
|
||||
|
||||
func formatDuration(d time.Duration) string {
|
||||
if d < time.Hour {
|
||||
return fmt.Sprintf("%dm", int(d.Minutes()))
|
||||
}
|
||||
h := int(d.Hours())
|
||||
m := int(d.Minutes()) % 60
|
||||
if m == 0 {
|
||||
return fmt.Sprintf("%dh", h)
|
||||
}
|
||||
return fmt.Sprintf("%dh%dm", h, m)
|
||||
}
|
||||
Reference in New Issue
Block a user