Files
deploy-felhom-compose/controller/internal/notify/notifier.go
T
admin bdbe170a54 feat: storage watchdog — USB disconnect detection, auto-stop, safe eject, auto-reconnect (v0.17.0)
New storage watchdog monitors registered storage paths every 5s. On disconnect
(3 consecutive probe failures), auto-stops affected apps, lazy-unmounts stale
VFS entries, fires alerts/notifications/hub report. On reconnect (UUID detected),
auto-remounts via fstab, cleans stale restic locks, offers app restart.

Safe disconnect UI for USB drives: confirmation dialog, stop apps, sync, unmount.
Disconnected state visible across all pages (dashboard, settings, backups, monitoring)
with hatched red bars and badges. Backup guards skip disconnected drives.

22 files changed (1 new: monitor/watchdog.go), ~1500 lines added.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 19:42:26 +01:00

356 lines
10 KiB
Go

package notify
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-controller/internal/settings"
)
// Notifier sends notification events to the hub relay service.
// Non-blocking: fires requests in goroutines, logs errors but doesn't retry aggressively.
type Notifier struct {
hubURL string
apiKey string
customerID string
httpClient *http.Client
logger *log.Logger
enabled bool
settings *settings.Settings
mu sync.Mutex
cooldowns map[string]time.Time // event_type -> last notification time
perEventCooldown map[string]time.Duration // per-event override cooldown durations
// prevHealthStatus tracks the previous health check status for change detection
prevHealthStatus string
}
// New creates a new Notifier. Returns a no-op notifier if hub is not enabled.
func New(hubURL, apiKey, customerID string, sett *settings.Settings, logger *log.Logger) *Notifier {
enabled := hubURL != "" && apiKey != ""
if enabled {
logger.Printf("[INFO] Notifier enabled (hub: %s)", hubURL)
} else {
logger.Printf("[INFO] Notifier disabled (hub not configured)")
}
return &Notifier{
hubURL: hubURL,
apiKey: apiKey,
customerID: customerID,
httpClient: &http.Client{Timeout: 10 * time.Second},
logger: logger,
enabled: enabled,
settings: sett,
cooldowns: make(map[string]time.Time),
perEventCooldown: map[string]time.Duration{
"storage_disconnected": 1 * time.Hour,
"storage_reconnected": 1 * time.Hour,
},
}
}
// IsEnabled returns whether the notifier has a configured hub connection.
func (n *Notifier) IsEnabled() bool {
return n.enabled
}
// preferencesRequest is the JSON payload sent to the hub preferences endpoint.
type preferencesRequest struct {
CustomerID string `json:"customer_id"`
Email string `json:"email"`
EnabledEvents []string `json:"enabled_events"`
}
// SyncPreferences pushes the current notification preferences to the hub.
// Called after the user saves notification settings on the settings page.
// Synchronous — returns error for the handler to display to the user.
func (n *Notifier) SyncPreferences(email string, enabledEvents []string) error {
if !n.enabled {
return fmt.Errorf("hub nem konfigurált")
}
payload := preferencesRequest{
CustomerID: n.customerID,
Email: email,
EnabledEvents: enabledEvents,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
url := n.hubURL + "/api/v1/preferences"
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+n.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := n.httpClient.Do(req)
if err != nil {
return fmt.Errorf("hub elérhetetlen: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return fmt.Errorf("hub hiba (%d): %s", resp.StatusCode, string(body))
}
n.logger.Printf("[INFO] Notification preferences synced to hub: email=%s, events=%v", email, enabledEvents)
return nil
}
// notifyRequest is the JSON payload sent to the hub.
type notifyRequest struct {
CustomerID string `json:"customer_id"`
EventType string `json:"event_type"`
Severity string `json:"severity"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
}
// Notify sends a notification event to the hub relay.
// Checks local cooldown and event preferences before sending.
// Non-blocking: fires the HTTP request in a goroutine.
func (n *Notifier) Notify(eventType, severity, message, details string) {
if !n.enabled {
return
}
prefs := n.settings.GetNotificationPrefs()
if prefs.Email == "" {
return // No email configured, skip
}
// Check if event is enabled in preferences
eventEnabled := false
for _, e := range prefs.EnabledEvents {
if e == eventType {
eventEnabled = true
break
}
}
if !eventEnabled {
return
}
// Check cooldown — per-event override takes priority over global
cooldownDuration := time.Duration(prefs.CooldownHours) * time.Hour
if cooldownDuration == 0 {
cooldownDuration = 6 * time.Hour
}
if override, ok := n.perEventCooldown[eventType]; ok {
cooldownDuration = override
}
n.mu.Lock()
lastSent, exists := n.cooldowns[eventType]
if exists && time.Since(lastSent) < cooldownDuration {
n.mu.Unlock()
n.logger.Printf("[DEBUG] Notification cooldown active for %s (sent %s ago)", eventType, time.Since(lastSent).Round(time.Minute))
return
}
n.cooldowns[eventType] = time.Now()
n.mu.Unlock()
// Fire the notification in a goroutine (non-blocking)
go func() {
payload := notifyRequest{
CustomerID: n.customerID,
EventType: eventType,
Severity: severity,
Message: message,
Details: details,
}
jsonData, err := json.Marshal(payload)
if err != nil {
n.logger.Printf("[ERROR] Failed to marshal notification: %v", err)
return
}
url := n.hubURL + "/api/v1/notify"
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
if err != nil {
n.logger.Printf("[ERROR] Failed to create notification request: %v", err)
return
}
req.Header.Set("Authorization", "Bearer "+n.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := n.httpClient.Do(req)
if err != nil {
n.logger.Printf("[WARN] Failed to send notification to hub: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
n.logger.Printf("[WARN] Hub notification returned %d for %s/%s", resp.StatusCode, eventType, severity)
return
}
n.logger.Printf("[INFO] Notification sent: %s (%s) — %s", eventType, severity, message)
}()
}
// NotifyHealthChange checks if health status changed and sends appropriate notifications.
// Call this after each health check with the new report status/issues/warnings.
func (n *Notifier) NotifyHealthChange(status string, issues, warnings []string) {
if !n.enabled {
return
}
prev := n.prevHealthStatus
n.prevHealthStatus = status
// Only notify on status degradation (ok→warn, ok→fail, warn→fail)
if prev == "" {
return // First run, just record status
}
if statusRank(status) <= statusRank(prev) {
return // Status improved or stayed the same
}
// Notify about each issue/warning
for _, issue := range issues {
n.Notify("container_unhealthy", "critical", issue, "")
}
for _, w := range warnings {
// Determine specific event type from warning message
eventType := classifyWarning(w)
n.Notify(eventType, "warning", w, "")
}
}
// NotifyBackupFailed sends a notification about a backup failure.
func (n *Notifier) NotifyBackupFailed(message, details string) {
n.Notify("backup_failed", "critical", message, details)
}
// NotifyDBDumpFailed sends a notification about a database dump failure.
func (n *Notifier) NotifyDBDumpFailed(message, details string) {
n.Notify("db_dump_failed", "critical", message, details)
}
// NotifyIntegrityFailed sends a notification about a backup integrity check failure.
func (n *Notifier) NotifyIntegrityFailed(message, details string) {
n.Notify("integrity_failed", "warning", message, details)
}
// NotifyUpdateSuccess sends a notification about a successful controller update.
func (n *Notifier) NotifyUpdateSuccess(fromVer, toVer string) {
n.Notify("update_success", "info",
fmt.Sprintf("Controller frissítve: %s → %s", fromVer, toVer), "")
}
// NotifyUpdateFailed sends a notification about a failed controller update.
func (n *Notifier) NotifyUpdateFailed(targetVer, errMsg string) {
n.Notify("update_failed", "warning",
fmt.Sprintf("Controller frissítés sikertelen: %s — %s", targetVer, errMsg), "")
}
// SendTest sends a test notification for verifying the notification flow.
func (n *Notifier) SendTest() error {
if !n.enabled {
return fmt.Errorf("notifications not enabled (hub not configured)")
}
payload := notifyRequest{
CustomerID: n.customerID,
EventType: "test",
Severity: "info",
Message: "Teszt értesítés a Felhom rendszerből",
Details: "Ha ezt az emailt megkapta, az értesítések megfelelően működnek.",
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
url := n.hubURL + "/api/v1/notify"
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+n.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := n.httpClient.Do(req)
if err != nil {
return fmt.Errorf("send: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("hub returned %d", resp.StatusCode)
}
return nil
}
func statusRank(status string) int {
switch status {
case "ok":
return 0
case "warn":
return 1
case "fail":
return 2
default:
return 0
}
}
func classifyWarning(message string) string {
// Try to classify the warning message into a specific event type
switch {
case contains(message, "disk") || contains(message, "Disk") || contains(message, "SSD") || contains(message, "HDD"):
if contains(message, "critical") || contains(message, "Critical") {
return "disk_critical"
}
return "disk_warning"
case contains(message, "Memory") || contains(message, "memory"):
return "disk_warning" // group memory under system warnings
case contains(message, "Temperature") || contains(message, "temperature"):
return "disk_warning" // group temp under system warnings
case contains(message, "container") || contains(message, "Container"):
return "container_unhealthy"
default:
return "disk_warning" // fallback to generic system warning
}
}
func contains(s, substr string) bool {
return strings.Contains(s, substr)
}
// NotifyStorageDisconnected sends a notification about a drive disconnection.
func (n *Notifier) NotifyStorageDisconnected(label string, stoppedApps []string) {
msg := fmt.Sprintf("Meghajtó váratlanul leválasztva: %s", label)
details := ""
if len(stoppedApps) > 0 {
details = fmt.Sprintf("Leállított alkalmazások: %s", strings.Join(stoppedApps, ", "))
}
n.Notify("storage_disconnected", "critical", msg, details)
}
// NotifyStorageReconnected sends a notification about a drive reconnection.
func (n *Notifier) NotifyStorageReconnected(label string) {
n.Notify("storage_reconnected", "info",
fmt.Sprintf("Meghajtó újra csatlakoztatva: %s. Az alkalmazások manuálisan indíthatók.", label), "")
}