From 3217cb47519ee0b9aef653726ed279236d82fe2d Mon Sep 17 00:00:00 2001 From: kisfenyo Date: Fri, 20 Feb 2026 18:53:24 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20Hub=20monitoring=20takeover=20=E2=80=94?= =?UTF-8?q?=20event=20system,=20dead=20man's=20switch,=20notifications=20(?= =?UTF-8?q?v0.3.0)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace external Healthchecks.io with Hub-native monitoring. New events table + /api/v1/event endpoint for structured events from controllers. Staleness checker (60s) detects unresponsive nodes. Backup deadline checker (daily 05:00) catches missed backups. Notification dispatcher sends operator (English) + customer (Hungarian) emails via Resend with per-event cooldowns. Event timeline on customer page, dashboard badges. Config form deprecates Monitoring UUIDs section. Co-Authored-By: Claude Opus 4.6 --- hub/CHANGELOG.md | 50 ++++ hub/README.md | 54 +++- hub/cmd/hub/main.go | 105 ++++++-- hub/internal/api/handler.go | 160 +++++++++++- hub/internal/monitor/deadline.go | 86 +++++++ hub/internal/monitor/staleness.go | 183 ++++++++++++++ hub/internal/notify/dispatcher.go | 201 +++++++++++++++ hub/internal/notify/templates.go | 153 +++++++++++ hub/internal/store/store.go | 237 +++++++++++++++++- hub/internal/web/configs.go | 14 +- hub/internal/web/controller.yaml.default | 12 +- hub/internal/web/server.go | 15 ++ hub/internal/web/templates/config_form.html | 5 +- .../web/templates/customer_unified.html | 57 +++++ hub/internal/web/templates/dashboard.html | 2 + hub/internal/web/templates/style.css | 49 ++++ 16 files changed, 1319 insertions(+), 64 deletions(-) create mode 100644 hub/internal/monitor/deadline.go create mode 100644 hub/internal/monitor/staleness.go create mode 100644 hub/internal/notify/dispatcher.go create mode 100644 hub/internal/notify/templates.go diff --git a/hub/CHANGELOG.md b/hub/CHANGELOG.md index 71708ac..b8e25ed 100644 --- a/hub/CHANGELOG.md +++ b/hub/CHANGELOG.md @@ -1,5 +1,55 @@ # Felhom Hub — Changelog +## v0.3.0 (2026-02-20) + +**Hub Monitoring Takeover — Event System, Dead Man's Switch, Notifications** + +Replaces external Healthchecks.io with a Hub-native event system. The Hub becomes the single source of truth for all customer monitoring, event tracking, dead man's switch alerting, and notification delivery. + +### Phase 1 — Event System +- **`events` table** in SQLite: stores all events with customer_id, event_type, severity, message, details_json, source, timestamp +- **Indexes**: `idx_events_customer_created` (customer + time DESC), `idx_events_type` (type + time DESC) +- **Store methods**: `SaveEvent`, `GetRecentEvents`, `GetEventsByType`, `GetLatestEventByType`, `GetAllRecentEvents`, `CountEventsBySeverity`, `PruneEvents`, `GetActiveCustomerIDs` +- **`POST /api/v1/event`** endpoint: accepts structured events from controllers, validates event_type against 27 allowed types, validates severity (info/warning/error), stores in DB +- **Enhanced auth**: `checkAuthCustomer()` validates per-customer API keys match the customer_id in payload; global key bypasses ownership check +- **Prune**: events pruned alongside reports at 04:30 Budapest time + +### Phase 2 — Dead Man's Switch +- **Staleness checker** (`internal/monitor/staleness.go`): runs every 60s, detects when controllers stop reporting + - ok→stale (>30min): inserts `node_stale` warning event + - any→down (>60min): inserts `node_down` error event + - stale/down→ok: inserts `node_recovered` info event + - Skips blocked customers, no false alerts on startup +- **Backup deadline checker** (`internal/monitor/deadline.go`): runs daily at 05:00 Budapest + - Detects missing `backup_completed` events since midnight → inserts `expected_backup_missed` error + - Detects missing `db_dump_completed` events → inserts `expected_dbdump_missed` error + - Grace: skips customers with `node_down` state +- **`scheduleDaily()`** helper: goroutine that sleeps until target time (Europe/Budapest), runs function, loops +- **`/healthz`** enhanced: returns 503 if SQLite Ping fails + +### Phase 3 — Notification System +- **Dispatcher** (`internal/notify/dispatcher.go`): processes events and sends emails via Resend API + - **Operator channel**: English emails to operator for warning/error events, 1h cooldown per customer:eventType + - **Customer channel**: Hungarian emails per event_type, respects customer preferences (enabled_events, cooldown_hours), blocked customers skipped + - **Test bypass**: `test` event type skips cooldown/preferences, sends directly to customer email +- **Email templates** (`internal/notify/templates.go`): operator (concise English), customer (Hungarian per event type with complete message table) +- **Cooldown tracking**: in-memory maps with per-customer:eventType granularity +- **`customer_notifications` table**: added `cooldown_hours` column (default 6) +- **`notification_log` table**: added `channel` column (operator/customer) +- Wired into `/api/v1/event` handler and staleness/deadline checkers + +### Phase 4 — Hub UI +- **Events section** on customer detail page: last 50 events, severity filter buttons (All/Errors/Warnings/Info), colored severity badges +- **Dashboard badges**: error+warning count in last 24h per customer, clickable to customer events +- **Notification log**: shows channel column (operator/customer) in customer detail page +- **Config form**: Monitoring UUIDs section marked as "Legacy" with deprecation notice, collapsed by default + +### Phase 6 — Config Cleanup +- **`controller.yaml.default`**: `monitoring.ping_uuids` section commented out (deprecated) +- **`buildConfigJSON`**: only writes `ping_uuids` to config JSON if user explicitly provides UUID values (new configs get none) + +--- + ## v0.2.2 (2026-02-20) **Config Hash Comparison** diff --git a/hub/README.md b/hub/README.md index 538c476..4b50f1a 100644 --- a/hub/README.md +++ b/hub/README.md @@ -2,9 +2,9 @@ **Central operator dashboard for monitoring and managing Felhom customer deployments.** -A lightweight Go service that receives periodic reports from felhom-controller instances, stores them in SQLite, and provides a web dashboard for fleet monitoring. Also serves as the infrastructure backup store for disaster recovery. +A lightweight Go service that receives periodic reports and structured events from felhom-controller instances, stores them in SQLite, and provides a web dashboard for fleet monitoring. Also serves as the infrastructure backup store for disaster recovery, event-based dead man's switch monitoring, and notification dispatch. -**Current version: v0.2.2** +**Current version: v0.3.0** --- @@ -72,14 +72,29 @@ The infra-backup payload contains everything needed to restore a customer deploy 4. Controller uses disk UUIDs to auto-mount surviving drives 5. Controller restores apps from local backups on those drives +### Events + +| Method | Path | Description | +|--------|------|-------------| +| `POST` | `/api/v1/event` | Controller pushes structured event (27 allowed types, severity: info/warning/error) | + +Events are the primary monitoring mechanism. Each event has: customer_id, event_type, severity, message, details_json, source. Per-customer API keys are validated against the customer_id in the payload. Stored in the `events` table with automatic pruning. + +**Hub-generated events** (source="hub"): +- `node_stale` / `node_down` / `node_recovered` — dead man's switch from staleness checker (every 60s) +- `expected_backup_missed` / `expected_dbdump_missed` — backup deadline checker (daily at 05:00 Budapest) + ### Notifications | Method | Path | Description | |--------|------|-------------| -| `POST` | `/api/v1/notify` | Controller sends event notification (backup_failed, disk_warning, etc.) | -| `POST` | `/api/v1/preferences` | Controller syncs customer notification preferences | +| `POST` | `/api/v1/notify` | Legacy notification relay (kept for backward compatibility) | +| `POST` | `/api/v1/preferences` | Controller syncs customer notification preferences (email, enabled_events, cooldown_hours) | -Notifications are sent via Resend.com email API. +Notifications are dispatched automatically when events are processed: +- **Operator channel**: English emails for warning/error events, 1h cooldown per customer:eventType +- **Customer channel**: Hungarian emails per event type, respects customer preferences and cooldown (default 6h) +- Email delivery via Resend.com API ### Customer Config Retrieval @@ -93,7 +108,7 @@ Config retrieval uses a separate per-customer retrieval password (not the API ke | Method | Path | Description | |--------|------|-------------| -| `GET` | `/healthz` | Health check (no auth required) | +| `GET` | `/healthz` | Health check (no auth required, returns 503 if SQLite ping fails) | ## Web Dashboard @@ -101,13 +116,13 @@ Protected by bcrypt password + session cookie (7-day expiry). ### Pages -- **Dashboard (`/`)** — Fleet overview table showing all customers with live status. Config-only customers (no reports yet) appear as "PENDING" with gray badge. Blocked customers are hidden. Auto-refreshes every 60 seconds. +- **Dashboard (`/`)** — Fleet overview table showing all customers with live status and event count badges (error+warning in last 24h). Config-only customers (no reports yet) appear as "PENDING" with gray badge. Blocked customers are hidden. Auto-refreshes every 60 seconds. - **Customers (`/configs`)** — Customer management list. Shows all customers (both managed and manual), their status, controller version, and config type (MANAGED/MANUAL). Blocked customers shown grayed-out with BLOCKED badge. - **Unified Customer Detail (`/customers/{id}`)** — Single page per customer combining config management and live monitoring. Adapts content based on available data: - - **Managed + reporting:** Full view — config info, system metrics, storage, containers, backup status, credentials, setup commands, YAML preview, controller update, notifications, history + - **Managed + reporting:** Full view — config info, system metrics, storage, containers, backup status, events timeline (last 50, severity filter), credentials, setup commands, YAML preview, controller update, notifications (with channel column), history - **Managed + no reports yet:** Config info, credentials, setup commands, "Waiting for first report" indicator - **Manual (report-only):** System metrics, storage, containers, backup, with "Create Config" button to convert to managed -- **Config Form (`/configs/new`, `/configs/{id}/edit`)** — Create/edit customer configurations with identity, infrastructure tokens, and monitoring overrides +- **Config Form (`/configs/new`, `/configs/{id}/edit`)** — Create/edit customer configurations with identity, infrastructure tokens, and monitoring overrides. Legacy Monitoring UUIDs section collapsed by default with deprecation notice ### Customer States @@ -144,9 +159,10 @@ SQLite with WAL mode. Tables: | Table | Purpose | |-------|---------| | `reports` | Full JSON reports with denormalized fields for dashboard queries | +| `events` | Structured events from controllers and Hub (type, severity, message, details, source) | | `infra_backups` | Per-customer infrastructure snapshots for disaster recovery | -| `customer_notifications` | Email + enabled event types per customer | -| `notification_log` | Send/skip/fail history for notifications | +| `customer_notifications` | Email, enabled event types, cooldown hours per customer | +| `notification_log` | Send/skip/fail history for notifications with channel (operator/customer) | | `customer_configs` | Pre-configured customer settings, retrieval passwords, per-customer API keys, status (active/blocked) | Retention: configurable (default 90 days), daily prune at 04:30 Budapest time. @@ -164,6 +180,8 @@ api: notifications: resend_api_key: "" # Resend.com API key for email from_email: "monitoring@felhom.eu" + operator_email: "" # Operator alert recipient + operator_enabled: true # Enable operator email notifications retention: max_days: 90 @@ -195,16 +213,26 @@ Runs on k3s (Kubernetes) in the `felhom-system` namespace: ```bash # Build and push cd hub/ -make VERSION=0.2.2 docker docker-push +make VERSION=0.3.0 docker docker-push # Deploy -kubectl set image -n felhom-system deploy/hub hub=gitea.dooplex.hu/admin/felhom-hub:v0.2.2 +kubectl set image -n felhom-system deploy/hub hub=gitea.dooplex.hu/admin/felhom-hub:v0.3.0 kubectl rollout status -n felhom-system deploy/hub # Check kubectl logs -n felhom-system -l app=hub --tail 20 ``` +## Background Services + +| Service | Schedule | Description | +|---------|----------|-------------| +| **Staleness checker** | Every 60s | Detects controllers that stopped reporting. Generates `node_stale` (>30min), `node_down` (>60min), `node_recovered` events | +| **Backup deadline checker** | Daily 05:00 Budapest | Detects missing backup/db-dump events since midnight. Generates `expected_backup_missed`, `expected_dbdump_missed` events | +| **Report/event prune** | Daily 04:30 Budapest | Deletes reports and events older than retention period (default 90 days) | +| **Registry version check** | Every 30min | Checks Gitea registry for new controller image tags | +| **Template refresh** | Every 1h | Fetches latest `controller.yaml.example` from Gitea | + ## Dependencies - `golang.org/x/crypto` — bcrypt for password hashing diff --git a/hub/cmd/hub/main.go b/hub/cmd/hub/main.go index 33cd43d..3684383 100644 --- a/hub/cmd/hub/main.go +++ b/hub/cmd/hub/main.go @@ -13,6 +13,8 @@ import ( "time" "gitea.dooplex.hu/admin/felhom-hub/internal/api" + "gitea.dooplex.hu/admin/felhom-hub/internal/monitor" + "gitea.dooplex.hu/admin/felhom-hub/internal/notify" "gitea.dooplex.hu/admin/felhom-hub/internal/store" "gitea.dooplex.hu/admin/felhom-hub/internal/web" "gopkg.in/yaml.v3" @@ -32,8 +34,10 @@ type Config struct { ReportAPIKey string `yaml:"report_api_key"` } `yaml:"api"` Notifications struct { - ResendAPIKey string `yaml:"resend_api_key"` - FromEmail string `yaml:"from_email"` + ResendAPIKey string `yaml:"resend_api_key"` + FromEmail string `yaml:"from_email"` + OperatorEmail string `yaml:"operator_email"` + OperatorEnabled bool `yaml:"operator_enabled"` } `yaml:"notifications"` Retention struct { MaxDays int `yaml:"max_days"` @@ -119,6 +123,18 @@ func main() { templateProvider = templateFetcher } apiHandler := api.New(dataStore, cfg.API.ReportAPIKey, cfg.Notifications.ResendAPIKey, cfg.Notifications.FromEmail, templateProvider, logger) + + // Initialize notification dispatcher + dispatcher := notify.NewDispatcher( + dataStore, + cfg.Notifications.ResendAPIKey, + cfg.Notifications.FromEmail, + cfg.Notifications.OperatorEmail, + cfg.Notifications.OperatorEnabled, + logger, + ) + apiHandler.SetDispatcher(dispatcher) + webServer := web.New(dataStore, cfg.Auth.PasswordHash, cfg.API.ReportAPIKey, staleThreshold, logger) webServer.SetTemplateFetcher(templateFetcher) @@ -127,6 +143,11 @@ func main() { // Health check endpoint — bypasses auth (for k8s probes) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + if err := dataStore.Ping(); err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte("db unhealthy")) + return + } w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) }) @@ -165,10 +186,34 @@ func main() { } webServer.SetVersionChecker(versionChecker) + // Prune on startup, then daily at configured time (default 04:30) if cfg.Retention.MaxDays > 0 { - go pruneLoop(ctx, dataStore, cfg.Retention.MaxDays, logger) + pruneAll(dataStore, cfg.Retention.MaxDays, logger) + go scheduleDaily(ctx, "prune", cfg.Retention.PruneSchedule, func() { + pruneAll(dataStore, cfg.Retention.MaxDays, logger) + }, logger) } + // Staleness checker — runs every 60s + stalenessChecker := monitor.NewStalenessChecker(dataStore, staleThreshold, dispatcher.ProcessEvent, logger) + go func() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + stalenessChecker.Check() + } + } + }() + + // Backup deadline checker — runs daily at 05:00 Budapest + go scheduleDaily(ctx, "deadline-check", "05:00", func() { + monitor.CheckBackupDeadlines(dataStore, stalenessChecker, dispatcher.ProcessEvent, logger) + }, logger) + // Signal handling sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) @@ -244,28 +289,52 @@ func loadConfig(path string, logger *log.Logger) *Config { return cfg } -func pruneLoop(ctx context.Context, s *store.Store, maxDays int, logger *log.Logger) { - // Prune once on startup - if deleted, err := s.Prune(maxDays); err != nil { - logger.Printf("[WARN] Prune failed: %v", err) - } else if deleted > 0 { - logger.Printf("[INFO] Pruned %d old report rows", deleted) +// scheduleDaily runs fn once daily at the given "HH:MM" time in Europe/Budapest. +// It blocks until ctx is cancelled. +func scheduleDaily(ctx context.Context, name, timeStr string, fn func(), logger *log.Logger) { + budapest, err := time.LoadLocation("Europe/Budapest") + if err != nil { + budapest = time.FixedZone("CET", 3600) } - // Then daily - ticker := time.NewTicker(24 * time.Hour) - defer ticker.Stop() + hour, min := parseHM(timeStr) for { + now := time.Now().In(budapest) + next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, budapest) + if !next.After(now) { + next = next.Add(24 * time.Hour) + } + delay := time.Until(next) + logger.Printf("[INFO] %s: next run at %s (in %s)", name, next.Format("2006-01-02 15:04 MST"), delay.Round(time.Second)) + select { case <-ctx.Done(): return - case <-ticker.C: - if deleted, err := s.Prune(maxDays); err != nil { - logger.Printf("[WARN] Prune failed: %v", err) - } else if deleted > 0 { - logger.Printf("[INFO] Pruned %d old report rows", deleted) - } + case <-time.After(delay): + fn() } } } + +// parseHM parses "HH:MM" into hour and minute. Returns 0, 0 on invalid input. +func parseHM(s string) (int, int) { + var h, m int + if _, err := fmt.Sscanf(s, "%d:%d", &h, &m); err != nil { + return 0, 0 + } + return h, m +} + +func pruneAll(s *store.Store, maxDays int, logger *log.Logger) { + if deleted, err := s.Prune(maxDays); err != nil { + logger.Printf("[WARN] Prune reports failed: %v", err) + } else if deleted > 0 { + logger.Printf("[INFO] Pruned %d old report rows", deleted) + } + if deleted, err := s.PruneEvents(maxDays); err != nil { + logger.Printf("[WARN] Prune events failed: %v", err) + } else if deleted > 0 { + logger.Printf("[INFO] Pruned %d old event rows", deleted) + } +} diff --git a/hub/internal/api/handler.go b/hub/internal/api/handler.go index f39d461..0f300ab 100644 --- a/hub/internal/api/handler.go +++ b/hub/internal/api/handler.go @@ -12,6 +12,7 @@ import ( "time" "gitea.dooplex.hu/admin/felhom-hub/internal/configgen" + "gitea.dooplex.hu/admin/felhom-hub/internal/notify" "gitea.dooplex.hu/admin/felhom-hub/internal/store" ) @@ -29,6 +30,7 @@ type Handler struct { logger *log.Logger httpClient *http.Client templateProvider ConfigTemplateProvider + dispatcher *notify.Dispatcher } // New creates a new API handler. @@ -44,23 +46,40 @@ func New(store *store.Store, apiKey, resendAPIKey, fromEmail string, templatePro } } +// SetDispatcher sets the notification dispatcher for event-triggered emails. +func (h *Handler) SetDispatcher(d *notify.Dispatcher) { + h.dispatcher = d +} + // checkAuth verifies the Bearer token against the global API key or a per-customer API key. // Returns true if authorized. func (h *Handler) checkAuth(r *http.Request) bool { + _, _, ok := h.checkAuthCustomer(r) + return ok +} + +// checkAuthCustomer verifies the Bearer token and returns the authenticated customer identity. +// For per-customer keys: returns (customerID, false, true). +// For global key: returns ("", true, true) — caller must allow any customer_id. +// On failure: returns ("", false, false). +func (h *Handler) checkAuthCustomer(r *http.Request) (customerID string, isGlobal bool, ok bool) { auth := r.Header.Get("Authorization") if !strings.HasPrefix(auth, "Bearer ") { - return false + return "", false, false } token := strings.TrimPrefix(auth, "Bearer ") // Check global key first if h.apiKey != "" && subtle.ConstantTimeCompare([]byte(token), []byte(h.apiKey)) == 1 { - return true + return "", true, true } // Check per-customer key cfg, err := h.store.GetCustomerConfigByAPIKey(token) - return err == nil && cfg != nil + if err != nil || cfg == nil { + return "", false, false + } + return cfg.CustomerID, false, true } // ServeHTTP routes API requests. @@ -70,6 +89,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch { case r.Method == http.MethodPost && path == "/report": h.handleReport(w, r) + case r.Method == http.MethodPost && path == "/event": + h.handleEvent(w, r) case r.Method == http.MethodPost && path == "/notify": h.handleNotify(w, r) case r.Method == http.MethodPost && path == "/infra-backup": @@ -97,7 +118,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *Handler) handleReport(w http.ResponseWriter, r *http.Request) { - if !h.checkAuth(r) { + authCustomerID, isGlobal, ok := h.checkAuthCustomer(r) + if !ok { http.Error(w, "Unauthorized", http.StatusUnauthorized) return } @@ -117,6 +139,12 @@ func (h *Handler) handleReport(w http.ResponseWriter, r *http.Request) { return } + // Validate customer_id matches authenticated customer (unless global key) + if !isGlobal && authCustomerID != payload.CustomerID { + http.Error(w, "Forbidden: customer_id mismatch", http.StatusForbidden) + return + } + if err := h.store.SaveReport(payload.CustomerID, body); err != nil { h.logger.Printf("[ERROR] Failed to save report from %s: %v", payload.CustomerID, err) http.Error(w, "Internal error", http.StatusInternalServerError) @@ -128,6 +156,114 @@ func (h *Handler) handleReport(w http.ResponseWriter, r *http.Request) { w.Write([]byte(`{"status":"ok"}`)) } +// allowedEventTypes lists all valid event_type values the Hub accepts. +var allowedEventTypes = map[string]bool{ + // Controller-pushed events + "controller_started": true, + "controller_updated": true, + "backup_completed": true, + "backup_failed": true, + "db_dump_completed": true, + "db_dump_failed": true, + "backup_integrity_ok": true, + "backup_integrity_failed": true, + "crossdrive_completed": true, + "crossdrive_failed": true, + "storage_disconnected": true, + "storage_reconnected": true, + "disk_warning": true, + "disk_critical": true, + "health_degraded": true, + "health_critical": true, + "health_recovered": true, + "app_deployed": true, + "app_removed": true, + "disaster_recovery_started": true, + "disaster_recovery_completed": true, + // Hub-generated events + "node_stale": true, + "node_down": true, + "node_recovered": true, + "expected_backup_missed": true, + "expected_dbdump_missed": true, + // Special + "test": true, +} + +// handleEvent processes structured events from controllers (new endpoint, replaces /notify for updated controllers). +func (h *Handler) handleEvent(w http.ResponseWriter, r *http.Request) { + authCustomerID, isGlobal, ok := h.checkAuthCustomer(r) + if !ok { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + + var payload struct { + CustomerID string `json:"customer_id"` + EventType string `json:"event_type"` + Severity string `json:"severity"` + Message string `json:"message"` + Details json.RawMessage `json:"details"` + } + if err := json.Unmarshal(body, &payload); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + if payload.CustomerID == "" || payload.EventType == "" { + http.Error(w, "customer_id and event_type are required", http.StatusBadRequest) + return + } + + // Validate customer_id matches authenticated customer (unless global key) + if !isGlobal && authCustomerID != payload.CustomerID { + http.Error(w, "Forbidden: customer_id mismatch", http.StatusForbidden) + return + } + + // Validate event_type + if !allowedEventTypes[payload.EventType] { + http.Error(w, fmt.Sprintf("Invalid event_type: %s", payload.EventType), http.StatusBadRequest) + return + } + + // Validate/default severity + switch payload.Severity { + case "info", "warning", "error": + default: + payload.Severity = "info" + } + + // Store details as JSON string + detailsStr := "{}" + if len(payload.Details) > 0 && string(payload.Details) != "null" { + detailsStr = string(payload.Details) + } + + _, err = h.store.SaveEvent(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, detailsStr, "controller") + if err != nil { + h.logger.Printf("[ERROR] Failed to save event from %s: %v", payload.CustomerID, err) + http.Error(w, "Internal error", http.StatusInternalServerError) + return + } + + h.logger.Printf("[INFO] Event from %s: %s (%s) — %s", payload.CustomerID, payload.EventType, payload.Severity, payload.Message) + + // Dispatch notifications (non-blocking) + if h.dispatcher != nil { + go h.dispatcher.ProcessEvent(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, detailsStr, "controller") + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"ok":true}`)) +} + func (h *Handler) handleCustomers(w http.ResponseWriter, r *http.Request) { customers, err := h.store.GetCustomers() if err != nil { @@ -258,7 +394,7 @@ func (h *Handler) handleNotify(w http.ResponseWriter, r *http.Request) { // Check if customer is blocked if h.store.IsCustomerBlocked(payload.CustomerID) { h.logger.Printf("[INFO] Notification suppressed for blocked customer %s", payload.CustomerID) - h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "customer blocked") + h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "customer blocked", "customer") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"ok","sent":false,"reason":"blocked"}`)) return @@ -275,7 +411,7 @@ func (h *Handler) handleNotify(w http.ResponseWriter, r *http.Request) { // Check if customer has email configured and event type is enabled if prefs == nil || prefs.Email == "" { h.logger.Printf("[INFO] No email configured for %s, skipping notification", payload.CustomerID) - h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "no email configured") + h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "no email configured", "customer") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"ok","sent":false,"reason":"no_email"}`)) return @@ -291,7 +427,7 @@ func (h *Handler) handleNotify(w http.ResponseWriter, r *http.Request) { } if !eventEnabled { h.logger.Printf("[INFO] Event %s not enabled for %s, skipping", payload.EventType, payload.CustomerID) - h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "event not enabled") + h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "event not enabled", "customer") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"ok","sent":false,"reason":"event_disabled"}`)) return @@ -300,7 +436,7 @@ func (h *Handler) handleNotify(w http.ResponseWriter, r *http.Request) { // Send email via Resend API if h.resendAPIKey == "" { h.logger.Printf("[WARN] Resend API key not configured, cannot send notification email") - h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "resend api key not configured") + h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "skipped", "resend api key not configured", "customer") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"ok","sent":false,"reason":"no_api_key"}`)) return @@ -310,13 +446,13 @@ func (h *Handler) handleNotify(w http.ResponseWriter, r *http.Request) { sendErr := h.sendResendEmail(prefs.Email, subject, emailBody) if sendErr != nil { h.logger.Printf("[ERROR] Failed to send notification email to %s: %v", prefs.Email, sendErr) - h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "failed", sendErr.Error()) + h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "failed", sendErr.Error(), "customer") http.Error(w, "Failed to send email", http.StatusInternalServerError) return } h.logger.Printf("[INFO] Notification email sent to %s for %s/%s", prefs.Email, payload.CustomerID, payload.EventType) - h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "sent", "") + h.store.LogNotification(payload.CustomerID, payload.EventType, payload.Severity, payload.Message, "sent", "", "customer") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"ok","sent":true}`)) @@ -339,13 +475,14 @@ func (h *Handler) handleSavePreferences(w http.ResponseWriter, r *http.Request) CustomerID string `json:"customer_id"` Email string `json:"email"` EnabledEvents []string `json:"enabled_events"` + CooldownHours int `json:"cooldown_hours"` } if err := json.Unmarshal(body, &payload); err != nil || payload.CustomerID == "" { http.Error(w, "Invalid payload: customer_id required", http.StatusBadRequest) return } - if err := h.store.SaveNotificationPrefs(payload.CustomerID, payload.Email, payload.EnabledEvents); err != nil { + if err := h.store.SaveNotificationPrefs(payload.CustomerID, payload.Email, payload.EnabledEvents, payload.CooldownHours); err != nil { h.logger.Printf("[ERROR] Failed to save notification prefs for %s: %v", payload.CustomerID, err) http.Error(w, "Internal error", http.StatusInternalServerError) return @@ -503,6 +640,7 @@ func formatNotificationEmail(customerID, eventType, severity, message, details s severityLabel := map[string]string{ "info": "Információ", "warning": "Figyelmeztetés", + "error": "Hiba", "critical": "Kritikus", } label := severityLabel[severity] diff --git a/hub/internal/monitor/deadline.go b/hub/internal/monitor/deadline.go new file mode 100644 index 0000000..981591e --- /dev/null +++ b/hub/internal/monitor/deadline.go @@ -0,0 +1,86 @@ +package monitor + +import ( + "log" + "time" + + "gitea.dooplex.hu/admin/felhom-hub/internal/store" +) + +// budapest returns the Europe/Budapest timezone (cached). +var budapest *time.Location + +func init() { + var err error + budapest, err = time.LoadLocation("Europe/Budapest") + if err != nil { + // Fallback: UTC+1 (CET base; DST handled by OS if available) + budapest = time.FixedZone("CET", 3600) + } +} + +// CheckBackupDeadlines checks whether active customers had their expected +// daily backups and DB dumps. Runs once daily at 05:00 Budapest time. +// +// For each active customer, it checks for backup_completed and db_dump_completed +// events since Budapest midnight. If neither success nor failure events exist, +// it inserts expected_backup_missed / expected_dbdump_missed events. +// +// Customers whose nodes are "down" (no report in >1h) are skipped — they +// already have staleness events. +func CheckBackupDeadlines(s *store.Store, staleness *StalenessChecker, onEvent EventNotifyFunc, logger *log.Logger) { + customerIDs, err := s.GetActiveCustomerIDs() + if err != nil { + logger.Printf("[WARN] Deadline check: failed to get active customers: %v", err) + return + } + + // Budapest midnight today + now := time.Now().In(budapest) + midnightBudapest := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, budapest) + sinceUTC := midnightBudapest.UTC() + + var backupMissed, dbdumpMissed, skipped int + + for _, id := range customerIDs { + // Skip nodes that are down — they already have staleness events + if staleness != nil && staleness.GetState(id) == "down" { + skipped++ + continue + } + + // Check blocked + if s.IsCustomerBlocked(id) { + continue + } + + // Check backup_completed / backup_failed since midnight + backupOK, _ := s.GetEventsByType(id, "backup_completed", sinceUTC) + backupFailed, _ := s.GetEventsByType(id, "backup_failed", sinceUTC) + if len(backupOK) == 0 && len(backupFailed) == 0 { + msg := "No backup completed or failed since midnight" + if _, err := s.SaveEvent(id, "expected_backup_missed", "error", msg, "{}", "hub"); err != nil { + logger.Printf("[WARN] Failed to save expected_backup_missed for %s: %v", id, err) + } else if onEvent != nil { + onEvent(id, "expected_backup_missed", "error", msg, "{}", "hub") + } + backupMissed++ + } + + // Check db_dump_completed / db_dump_failed since midnight + dumpOK, _ := s.GetEventsByType(id, "db_dump_completed", sinceUTC) + dumpFailed, _ := s.GetEventsByType(id, "db_dump_failed", sinceUTC) + if len(dumpOK) == 0 && len(dumpFailed) == 0 { + msg := "No DB dump completed or failed since midnight" + if _, err := s.SaveEvent(id, "expected_dbdump_missed", "error", msg, "{}", "hub"); err != nil { + logger.Printf("[WARN] Failed to save expected_dbdump_missed for %s: %v", id, err) + } else if onEvent != nil { + onEvent(id, "expected_dbdump_missed", "error", msg, "{}", "hub") + } + dbdumpMissed++ + } + } + + logger.Printf("[INFO] Deadline check: %d customers, %d backup missed, %d dbdump missed, %d skipped (down)", + len(customerIDs), backupMissed, dbdumpMissed, skipped) +} diff --git a/hub/internal/monitor/staleness.go b/hub/internal/monitor/staleness.go new file mode 100644 index 0000000..76bf33a --- /dev/null +++ b/hub/internal/monitor/staleness.go @@ -0,0 +1,183 @@ +package monitor + +import ( + "fmt" + "log" + "sync" + "time" + + "gitea.dooplex.hu/admin/felhom-hub/internal/store" +) + +// EventNotifyFunc is called after a hub-generated event is saved, +// to trigger notification dispatch. Keeps monitor decoupled from notify. +type EventNotifyFunc func(customerID, eventType, severity, message, detailsJSON, source string) + +// StalenessChecker monitors customer report freshness and generates +// node_stale / node_down / node_recovered events on state transitions. +type StalenessChecker struct { + store *store.Store + threshold time.Duration // "stale" after this duration (default 30m) + downAfter time.Duration // "down" after this duration (2x threshold) + logger *log.Logger + onEvent EventNotifyFunc + + mu sync.Mutex + states map[string]string // customerID → "ok" | "stale" | "down" +} + +// NewStalenessChecker creates a checker and initializes state from current data. +// No events are generated during initialization (binding #12). +// onEvent is called after each hub-generated event is saved (may be nil). +func NewStalenessChecker(s *store.Store, threshold time.Duration, onEvent EventNotifyFunc, logger *log.Logger) *StalenessChecker { + sc := &StalenessChecker{ + store: s, + threshold: threshold, + downAfter: 2 * threshold, + logger: logger, + onEvent: onEvent, + states: make(map[string]string), + } + + // Seed states from current report timestamps — no events on init + customers, err := s.GetCustomers() + if err != nil { + logger.Printf("[WARN] Staleness checker: failed to seed states: %v", err) + return sc + } + + var okCount, staleCount, downCount int + for _, c := range customers { + if s.IsCustomerBlocked(c.CustomerID) { + continue + } + age := time.Since(c.ReceivedAt) + switch { + case age > sc.downAfter: + sc.states[c.CustomerID] = "down" + downCount++ + case age > sc.threshold: + sc.states[c.CustomerID] = "stale" + staleCount++ + default: + sc.states[c.CustomerID] = "ok" + okCount++ + } + } + logger.Printf("[INFO] Staleness checker initialized: %d ok, %d stale, %d down", okCount, staleCount, downCount) + + return sc +} + +// Check evaluates all customers and emits events on state transitions. +// Should be called periodically (every 60s). +func (sc *StalenessChecker) Check() { + customers, err := sc.store.GetCustomers() + if err != nil { + sc.logger.Printf("[WARN] Staleness check failed: %v", err) + return + } + + sc.mu.Lock() + defer sc.mu.Unlock() + + // Track which customers are still present (to clean up removed ones) + seen := make(map[string]bool, len(customers)) + + for _, c := range customers { + seen[c.CustomerID] = true + + if sc.store.IsCustomerBlocked(c.CustomerID) { + delete(sc.states, c.CustomerID) + continue + } + + age := time.Since(c.ReceivedAt) + var newState string + switch { + case age > sc.downAfter: + newState = "down" + case age > sc.threshold: + newState = "stale" + default: + newState = "ok" + } + + oldState := sc.states[c.CustomerID] + if oldState == "" { + // New customer — set state without event + sc.states[c.CustomerID] = newState + continue + } + + if oldState == newState { + continue + } + + // State transition — emit event + sc.states[c.CustomerID] = newState + sc.emitTransition(c.CustomerID, oldState, newState, age) + } + + // Clean up customers that no longer have reports + for id := range sc.states { + if !seen[id] { + delete(sc.states, id) + } + } +} + +// GetState returns the current staleness state for a customer. +func (sc *StalenessChecker) GetState(customerID string) string { + sc.mu.Lock() + defer sc.mu.Unlock() + s := sc.states[customerID] + if s == "" { + return "unknown" + } + return s +} + +func (sc *StalenessChecker) emitTransition(customerID, oldState, newState string, age time.Duration) { + var eventType, severity, message string + + switch { + case newState == "stale": + eventType = "node_stale" + severity = "warning" + message = "No report received for " + formatDuration(age) + case newState == "down": + eventType = "node_down" + severity = "error" + message = "No report received for " + formatDuration(age) + case newState == "ok" && (oldState == "stale" || oldState == "down"): + eventType = "node_recovered" + severity = "info" + message = "Reports resumed (was " + oldState + " for " + formatDuration(age) + ")" + default: + return + } + + sc.logger.Printf("[INFO] Staleness: %s %s → %s (%s)", customerID, oldState, newState, eventType) + + if _, err := sc.store.SaveEvent(customerID, eventType, severity, message, "{}", "hub"); err != nil { + sc.logger.Printf("[WARN] Failed to save staleness event for %s: %v", customerID, err) + return + } + + if sc.onEvent != nil { + sc.onEvent(customerID, eventType, severity, message, "{}", "hub") + } +} + +func formatDuration(d time.Duration) string { + if d < time.Hour { + return fmt.Sprintf("%dm", int(d.Minutes())) + } + h := int(d.Hours()) + m := int(d.Minutes()) % 60 + if m == 0 { + return fmt.Sprintf("%dh", h) + } + return fmt.Sprintf("%dh%dm", h, m) +} diff --git a/hub/internal/notify/dispatcher.go b/hub/internal/notify/dispatcher.go new file mode 100644 index 0000000..f99fa95 --- /dev/null +++ b/hub/internal/notify/dispatcher.go @@ -0,0 +1,201 @@ +package notify + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "sync" + "time" + + "gitea.dooplex.hu/admin/felhom-hub/internal/store" +) + +// Dispatcher routes events to operator and/or customer email channels. +// Cooldowns are in-memory (lost on restart, acceptable). +type Dispatcher struct { + store *store.Store + resendAPIKey string + fromEmail string + operatorEmail string + operatorOn bool + httpClient *http.Client + logger *log.Logger + + mu sync.Mutex + opCooldowns map[string]time.Time // "customerID:eventType" → last operator notify + custCooldowns map[string]time.Time // "customerID:eventType" → last customer notify +} + +// NewDispatcher creates a new notification dispatcher. +func NewDispatcher(s *store.Store, resendAPIKey, fromEmail, operatorEmail string, operatorOn bool, logger *log.Logger) *Dispatcher { + return &Dispatcher{ + store: s, + resendAPIKey: resendAPIKey, + fromEmail: fromEmail, + operatorEmail: operatorEmail, + operatorOn: operatorOn, + httpClient: &http.Client{Timeout: 10 * time.Second}, + logger: logger, + opCooldowns: make(map[string]time.Time), + custCooldowns: make(map[string]time.Time), + } +} + +// ProcessEvent evaluates an event and sends notifications as appropriate. +// Safe to call from goroutines. +func (d *Dispatcher) ProcessEvent(customerID, eventType, severity, message, detailsJSON, source string) { + if d.resendAPIKey == "" { + return + } + + // "test" bypass — send directly to customer email, skip prefs/cooldown + if eventType == "test" { + d.sendTestEmail(customerID) + return + } + + // Only warning and error severity trigger notifications + if severity != "warning" && severity != "error" { + return + } + + // Operator channel + d.processOperator(customerID, eventType, severity, message, detailsJSON, source) + + // Customer channel + d.processCustomer(customerID, eventType, severity, message, detailsJSON, source) +} + +func (d *Dispatcher) sendTestEmail(customerID string) { + prefs, err := d.store.GetNotificationPrefs(customerID) + if err != nil || prefs.Email == "" { + d.logger.Printf("[WARN] Test email: no email configured for %s", customerID) + return + } + + subject := "[Felhom] Teszt értesítés" + body := "Kedves Ügyfél!\n\nEz egy teszt értesítés a Felhom monitoring rendszerből.\nAz értesítések megfelelően működnek.\n\nÜdvözlettel,\nFelhom.eu monitoring" + + if err := d.sendEmail(prefs.Email, subject, body); err != nil { + d.logger.Printf("[ERROR] Test email to %s failed: %v", prefs.Email, err) + d.store.LogNotification(customerID, "test", "info", "Teszt értesítés", "failed", err.Error(), "customer") + return + } + d.logger.Printf("[INFO] Test email sent to %s for %s", prefs.Email, customerID) + d.store.LogNotification(customerID, "test", "info", "Teszt értesítés", "sent", "", "customer") +} + +func (d *Dispatcher) processOperator(customerID, eventType, severity, message, detailsJSON, source string) { + if !d.operatorOn || d.operatorEmail == "" { + return + } + + cooldownKey := customerID + ":" + eventType + d.mu.Lock() + if last, ok := d.opCooldowns[cooldownKey]; ok && time.Since(last) < 1*time.Hour { + d.mu.Unlock() + return + } + d.opCooldowns[cooldownKey] = time.Now() + d.mu.Unlock() + + subject, body := FormatOperatorEmail(customerID, eventType, severity, message, detailsJSON) + + if err := d.sendEmail(d.operatorEmail, subject, body); err != nil { + d.logger.Printf("[ERROR] Operator email failed for %s/%s: %v", customerID, eventType, err) + d.store.LogNotification(customerID, eventType, severity, message, "failed", err.Error(), "operator") + return + } + d.logger.Printf("[INFO] Operator email sent for %s/%s", customerID, eventType) + d.store.LogNotification(customerID, eventType, severity, message, "sent", "", "operator") +} + +func (d *Dispatcher) processCustomer(customerID, eventType, severity, message, detailsJSON, source string) { + // Check if customer is blocked + if d.store.IsCustomerBlocked(customerID) { + return + } + + // Load preferences + prefs, err := d.store.GetNotificationPrefs(customerID) + if err != nil || prefs.Email == "" { + return + } + + // Check if event type is enabled + if !isEventEnabled(prefs.EnabledEvents, eventType) { + return + } + + // Customer cooldown (from prefs, default 6h) + cooldownHours := prefs.CooldownHours + if cooldownHours <= 0 { + cooldownHours = 6 + } + cooldownDur := time.Duration(cooldownHours) * time.Hour + + cooldownKey := customerID + ":" + eventType + d.mu.Lock() + if last, ok := d.custCooldowns[cooldownKey]; ok && time.Since(last) < cooldownDur { + d.mu.Unlock() + return + } + d.custCooldowns[cooldownKey] = time.Now() + d.mu.Unlock() + + subject, body := FormatCustomerEmail(customerID, eventType, severity, message, detailsJSON) + + if err := d.sendEmail(prefs.Email, subject, body); err != nil { + d.logger.Printf("[ERROR] Customer email failed for %s/%s: %v", customerID, eventType, err) + d.store.LogNotification(customerID, eventType, severity, message, "failed", err.Error(), "customer") + return + } + d.logger.Printf("[INFO] Customer email sent to %s for %s/%s", prefs.Email, customerID, eventType) + d.store.LogNotification(customerID, eventType, severity, message, "sent", "", "customer") +} + +func (d *Dispatcher) sendEmail(to, subject, textBody string) error { + payload := map[string]interface{}{ + "from": d.fromEmail, + "to": []string{to}, + "subject": subject, + "text": textBody, + } + + jsonData, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshaling email payload: %w", err) + } + + req, err := http.NewRequest("POST", "https://api.resend.com/emails", bytes.NewReader(jsonData)) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + req.Header.Set("Authorization", "Bearer "+d.resendAPIKey) + req.Header.Set("Content-Type", "application/json") + + resp, err := d.httpClient.Do(req) + if err != nil { + return fmt.Errorf("sending request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) + return fmt.Errorf("resend API returned %d: %s", resp.StatusCode, string(respBody)) + } + + return nil +} + +func isEventEnabled(enabledEvents []string, eventType string) bool { + for _, e := range enabledEvents { + if e == eventType { + return true + } + } + return false +} diff --git a/hub/internal/notify/templates.go b/hub/internal/notify/templates.go new file mode 100644 index 0000000..377b5e7 --- /dev/null +++ b/hub/internal/notify/templates.go @@ -0,0 +1,153 @@ +package notify + +import ( + "fmt" + "time" +) + +// budapest timezone for formatting. +var budapest *time.Location + +func init() { + var err error + budapest, err = time.LoadLocation("Europe/Budapest") + if err != nil { + budapest = time.FixedZone("CET", 3600) + } +} + +// ────────────────────────────────────────────────────────────────────── +// Operator email — concise, English +// ────────────────────────────────────────────────────────────────────── + +// FormatOperatorEmail returns (subject, textBody) for the operator channel. +func FormatOperatorEmail(customerID, eventType, severity, message, detailsJSON string) (string, string) { + icon := "⚠️" + if severity == "error" { + icon = "🔴" + } + + subject := fmt.Sprintf("[Felhom] %s %s: %s", icon, customerID, eventType) + + now := time.Now().In(budapest).Format("2006-01-02 15:04 MST") + body := fmt.Sprintf(`Customer: %s +Event: %s +Severity: %s +Time: %s +Message: %s`, customerID, eventType, severity, now, message) + + if detailsJSON != "" && detailsJSON != "{}" { + body += fmt.Sprintf("\nDetails: %s", detailsJSON) + } + + body += fmt.Sprintf("\n\nDashboard: https://hub.felhom.eu/customers/%s", customerID) + + return subject, body +} + +// ────────────────────────────────────────────────────────────────────── +// Customer email — Hungarian, friendly +// ────────────────────────────────────────────────────────────────────── + +// customerMessages maps event_type → Hungarian customer message. +var customerMessages = map[string]string{ + // Backup events + "backup_completed": "A biztonsági mentés sikeresen elkészült.", + "backup_failed": "A biztonsági mentés sikertelen! Kérjük, ellenőrizd a rendszert.", + "db_dump_completed": "Az adatbázis mentés sikeresen elkészült.", + "db_dump_failed": "Az adatbázis mentés sikertelen!", + "backup_integrity_ok": "A mentés integritás ellenőrzés sikeres.", + "backup_integrity_failed": "A mentés integritás ellenőrzés hibát talált!", + "crossdrive_completed": "A másodlagos mentés sikeresen elkészült.", + "crossdrive_failed": "A másodlagos mentés sikertelen!", + + // Disk events + "disk_warning": "A lemezterület 90% felett van — kérjük, szabadíts fel helyet.", + "disk_critical": "A lemezterület kritikusan magas (95%+) — azonnali beavatkozás szükséges!", + + // Storage events + "storage_disconnected": "Egy meghajtó leválasztva — a mentések szünetelhetnek.", + "storage_reconnected": "A meghajtó újra csatlakoztatva.", + + // Staleness events (Hub-generated) + "node_stale": "A szerver nem küldött jelentést az elmúlt időszakban.", + "node_down": "A szerver nem elérhető!", + "node_recovered": "A szerver újra elérhető.", + + // Health events + "health_degraded": "A rendszer állapota romlott.", + "health_critical": "A rendszer állapota kritikus!", + "health_recovered": "A rendszer állapota helyreállt.", + + // Controller events + "controller_started": "A vezérlő elindult.", + "controller_updated": "A vezérlő frissítve lett.", + + // Deadline events (Hub-generated) + "expected_backup_missed": "A mai biztonsági mentés nem készült el a határidőig!", + "expected_dbdump_missed": "A mai adatbázis mentés nem készült el a határidőig!", + + // App lifecycle events + "app_deployed": "Alkalmazás telepítve.", + "app_removed": "Alkalmazás eltávolítva.", + + // Disaster recovery events + "disaster_recovery_started": "Katasztrófa helyreállítás elindítva.", + "disaster_recovery_completed": "Katasztrófa helyreállítás befejezve.", + + // Test + "test": "Ez egy teszt értesítés.", +} + +// severityLabels maps severity to Hungarian labels. +var severityLabels = map[string]string{ + "info": "Információ", + "warning": "Figyelmeztetés", + "error": "Hiba", +} + +// FormatCustomerEmail returns (subject, textBody) for the customer channel. +func FormatCustomerEmail(customerID, eventType, severity, message, detailsJSON string) (string, string) { + label := severityLabels[severity] + if label == "" { + label = severity + } + + // Use the per-event-type Hungarian message if available, otherwise fall back to message + hunMessage := customerMessages[eventType] + if hunMessage == "" { + hunMessage = message + } + + subject := fmt.Sprintf("[Felhom] %s: %s", label, hunMessage) + + now := time.Now().In(budapest).Format("2006-01-02 15:04") + body := fmt.Sprintf(`Kedves Ügyfél! + +A Felhom rendszered a következő értesítést küldte: + +%s + +Részletek: +- Szerver: %s +- Időpont: %s +- Szint: %s +- Típus: %s`, hunMessage, customerID, now, label, eventType) + + if message != "" && message != hunMessage { + body += fmt.Sprintf("\n- Üzenet: %s", message) + } + + if detailsJSON != "" && detailsJSON != "{}" { + body += fmt.Sprintf("\n- Megjegyzés: %s", detailsJSON) + } + + body += ` + +Ha kérdésed van, vedd fel a kapcsolatot az üzemeltetővel. + +Üdvözlettel, +Felhom.eu monitoring` + + return subject, body +} diff --git a/hub/internal/store/store.go b/hub/internal/store/store.go index ff64a79..63f3dea 100644 --- a/hub/internal/store/store.go +++ b/hub/internal/store/store.go @@ -121,6 +121,33 @@ func (s *Store) migrate() error { // v0.2.1: add status column to customer_configs (idempotent) s.db.Exec("ALTER TABLE customer_configs ADD COLUMN status TEXT NOT NULL DEFAULT 'active'") + // v0.3.0: events table for hub-native monitoring + _, err = s.db.Exec(` + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + customer_id TEXT NOT NULL, + event_type TEXT NOT NULL, + severity TEXT NOT NULL DEFAULT 'info', + message TEXT NOT NULL DEFAULT '', + details_json TEXT NOT NULL DEFAULT '{}', + source TEXT NOT NULL DEFAULT 'controller', + created_at DATETIME NOT NULL DEFAULT (datetime('now')) + ); + CREATE INDEX IF NOT EXISTS idx_events_customer_created + ON events(customer_id, created_at DESC); + CREATE INDEX IF NOT EXISTS idx_events_type + ON events(event_type, created_at DESC); + `) + if err != nil { + return err + } + + // v0.3.0: add cooldown_hours to customer_notifications (idempotent) + s.db.Exec("ALTER TABLE customer_notifications ADD COLUMN cooldown_hours INTEGER DEFAULT 6") + + // v0.3.0: add channel column to notification_log (idempotent) + s.db.Exec("ALTER TABLE notification_log ADD COLUMN channel TEXT NOT NULL DEFAULT 'customer'") + return nil } @@ -129,15 +156,17 @@ type NotificationPrefs struct { CustomerID string Email string EnabledEvents []string + CooldownHours int } // GetNotificationPrefs returns notification preferences for a customer. func (s *Store) GetNotificationPrefs(customerID string) (*NotificationPrefs, error) { var email, eventsJSON string + var cooldownHours int err := s.db.QueryRow( - "SELECT email, enabled_events FROM customer_notifications WHERE customer_id = ?", + "SELECT email, enabled_events, COALESCE(cooldown_hours, 6) FROM customer_notifications WHERE customer_id = ?", customerID, - ).Scan(&email, &eventsJSON) + ).Scan(&email, &eventsJSON, &cooldownHours) if err != nil { if err == sql.ErrNoRows { return nil, nil @@ -150,34 +179,46 @@ func (s *Store) GetNotificationPrefs(customerID string) (*NotificationPrefs, err s.logger.Printf("[WARN] Corrupt enabled_events JSON for %s: %v", customerID, err) } + if cooldownHours <= 0 { + cooldownHours = 6 + } + return &NotificationPrefs{ CustomerID: customerID, Email: email, EnabledEvents: events, + CooldownHours: cooldownHours, }, nil } // SaveNotificationPrefs creates or updates notification preferences for a customer. -func (s *Store) SaveNotificationPrefs(customerID, email string, enabledEvents []string) error { +func (s *Store) SaveNotificationPrefs(customerID, email string, enabledEvents []string, cooldownHours int) error { eventsJSON, _ := json.Marshal(enabledEvents) + if cooldownHours <= 0 { + cooldownHours = 6 + } _, err := s.db.Exec(` - INSERT INTO customer_notifications (customer_id, email, enabled_events, updated_at) - VALUES (?, ?, ?, datetime('now')) + INSERT INTO customer_notifications (customer_id, email, enabled_events, cooldown_hours, updated_at) + VALUES (?, ?, ?, ?, datetime('now')) ON CONFLICT(customer_id) DO UPDATE SET email = excluded.email, enabled_events = excluded.enabled_events, + cooldown_hours = excluded.cooldown_hours, updated_at = datetime('now')`, - customerID, email, string(eventsJSON), + customerID, email, string(eventsJSON), cooldownHours, ) return err } // LogNotification records a notification attempt. -func (s *Store) LogNotification(customerID, eventType, severity, message, status, errorMsg string) error { +func (s *Store) LogNotification(customerID, eventType, severity, message, status, errorMsg, channel string) error { + if channel == "" { + channel = "customer" + } _, err := s.db.Exec(` - INSERT INTO notification_log (customer_id, event_type, severity, message, status, error_message) - VALUES (?, ?, ?, ?, ?, ?)`, - customerID, eventType, severity, message, status, errorMsg, + INSERT INTO notification_log (customer_id, event_type, severity, message, status, error_message, channel) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + customerID, eventType, severity, message, status, errorMsg, channel, ) return err } @@ -189,13 +230,14 @@ type NotificationLogEntry struct { Message string Status string // "sent", "skipped", "failed" ErrorMessage string + Channel string // "operator" or "customer" CreatedAt time.Time } // GetRecentNotifications returns the most recent notification log entries for a customer. func (s *Store) GetRecentNotifications(customerID string, limit int) ([]NotificationLogEntry, error) { rows, err := s.db.Query(` - SELECT event_type, severity, message, status, COALESCE(error_message, ''), created_at + SELECT event_type, severity, message, status, COALESCE(error_message, ''), COALESCE(channel, 'customer'), created_at FROM notification_log WHERE customer_id = ? ORDER BY created_at DESC @@ -209,7 +251,7 @@ func (s *Store) GetRecentNotifications(customerID string, limit int) ([]Notifica for rows.Next() { var e NotificationLogEntry var createdAt, errorMsg string - if err := rows.Scan(&e.EventType, &e.Severity, &e.Message, &e.Status, &errorMsg, &createdAt); err != nil { + if err := rows.Scan(&e.EventType, &e.Severity, &e.Message, &e.Status, &errorMsg, &e.Channel, &createdAt); err != nil { return nil, err } e.CreatedAt = parseSQLiteTime(createdAt) @@ -658,6 +700,177 @@ func (s *Store) UpdateRetrievalPassword(customerID, newPassword string) error { return err } +// --- Event system --- + +// Event represents a single event record. +type Event struct { + ID int64 + CustomerID string + EventType string + Severity string // "info", "warning", "error" + Message string + DetailsJSON string // raw JSON + Source string // "controller" or "hub" + CreatedAt time.Time +} + +// SaveEvent inserts a new event and returns its ID. +func (s *Store) SaveEvent(customerID, eventType, severity, message, detailsJSON, source string) (int64, error) { + if detailsJSON == "" { + detailsJSON = "{}" + } + if source == "" { + source = "controller" + } + res, err := s.db.Exec(` + INSERT INTO events (customer_id, event_type, severity, message, details_json, source) + VALUES (?, ?, ?, ?, ?, ?)`, + customerID, eventType, severity, message, detailsJSON, source, + ) + if err != nil { + return 0, err + } + return res.LastInsertId() +} + +// GetRecentEvents returns the most recent events for a customer, newest first. +func (s *Store) GetRecentEvents(customerID string, limit int) ([]Event, error) { + rows, err := s.db.Query(` + SELECT id, customer_id, event_type, severity, message, details_json, source, created_at + FROM events + WHERE customer_id = ? + ORDER BY created_at DESC + LIMIT ?`, customerID, limit) + if err != nil { + return nil, err + } + defer rows.Close() + return scanEvents(rows) +} + +// GetEventsByType returns events of a specific type for a customer since a given time. +func (s *Store) GetEventsByType(customerID, eventType string, since time.Time) ([]Event, error) { + rows, err := s.db.Query(` + SELECT id, customer_id, event_type, severity, message, details_json, source, created_at + FROM events + WHERE customer_id = ? AND event_type = ? AND created_at >= ? + ORDER BY created_at DESC`, + customerID, eventType, since.UTC().Format("2006-01-02 15:04:05")) + if err != nil { + return nil, err + } + defer rows.Close() + return scanEvents(rows) +} + +// GetLatestEventByType returns the most recent event of a given type for a customer. +func (s *Store) GetLatestEventByType(customerID, eventType string) (*Event, error) { + var e Event + var createdAt string + err := s.db.QueryRow(` + SELECT id, customer_id, event_type, severity, message, details_json, source, created_at + FROM events + WHERE customer_id = ? AND event_type = ? + ORDER BY created_at DESC + LIMIT 1`, customerID, eventType, + ).Scan(&e.ID, &e.CustomerID, &e.EventType, &e.Severity, &e.Message, &e.DetailsJSON, &e.Source, &createdAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + e.CreatedAt = parseSQLiteTime(createdAt) + return &e, nil +} + +// GetAllRecentEvents returns the most recent events across all customers. +func (s *Store) GetAllRecentEvents(limit int) ([]Event, error) { + rows, err := s.db.Query(` + SELECT id, customer_id, event_type, severity, message, details_json, source, created_at + FROM events + ORDER BY created_at DESC + LIMIT ?`, limit) + if err != nil { + return nil, err + } + defer rows.Close() + return scanEvents(rows) +} + +// CountEventsBySeverity returns a count of events per severity for a customer since a given time. +func (s *Store) CountEventsBySeverity(customerID string, since time.Time) (map[string]int, error) { + rows, err := s.db.Query(` + SELECT severity, COUNT(*) FROM events + WHERE customer_id = ? AND created_at >= ? + GROUP BY severity`, + customerID, since.UTC().Format("2006-01-02 15:04:05")) + if err != nil { + return nil, err + } + defer rows.Close() + + counts := make(map[string]int) + for rows.Next() { + var sev string + var count int + if err := rows.Scan(&sev, &count); err != nil { + return nil, err + } + counts[sev] = count + } + return counts, rows.Err() +} + +// PruneEvents deletes events older than the given number of days. +func (s *Store) PruneEvents(maxDays int) (int64, error) { + cutoff := time.Now().AddDate(0, 0, -maxDays).UTC().Format("2006-01-02 15:04:05") + res, err := s.db.Exec("DELETE FROM events WHERE created_at < ?", cutoff) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +// GetActiveCustomerIDs returns customer IDs from customer_configs where status is 'active'. +func (s *Store) GetActiveCustomerIDs() ([]string, error) { + rows, err := s.db.Query("SELECT customer_id FROM customer_configs WHERE status = 'active'") + if err != nil { + return nil, err + } + defer rows.Close() + + var ids []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + ids = append(ids, id) + } + return ids, rows.Err() +} + +// Ping verifies the database is accessible. +func (s *Store) Ping() error { + var n int + return s.db.QueryRow("SELECT 1").Scan(&n) +} + +func scanEvents(rows *sql.Rows) ([]Event, error) { + var events []Event + for rows.Next() { + var e Event + var createdAt string + if err := rows.Scan(&e.ID, &e.CustomerID, &e.EventType, &e.Severity, &e.Message, &e.DetailsJSON, &e.Source, &createdAt); err != nil { + return nil, err + } + e.CreatedAt = parseSQLiteTime(createdAt) + events = append(events, e) + } + return events, rows.Err() +} + // parseSQLiteTime tries multiple formats that modernc.org/sqlite may return. func parseSQLiteTime(s string) time.Time { formats := []string{ diff --git a/hub/internal/web/configs.go b/hub/internal/web/configs.go index 6cc43d7..f9b5f58 100644 --- a/hub/internal/web/configs.go +++ b/hub/internal/web/configs.go @@ -223,12 +223,14 @@ func (s *Server) handleCustomerUnified(w http.ResponseWriter, r *http.Request, c } } - // History, notifications, infra backup + // History, notifications, events, infra backup var history []store.CustomerSummary var notifPrefs *store.NotificationPrefs var recentNotifs []store.NotificationLogEntry var infraMeta *store.InfraBackupMeta var infraBackupAge string + var events []store.Event + var eventCounts map[string]int if customer != nil { history, _ = s.store.GetCustomerHistory(customerID, 24*time.Hour) @@ -238,6 +240,8 @@ func (s *Server) handleCustomerUnified(w http.ResponseWriter, r *http.Request, c if infraMeta != nil { infraBackupAge = timeAgo(infraMeta.UpdatedAt) } + events, _ = s.store.GetRecentEvents(customerID, 50) + eventCounts, _ = s.store.CountEventsBySeverity(customerID, time.Now().Add(-24*time.Hour)) } type pageData struct { @@ -270,6 +274,9 @@ func (s *Server) handleCustomerUnified(w http.ResponseWriter, r *http.Request, c RecentNotifications []store.NotificationLogEntry History []store.CustomerSummary + Events []store.Event + EventCounts map[string]int // severity → count (last 24h) + Flash string ActiveNav string } @@ -304,6 +311,9 @@ func (s *Server) handleCustomerUnified(w http.ResponseWriter, r *http.Request, c RecentNotifications: recentNotifs, History: history, + Events: events, + EventCounts: eventCounts, + Flash: r.URL.Query().Get("flash"), ActiveNav: "configs", } @@ -697,7 +707,7 @@ func buildConfigJSON(r *http.Request) string { overrides["git"] = git } - // Monitoring UUIDs + // Monitoring UUIDs (legacy — only written if user explicitly provides values) uuids := make(map[string]interface{}) for _, key := range []string{"heartbeat", "system_health", "db_dump", "backup", "backup_integrity"} { if v := strings.TrimSpace(r.FormValue("uuid_" + key)); v != "" { diff --git a/hub/internal/web/controller.yaml.default b/hub/internal/web/controller.yaml.default index cf974d3..d295b18 100644 --- a/hub/internal/web/controller.yaml.default +++ b/hub/internal/web/controller.yaml.default @@ -80,12 +80,12 @@ backup: monitoring: enabled: true healthchecks_base: "https://status.felhom.eu" - ping_uuids: - heartbeat: "" # Every 5 min — controller process alive - system_health: "" # Every 5 min — comprehensive system check - db_dump: "" # Daily — after database dumps - backup: "" # Daily — after restic snapshot - backup_integrity: "" # Weekly (Sunday) — restic check + # ping_uuids: (deprecated — monitoring is now handled by the Hub event system) + # heartbeat: "" + # system_health: "" + # db_dump: "" + # backup: "" + # backup_integrity: "" system_health_interval: "5m" health_check_schedule: "06:00" thresholds: diff --git a/hub/internal/web/server.go b/hub/internal/web/server.go index d562048..cd65a73 100644 --- a/hub/internal/web/server.go +++ b/hub/internal/web/server.go @@ -40,6 +40,13 @@ func New(store *store.Store, passwordHash, apiKey string, staleThreshold time.Du b, _ := json.Marshal(v) return template.JS(b) }, + "add": func(a, b int) int { return a + b }, + "mapGet": func(m map[string]int, key string) int { + if m == nil { + return 0 + } + return m[key] + }, } tmpl := template.Must(template.New("").Funcs(funcMap).ParseFS(templateFS, "templates/*.html")) @@ -232,6 +239,8 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) { store.CustomerSummary OverallStatus string // "ok", "warn", "down", "pending" BackupAge string + EventErrors int + EventWarnings int } // Build map of report customers keyed by ID @@ -266,6 +275,12 @@ func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) { dc.BackupAge = "–" } + // Event counts (last 24h) + if counts, err := s.store.CountEventsBySeverity(c.CustomerID, time.Now().Add(-24*time.Hour)); err == nil { + dc.EventErrors = counts["error"] + dc.EventWarnings = counts["warning"] + } + data = append(data, dc) } diff --git a/hub/internal/web/templates/config_form.html b/hub/internal/web/templates/config_form.html index 1dae7f8..e7f82cb 100644 --- a/hub/internal/web/templates/config_form.html +++ b/hub/internal/web/templates/config_form.html @@ -96,8 +96,9 @@ -
-

