Files
felhom.eu/hub/internal/notify/dispatcher.go
T
admin 3217cb4751 feat: Hub monitoring takeover — event system, dead man's switch, notifications (v0.3.0)
Replace external Healthchecks.io with Hub-native monitoring. New events
table + /api/v1/event endpoint for structured events from controllers.
Staleness checker (60s) detects unresponsive nodes. Backup deadline
checker (daily 05:00) catches missed backups. Notification dispatcher
sends operator (English) + customer (Hungarian) emails via Resend with
per-event cooldowns. Event timeline on customer page, dashboard badges.
Config form deprecates Monitoring UUIDs section.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 18:53:24 +01:00

202 lines
6.1 KiB
Go

package notify
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"gitea.dooplex.hu/admin/felhom-hub/internal/store"
)
// Dispatcher routes events to operator and/or customer email channels.
// Cooldowns are in-memory (lost on restart, acceptable).
type Dispatcher struct {
store *store.Store
resendAPIKey string
fromEmail string
operatorEmail string
operatorOn bool
httpClient *http.Client
logger *log.Logger
mu sync.Mutex
opCooldowns map[string]time.Time // "customerID:eventType" → last operator notify
custCooldowns map[string]time.Time // "customerID:eventType" → last customer notify
}
// NewDispatcher creates a new notification dispatcher.
func NewDispatcher(s *store.Store, resendAPIKey, fromEmail, operatorEmail string, operatorOn bool, logger *log.Logger) *Dispatcher {
return &Dispatcher{
store: s,
resendAPIKey: resendAPIKey,
fromEmail: fromEmail,
operatorEmail: operatorEmail,
operatorOn: operatorOn,
httpClient: &http.Client{Timeout: 10 * time.Second},
logger: logger,
opCooldowns: make(map[string]time.Time),
custCooldowns: make(map[string]time.Time),
}
}
// ProcessEvent evaluates an event and sends notifications as appropriate.
// Safe to call from goroutines.
func (d *Dispatcher) ProcessEvent(customerID, eventType, severity, message, detailsJSON, source string) {
if d.resendAPIKey == "" {
return
}
// "test" bypass — send directly to customer email, skip prefs/cooldown
if eventType == "test" {
d.sendTestEmail(customerID)
return
}
// Only warning and error severity trigger notifications
if severity != "warning" && severity != "error" {
return
}
// Operator channel
d.processOperator(customerID, eventType, severity, message, detailsJSON, source)
// Customer channel
d.processCustomer(customerID, eventType, severity, message, detailsJSON, source)
}
func (d *Dispatcher) sendTestEmail(customerID string) {
prefs, err := d.store.GetNotificationPrefs(customerID)
if err != nil || prefs.Email == "" {
d.logger.Printf("[WARN] Test email: no email configured for %s", customerID)
return
}
subject := "[Felhom] Teszt értesítés"
body := "Kedves Ügyfél!\n\nEz egy teszt értesítés a Felhom monitoring rendszerből.\nAz értesítések megfelelően működnek.\n\nÜdvözlettel,\nFelhom.eu monitoring"
if err := d.sendEmail(prefs.Email, subject, body); err != nil {
d.logger.Printf("[ERROR] Test email to %s failed: %v", prefs.Email, err)
d.store.LogNotification(customerID, "test", "info", "Teszt értesítés", "failed", err.Error(), "customer")
return
}
d.logger.Printf("[INFO] Test email sent to %s for %s", prefs.Email, customerID)
d.store.LogNotification(customerID, "test", "info", "Teszt értesítés", "sent", "", "customer")
}
func (d *Dispatcher) processOperator(customerID, eventType, severity, message, detailsJSON, source string) {
if !d.operatorOn || d.operatorEmail == "" {
return
}
cooldownKey := customerID + ":" + eventType
d.mu.Lock()
if last, ok := d.opCooldowns[cooldownKey]; ok && time.Since(last) < 1*time.Hour {
d.mu.Unlock()
return
}
d.opCooldowns[cooldownKey] = time.Now()
d.mu.Unlock()
subject, body := FormatOperatorEmail(customerID, eventType, severity, message, detailsJSON)
if err := d.sendEmail(d.operatorEmail, subject, body); err != nil {
d.logger.Printf("[ERROR] Operator email failed for %s/%s: %v", customerID, eventType, err)
d.store.LogNotification(customerID, eventType, severity, message, "failed", err.Error(), "operator")
return
}
d.logger.Printf("[INFO] Operator email sent for %s/%s", customerID, eventType)
d.store.LogNotification(customerID, eventType, severity, message, "sent", "", "operator")
}
func (d *Dispatcher) processCustomer(customerID, eventType, severity, message, detailsJSON, source string) {
// Check if customer is blocked
if d.store.IsCustomerBlocked(customerID) {
return
}
// Load preferences
prefs, err := d.store.GetNotificationPrefs(customerID)
if err != nil || prefs.Email == "" {
return
}
// Check if event type is enabled
if !isEventEnabled(prefs.EnabledEvents, eventType) {
return
}
// Customer cooldown (from prefs, default 6h)
cooldownHours := prefs.CooldownHours
if cooldownHours <= 0 {
cooldownHours = 6
}
cooldownDur := time.Duration(cooldownHours) * time.Hour
cooldownKey := customerID + ":" + eventType
d.mu.Lock()
if last, ok := d.custCooldowns[cooldownKey]; ok && time.Since(last) < cooldownDur {
d.mu.Unlock()
return
}
d.custCooldowns[cooldownKey] = time.Now()
d.mu.Unlock()
subject, body := FormatCustomerEmail(customerID, eventType, severity, message, detailsJSON)
if err := d.sendEmail(prefs.Email, subject, body); err != nil {
d.logger.Printf("[ERROR] Customer email failed for %s/%s: %v", customerID, eventType, err)
d.store.LogNotification(customerID, eventType, severity, message, "failed", err.Error(), "customer")
return
}
d.logger.Printf("[INFO] Customer email sent to %s for %s/%s", prefs.Email, customerID, eventType)
d.store.LogNotification(customerID, eventType, severity, message, "sent", "", "customer")
}
func (d *Dispatcher) sendEmail(to, subject, textBody string) error {
payload := map[string]interface{}{
"from": d.fromEmail,
"to": []string{to},
"subject": subject,
"text": textBody,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshaling email payload: %w", err)
}
req, err := http.NewRequest("POST", "https://api.resend.com/emails", bytes.NewReader(jsonData))
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+d.resendAPIKey)
req.Header.Set("Content-Type", "application/json")
resp, err := d.httpClient.Do(req)
if err != nil {
return fmt.Errorf("sending request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("resend API returned %d: %s", resp.StatusCode, string(respBody))
}
return nil
}
func isEventEnabled(enabledEvents []string, eventType string) bool {
for _, e := range enabledEvents {
if e == eventType {
return true
}
}
return false
}