Files
deploy-felhom-compose/controller/internal/notify/notifier.go
T
admin be7803c0ac v0.24.0 — Pre-testing observability: debug logging, diagnostic dump, startup self-test
- Add [DEBUG] logging across all modules (backup, storage, sync, selfupdate,
  monitor, notify, report, assets, setup) gated behind logging.level: "debug"
- Add /api/debug/dump endpoint returning full controller state JSON (debug only)
- Add startup self-test validating 9 subsystems (Docker, dirs, storage, hub,
  restic repos, metrics DB) with pass/warn/fail summary
- New packages: internal/selftest, internal/util
- Constructor/signature changes: debug bool params, logger params on
  RunHealthCheck and BuildReport, smart watchdog probe logging

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 18:32:26 +01:00

523 lines
17 KiB
Go

package notify
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-controller/internal/settings"
)
// Notifier sends structured events to the hub via /api/v1/event.
// Non-blocking: fires requests in goroutines, logs errors but doesn't retry aggressively.
// Cooldown logic is handled by the Hub — the controller sends all events unconditionally.
type Notifier struct {
hubURL string
apiKey string
customerID string
httpClient *http.Client
logger *log.Logger
enabled bool
debug bool
settings *settings.Settings
mu sync.Mutex
prevHealthStatus string // tracks previous health check status for change detection
}
// New creates a new Notifier. Returns a no-op notifier if hub is not enabled.
func New(hubURL, apiKey, customerID string, sett *settings.Settings, logger *log.Logger, debug bool) *Notifier {
enabled := hubURL != "" && apiKey != ""
if enabled {
logger.Printf("[INFO] Notifier enabled (hub: %s)", hubURL)
} else {
logger.Printf("[INFO] Notifier disabled (hub not configured)")
}
return &Notifier{
hubURL: hubURL,
apiKey: apiKey,
customerID: customerID,
httpClient: &http.Client{Timeout: 10 * time.Second},
logger: logger,
enabled: enabled,
debug: debug,
settings: sett,
}
}
// IsEnabled returns whether the notifier has a configured hub connection.
func (n *Notifier) IsEnabled() bool {
return n.enabled
}
// ── Detail structs ───────────────────────────────────────────────────
// BackupDetails holds structured data for backup events.
type BackupDetails struct {
DriveCount int `json:"drive_count,omitempty"`
SnapshotID string `json:"snapshot_id,omitempty"`
DurationSec int `json:"duration_sec,omitempty"`
DataAdded string `json:"data_added,omitempty"`
Error string `json:"error,omitempty"`
}
// DBDumpDetails holds structured data for DB dump events.
type DBDumpDetails struct {
DatabaseCount int `json:"database_count,omitempty"`
TotalSize string `json:"total_size,omitempty"`
DurationSec int `json:"duration_sec,omitempty"`
Error string `json:"error,omitempty"`
}
// DiskDetails holds structured data for disk warning/critical events.
type DiskDetails struct {
Mount string `json:"mount,omitempty"`
UsagePercent float64 `json:"usage_percent,omitempty"`
Label string `json:"label,omitempty"`
}
// HealthDetails holds structured data for health events.
type HealthDetails struct {
PreviousStatus string `json:"previous_status,omitempty"`
CurrentStatus string `json:"current_status,omitempty"`
Issues []string `json:"issues,omitempty"`
Warnings []string `json:"warnings,omitempty"`
}
// StorageDetails holds structured data for storage events.
type StorageDetails struct {
DrivePath string `json:"drive_path,omitempty"`
Label string `json:"label,omitempty"`
StoppedApps []string `json:"stopped_apps,omitempty"`
}
// UpdateDetails holds structured data for controller update events.
type UpdateDetails struct {
FromVersion string `json:"from_version,omitempty"`
ToVersion string `json:"to_version,omitempty"`
Error string `json:"error,omitempty"`
}
// AppDetails holds structured data for app lifecycle events.
type AppDetails struct {
StackName string `json:"stack_name,omitempty"`
DisplayName string `json:"display_name,omitempty"`
}
// CrossDriveDetails holds structured data for cross-drive backup events.
type CrossDriveDetails struct {
StackName string `json:"stack_name,omitempty"`
Method string `json:"method,omitempty"`
DestPath string `json:"dest_path,omitempty"`
Duration string `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
}
// ── Core event push ──────────────────────────────────────────────────
// eventRequest is the JSON payload sent to /api/v1/event.
type eventRequest struct {
CustomerID string `json:"customer_id"`
EventType string `json:"event_type"`
Severity string `json:"severity"`
Message string `json:"message"`
Details json.RawMessage `json:"details,omitempty"`
}
// PushEvent sends a structured event to the hub's /api/v1/event endpoint.
// Non-blocking (goroutine). Retries twice with 3s backoff.
// details may be nil (omitted from JSON) or a struct that marshals to JSON.
func (n *Notifier) PushEvent(eventType, severity, message string, details interface{}) {
if !n.enabled {
return
}
var detailsJSON json.RawMessage
if details != nil {
b, err := json.Marshal(details)
if err != nil {
n.logger.Printf("[WARN] PushEvent: failed to marshal details for %s: %v", eventType, err)
} else {
detailsJSON = b
}
}
payload := eventRequest{
CustomerID: n.customerID,
EventType: eventType,
Severity: severity,
Message: message,
Details: detailsJSON,
}
jsonData, err := json.Marshal(payload)
if err != nil {
n.logger.Printf("[ERROR] PushEvent: marshal failed for %s: %v", eventType, err)
return
}
go func() {
url := n.hubURL + "/api/v1/event"
if n.debug {
n.logger.Printf("[DEBUG] PushEvent: type=%s severity=%s url=%s", eventType, severity, url)
}
var lastErr error
for attempt := 0; attempt < 3; attempt++ {
if attempt > 0 {
time.Sleep(3 * time.Second)
}
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
if err != nil {
lastErr = err
continue
}
req.Header.Set("Authorization", "Bearer "+n.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := n.httpClient.Do(req)
if err != nil {
lastErr = err
continue
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
if n.debug {
n.logger.Printf("[DEBUG] PushEvent: %s pushed OK (HTTP %d)", eventType, resp.StatusCode)
}
n.logger.Printf("[INFO] Event pushed: %s (%s) — %s", eventType, severity, message)
return
}
lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
}
n.logger.Printf("[WARN] Event push failed after 3 attempts (%s/%s): %v", eventType, severity, lastErr)
}()
}
// ── Convenience methods ──────────────────────────────────────────────
// NotifyHealthChange checks if health status changed and sends appropriate events.
// Detects both degradation (ok→warn, ok→fail, warn→fail) and recovery (fail→ok, warn→ok, fail→warn).
func (n *Notifier) NotifyHealthChange(status string, issues, warnings []string) {
if !n.enabled {
return
}
n.mu.Lock()
prev := n.prevHealthStatus
n.prevHealthStatus = status
n.mu.Unlock()
if prev == "" {
return // First run, just record status
}
if status == prev {
return
}
details := HealthDetails{
PreviousStatus: prev,
CurrentStatus: status,
Issues: issues,
Warnings: warnings,
}
prevRank := statusRank(prev)
newRank := statusRank(status)
if newRank > prevRank {
// Degradation
if status == "fail" {
n.PushEvent("health_critical", "error",
fmt.Sprintf("Rendszer állapot kritikus (volt: %s)", prev), details)
} else if status == "warn" {
n.PushEvent("health_degraded", "warning",
fmt.Sprintf("Rendszer állapot romlott (volt: %s)", prev), details)
}
} else {
// Recovery
n.PushEvent("health_recovered", "info",
fmt.Sprintf("Rendszer állapot helyreállt: %s (volt: %s)", status, prev), details)
}
}
// NotifyBackupFailed sends a backup failure event.
func (n *Notifier) NotifyBackupFailed(message, errMsg string) {
n.PushEvent("backup_failed", "error", message, BackupDetails{Error: errMsg})
}
// NotifyBackupCompleted sends a backup success event.
func (n *Notifier) NotifyBackupCompleted(details BackupDetails) {
n.PushEvent("backup_completed", "info", "Biztonsági mentés elkészült", details)
}
// NotifyDBDumpFailed sends a DB dump failure event.
func (n *Notifier) NotifyDBDumpFailed(message, errMsg string) {
n.PushEvent("db_dump_failed", "error", message, DBDumpDetails{Error: errMsg})
}
// NotifyDBDumpCompleted sends a DB dump success event.
func (n *Notifier) NotifyDBDumpCompleted(details DBDumpDetails) {
n.PushEvent("db_dump_completed", "info", "Adatbázis mentés elkészült", details)
}
// NotifyIntegrityFailed sends a backup integrity check failure event.
func (n *Notifier) NotifyIntegrityFailed(message, errMsg string) {
n.PushEvent("backup_integrity_failed", "error", message, &BackupDetails{Error: errMsg})
}
// NotifyIntegrityOK sends a backup integrity check success event.
func (n *Notifier) NotifyIntegrityOK(message string) {
n.PushEvent("backup_integrity_ok", "info", message, nil)
}
// NotifyControllerUpdated sends a controller update event.
func (n *Notifier) NotifyControllerUpdated(fromVer, toVer string, success bool) {
severity := "info"
msg := fmt.Sprintf("Controller frissítve: %s → %s", fromVer, toVer)
details := UpdateDetails{FromVersion: fromVer, ToVersion: toVer}
if !success {
severity = "error"
msg = fmt.Sprintf("Controller frissítés sikertelen: %s → %s", fromVer, toVer)
}
n.PushEvent("controller_updated", severity, msg, details)
}
// NotifyControllerStarted sends a controller startup event.
// details may include self-test summary (e.g., {"selftest_pass": 8, "selftest_warn": 1, "selftest_fail": 0}).
func (n *Notifier) NotifyControllerStarted(version string, details map[string]interface{}) {
n.PushEvent("controller_started", "info",
fmt.Sprintf("Controller elindult (%s)", version), details)
}
// NotifyStorageDisconnected sends a drive disconnection event.
func (n *Notifier) NotifyStorageDisconnected(label string, stoppedApps []string) {
msg := fmt.Sprintf("Meghajtó váratlanul leválasztva: %s", label)
n.PushEvent("storage_disconnected", "error", msg, StorageDetails{
Label: label,
StoppedApps: stoppedApps,
})
}
// NotifyStorageReconnected sends a drive reconnection event.
func (n *Notifier) NotifyStorageReconnected(label string) {
n.PushEvent("storage_reconnected", "info",
fmt.Sprintf("Meghajtó újra csatlakoztatva: %s", label), StorageDetails{Label: label})
}
// NotifyAppDeployed sends an app deployment event.
func (n *Notifier) NotifyAppDeployed(stackName, displayName string) {
n.PushEvent("app_deployed", "info",
fmt.Sprintf("Alkalmazás telepítve: %s", displayName),
AppDetails{StackName: stackName, DisplayName: displayName})
}
// NotifyAppRemoved sends an app removal event.
func (n *Notifier) NotifyAppRemoved(stackName, displayName string) {
n.PushEvent("app_removed", "info",
fmt.Sprintf("Alkalmazás eltávolítva: %s", displayName),
AppDetails{StackName: stackName, DisplayName: displayName})
}
// NotifyCrossDriveCompleted sends a cross-drive backup success event.
func (n *Notifier) NotifyCrossDriveCompleted(details CrossDriveDetails) {
n.PushEvent("crossdrive_completed", "info",
fmt.Sprintf("Másodlagos mentés elkészült: %s", details.StackName), details)
}
// NotifyCrossDriveFailed sends a cross-drive backup failure event.
func (n *Notifier) NotifyCrossDriveFailed(details CrossDriveDetails) {
n.PushEvent("crossdrive_failed", "error",
fmt.Sprintf("Másodlagos mentés sikertelen: %s", details.StackName), details)
}
// NotifyDRStarted sends a disaster recovery start event.
func (n *Notifier) NotifyDRStarted(appCount int) {
n.PushEvent("disaster_recovery_started", "warning",
fmt.Sprintf("Katasztrófa helyreállítás elindítva (%d alkalmazás)", appCount), nil)
}
// NotifyDRCompleted sends a disaster recovery completion event.
func (n *Notifier) NotifyDRCompleted(successCount, failCount int) {
severity := "info"
if failCount > 0 {
severity = "warning"
}
n.PushEvent("disaster_recovery_completed", severity,
fmt.Sprintf("Katasztrófa helyreállítás befejezve (%d sikeres, %d sikertelen)", successCount, failCount), nil)
}
// ── Preferences sync ─────────────────────────────────────────────────
type preferencesRequest struct {
CustomerID string `json:"customer_id"`
Email string `json:"email"`
EnabledEvents []string `json:"enabled_events"`
CooldownHours int `json:"cooldown_hours,omitempty"`
}
// SyncPreferences pushes the current notification preferences to the hub.
// Synchronous — returns error for the handler to display to the user.
func (n *Notifier) SyncPreferences(email string, enabledEvents []string, cooldownHours int) error {
if !n.enabled {
return fmt.Errorf("hub nem konfigurált")
}
payload := preferencesRequest{
CustomerID: n.customerID,
Email: email,
EnabledEvents: enabledEvents,
CooldownHours: cooldownHours,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
url := n.hubURL + "/api/v1/preferences"
if n.debug {
n.logger.Printf("[DEBUG] SyncPreferences: url=%s email=%s events=%v cooldown=%dh",
url, email, enabledEvents, cooldownHours)
}
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+n.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := n.httpClient.Do(req)
if err != nil {
return fmt.Errorf("hub elérhetetlen: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
return fmt.Errorf("hub hiba (%d): %s", resp.StatusCode, string(body))
}
if n.debug {
n.logger.Printf("[DEBUG] SyncPreferences: response HTTP %d", resp.StatusCode)
}
n.logger.Printf("[INFO] Notification preferences synced to hub: email=%s, events=%v, cooldown=%dh", email, enabledEvents, cooldownHours)
return nil
}
// ── Test notification ────────────────────────────────────────────────
// SendTest sends a test event for verifying the notification flow (synchronous).
func (n *Notifier) SendTest() error {
if !n.enabled {
return fmt.Errorf("notifications not enabled (hub not configured)")
}
payload := eventRequest{
CustomerID: n.customerID,
EventType: "test",
Severity: "info",
Message: "Teszt értesítés a Felhom rendszerből",
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
url := n.hubURL + "/api/v1/event"
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+n.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := n.httpClient.Do(req)
if err != nil {
return fmt.Errorf("send: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("hub returned %d", resp.StatusCode)
}
return nil
}
// ── Backward compatibility ───────────────────────────────────────────
// notifyRequest is the JSON payload for the legacy /api/v1/notify endpoint.
type notifyRequest struct {
CustomerID string `json:"customer_id"`
EventType string `json:"event_type"`
Severity string `json:"severity"`
Message string `json:"message"`
Details string `json:"details,omitempty"`
}
// Notify sends a legacy notification to /api/v1/notify (backward compat).
// Kept for old Hub instances that don't support /api/v1/event yet.
// No local cooldown — Hub handles cooldowns.
func (n *Notifier) Notify(eventType, severity, message, details string) {
if !n.enabled {
return
}
go func() {
payload := notifyRequest{
CustomerID: n.customerID,
EventType: eventType,
Severity: severity,
Message: message,
Details: details,
}
jsonData, err := json.Marshal(payload)
if err != nil {
n.logger.Printf("[ERROR] Failed to marshal notification: %v", err)
return
}
url := n.hubURL + "/api/v1/notify"
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
if err != nil {
return
}
req.Header.Set("Authorization", "Bearer "+n.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := n.httpClient.Do(req)
if err != nil {
return
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()
}
// ── Helpers ──────────────────────────────────────────────────────────
func statusRank(status string) int {
switch status {
case "ok":
return 0
case "warn":
return 1
case "fail":
return 2
default:
return 0
}
}