Monitoring UUIDs

+
+

Monitoring UUIDs

Legacy
+

Healthchecks ping UUIDs are deprecated. Monitoring is now handled natively by the Hub event system. These fields are kept for backward compatibility with older controllers.

{{$uuids := ""}} {{with .Overrides}}{{with index . "monitoring"}}{{with index . "ping_uuids"}}{{$uuids = .}}{{end}}{{end}}{{end}} diff --git a/hub/internal/web/templates/customer_unified.html b/hub/internal/web/templates/customer_unified.html index 8660edc..485819c 100644 --- a/hub/internal/web/templates/customer_unified.html +++ b/hub/internal/web/templates/customer_unified.html @@ -403,6 +403,61 @@ {{end}} + +
+

Events + {{if .EventCounts}} + {{with mapGet .EventCounts "error"}}{{.}} error{{if gt . 1}}s{{end}}{{end}} + {{with mapGet .EventCounts "warning"}}{{.}} warning{{if gt . 1}}s{{end}}{{end}} + {{end}} + (last 24h) +

+ {{if .Events}} +
+ + + + +
+ + + + + + + + + + + + {{range .Events}} + + + + + + + + {{end}} + +
TimeSeverityTypeMessageSource
{{.CreatedAt.Format "Jan 02 15:04"}}{{.Severity}}{{.EventType}}{{.Message}}{{.Source}}
+ + {{else}} +

