update
This commit is contained in:
@@ -22,8 +22,9 @@ type StalenessChecker struct {
|
|||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
onEvent EventNotifyFunc
|
onEvent EventNotifyFunc
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
states map[string]string // customerID → "ok" | "stale" | "down"
|
states map[string]string // customerID → "ok" | "stale" | "down"
|
||||||
|
downtimeStart map[string]time.Time // customerID → when node first became unreachable
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStalenessChecker creates a checker and initializes state from current data.
|
// NewStalenessChecker creates a checker and initializes state from current data.
|
||||||
@@ -31,12 +32,13 @@ type StalenessChecker struct {
|
|||||||
// onEvent is called after each hub-generated event is saved (may be nil).
|
// onEvent is called after each hub-generated event is saved (may be nil).
|
||||||
func NewStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *StalenessChecker {
|
func NewStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *StalenessChecker {
|
||||||
sc := &StalenessChecker{
|
sc := &StalenessChecker{
|
||||||
store: s,
|
store: s,
|
||||||
threshold: threshold,
|
threshold: threshold,
|
||||||
downAfter: 2 * threshold,
|
downAfter: 2 * threshold,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
onEvent: onEvent,
|
onEvent: onEvent,
|
||||||
states: make(map[string]string),
|
states: make(map[string]string),
|
||||||
|
downtimeStart: make(map[string]time.Time),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seed states from current report timestamps — no events on init
|
// Seed states from current report timestamps — no events on init
|
||||||
@@ -116,13 +118,25 @@ func (sc *StalenessChecker) Check() {
|
|||||||
|
|
||||||
// State transition — emit event
|
// State transition — emit event
|
||||||
sc.states[c.CustomerID] = newState
|
sc.states[c.CustomerID] = newState
|
||||||
sc.emitTransition(c.CustomerID, oldState, newState, age)
|
if newState == "stale" && oldState == "ok" {
|
||||||
|
// Record when node first became unreachable
|
||||||
|
sc.downtimeStart[c.CustomerID] = time.Now()
|
||||||
|
}
|
||||||
|
downtimeDur := age
|
||||||
|
if newState == "ok" {
|
||||||
|
if t, ok := sc.downtimeStart[c.CustomerID]; ok {
|
||||||
|
downtimeDur = time.Since(t)
|
||||||
|
}
|
||||||
|
delete(sc.downtimeStart, c.CustomerID)
|
||||||
|
}
|
||||||
|
sc.emitTransition(c.CustomerID, oldState, newState, downtimeDur)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up customers that no longer have reports
|
// Clean up customers that no longer have reports
|
||||||
for id := range sc.states {
|
for id := range sc.states {
|
||||||
if !seen[id] {
|
if !seen[id] {
|
||||||
delete(sc.states, id)
|
delete(sc.states, id)
|
||||||
|
delete(sc.downtimeStart, id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user