package notify import ( "bytes" "encoding/json" "fmt" "io" "log" "net/http" "sync" "time" "gitea.dooplex.hu/admin/felhom-controller/internal/settings" ) // Notifier sends structured events to the hub via /api/v1/event. // Non-blocking: fires requests in goroutines, logs errors but doesn't retry aggressively. // Cooldown logic is handled by the Hub — the controller sends all events unconditionally. // EventHistoryEntry records a sent event for the debug page. type EventHistoryEntry struct { Timestamp time.Time `json:"timestamp"` EventType string `json:"event_type"` Severity string `json:"severity"` Message string `json:"message"` HubStatus int `json:"hub_status"` HubError string `json:"hub_error,omitempty"` } type Notifier struct { hubURL string apiKey string customerID string httpClient *http.Client logger *log.Logger enabled bool debug bool settings *settings.Settings mu sync.Mutex prevHealthStatus string // tracks previous health check status for change detection // Event history ring buffer (debug page) historyMu sync.RWMutex history [50]EventHistoryEntry histPos int histFull bool } // New creates a new Notifier. Returns a no-op notifier if hub is not enabled. func New(hubURL, apiKey, customerID string, sett *settings.Settings, logger *log.Logger, debug bool) *Notifier { enabled := hubURL != "" && apiKey != "" if enabled { logger.Printf("[INFO] Notifier enabled (hub: %s)", hubURL) } else { logger.Printf("[INFO] Notifier disabled (hub not configured)") } return &Notifier{ hubURL: hubURL, apiKey: apiKey, customerID: customerID, httpClient: &http.Client{Timeout: 10 * time.Second}, logger: logger, enabled: enabled, debug: debug, settings: sett, } } // IsEnabled returns whether the notifier has a configured hub connection. func (n *Notifier) IsEnabled() bool { return n.enabled } // ── Detail structs ─────────────────────────────────────────────────── // BackupDetails holds structured data for backup events. type BackupDetails struct { DriveCount int `json:"drive_count,omitempty"` SnapshotID string `json:"snapshot_id,omitempty"` DurationSec int `json:"duration_sec,omitempty"` DataAdded string `json:"data_added,omitempty"` Error string `json:"error,omitempty"` } // DBDumpDetails holds structured data for DB dump events. type DBDumpDetails struct { DatabaseCount int `json:"database_count,omitempty"` TotalSize string `json:"total_size,omitempty"` DurationSec int `json:"duration_sec,omitempty"` Error string `json:"error,omitempty"` } // DiskDetails holds structured data for disk warning/critical events. type DiskDetails struct { Mount string `json:"mount,omitempty"` UsagePercent float64 `json:"usage_percent,omitempty"` Label string `json:"label,omitempty"` } // HealthDetails holds structured data for health events. type HealthDetails struct { PreviousStatus string `json:"previous_status,omitempty"` CurrentStatus string `json:"current_status,omitempty"` Issues []string `json:"issues,omitempty"` Warnings []string `json:"warnings,omitempty"` } // StorageDetails holds structured data for storage events. type StorageDetails struct { DrivePath string `json:"drive_path,omitempty"` Label string `json:"label,omitempty"` StoppedApps []string `json:"stopped_apps,omitempty"` } // UpdateDetails holds structured data for controller update events. type UpdateDetails struct { FromVersion string `json:"from_version,omitempty"` ToVersion string `json:"to_version,omitempty"` Error string `json:"error,omitempty"` } // AppDetails holds structured data for app lifecycle events. type AppDetails struct { StackName string `json:"stack_name,omitempty"` DisplayName string `json:"display_name,omitempty"` } // CrossDriveDetails holds structured data for cross-drive backup events. type CrossDriveDetails struct { StackName string `json:"stack_name,omitempty"` Method string `json:"method,omitempty"` DestPath string `json:"dest_path,omitempty"` Duration string `json:"duration,omitempty"` Error string `json:"error,omitempty"` } // ── Core event push ────────────────────────────────────────────────── // eventRequest is the JSON payload sent to /api/v1/event. type eventRequest struct { CustomerID string `json:"customer_id"` EventType string `json:"event_type"` Severity string `json:"severity"` Message string `json:"message"` Details json.RawMessage `json:"details,omitempty"` } // PushEvent sends a structured event to the hub's /api/v1/event endpoint. // Non-blocking (goroutine). Retries twice with 3s backoff. // details may be nil (omitted from JSON) or a struct that marshals to JSON. func (n *Notifier) PushEvent(eventType, severity, message string, details interface{}) { if !n.enabled { return } var detailsJSON json.RawMessage if details != nil { b, err := json.Marshal(details) if err != nil { n.logger.Printf("[WARN] PushEvent: failed to marshal details for %s: %v", eventType, err) } else { detailsJSON = b } } payload := eventRequest{ CustomerID: n.customerID, EventType: eventType, Severity: severity, Message: message, Details: detailsJSON, } jsonData, err := json.Marshal(payload) if err != nil { n.logger.Printf("[ERROR] PushEvent: marshal failed for %s: %v", eventType, err) return } go func() { url := n.hubURL + "/api/v1/event" if n.debug { n.logger.Printf("[DEBUG] PushEvent: type=%s severity=%s url=%s", eventType, severity, url) } var lastErr error for attempt := 0; attempt < 3; attempt++ { if attempt > 0 { time.Sleep(3 * time.Second) } req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData)) if err != nil { lastErr = err continue } req.Header.Set("Authorization", "Bearer "+n.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := n.httpClient.Do(req) if err != nil { lastErr = err continue } io.Copy(io.Discard, resp.Body) resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode < 300 { if n.debug { n.logger.Printf("[DEBUG] PushEvent: %s pushed OK (HTTP %d)", eventType, resp.StatusCode) } n.logger.Printf("[INFO] Event pushed: %s (%s) — %s", eventType, severity, message) n.recordHistory(eventType, severity, message, resp.StatusCode, "") return } lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) } n.logger.Printf("[WARN] Event push failed after 3 attempts (%s/%s): %v", eventType, severity, lastErr) errMsg := "" if lastErr != nil { errMsg = lastErr.Error() } n.recordHistory(eventType, severity, message, 0, errMsg) }() } // ── Convenience methods ────────────────────────────────────────────── // NotifyHealthChange checks if health status changed and sends appropriate events. // Detects both degradation (ok→warn, ok→fail, warn→fail) and recovery (fail→ok, warn→ok, fail→warn). func (n *Notifier) NotifyHealthChange(status string, issues, warnings []string) { if !n.enabled { return } n.mu.Lock() prev := n.prevHealthStatus n.prevHealthStatus = status n.mu.Unlock() if prev == "" { return // First run, just record status } if status == prev { return } details := HealthDetails{ PreviousStatus: prev, CurrentStatus: status, Issues: issues, Warnings: warnings, } prevRank := statusRank(prev) newRank := statusRank(status) if newRank > prevRank { // Degradation if status == "fail" { n.PushEvent("health_critical", "error", fmt.Sprintf("Rendszer állapot kritikus (volt: %s)", prev), details) } else if status == "warn" { n.PushEvent("health_degraded", "warning", fmt.Sprintf("Rendszer állapot romlott (volt: %s)", prev), details) } } else { // Recovery n.PushEvent("health_recovered", "info", fmt.Sprintf("Rendszer állapot helyreállt: %s (volt: %s)", status, prev), details) } } // NotifyBackupFailed sends a backup failure event. func (n *Notifier) NotifyBackupFailed(message, errMsg string) { n.PushEvent("backup_failed", "error", message, BackupDetails{Error: errMsg}) } // NotifyBackupCompleted sends a backup success event. func (n *Notifier) NotifyBackupCompleted(details BackupDetails) { n.PushEvent("backup_completed", "info", "Biztonsági mentés elkészült", details) } // NotifyDBDumpFailed sends a DB dump failure event. func (n *Notifier) NotifyDBDumpFailed(message, errMsg string) { n.PushEvent("db_dump_failed", "error", message, DBDumpDetails{Error: errMsg}) } // NotifyDBDumpCompleted sends a DB dump success event. func (n *Notifier) NotifyDBDumpCompleted(details DBDumpDetails) { n.PushEvent("db_dump_completed", "info", "Adatbázis mentés elkészült", details) } // NotifyIntegrityFailed sends a backup integrity check failure event. func (n *Notifier) NotifyIntegrityFailed(message, errMsg string) { n.PushEvent("backup_integrity_failed", "error", message, &BackupDetails{Error: errMsg}) } // NotifyIntegrityOK sends a backup integrity check success event. func (n *Notifier) NotifyIntegrityOK(message string) { n.PushEvent("backup_integrity_ok", "info", message, nil) } // NotifyControllerUpdated sends a controller update event. func (n *Notifier) NotifyControllerUpdated(fromVer, toVer string, success bool) { severity := "info" msg := fmt.Sprintf("Controller frissítve: %s → %s", fromVer, toVer) details := UpdateDetails{FromVersion: fromVer, ToVersion: toVer} if !success { severity = "error" msg = fmt.Sprintf("Controller frissítés sikertelen: %s → %s", fromVer, toVer) } n.PushEvent("controller_updated", severity, msg, details) } // NotifyControllerStarted sends a controller startup event. // details may include self-test summary (e.g., {"selftest_pass": 8, "selftest_warn": 1, "selftest_fail": 0}). func (n *Notifier) NotifyControllerStarted(version string, details map[string]interface{}) { n.PushEvent("controller_started", "info", fmt.Sprintf("Controller elindult (%s)", version), details) } // NotifyStorageDisconnected sends a drive disconnection event. func (n *Notifier) NotifyStorageDisconnected(label string, stoppedApps []string) { msg := fmt.Sprintf("Meghajtó váratlanul leválasztva: %s", label) n.PushEvent("storage_disconnected", "error", msg, StorageDetails{ Label: label, StoppedApps: stoppedApps, }) } // NotifyStorageReconnected sends a drive reconnection event. func (n *Notifier) NotifyStorageReconnected(label string) { n.PushEvent("storage_reconnected", "info", fmt.Sprintf("Meghajtó újra csatlakoztatva: %s", label), StorageDetails{Label: label}) } // NotifyAppDeployed sends an app deployment event. func (n *Notifier) NotifyAppDeployed(stackName, displayName string) { n.PushEvent("app_deployed", "info", fmt.Sprintf("Alkalmazás telepítve: %s", displayName), AppDetails{StackName: stackName, DisplayName: displayName}) } // NotifyAppRemoved sends an app removal event. func (n *Notifier) NotifyAppRemoved(stackName, displayName string) { n.PushEvent("app_removed", "info", fmt.Sprintf("Alkalmazás eltávolítva: %s", displayName), AppDetails{StackName: stackName, DisplayName: displayName}) } // NotifyCrossDriveCompleted sends a cross-drive backup success event. func (n *Notifier) NotifyCrossDriveCompleted(details CrossDriveDetails) { n.PushEvent("crossdrive_completed", "info", fmt.Sprintf("Másodlagos mentés elkészült: %s", details.StackName), details) } // NotifyCrossDriveFailed sends a cross-drive backup failure event. func (n *Notifier) NotifyCrossDriveFailed(details CrossDriveDetails) { n.PushEvent("crossdrive_failed", "error", fmt.Sprintf("Másodlagos mentés sikertelen: %s", details.StackName), details) } // NotifyDRStarted sends a disaster recovery start event. func (n *Notifier) NotifyDRStarted(appCount int) { n.PushEvent("disaster_recovery_started", "warning", fmt.Sprintf("Katasztrófa helyreállítás elindítva (%d alkalmazás)", appCount), nil) } // NotifyDRCompleted sends a disaster recovery completion event. func (n *Notifier) NotifyDRCompleted(successCount, failCount int) { severity := "info" if failCount > 0 { severity = "warning" } n.PushEvent("disaster_recovery_completed", severity, fmt.Sprintf("Katasztrófa helyreállítás befejezve (%d sikeres, %d sikertelen)", successCount, failCount), nil) } // ── Preferences sync ───────────────────────────────────────────────── type preferencesRequest struct { CustomerID string `json:"customer_id"` Email string `json:"email"` EnabledEvents []string `json:"enabled_events"` CooldownHours int `json:"cooldown_hours,omitempty"` } // SyncPreferences pushes the current notification preferences to the hub. // Synchronous — returns error for the handler to display to the user. func (n *Notifier) SyncPreferences(email string, enabledEvents []string, cooldownHours int) error { if !n.enabled { return fmt.Errorf("hub nem konfigurált") } payload := preferencesRequest{ CustomerID: n.customerID, Email: email, EnabledEvents: enabledEvents, CooldownHours: cooldownHours, } jsonData, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal: %w", err) } url := n.hubURL + "/api/v1/preferences" if n.debug { n.logger.Printf("[DEBUG] SyncPreferences: url=%s email=%s events=%v cooldown=%dh", url, email, enabledEvents, cooldownHours) } req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData)) if err != nil { return fmt.Errorf("request: %w", err) } req.Header.Set("Authorization", "Bearer "+n.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := n.httpClient.Do(req) if err != nil { return fmt.Errorf("hub elérhetetlen: %w", err) } defer resp.Body.Close() if resp.StatusCode >= 400 { body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) return fmt.Errorf("hub hiba (%d): %s", resp.StatusCode, string(body)) } if n.debug { n.logger.Printf("[DEBUG] SyncPreferences: response HTTP %d", resp.StatusCode) } n.logger.Printf("[INFO] Notification preferences synced to hub: email=%s, events=%v, cooldown=%dh", email, enabledEvents, cooldownHours) return nil } // ── Test notification ──────────────────────────────────────────────── // SendTest sends a test event for verifying the notification flow (synchronous). func (n *Notifier) SendTest() error { if !n.enabled { return fmt.Errorf("notifications not enabled (hub not configured)") } payload := eventRequest{ CustomerID: n.customerID, EventType: "test", Severity: "info", Message: "Teszt értesítés a Felhom rendszerből", } jsonData, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal: %w", err) } url := n.hubURL + "/api/v1/event" req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData)) if err != nil { return fmt.Errorf("request: %w", err) } req.Header.Set("Authorization", "Bearer "+n.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := n.httpClient.Do(req) if err != nil { return fmt.Errorf("send: %w", err) } defer resp.Body.Close() if resp.StatusCode >= 400 { return fmt.Errorf("hub returned %d", resp.StatusCode) } return nil } // ── Debug event testing ─────────────────────────────────────────────── // PushTestEventSync sends a test event synchronously and returns the Hub HTTP status code. // Used by the debug page for event testing with configurable type/severity. func (n *Notifier) PushTestEventSync(eventType, severity, message string) (statusCode int, err error) { if !n.enabled { return 0, fmt.Errorf("hub nem konfigurált") } payload := eventRequest{ CustomerID: n.customerID, EventType: eventType, Severity: severity, Message: message, } jsonData, err := json.Marshal(payload) if err != nil { return 0, fmt.Errorf("marshal: %w", err) } url := n.hubURL + "/api/v1/event" req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData)) if err != nil { return 0, fmt.Errorf("request: %w", err) } req.Header.Set("Authorization", "Bearer "+n.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := n.httpClient.Do(req) if err != nil { n.recordHistory(eventType, severity, message, 0, err.Error()) return 0, fmt.Errorf("send: %w", err) } io.Copy(io.Discard, resp.Body) resp.Body.Close() if resp.StatusCode >= 400 { n.recordHistory(eventType, severity, message, resp.StatusCode, fmt.Sprintf("HTTP %d", resp.StatusCode)) return resp.StatusCode, fmt.Errorf("hub returned %d", resp.StatusCode) } n.recordHistory(eventType, severity, message, resp.StatusCode, "") return resp.StatusCode, nil } // GetEventHistory returns the last N event history entries (newest first). func (n *Notifier) GetEventHistory(limit int) []EventHistoryEntry { n.historyMu.RLock() defer n.historyMu.RUnlock() total := n.histPos if n.histFull { total = len(n.history) } if limit <= 0 || limit > total { limit = total } result := make([]EventHistoryEntry, 0, limit) for i := 0; i < limit; i++ { idx := n.histPos - 1 - i if idx < 0 { idx += len(n.history) } result = append(result, n.history[idx]) } return result } // recordHistory appends an entry to the event history ring buffer. func (n *Notifier) recordHistory(eventType, severity, message string, hubStatus int, hubError string) { n.historyMu.Lock() defer n.historyMu.Unlock() n.history[n.histPos] = EventHistoryEntry{ Timestamp: time.Now(), EventType: eventType, Severity: severity, Message: message, HubStatus: hubStatus, HubError: hubError, } n.histPos++ if n.histPos >= len(n.history) { n.histPos = 0 n.histFull = true } } // ── Backward compatibility ─────────────────────────────────────────── // notifyRequest is the JSON payload for the legacy /api/v1/notify endpoint. type notifyRequest struct { CustomerID string `json:"customer_id"` EventType string `json:"event_type"` Severity string `json:"severity"` Message string `json:"message"` Details string `json:"details,omitempty"` } // Notify sends a legacy notification to /api/v1/notify (backward compat). // Kept for old Hub instances that don't support /api/v1/event yet. // No local cooldown — Hub handles cooldowns. func (n *Notifier) Notify(eventType, severity, message, details string) { if !n.enabled { return } go func() { payload := notifyRequest{ CustomerID: n.customerID, EventType: eventType, Severity: severity, Message: message, Details: details, } jsonData, err := json.Marshal(payload) if err != nil { n.logger.Printf("[ERROR] Failed to marshal notification: %v", err) return } url := n.hubURL + "/api/v1/notify" req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData)) if err != nil { return } req.Header.Set("Authorization", "Bearer "+n.apiKey) req.Header.Set("Content-Type", "application/json") resp, err := n.httpClient.Do(req) if err != nil { return } io.Copy(io.Discard, resp.Body) resp.Body.Close() }() } // ── Helpers ────────────────────────────────────────────────────────── func statusRank(status string) int { switch status { case "ok": return 0 case "warn": return 1 case "fail": return 2 default: return 0 } }