198 lines
5.3 KiB
Go
198 lines
5.3 KiB
Go
package monitor
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
|
|
)
|
|
|
|
// EventNotifyFunc is called after a hub-generated event is saved,
|
|
// to trigger notification dispatch. Keeps monitor decoupled from notify.
|
|
type EventNotifyFunc func(customerID, eventType, severity, message, detailsJSON, source string)
|
|
|
|
// StalenessChecker monitors customer report freshness and generates
|
|
// node_stale / node_down / node_recovered events on state transitions.
|
|
type StalenessChecker struct {
|
|
store *store.Store
|
|
threshold time.Duration // "stale" after this duration (default 30m)
|
|
downAfter time.Duration // "down" after this duration (2x threshold)
|
|
logger *log.Logger
|
|
onEvent EventNotifyFunc
|
|
|
|
mu sync.Mutex
|
|
states map[string]string // customerID → "ok" | "stale" | "down"
|
|
downtimeStart map[string]time.Time // customerID → when node first became unreachable
|
|
}
|
|
|
|
// NewStalenessChecker creates a checker and initializes state from current data.
|
|
// No events are generated during initialization (binding #12).
|
|
// onEvent is called after each hub-generated event is saved (may be nil).
|
|
func NewStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *StalenessChecker {
|
|
sc := &StalenessChecker{
|
|
store: s,
|
|
threshold: threshold,
|
|
downAfter: 2 * threshold,
|
|
logger: logger,
|
|
onEvent: onEvent,
|
|
states: make(map[string]string),
|
|
downtimeStart: make(map[string]time.Time),
|
|
}
|
|
|
|
// Seed states from current report timestamps — no events on init
|
|
customers, err := s.GetCustomers()
|
|
if err != nil {
|
|
logger.Printf("[WARN] Staleness checker: failed to seed states: %v", err)
|
|
return sc
|
|
}
|
|
|
|
var okCount, staleCount, downCount int
|
|
for _, c := range customers {
|
|
if s.IsCustomerBlocked(c.CustomerID) {
|
|
continue
|
|
}
|
|
age := time.Since(c.ReceivedAt)
|
|
switch {
|
|
case age > sc.downAfter:
|
|
sc.states[c.CustomerID] = "down"
|
|
downCount++
|
|
case age > sc.threshold:
|
|
sc.states[c.CustomerID] = "stale"
|
|
staleCount++
|
|
default:
|
|
sc.states[c.CustomerID] = "ok"
|
|
okCount++
|
|
}
|
|
}
|
|
logger.Printf("[INFO] Staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount)
|
|
|
|
return sc
|
|
}
|
|
|
|
// Check evaluates all customers and emits events on state transitions.
|
|
// Should be called periodically (every 60s).
|
|
func (sc *StalenessChecker) Check() {
|
|
customers, err := sc.store.GetCustomers()
|
|
if err != nil {
|
|
sc.logger.Printf("[WARN] Staleness check failed: %v", err)
|
|
return
|
|
}
|
|
|
|
sc.mu.Lock()
|
|
defer sc.mu.Unlock()
|
|
|
|
// Track which customers are still present (to clean up removed ones)
|
|
seen := make(map[string]bool, len(customers))
|
|
|
|
for _, c := range customers {
|
|
seen[c.CustomerID] = true
|
|
|
|
if sc.store.IsCustomerBlocked(c.CustomerID) {
|
|
delete(sc.states, c.CustomerID)
|
|
continue
|
|
}
|
|
|
|
age := time.Since(c.ReceivedAt)
|
|
var newState string
|
|
switch {
|
|
case age > sc.downAfter:
|
|
newState = "down"
|
|
case age > sc.threshold:
|
|
newState = "stale"
|
|
default:
|
|
newState = "ok"
|
|
}
|
|
|
|
oldState := sc.states[c.CustomerID]
|
|
if oldState == "" {
|
|
// New customer — set state without event
|
|
sc.states[c.CustomerID] = newState
|
|
continue
|
|
}
|
|
|
|
if oldState == newState {
|
|
continue
|
|
}
|
|
|
|
// State transition — emit event
|
|
sc.states[c.CustomerID] = newState
|
|
if newState == "stale" && oldState == "ok" {
|
|
// Record when node first became unreachable
|
|
sc.downtimeStart[c.CustomerID] = time.Now()
|
|
}
|
|
downtimeDur := age
|
|
if newState == "ok" {
|
|
if t, ok := sc.downtimeStart[c.CustomerID]; ok {
|
|
downtimeDur = time.Since(t)
|
|
}
|
|
delete(sc.downtimeStart, c.CustomerID)
|
|
}
|
|
sc.emitTransition(c.CustomerID, oldState, newState, downtimeDur)
|
|
}
|
|
|
|
// Clean up customers that no longer have reports
|
|
for id := range sc.states {
|
|
if !seen[id] {
|
|
delete(sc.states, id)
|
|
delete(sc.downtimeStart, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetState returns the current staleness state for a customer.
|
|
func (sc *StalenessChecker) GetState(customerID string) string {
|
|
sc.mu.Lock()
|
|
defer sc.mu.Unlock()
|
|
s := sc.states[customerID]
|
|
if s == "" {
|
|
return "unknown"
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (sc *StalenessChecker) emitTransition(customerID, oldState, newState string, age time.Duration) {
|
|
var eventType, severity, message string
|
|
|
|
switch {
|
|
case newState == "stale":
|
|
eventType = "node_stale"
|
|
severity = "warning"
|
|
message = "No report received for " + formatDuration(age)
|
|
case newState == "down":
|
|
eventType = "node_down"
|
|
severity = "error"
|
|
message = "No report received for " + formatDuration(age)
|
|
case newState == "ok" && (oldState == "stale" || oldState == "down"):
|
|
eventType = "node_recovered"
|
|
severity = "info"
|
|
message = "Reports resumed (was " + oldState + " for " + formatDuration(age) + ")"
|
|
default:
|
|
return
|
|
}
|
|
|
|
sc.logger.Printf("[INFO] Staleness: %s %s → %s (%s)", customerID, oldState, newState, eventType)
|
|
|
|
if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil {
|
|
sc.logger.Printf("[WARN] Failed to save staleness event for %s: %v", customerID, err)
|
|
return
|
|
}
|
|
|
|
if sc.onEvent != nil {
|
|
sc.onEvent(customerID, eventType, severity, message, "{}", "hub")
|
|
}
|
|
}
|
|
|
|
func formatDuration(d time.Duration) string {
|
|
if d < time.Hour {
|
|
return fmt.Sprintf("%dm", int(d.Minutes()))
|
|
}
|
|
h := int(d.Hours())
|
|
m := int(d.Minutes()) % 60
|
|
if m == 0 {
|
|
return fmt.Sprintf("%dh", h)
|
|
}
|
|
return fmt.Sprintf("%dh%dm", h, m)
|
|
}
|