e217c3a445
After successful config apply, immediately push infra backup to Hub so the config sync status updates right away. Also fix startup event message that showed "vv0.21.2" instead of "v0.21.3". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
507 lines
16 KiB
Go
507 lines
16 KiB
Go
package notify
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.dooplex.hu/admin/felhom-controller/internal/settings"
|
|
)
|
|
|
|
// Notifier sends structured events to the hub via /api/v1/event.
|
|
// Non-blocking: fires requests in goroutines, logs errors but doesn't retry aggressively.
|
|
// Cooldown logic is handled by the Hub — the controller sends all events unconditionally.
|
|
type Notifier struct {
|
|
hubURL string
|
|
apiKey string
|
|
customerID string
|
|
httpClient *http.Client
|
|
logger *log.Logger
|
|
enabled bool
|
|
settings *settings.Settings
|
|
|
|
mu sync.Mutex
|
|
prevHealthStatus string // tracks previous health check status for change detection
|
|
}
|
|
|
|
// New creates a new Notifier. Returns a no-op notifier if hub is not enabled.
|
|
func New(hubURL, apiKey, customerID string, sett *settings.Settings, logger *log.Logger) *Notifier {
|
|
enabled := hubURL != "" && apiKey != ""
|
|
if enabled {
|
|
logger.Printf("[INFO] Notifier enabled (hub: %s)", hubURL)
|
|
} else {
|
|
logger.Printf("[INFO] Notifier disabled (hub not configured)")
|
|
}
|
|
|
|
return &Notifier{
|
|
hubURL: hubURL,
|
|
apiKey: apiKey,
|
|
customerID: customerID,
|
|
httpClient: &http.Client{Timeout: 10 * time.Second},
|
|
logger: logger,
|
|
enabled: enabled,
|
|
settings: sett,
|
|
}
|
|
}
|
|
|
|
// IsEnabled returns whether the notifier has a configured hub connection.
|
|
func (n *Notifier) IsEnabled() bool {
|
|
return n.enabled
|
|
}
|
|
|
|
// ── Detail structs ───────────────────────────────────────────────────
|
|
|
|
// BackupDetails holds structured data for backup events.
|
|
type BackupDetails struct {
|
|
DriveCount int `json:"drive_count,omitempty"`
|
|
SnapshotID string `json:"snapshot_id,omitempty"`
|
|
DurationSec int `json:"duration_sec,omitempty"`
|
|
DataAdded string `json:"data_added,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// DBDumpDetails holds structured data for DB dump events.
|
|
type DBDumpDetails struct {
|
|
DatabaseCount int `json:"database_count,omitempty"`
|
|
TotalSize string `json:"total_size,omitempty"`
|
|
DurationSec int `json:"duration_sec,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// DiskDetails holds structured data for disk warning/critical events.
|
|
type DiskDetails struct {
|
|
Mount string `json:"mount,omitempty"`
|
|
UsagePercent float64 `json:"usage_percent,omitempty"`
|
|
Label string `json:"label,omitempty"`
|
|
}
|
|
|
|
// HealthDetails holds structured data for health events.
|
|
type HealthDetails struct {
|
|
PreviousStatus string `json:"previous_status,omitempty"`
|
|
CurrentStatus string `json:"current_status,omitempty"`
|
|
Issues []string `json:"issues,omitempty"`
|
|
Warnings []string `json:"warnings,omitempty"`
|
|
}
|
|
|
|
// StorageDetails holds structured data for storage events.
|
|
type StorageDetails struct {
|
|
DrivePath string `json:"drive_path,omitempty"`
|
|
Label string `json:"label,omitempty"`
|
|
StoppedApps []string `json:"stopped_apps,omitempty"`
|
|
}
|
|
|
|
// UpdateDetails holds structured data for controller update events.
|
|
type UpdateDetails struct {
|
|
FromVersion string `json:"from_version,omitempty"`
|
|
ToVersion string `json:"to_version,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// AppDetails holds structured data for app lifecycle events.
|
|
type AppDetails struct {
|
|
StackName string `json:"stack_name,omitempty"`
|
|
DisplayName string `json:"display_name,omitempty"`
|
|
}
|
|
|
|
// CrossDriveDetails holds structured data for cross-drive backup events.
|
|
type CrossDriveDetails struct {
|
|
StackName string `json:"stack_name,omitempty"`
|
|
Method string `json:"method,omitempty"`
|
|
DestPath string `json:"dest_path,omitempty"`
|
|
Duration string `json:"duration,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// ── Core event push ──────────────────────────────────────────────────
|
|
|
|
// eventRequest is the JSON payload sent to /api/v1/event.
|
|
type eventRequest struct {
|
|
CustomerID string `json:"customer_id"`
|
|
EventType string `json:"event_type"`
|
|
Severity string `json:"severity"`
|
|
Message string `json:"message"`
|
|
Details json.RawMessage `json:"details,omitempty"`
|
|
}
|
|
|
|
// PushEvent sends a structured event to the hub's /api/v1/event endpoint.
|
|
// Non-blocking (goroutine). Retries twice with 3s backoff.
|
|
// details may be nil (omitted from JSON) or a struct that marshals to JSON.
|
|
func (n *Notifier) PushEvent(eventType, severity, message string, details interface{}) {
|
|
if !n.enabled {
|
|
return
|
|
}
|
|
|
|
var detailsJSON json.RawMessage
|
|
if details != nil {
|
|
b, err := json.Marshal(details)
|
|
if err != nil {
|
|
n.logger.Printf("[WARN] PushEvent: failed to marshal details for %s: %v", eventType, err)
|
|
} else {
|
|
detailsJSON = b
|
|
}
|
|
}
|
|
|
|
payload := eventRequest{
|
|
CustomerID: n.customerID,
|
|
EventType: eventType,
|
|
Severity: severity,
|
|
Message: message,
|
|
Details: detailsJSON,
|
|
}
|
|
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
n.logger.Printf("[ERROR] PushEvent: marshal failed for %s: %v", eventType, err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
url := n.hubURL + "/api/v1/event"
|
|
var lastErr error
|
|
for attempt := 0; attempt < 3; attempt++ {
|
|
if attempt > 0 {
|
|
time.Sleep(3 * time.Second)
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+n.apiKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := n.httpClient.Do(req)
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
io.Copy(io.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
|
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
|
n.logger.Printf("[INFO] Event pushed: %s (%s) — %s", eventType, severity, message)
|
|
return
|
|
}
|
|
lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
|
|
}
|
|
n.logger.Printf("[WARN] Event push failed after 3 attempts (%s/%s): %v", eventType, severity, lastErr)
|
|
}()
|
|
}
|
|
|
|
// ── Convenience methods ──────────────────────────────────────────────
|
|
|
|
// NotifyHealthChange checks if health status changed and sends appropriate events.
|
|
// Detects both degradation (ok→warn, ok→fail, warn→fail) and recovery (fail→ok, warn→ok, fail→warn).
|
|
func (n *Notifier) NotifyHealthChange(status string, issues, warnings []string) {
|
|
if !n.enabled {
|
|
return
|
|
}
|
|
|
|
n.mu.Lock()
|
|
prev := n.prevHealthStatus
|
|
n.prevHealthStatus = status
|
|
n.mu.Unlock()
|
|
|
|
if prev == "" {
|
|
return // First run, just record status
|
|
}
|
|
if status == prev {
|
|
return
|
|
}
|
|
|
|
details := HealthDetails{
|
|
PreviousStatus: prev,
|
|
CurrentStatus: status,
|
|
Issues: issues,
|
|
Warnings: warnings,
|
|
}
|
|
|
|
prevRank := statusRank(prev)
|
|
newRank := statusRank(status)
|
|
|
|
if newRank > prevRank {
|
|
// Degradation
|
|
if status == "fail" {
|
|
n.PushEvent("health_critical", "error",
|
|
fmt.Sprintf("Rendszer állapot kritikus (volt: %s)", prev), details)
|
|
} else if status == "warn" {
|
|
n.PushEvent("health_degraded", "warning",
|
|
fmt.Sprintf("Rendszer állapot romlott (volt: %s)", prev), details)
|
|
}
|
|
} else {
|
|
// Recovery
|
|
n.PushEvent("health_recovered", "info",
|
|
fmt.Sprintf("Rendszer állapot helyreállt: %s (volt: %s)", status, prev), details)
|
|
}
|
|
}
|
|
|
|
// NotifyBackupFailed sends a backup failure event.
|
|
func (n *Notifier) NotifyBackupFailed(message, errMsg string) {
|
|
n.PushEvent("backup_failed", "error", message, BackupDetails{Error: errMsg})
|
|
}
|
|
|
|
// NotifyBackupCompleted sends a backup success event.
|
|
func (n *Notifier) NotifyBackupCompleted(details BackupDetails) {
|
|
n.PushEvent("backup_completed", "info", "Biztonsági mentés elkészült", details)
|
|
}
|
|
|
|
// NotifyDBDumpFailed sends a DB dump failure event.
|
|
func (n *Notifier) NotifyDBDumpFailed(message, errMsg string) {
|
|
n.PushEvent("db_dump_failed", "error", message, DBDumpDetails{Error: errMsg})
|
|
}
|
|
|
|
// NotifyDBDumpCompleted sends a DB dump success event.
|
|
func (n *Notifier) NotifyDBDumpCompleted(details DBDumpDetails) {
|
|
n.PushEvent("db_dump_completed", "info", "Adatbázis mentés elkészült", details)
|
|
}
|
|
|
|
// NotifyIntegrityFailed sends a backup integrity check failure event.
|
|
func (n *Notifier) NotifyIntegrityFailed(message, errMsg string) {
|
|
n.PushEvent("backup_integrity_failed", "error", message, &BackupDetails{Error: errMsg})
|
|
}
|
|
|
|
// NotifyIntegrityOK sends a backup integrity check success event.
|
|
func (n *Notifier) NotifyIntegrityOK(message string) {
|
|
n.PushEvent("backup_integrity_ok", "info", message, nil)
|
|
}
|
|
|
|
// NotifyControllerUpdated sends a controller update event.
|
|
func (n *Notifier) NotifyControllerUpdated(fromVer, toVer string, success bool) {
|
|
severity := "info"
|
|
msg := fmt.Sprintf("Controller frissítve: %s → %s", fromVer, toVer)
|
|
details := UpdateDetails{FromVersion: fromVer, ToVersion: toVer}
|
|
if !success {
|
|
severity = "error"
|
|
msg = fmt.Sprintf("Controller frissítés sikertelen: %s → %s", fromVer, toVer)
|
|
}
|
|
n.PushEvent("controller_updated", severity, msg, details)
|
|
}
|
|
|
|
// NotifyControllerStarted sends a controller startup event.
|
|
func (n *Notifier) NotifyControllerStarted(version string) {
|
|
n.PushEvent("controller_started", "info",
|
|
fmt.Sprintf("Controller elindult (%s)", version), nil)
|
|
}
|
|
|
|
// NotifyStorageDisconnected sends a drive disconnection event.
|
|
func (n *Notifier) NotifyStorageDisconnected(label string, stoppedApps []string) {
|
|
msg := fmt.Sprintf("Meghajtó váratlanul leválasztva: %s", label)
|
|
n.PushEvent("storage_disconnected", "error", msg, StorageDetails{
|
|
Label: label,
|
|
StoppedApps: stoppedApps,
|
|
})
|
|
}
|
|
|
|
// NotifyStorageReconnected sends a drive reconnection event.
|
|
func (n *Notifier) NotifyStorageReconnected(label string) {
|
|
n.PushEvent("storage_reconnected", "info",
|
|
fmt.Sprintf("Meghajtó újra csatlakoztatva: %s", label), StorageDetails{Label: label})
|
|
}
|
|
|
|
// NotifyAppDeployed sends an app deployment event.
|
|
func (n *Notifier) NotifyAppDeployed(stackName, displayName string) {
|
|
n.PushEvent("app_deployed", "info",
|
|
fmt.Sprintf("Alkalmazás telepítve: %s", displayName),
|
|
AppDetails{StackName: stackName, DisplayName: displayName})
|
|
}
|
|
|
|
// NotifyAppRemoved sends an app removal event.
|
|
func (n *Notifier) NotifyAppRemoved(stackName, displayName string) {
|
|
n.PushEvent("app_removed", "info",
|
|
fmt.Sprintf("Alkalmazás eltávolítva: %s", displayName),
|
|
AppDetails{StackName: stackName, DisplayName: displayName})
|
|
}
|
|
|
|
// NotifyCrossDriveCompleted sends a cross-drive backup success event.
|
|
func (n *Notifier) NotifyCrossDriveCompleted(details CrossDriveDetails) {
|
|
n.PushEvent("crossdrive_completed", "info",
|
|
fmt.Sprintf("Másodlagos mentés elkészült: %s", details.StackName), details)
|
|
}
|
|
|
|
// NotifyCrossDriveFailed sends a cross-drive backup failure event.
|
|
func (n *Notifier) NotifyCrossDriveFailed(details CrossDriveDetails) {
|
|
n.PushEvent("crossdrive_failed", "error",
|
|
fmt.Sprintf("Másodlagos mentés sikertelen: %s", details.StackName), details)
|
|
}
|
|
|
|
// NotifyDRStarted sends a disaster recovery start event.
|
|
func (n *Notifier) NotifyDRStarted(appCount int) {
|
|
n.PushEvent("disaster_recovery_started", "warning",
|
|
fmt.Sprintf("Katasztrófa helyreállítás elindítva (%d alkalmazás)", appCount), nil)
|
|
}
|
|
|
|
// NotifyDRCompleted sends a disaster recovery completion event.
|
|
func (n *Notifier) NotifyDRCompleted(successCount, failCount int) {
|
|
severity := "info"
|
|
if failCount > 0 {
|
|
severity = "warning"
|
|
}
|
|
n.PushEvent("disaster_recovery_completed", severity,
|
|
fmt.Sprintf("Katasztrófa helyreállítás befejezve (%d sikeres, %d sikertelen)", successCount, failCount), nil)
|
|
}
|
|
|
|
// ── Preferences sync ─────────────────────────────────────────────────
|
|
|
|
type preferencesRequest struct {
|
|
CustomerID string `json:"customer_id"`
|
|
Email string `json:"email"`
|
|
EnabledEvents []string `json:"enabled_events"`
|
|
CooldownHours int `json:"cooldown_hours,omitempty"`
|
|
}
|
|
|
|
// SyncPreferences pushes the current notification preferences to the hub.
|
|
// Synchronous — returns error for the handler to display to the user.
|
|
func (n *Notifier) SyncPreferences(email string, enabledEvents []string, cooldownHours int) error {
|
|
if !n.enabled {
|
|
return fmt.Errorf("hub nem konfigurált")
|
|
}
|
|
|
|
payload := preferencesRequest{
|
|
CustomerID: n.customerID,
|
|
Email: email,
|
|
EnabledEvents: enabledEvents,
|
|
CooldownHours: cooldownHours,
|
|
}
|
|
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal: %w", err)
|
|
}
|
|
|
|
url := n.hubURL + "/api/v1/preferences"
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
|
|
if err != nil {
|
|
return fmt.Errorf("request: %w", err)
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+n.apiKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := n.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("hub elérhetetlen: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode >= 400 {
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
|
return fmt.Errorf("hub hiba (%d): %s", resp.StatusCode, string(body))
|
|
}
|
|
|
|
n.logger.Printf("[INFO] Notification preferences synced to hub: email=%s, events=%v, cooldown=%dh", email, enabledEvents, cooldownHours)
|
|
return nil
|
|
}
|
|
|
|
// ── Test notification ────────────────────────────────────────────────
|
|
|
|
// SendTest sends a test event for verifying the notification flow (synchronous).
|
|
func (n *Notifier) SendTest() error {
|
|
if !n.enabled {
|
|
return fmt.Errorf("notifications not enabled (hub not configured)")
|
|
}
|
|
|
|
payload := eventRequest{
|
|
CustomerID: n.customerID,
|
|
EventType: "test",
|
|
Severity: "info",
|
|
Message: "Teszt értesítés a Felhom rendszerből",
|
|
}
|
|
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal: %w", err)
|
|
}
|
|
|
|
url := n.hubURL + "/api/v1/event"
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
|
|
if err != nil {
|
|
return fmt.Errorf("request: %w", err)
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+n.apiKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := n.httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("send: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode >= 400 {
|
|
return fmt.Errorf("hub returned %d", resp.StatusCode)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ── Backward compatibility ───────────────────────────────────────────
|
|
|
|
// notifyRequest is the JSON payload for the legacy /api/v1/notify endpoint.
|
|
type notifyRequest struct {
|
|
CustomerID string `json:"customer_id"`
|
|
EventType string `json:"event_type"`
|
|
Severity string `json:"severity"`
|
|
Message string `json:"message"`
|
|
Details string `json:"details,omitempty"`
|
|
}
|
|
|
|
// Notify sends a legacy notification to /api/v1/notify (backward compat).
|
|
// Kept for old Hub instances that don't support /api/v1/event yet.
|
|
// No local cooldown — Hub handles cooldowns.
|
|
func (n *Notifier) Notify(eventType, severity, message, details string) {
|
|
if !n.enabled {
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
payload := notifyRequest{
|
|
CustomerID: n.customerID,
|
|
EventType: eventType,
|
|
Severity: severity,
|
|
Message: message,
|
|
Details: details,
|
|
}
|
|
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
n.logger.Printf("[ERROR] Failed to marshal notification: %v", err)
|
|
return
|
|
}
|
|
|
|
url := n.hubURL + "/api/v1/notify"
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(jsonData))
|
|
if err != nil {
|
|
return
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+n.apiKey)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := n.httpClient.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
io.Copy(io.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
}()
|
|
}
|
|
|
|
// ── Helpers ──────────────────────────────────────────────────────────
|
|
|
|
func statusRank(status string) int {
|
|
switch status {
|
|
case "ok":
|
|
return 0
|
|
case "warn":
|
|
return 1
|
|
case "fail":
|
|
return 2
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|