No events recorded yet.

+ {{end}} +
+

Notifications

@@ -424,6 +479,7 @@ Time + Channel Event Status Message @@ -433,6 +489,7 @@ {{range .RecentNotifications}} {{.CreatedAt.Format "Jan 02 15:04"}} + {{.Channel}} {{.EventType}} {{.Status}} {{.Message}} diff --git a/hub/internal/web/templates/dashboard.html b/hub/internal/web/templates/dashboard.html index 8e6b659..4db3473 100644 --- a/hub/internal/web/templates/dashboard.html +++ b/hub/internal/web/templates/dashboard.html @@ -28,6 +28,7 @@ Customer Status + Events Last Seen CPU Memory @@ -49,6 +50,7 @@ {{if eq .OverallStatus "ok"}}OK{{else if eq .OverallStatus "warn"}}WARN{{else if eq .OverallStatus "disabled"}}PAUSED{{else if eq .OverallStatus "pending"}}PENDING{{else}}DOWN{{end}} + {{if eq .OverallStatus "pending"}}—{{else}}{{if gt (add .EventErrors .EventWarnings) 0}}{{if gt .EventErrors 0}}{{.EventErrors}}{{end}}{{if gt .EventWarnings 0}}{{.EventWarnings}}{{end}}{{else}}{{end}}{{end}} {{if eq .OverallStatus "pending"}}—{{else}}{{timeAgo .ReceivedAt}}{{end}} {{if eq .OverallStatus "pending"}}—{{else}}{{formatFloat .CPUPercent}}%{{end}} {{if eq .OverallStatus "pending"}}—{{else}}{{formatFloat .MemoryPercent}}%{{end}} diff --git a/hub/internal/web/templates/style.css b/hub/internal/web/templates/style.css index d2a64f7..e882852 100644 --- a/hub/internal/web/templates/style.css +++ b/hub/internal/web/templates/style.css @@ -564,6 +564,55 @@ code { color: var(--text-muted); } +/* Severity badges */ +.severity-badge { + display: inline-block; + padding: 0.15em 0.5em; + border-radius: 4px; + font-size: 0.8em; + font-weight: 600; + line-height: 1.4; +} + +.severity-error { + background: rgba(239, 68, 68, 0.15); + color: #ef4444; +} + +.severity-warning { + background: rgba(245, 158, 11, 0.15); + color: #f59e0b; +} + +.severity-info { + background: rgba(59, 130, 246, 0.15); + color: #3b82f6; +} + +/* Event filter buttons */ +.event-filter { + font-size: 0.8em; + padding: 0.2em 0.6em; + cursor: pointer; +} + +.event-filter.active { + background: var(--accent); + color: #fff; + border-color: var(--accent); +} + +/* Notification channel badges */ +.status-badge-operator { + background: rgba(139, 92, 246, 0.15); + color: #8b5cf6; +} + +.status-badge-customer { + background: rgba(59, 130, 246, 0.15); + color: #3b82f6; +} + /* Responsive */ @media (max-width: 768px) { .container { padding: 1rem; }