diff --git a/hub/internal/monitor/staleness.go b/hub/internal/monitor/staleness.go index 76bf33a..2b8c7b3 100644 --- a/hub/internal/monitor/staleness.go +++ b/hub/internal/monitor/staleness.go @@ -22,8 +22,9 @@ type StalenessChecker struct { logger *log.Logger onEvent EventNotifyFunc - mu sync.Mutex - states map[string]string // customerID → "ok" | "stale" | "down" + mu sync.Mutex + states map[string]string // customerID → "ok" | "stale" | "down" + downtimeStart map[string]time.Time // customerID → when node first became unreachable } // NewStalenessChecker creates a checker and initializes state from current data. @@ -31,12 +32,13 @@ type StalenessChecker struct { // onEvent is called after each hub-generated event is saved (may be nil). func NewStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *StalenessChecker { sc := &StalenessChecker{ - store: s, - threshold: threshold, - downAfter: 2 * threshold, - logger: logger, - onEvent: onEvent, - states: make(map[string]string), + store: s, + threshold: threshold, + downAfter: 2 * threshold, + logger: logger, + onEvent: onEvent, + states: make(map[string]string), + downtimeStart: make(map[string]time.Time), } // Seed states from current report timestamps — no events on init @@ -116,13 +118,25 @@ func (sc *StalenessChecker) Check() { // State transition — emit event sc.states[c.CustomerID] = newState - sc.emitTransition(c.CustomerID, oldState, newState, age) + if newState == "stale" && oldState == "ok" { + // Record when node first became unreachable + sc.downtimeStart[c.CustomerID] = time.Now() + } + downtimeDur := age + if newState == "ok" { + if t, ok := sc.downtimeStart[c.CustomerID]; ok { + downtimeDur = time.Since(t) + } + delete(sc.downtimeStart, c.CustomerID) + } + sc.emitTransition(c.CustomerID, oldState, newState, downtimeDur) } // Clean up customers that no longer have reports for id := range sc.states { if !seen[id] { delete(sc.states, id) + delete(sc.downtimeStart, id) } } }